SQL Sources & Sinks

Flink SQL provides access to data which is stored in external systems (database, key-value store, message queue) or files.

By CREATE TABLE statement, data could be accessed as a SQL table in the following DML statements and translated to TableSource or TableSink automatically.

We use WITH clauses to describe the information necessary to access a external system.

Provided Connectors

CSV connector

Support Matrix

Stream Mode Source Sink Temporal Join
Batch Y Y Y
Streaming Y Y Y
-- Create a table named `Orders` which includes a primary key, and is stored as a CSV file
CREATE TABLE Orders (
    orderId BIGINT NOT NULL,
    customId VARCHAR NOT NULL,
    itemId BIGINT NOT NULL,
    totalPrice BIGINT NOT NULL,
    orderTime TIMESTAMP NOT NULL,
    description VARCHAR,
    PRIMARY KEY(orderId)
) WITH (
    type='csv',
    path='file:///abc/csv_file1'
)

Required configuration

  • type : use CSV to create a Csv Table to read CSV files or to write into CSV files.
  • path : locations of the CSV files. Accepts standard Hadoop globbing expressions. To read a directory of CSV files, specify a directory.

Optional Configuration

  • enumerateNestedFiles : when set to true, reader descends the directory for csv files. By default true.
  • fieldDelim : the field delimiter. By default ,, but can be set to any character.
  • lineDelim : the line delimiter. By default \n, but can be set to any character.
  • charset : defaults to UTF-8, but can be set to other valid charset names.
  • override : when set to true the existing files are overwritten. By default false.
  • emptyColumnAsNull : when set to true, any empty column will be set as null. By default false.
  • quoteCharacter : by default no quote character, but can be set to any character.
  • firstLineAsHeader : when set to true, the first line of files are used to name columns and are not included in data. All types are assumed to be string. By default false.
  • parallelism : the number of files to write to.
  • timeZone : timeZone to parse DateTime columns. Defaults to UTC, but can be set to other valid time zones.
  • commentsPrefix : skip lines beginning with this character. By default no commentsPrefix, but can be set to any string.
  • updateMode : the ways to encode a changes of a dynamic table. By default append. See Table to Stream Conversion
    • append : encoding INSERT changes.
    • upsert : encoding INSERT and UPDATE changes as upsert message and DELETE changes as delete message.
    • retract : encoding INSERT as add message and DELETE changes as retract message, and an UPDATE change as a retract message for the updated (previous) row and an add message for the updating (new) row.

HBase Connector

Support Matrix

Stream Mode Source Sink Temporal Join
Batch I Y Y
Streaming N Y Y

Legend:

  • Y: support
  • N: not support
  • I: incoming soon
CREATE TABLE testSinkTable (
     ROWKEY BIGINT,
     `family1.col1` VARCHAR,
     `family2.col1` INTEGER,
     `family2.col2` VARCHAR,
     `family3.col1` DOUBLE,
     `family3.col2` DATE,
     `family3.col3` BIGINT,
     PRIMARY KEY(ROWKEY)
) WITH (
    type='HBASE',
    connector.property-version='1.4.3',
    hbase.zookeeper.quorum='test_hostname:2181'
)

Note : the HBase table schema (that used for writing or temporal joining) must have a single column primary key which named ROWKEY and the column name format should be columnFamily.qualifier.

Required Configuration

  • type : use HBASE to create an HBase table to read/write data.
  • connector.property-version : specify the HBase client version, currently only ‘1.4.3’ is available. More version(s) will’be supported later.
  • tableName : specify the name of the table in HBase.
  • hbase.zookeeper.quorum : specify the ZooKeeper quorum configuration for accessing the HBase cluster. Note : please specify this parameter or ensure a default hbase-site.xml is valid in the current classpath.

Optional Configuration

  • hbase.* : support all the parameters that have the ‘hbase.’ prefix, e.g., ‘hbase.client.operation.timeout’.

Kafka Connector

Support Matrix

Stream Mode Source Sink Temporal Join
Batch I Y N
Streaming Y Y N

Legend:

  • Y: support
  • N: not support
  • I: incoming soon

Create Source Tables

CREATE TABLE kafka_source (
     key VARBINARY, 
     msg VARBINARY, 
     `topic` VARCHAR, 
     `partition` INT, 
     `offset` BIGINT
) WITH (
    type = 'KAFKA010',
    `bootstrap.servers` = 'test_hostname:9092',
    `group.id` = 'test-group-id',
    `topic` = 'source-topic',
    startupMode = 'EARLIEST'
)

Note: At this point, the Kafka source table must be created with the above five columns.

