Batch SQL Examples

Submit SQL Query via SQL Client

First, modify the sql client config file ./conf/sql-client-defaults.yaml to set execution type to batch.

Batch SQL Example: config

Then, start local cluster:

$ ./bin/

Check the web at http://localhost:8081 and make sure everything is up and running. The web frontend should report a single available TaskManager instance.

Prepare the input data:

$ cat /tmp/pagevisit.csv
2018-10-16 09:00:00,1001,/page1,chrome
2018-10-16 09:00:20,1001,/page2,safari
2018-10-16 09:03:20,1005,/page1,chrome
2018-10-16 09:05:50,1005,/page1,safari
2018-10-16 09:05:56,1005,/page2,safari
2018-10-16 09:05:57,1006,/page2,chrome

Then start SQL Client shell:

$ ./bin/ embedded

You can see the welcome message for flink sql client.

Paste the following sql ddl text into the shell. (For more information about sql ddl refer to SQL and Supported DDL)

create table pagevisit (
    visit_time varchar,
    user_id bigint,
    visit_page varchar,
    browser_type varchar
) with (
    type = 'csv',
    path = 'file:///tmp/pagevisit.csv'

Press ‘Enter’ and paste the following sql dml text.

  date_format(visit_time, 'yyyy-MM-dd HH:mm') as `visit_time`,
  count(user_id) as pv, 
  count(distinct user_id) as uv
from pagevisit
group by date_format(visit_time, 'yyyy-MM-dd HH:mm');

After press ‘Enter’ the sql will be submitted to the standalone cluster. The result will print on the shell.

Batch SQL Example: result

Open http://localhost:8081 and you can see the job information.

Batch SQL Example: web

For more information please refer to SQL and SQL Client.

Submit SQL Query Programmatically

SQL queries can be submitted using the sqlQuery() method of the TableEnvironment programmatically.

WordCountSQL shows how the Batch SQL API is used in Java.

// set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(4);
BatchTableEnvironment tEnv = TableEnvironment.getBatchTableEnvironment(env);

DataStreamSource<WC> input = env.fromElements(
    new WC("Hello", 1),
    new WC("Ciao", 1),
    new WC("Hello", 1));

// register the BoundedStream as table "WordCount"

tEnv.registerBoundedStream("WC", input, "word, frequency");

// run a SQL query on the Table and retrieve the result as a new Table
Table table = tEnv.sqlQuery(
    "SELECT word, SUM(frequency) as frequency FROM WC GROUP BY word");


// user-defined data types

 * Simple POJO containing a word and its respective count.
public static class WC {
    public String word;
    public long frequency;

    // public constructor to make it a Flink POJO
    public WC() {}

    public WC(String word, long frequency) {
        this.word = word;
        this.frequency = frequency;

    public String toString() {
        return "WC " + word + " " + frequency;

The WordCountSQL implements the above described algorithm.

// set up the execution environment
val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getBatchTableEnvironment(execEnv)
tEnv.getConfig.getConf.setInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM, 1)
val input = execEnv.fromCollection(List(WC("hello", 1), WC("hello", 1), WC("ciao", 1)))

// register the BoundedStream as table "WordCount"
tEnv.registerBoundedStream("WordCount", input, 'word, 'frequency)

// run a SQL query on the Table and retrieve the result as a new Table
tEnv.sqlQuery("SELECT word, SUM(frequency) FROM WordCount GROUP BY word").print()

// user-defined data types
case class WC(word: String, frequency: Long)

The WordCountSQL.scala implements the above described algorithm.

To run the example, issue the following command and the result will print on the shell:

$ ./bin/flink run ./examples/table/WordCountSQL.jar

Batch SQL Example: WordCountSQL result

Open http://localhost:8081 and you can see the dashboard.

SQL Example: WordCountSQL web

Clink the job name: “Flink Exec Table Job”, and you can see the detailed info page:

SQL Example: WordCountSQL detail

Back to top