Stream SQL Examples

Submit SQL Query via SQL Client

The following example programs showcase different applications of Flink SQL from simple word counting to pv-uv statistics via SQL Client and running on local standalone cluster.

Start local cluster first:

$ ./bin/start-cluster.sh

Check the web at http://localhost:8081 and make sure everything is up and running. The web frontend should report a single available TaskManager instance.

Note: Before running examples below, you need to make sure the local cluster is up and running.

Word Count

Prepare the input data:

$ cat /tmp/input.csv
hello
flink
hello
sql
hello
world

Then start SQL Client shell:

$ ./bin/sql-client.sh embedded

You can see the welcome message for flink sql client.

Paste the following sql ddl text into the shell. (For more information about sql ddl refer to SQL and Supported DDL)

create table csv_source (
  a varchar
) with (
  type = 'csv',
  path = 'file:///tmp/input.csv'
);

Press ‘Enter’, then paste the following sql ddl text.

create table csv_sink (
  a varchar,
  c bigint
) with (
  type = 'csv',
  updatemode = 'upsert',
  path = 'file:///tmp/output.csv'
);

Press ‘Enter’ and paste the following sql dml text.

insert into csv_sink
select
  a,
  count(*)
from csv_source
group by a;

After press ‘Enter’ the sql will be submitted to the standalone cluster. The log will print on the shell.

SQLClient Example: WordCount run

Open http://localhost:8081 and you can see the dashboard.

SQL Example: WordCount web

Click the job name: “default: insert into…”, and you can see the detailed info page:

SQL Example: WordCount detail

And run the following command to see the output result:

$ cat /tmp/output.csv

SQL Example: WordCount result

Note that non-local file systems require a schema prefix, such as hdfs://.

PV-UV Statistics (Input: Kafka)

This example reads the access data of the website in the following format from kafka, and gather statistics of pv-uv in real time.

Start SQL Client shell:

$ ./bin/sql-client.sh embedded

You can see the welcome message for flink sql client.

Paste the following sql ddl text into the shell. (For more information about sql ddl refer to SQL and Supported DDL)

Note: Replace the bootstrap.servers and group.id with your own environment.

create table kafka_source (
  messageKey varbinary, 
  message varbinary, 
  topic varchar,
  `partition` int,
  `offset` bigint
) with (
  type = 'kafka010',   
  topic = 'pvuv_demo',
  bootstrap.servers = 'YOUR_BROKER_IP:YOUR_BROKER_PORT',
  `group.id` = 'kafka_consumer_demo_group'
);

Press ‘Enter’, then paste the following sql dml text.

select
    date_format (visit_time, 'yyyy-MM-dd HH:mm') as `visit_time`,
    count (user_id) as pv,
    count (distinct user_id) as uv
from (
        select
            split_index (cast(message as varchar), ',', 0) as visit_time,
            split_index (cast(message as varchar), ',', 1) as user_id,
            split_index (cast(message as varchar), ',', 2) as visit_page,
            split_index (cast(message as varchar), ',', 3) as browser_type
        from
            kafka_source
    )
group by
    date_format (visit_time, 'yyyy-MM-dd HH:mm');

After press ‘Enter’ the sql will be submitted to the standalone cluster. Since there is no data in kafka topic at this time, no output.

Then run kafka-console-producer.sh script under your local kafka installation package: Note: Replace the –broker-list with your own environment.

$ ./bin/kafka-console-producer.sh --topic pvuv_demo --broker-list YOUR_BROKER_IP:YOUR_BROKER_PORT
1 >2018-10-16 09:00:00,1001,/page1,chrome
2 >2018-10-16 09:00:02,1001,/page2,safari
3 >2018-10-16 09:00:07,1005,/page1,safari
4 >2018-10-16 09:01:30,1001,/page1,chrome

PV-UV Statistics (Input: Kafka) input

You can see output like this:

PV-UV Statistics (Input: Kafka) output

Open http://localhost:8081 and you can see the dashboard.

SQL Example: PV-UV Statistics (Input: Kafka) web

Click the job name: “default: select date_format…”, and you can see the detailed info page:

SQL Example: PV-UV Statistics (Input: Kafka) detail

Animated demo of the example: Animated demo of the Flink SQL Client CLI running pv-uv sql on a cluster

For more information please refer to SQL and SQL Client.

Submit SQL Query Programmatically

SQL queries can be submitted using the sqlQuery() method of the TableEnvironment programmatically.

StreamJoinSQLExample shows the usage of SQL Join on Stream Tables. It computes orders shipped within one hour. The algorithm works in two steps: First, join the Order table and the Shipment table on the orderId field. Second, filter the join result by createTime.

// set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Order> order = env.fromElements(
    new Order(Timestamp.valueOf("2018-10-15 09:01:20"), 2, 1, 7),
    new Order(Timestamp.valueOf("2018-10-15 09:05:02"), 3, 2, 9),
    new Order(Timestamp.valueOf("2018-10-15 09:05:02"), 1, 3, 9),
    new Order(Timestamp.valueOf("2018-10-15 10:07:22"), 1, 4, 9),
    new Order(Timestamp.valueOf("2018-10-15 10:55:01"), 5, 5, 8));
DataStream<Shipment> shipment = env.fromElements(
    new Shipment(Timestamp.valueOf("2018-10-15 09:11:00"), 3),
    new Shipment(Timestamp.valueOf("2018-10-15 10:01:21"), 1),
    new Shipment(Timestamp.valueOf("2018-10-15 11:31:10"), 5));

// register the DataStreams under the name "t_order" and "t_shipment"
tEnv.registerDataStream("t_order", order, "createTime, unit, orderId, productId");
tEnv.registerDataStream("t_shipment", shipment, "createTime, orderId");

// run a SQL to get orders whose ship date are within one hour of the order date
Table table = tEnv.sqlQuery(
    "SELECT o.createTime, o.productId, o.orderId, s.createTime AS shipTime" +
        " FROM t_order AS o" +
        " JOIN t_shipment AS s" +
        "  ON o.orderId = s.orderId" +
        "  AND s.createTime BETWEEN o.createTime AND o.createTime + INTERVAL '1' HOUR");

DataStream<Row> resultDataStream = tEnv.toAppendStream(table, Row.class);
resultDataStream.print();

// execute program
env.execute();

The StreamJoinSQLExample implements the above described algorithm.

// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

val t_order: DataStream[Order] = env.fromCollection(Seq(
  Order(Timestamp.valueOf("2018-10-15 09:01:20"), 2, 1, 7),
  Order(Timestamp.valueOf("2018-10-15 09:05:02"), 3, 2, 9),
  Order(Timestamp.valueOf("2018-10-15 09:05:02"), 1, 3, 9),
  Order(Timestamp.valueOf("2018-10-15 10:07:22"), 1, 4, 9),
  Order(Timestamp.valueOf("2018-10-15 10:55:01"), 5, 5, 8)))

val t_shipment: DataStream[Shipment] = env.fromCollection(Seq(
  Shipment(Timestamp.valueOf("2018-10-15 09:11:00"), 3),
  Shipment(Timestamp.valueOf("2018-10-15 10:01:21"), 1),
  Shipment(Timestamp.valueOf("2018-10-15 11:31:10"), 5)))

// register the DataStreams under the name "t_order" and "t_shipment"
tEnv.registerDataStream("t_order", t_order, 'createTime, 'unit, 'orderId, 'productId)
tEnv.registerDataStream("t_shipment", t_shipment, 'createTime, 'orderId)

// run a SQL to get orders whose ship date are within one hour of the order date
val result = tEnv.sqlQuery(
  "SELECT o.createTime, o.productId, o.orderId, s.createTime AS shipTime" +
    " FROM t_order AS o" +
    " JOIN t_shipment AS s" +
    "  ON o.orderId = s.orderId" +
    "  AND s.createTime BETWEEN o.createTime AND o.createTime + INTERVAL '1' HOUR")

result.toAppendStream[Row].print()

// execute program
env.execute()

// user-defined pojo
case class Order(createTime: Timestamp, unit: Int, orderId: Long, productId: Long)

case class Shipment(createTime: Timestamp, orderId: Long)
    

The StreamJoinSQLExample.scala implements the above described algorithm.

To run the StreamJoinSQLExample, issue the following command:

$ ./bin/flink run ./examples/table/StreamJoinSQLExample.jar

Open http://localhost:8081 and you can see the dashboard.

SQL Example: Stream Join SQL web

And run the following command to see the result:

$ tail -f ./log/flink-*-taskexecutor*.out

Stream SQL Example: Stream Join SQL result

Back to top