Kafka Source Table Configurations in WITH block
Configuration Applicable Connector Version Required Description Note
type All Y The connector type, including Kafka version. the valid values are KAFKA08, KAFKA09, KAFKA010 or KAFKA011
topic All (Y) The single topic to read from Kafka
topicPattern All (Y) The regular expression for topic names to read from Kafka. Should and should only be set if `topic` is not configured.
zookeeper.connect KAFKA08 Y The Zookeeper connect address. The Zookeeper connect address. Only used by Kafka 0.8
bootstrap.servers All Y The Kafka cluster address. The Kafka cluster address.
group.id All Y The consumer group id. The consumer group id will be used to commit offsets back to Kafka for reporting purpose.
startupMode All N Specify the position to start reading from the Kafka topic
  • EARLIEST: start reading from the first available message.
  • Group_OFFSETS: start reading from the last committed offset.
  • LATEST(default): start reading from the latest offset
  • TIMESTAMP: start reading from the given timestamp (only supported in KAFKA010 and KAFKA011)
partitionDiscoveryIntervalMS All N Peoriodically check if there is new partititions added to the topic The default value is 60 secons.
extraConfig All N Additional configurations to use The syntax is extraConfig='key1=value1;key2=value2;'
Additional Kafka Source Table Configurations

When defining a table from Kafka, in the with block, users can also set the configurations supported by Kafka consumer from the corresponding Kafka version. See the following links for all the configurations supported by Kafka consumers.

KAFKA09

KAFKA010

KAFKA011

Create Sink Tables

CREATE TABLE kafka_sink (
    messageKey VARBINARY, 
    messageValue VARBINARY,
    PRIMARY KEY (messageKey)) 
with (
    type = 'KAFKA010', 
    topic = 'sink-topic', 
    `bootstrap.servers` = 'test_hostname:9092', 
    retries = '3'
)

Note: The primary key is mandatory for Kafka table as a sink table.

Kafka Sink Table Configurations in WITH block
Configuration Applicable Connector Version Required Description Note
type All Y The connector type, including Kafka version. the valid values are KAFKA08, KAFKA09, KAFKA010 or KAFKA011
topic All Y The single Kafka topic to producer message
bootstrap.servers All Y The Kafka cluster address. Used by Kafka 0.9 and above
extraConfig All N Additional configurations to use The syntax is extraConfig='key1=value1;key2=value2;'
Additional Sink Table Configurations

When defining a table from Kafka, in the with block, users can also set the configurations supported by Kafka producer from the corresponding Kafka version. See the following links for all the configurations supported by Kafka producers.

KAFKA09

KAFKA010

KAFKA011

PARQUET Connector

Support Matrix

Stream Mode Source Sink Temporal Join
Batch Y Y I
Streaming N N N

Legend:

  • Y: support
  • N: not support
  • I: incoming soon
CREATE TABLE testSinkTable (
     `family1.col1` VARCHAR,
     `family2.col1` INTEGER,
     `family2.col2` VARCHAR
) WITH (
    type='PARQUET',
    filePath='schema://file1/file2.csv'
)

Required Configuration

  • type : use PARQUET declare this data source is a parquet format.
  • filePath : the path to write the data to or consume from.

Optional Configuration

  • enumerateNestedFiles : If to read all the data files from filePath recursively, default to be true. This only works for table source.
  • writeMode : If to override the file if there is already a file same name to the path to write to. Default to be no_overwrite, which means the file would not be overridden, so an error will thrown out if there exists same name files. This only works for table sink.
  • compressionCodecName: The compression codec of the parquet format, the options are uncompressed/snappy/gzip/lzo and default to be snappy. This only works for table sink.

ORC Connector

Support Matrix

Stream Mode Source Sink Temporal Join
Batch Y Y I
Streaming N N N

Legend:

  • Y: support
  • N: not support
  • I: incoming soon
CREATE TABLE testSinkTable (
     `family1.col1` VARCHAR,
     `family2.col1` INTEGER,
     `family2.col2` VARCHAR,
     primary key(`family1.col1`)
) WITH (
    type='ORC',
    filePath='schema://file1/file2.csv'
)

Required Configuration

  • type : use ORC declare this data source is a ORC format.
  • filePath : the path to write the data to or consume from.

Optional Configuration

  • enumerateNestedFiles : If to read all the data files from filePath recursively, default to be true. This only works for table source.
  • writeMode : If to override the file if there is already a file same name to the path to write to. Default to be no_overwrite, which means the file would not be overridden, so an error will thrown out if there exists same name files. This only works for table sink.
  • compressionCodecName: The compression codec of the orc format, the options are uncompressed/snappy/gzip/lzo and default to be snappy. This only works for table sink.