The Table API and SQL are integrated in a joint API. The central concept of this API is a Table which serves as input and output of queries. This document shows the common structure of programs with Table API and SQL queries, how to register a Table, how to query a Table, and how to emit a Table.
All Table API and SQL programs for batch and streaming follow the same pattern. The following code example shows the common structure of Table API and SQL programs.
Table API and SQL queries can be easily integrated with and embedded into DataStream programs. Have a look at the Integration with DataStream API section to learn how DataStreams and DataSets can be converted into Tables and vice versa.
For bounded stream job, we should always use TableEnvironment.execute() to submit and run the job.
The TableEnvironment is a central concept of the Table API and SQL integration. It is responsible for:
Registering a Table in the internal catalog
Registering a catalog
Executing SQL queries
Registering a user-defined (scalar, table, or aggregation) function
Converting a DataStream into a Table
Holding a reference to an ExecutionEnvironment or StreamExecutionEnvironment
A Table is always bound to a specific TableEnvironment. It is not possible to combine tables of different TableEnvironments in the same query, e.g., to join or union them.
A TableEnvironment is created by calling the static TableEnvironment.getTableEnvironment() method with a StreamExecutionEnvironment or an ExecutionEnvironment and an optional TableConfig. The TableConfig can be used to configure the TableEnvironment or to customize the query optimization and translation process (see Query Optimization).
Tables registered via TableEnvironment will actually be registered to the default catalog and database in CatalogManager. Flink provides a built-in FlinkInMemoryCatalog, which implements ReadableWritableCatalog, as the default catalog. Users can also change the default catalog and database through both Table API or Flink SQL.
Note that Flink tables may not be registered to all ReadableWritableCatalog. Currently Flink only supports registering tables in FlinkInMemoryCatalog.
There are two types of Flink tables, input tables and output tables. Input tables can be referenced in Table API and SQL queries and provide input data. Output tables can be used to emit the result of a Table API or SQL query to an external system.
An input table can be registered from various sources:
an existing Table object, usually the result of a Table API or SQL query.
a TableSource, which accesses external data, such as a file, database, or messaging system.
An output table can be registered using a TableSink.
Register a Table
A Table is registered in a TableEnvironment as follows:
Note: A registered Table is treated similarly to a VIEW as known from relational database systems, i.e., the query that defines the Table is not optimized but will be inlined when another query references the registered Table. If multiple queries reference the same registered Table, it will be inlined for each referencing query and executed only once, i.e., the result of the registered Table will be shared.
A TableSource provides access to external data which is stored in a storage system such as a database (MySQL, HBase, …), a file with a specific encoding (CSV, Apache [Parquet, Avro, ORC], …), or a messaging system (Apache Kafka, RabbitMQ, …).
Flink aims to provide TableSources for common data formats and storage systems. Please have a look at the Table Sources and Sinks page for a list of supported TableSources and instructions for how to build a custom TableSource.
A TableSource is registered in a TableEnvironment as follows:
A registered TableSink can be used to emit the result of a Table API or SQL query to an external storage system, such as a database, key-value store, message queue, or file system (in different encodings, e.g., CSV, Apache [Parquet, Avro, ORC], …).
Flink aims to provide TableSinks for common data formats and storage systems. Please see the documentation about Table Sources and Sinks page for details about available sinks and instructions for how to implement a custom TableSink.
A TableSink is registered in a TableEnvironment as follows:
The Table API is a language-integrated query API for Scala and Java. In contrast to SQL, queries are not specified as Strings but are composed step-by-step in the host language.
The API is based on the Table class which represents a table (streaming or batch) and offers methods to apply relational operations. These methods return a new Table object, which represents the result of applying the relational operation on the input Table. Some relational operations are composed of multiple method calls such as table.groupBy(...).select(), where groupBy(...) specifies a grouping of table, and select(...) the projection on the grouping of table.
The Table API document describes all Table API operations that are supported on streaming and batch tables.
The following example shows a simple Table API aggregation query:
Note: The Scala Table API uses Scala Symbols, which start with a single tick (') to reference the attributes of a Table. The Table API uses Scala implicits. Make sure to import org.apache.flink.api.scala._ and org.apache.flink.table.api.scala._ in order to use Scala implicit conversions.
A Table is emitted by writing it to a TableSink. A TableSink is a generic interface to support a wide variety of file formats (e.g. CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra, Elasticsearch), or messaging systems (e.g., Apache Kafka, RabbitMQ).
A batch Table can only be written to a BatchTableSink, while a streaming Table requires either an AppendStreamTableSink, a RetractStreamTableSink, or an UpsertStreamTableSink.
Please see the documentation about Table Sources & Sinks for details about available sinks and instructions for how to implement a custom TableSink.
There are two ways to emit a table:
The Table.writeToSink(TableSink sink) method emits the table using the provided TableSink and automatically configures the sink with the schema of the table to emit.
The Table.insertInto(String sinkTable) method looks up a TableSink that was registered with a specific schema under the provided name in the TableEnvironment’s catalog. The schema of the table to emit is validated against the schema of the registered TableSink.
Table API and SQL queries are translated into DataStream programs no matter whether their input is a streaming or batch input. A query is internally represented as a logical query plan and is translated in two phases:
optimization of the logical plan,
translation into a DataStream program.
A Table API or SQL query is translated when:
a Table is emitted to a TableSink, i.e., when Table.writeToSink() or Table.insertInto() is called.
a SQL update query is specified, i.e., when TableEnvironment.sqlUpdate() is called.
Table API and SQL queries can be easily integrated with and embedded into DataStream programs. For instance, it is possible to query an external table (for example from a RDBMS), do some pre-processing, such as filtering, projecting, aggregating, or joining with meta data, and then further process the data with either the DataStream API (and any of the libraries built on top of these APIs, such as CEP or Gelly). Inversely, a Table API or SQL query can also be applied on the result of a DataStream program.
This interaction can be achieved by converting a DataStream into a Table and vice versa. In this section, we describe how these conversions are done.
Implicit Conversion for Scala
The Scala Table API features implicit conversions for the DataStream, and Table classes. These conversions are enabled by importing the package org.apache.flink.table.api.scala._ in addition to org.apache.flink.api.scala._ for the Scala DataStream API.
Register a DataStream as Table
A DataStream can be registered in a TableEnvironment as a Table. The schema of the resulting table depends on the data type of the registered DataStream. Please check the section about mapping of data types to table schema for details.
Note: The name of a DataStreamTable must not match the ^_DataStreamTable_[0-9]+ pattern and the name of a DataSetTable must not match the ^_DataSetTable_[0-9]+ pattern. These patterns are reserved for internal use only.
A Table can be converted into a DataStream. In this way, custom DataStream program can be run on the result of a Table API or SQL query.
When converting a Table into a DataStream, you need to specify the data type of the resulting DataStream, i.e., the data type into which the rows of the Table are to be converted. Often the most convenient conversion type is Row. The following list gives an overview of the features of the different options:
Row: fields are mapped by position, arbitrary number of fields, support for null values, no type-safe access.
POJO: fields are mapped by name (POJO fields must be named as Table fields), arbitrary number of fields, support for null values, type-safe access.
Case Class: fields are mapped by position, no support for null values, type-safe access.
Tuple: fields are mapped by position, limitation to 22 (Scala) or 25 (Java) fields, no support for null values, type-safe access.
Atomic Type: Table must have a single field, no support for null values, type-safe access.
Convert a Table into a DataStream(unbounded)
A Table that is the result of a streaming query will be updated dynamically, i.e., it is changing as new records arrive on the query’s input streams. Hence, the DataStream into which such a dynamic query is converted needs to encode the updates of the table.
There are two modes to convert a Table into a DataStream:
Append Mode: This mode can only be used if the dynamic Table is only modified by INSERT changes, i.e, it is append-only and previously emitted results are never updated.
Retract Mode: This mode can always be used. It encodes INSERT and DELETE changes with a boolean flag.
Note: A detailed discussion about dynamic tables and their properties is given in the Streaming Queries document.
Convert a Table into a DataStream(bounded)
A Table is converted into a bounded DataStream as follows:
Flink’s DataStream APIs support very diverse types. Composite types such as Tuples (built-in Scala and Flink Java tuples), POJOs, Scala case classes, and Flink’s Row type allow for nested data structures with multiple fields that can be accessed in table expressions. Other types are treated as atomic types. In the following, we describe how the Table API converts these types into an internal row representation and show examples of converting a DataStream into a Table.
The mapping of a data type to a table schema can happen in two ways: based on the field positions or based on the field names.
Position-based mapping can be used to give fields a more meaningful name while keeping the field order. This mapping is available for composite data types with a defined field order as well as atomic types. Composite data types such as tuples, rows, and case classes have such a field order. However, fields of a POJO must be mapped based on the field names (see next section).
When defining a position-based mapping, the specified names must not exist in the input data type, otherwise the API will assume that the mapping should happen based on the field names. If no field names are specified, the default field names and field order of the composite type are used or f0 for atomic types.
Name-based mapping can be used for any data type including POJOs. It is the most flexible way of defining a table schema mapping. All fields in the mapping are referenced by name and can be possibly renamed using an alias as. Fields can be reordered and projected out.
If no field names are specified, the default field names and field order of the composite type are used or f0 for atomic types.
Flink treats primitives (Integer, Double, String) or generic types (types that cannot be analyzed and decomposed) as atomic types. A DataStream of an atomic type is converted into a Table with a single attribute. The type of the attribute is inferred from the atomic type and the name of the attribute can be specified.
Tuples (Scala and Java) and Case Classes (Scala only)
Flink supports Scala’s built-in tuples and provides its own tuple classes for Java. DataStreams of both kinds of tuples can be converted into tables. Fields can be renamed by providing names for all fields (mapping based on position). If no field names are specified, the default field names are used. If the original field names (f0, f1, … for Flink Tuples and _1, _2, … for Scala Tuples) are referenced, the API assumes that the mapping is name-based instead of position-based. Name-based mapping allows for reordering fields and projection with alias (as).
POJO (Java and Scala)
Flink supports POJOs as composite types. The rules for what determines a POJO are documented here.
When converting a POJO DataStream into a Table without specifying field names, the names of the original POJO fields are used. The name mapping requires the original names and cannot be done by positions. Fields can be renamed using an alias (with the as keyword), reordered, and projected.
The Row data type supports an arbitrary number of fields and fields with null values. Field names can be specified via a RowTypeInfo or when converting a RowDataStream into a Table. The row type supports mapping of fields by position and by name. Fields can be renamed by providing names for all fields (mapping based on position) or selected individually for projection/ordering/renaming (mapping based on name).
The foundation of Apache Flink query optimization is Apache Calcite. In addition to apply Calcite in optimization, Flink also does a lot to enhance it.
Fist of all, Flink does a series of rule-based optimization and cost-based optimization including:
special subquery rewriting, including two part: 1. converts IN and EXISTS into left semi-join 2.converts NOT IN and NOT EXISTS into left anti-join. Note: only IN/EXISTS/NOT IN/NOT EXISTS in conjunctive condition is supported.
normal subquery decorrelation based on Calcite
filter push down
join reorder if it is enabled (sql.optimizer.join-reorder.enabled is true)
skew join optimization
other kinds of query rewriting
Secondly, Flink introduces rich statistics of data source and propagate those statistics up to the whole plan based on all kinds of extended MetadataHandlers. Optimizer could choose better plan based on those metadata.
Finally, Flink provides fine-grain cost of each operator, which takes io, cpu, network and memory into account. Cost-based optimization could choose better plan based on fine-grain cost definition .
It is possible to customize optimization programs referencing to FlinkBatchPrograms(default optimization programs for batch) or FlinkStreamPrograms(default optimization programs for stream), and replace the default optimization programs by providing a CalciteConfig object. This can be created via a builder by calling CalciteConfig.createBuilder()) and is provided to the TableEnvironment by calling tableEnv.getConfig.setCalciteConfig(calciteConfig).
Flink will try to find duplicate sub-plans by the digest of physical sub-plan and reuse them if Reuse sub-plan is enabled (sql.optimizer.reuse.sub-plan.enabled is true, default is false).
Note: Reuse sub-plan on Batch is supported now.
The following code example shows the physical plan when reuse sub-plan is enabled.
the explain result is as follows: (only show physical plan here, sub-plan of SortAggregate is reused)
The Table API provides a mechanism to explain the logical and optimized query plans to compute a Table.
This is done through the TableEnvironment.explain(table) method. It returns a String describing three plans:
the Abstract Syntax Tree of the relational query, i.e., the unoptimized logical query plan,
the optimized logical query plan, and
the physical execution plan.
The following code shows an example and the corresponding output:
Alter Table Statistics
The Table API provides a mechanism to fetch or modify a Table statistics which we represent as a struct as FlinkStatistic.
The FlinkStatistic contains information of a flink Table as below:
The columns which can be seen as unique keys.
Statistics of skewed column names and values.
TableStats which contains a ColumnStats of each column.
The ColumnStats contains information of a Table column as below:
The number of distinct values(NDV).
Null values count.
Average length of the column values.
Max length of the column values.
Max value of the column values.
Min values of the column values.
This statistics is very important in query optimization, Flink will try to choose the best query plan based on these statistics.
This following code shows how to interact with the FlinkStatistic: