Flink Interpreter for Apache Zeppelin

Overview

Apache Flink is an open source platform for distributed stream and batch data processing. Flink’s core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. Flink also builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization.

Here’s a list of properties that could be configured to customize Flink interpreter.

Property Default value Description
flink.execution.mode local execution mode of flink. It could be local, yarn or remote
flink.execution.remote.host host name of job manager in remote mode
flink.execution.remote.port port of job manager rest service in remote mode
flink.yarn.appName Yarn app name of flink session
flink.yarn.jm.memory 1024 Memory(mb) of JobManager
flink.yarn.tm.memory 1024 Memory(mb) of TaskManager
flink.yarn.tm.num 2 Number of TaskManager
flink.yarn.tm.slot 1 Slot number per TaskManager
flink.yarn.queue default Queue name for yarn app
zeppelin.flink.printREPLOutput true Whether to print repl output
zeppelin.flink.maxResult 1000 Max rows of result for batch Sql output
zeppelin.flink.concurrentBatchSql 10 Max number of batch sql executed concurrently
zeppelin.flink.concurrentStreamSql 10 Max number of stream sql executed concurrently
zeppelin.flink.scala.color true Whether to enable color output of Scala Shell

Besides these properties, you can also configure any flink properties that will override the value in flink-conf.yaml. For more information about Flink configuration, you can find it here.

By default, Flink interpreter run in local mode as the default value of flink.execution.mode is local. In local mode, Flink will launch one MiniCluster which include JobManager and TaskManagers in one JVM. But you can still customize the MiniCluster via the following properties:

  • local.number-taskmanage This property specify how many TaskManagers in MiniCluster.
  • taskmanager.numberOfTaskSlot This property specify how many slots for each TaskManager. By default it is 1.

If you want to run Flink in yarn mode, you have to set the following properties:

  • flink.execution.mode to be yarn
  • HADOOP_CONF_DIR must be specified either in zeppelin-env.sh or in interpreter properties.

You can also customize the yarn mode via the following properties:

  • flink.yarn.jm.memory Memory of JobManager
  • flink.yarn.tm.memory Memory of TaskManager
  • flink.yarn.tm.num Number of TaskManager
  • flink.yarn.tm.slot Slot number per TaskManager
  • flink.yarn.queue Queue name of yarn app

You have to set query.proxy.ports and query.server.ports to be a port range otherwise it is impossible to launch multiple TaskManager in one machine.

If you want to run Flink in standalone mode, you have to set the following properties:

  • flink.execution.mode to be remote
  • flink.execution.remote.host to be the host name of JobManager
  • flink.execution.remote.port to be the port of rest server of JobManager

Zeppelin’s Flink interpreter support 3 kinds of interpreter:

  • %flink (FlinkScalaInterpreter, Run scala code)
  • %flink.bsql (FlinkBatchSqlInterpreter, Run flink batch sql)
  • %flink.ssql (FlinkStreamSqlInterpreter, Run flink stream sql)

FlinkScalaInterpreter allow user to run scala code in zeppelin. 4 variables are created for users:

  • senv (StreamExecutionEnvironment)
  • benv (ExecutionEnvironment)
  • stenv (StreamTableEnvironment)
  • btenv (BatchTableEnvironment)

Users can use these variables to run DataSet/DataStream/BatchTable/StreamTable related job.

e.g. The following code snippet use benv to run a batch style WordCount

%flink

val data = benv.fromElements("hello world", "hello flink", "hello hadoop")
data.flatMap(line => line.split("\\s"))
  .map(w => (w, 1))
  .groupBy(0)
  .sum(1)
  .print()

The following use senv to run a stream style WordCount

%flink

val data = senv.fromElements("hello world", "hello flink", "hello hadoop")
data.flatMap(line => line.split("\\s"))
  .map(w => (w, 1))
  .keyBy(0)
  .sum(1)
  .print

senv.execute()

FlinkBatchSqlInterpreter(%flink.bsql)

FlinkBatchSqlInterpreter support to run sql to query tables registered in BatchTableEnvironment(btenv).

e.g. We can query the wc table which is registered in scala code.

%flink

val data = senv.fromElements("hello world", "hello flink", "hello hadoop").
    flatMap(line => line.split("\\s")).
    map(w => (w, 1))

btenv.registerOrReplaceBoundedStream("wc", 
    data, 
    'word,'number)
%flink.bsql

select word, sum(number) as c from wc group by word 

FlinkStreamSqlInterpreter(%flink.ssql)

Flink Interpreter also support stream sql via FlinkStreamSqlInterpreter(%flink.ssql) and also visualize the streaming data.

Overall there’re 3 kinds of streaming sql supported by %flink.ssql:

  • SingleRow
  • Retract
  • TimeSeries

SingleRow

This kind of sql only return one row of data, but this row will be updated continually. Usually this is used for tracking the aggregation result of some metrics. e.g. total page view, total transactions and etc. Regarding this kind of sql, you can visualize it via html. Here’s one example which calculate the total page view and visualize it via html.

%flink.ssql(type=single, parallelism=1, refreshInterval=3000, template=<h1>{1}</h1> until <h2>{0}</h2>, enableSavePoint=true, runWithSavePoint=true)

select max(rowtime), count(1) from log

Retract

This kind of sql will return a fixed number of rows, but will be updated continually. Usually this is used for tracking the aggregation result of some metrics by some dimensions. e.g. total page view per page, total transaction per country and etc. Regarding this kind of sql, you can visualize it via the built-in visualization charts of Zeppelin, such as barchart, linechart and etc. Here’s one example which calculate the total page view per page and visualize it via barchart.

%flink.ssql(type=retract, refreshInterval=2000, parallelism=2, enableSavePoint=true, runWithSavePoint=true)

select 
    url, 
    count(1) as pv
from log 
    group by url

TimeSeries

This kind of sql will return a fixed number of rows regularly in timeseries. This is usually used for tracking metrics by time window. e.g. Here’s one example which calculate the page view for each 5 seconds window.

%flink.ssql(type=ts, refreshInterval=2000, enableSavePoint=false, runWithSavePoint=false, threshold=60000)

select
    TUMBLE_START(rowtime, INTERVAL '5' SECOND) as start_time,
    url,
    count(1) as pv
from log
    group by TUMBLE(rowtime, INTERVAL '5' SECOND), url

Here’s a list of properties that you can use to customize Flink stream sql

Property Default value Description
type single | retract | ts
refreshInterval 3000 How oftern to refresh the result, it is in milliseconds.
template {0} This is used for display the result of type singlerow. `{i}` represent the placehold of the `ith` field. You can also use html in the template, such as <h1>{0}</h1>
parallelism The parallelism of this stream sql job
enableSavePoint false Whether do savepoint when canceling job
runWithSavePoint false Whether to run job from savepoint
threshold 3600000 How much history data to keep for TimeSeries StreamJob, 1 hour by default

Other Features

  • Job Canceling
    • User can cancel job via the job cancel button
  • Flink Job url association
    • Zeppelin will display the job url in paragraph
  • Code completion
    • Like other interpreters, user can use tab for code completion
  • ZeppelinContext
    • Flink interpreter also integrates ZeppelinContext. For how to use ZeppelinContext, please refer this link.

FAQ

  • Most of time, you will get clear error message when some unexpected happens. But you can still check the interpreter log in case the error message in frontend is not clear to you. The flink interpreter log is located in ZEPPELIN_HOME/logs