A catalog can provide information about databases and tables such as their name, schema, statistics, and information for how to access data stored in a database, table, or file. Once a catalog is registered to a
TableEnvironment, all meta-objects including databases and tables defined in a catalog can be accessed from Table API or SQL queries.
Catalogs are categorized into
ReadableWritableCatalog. The former one can only read meta-objects from the catalog system, and the later one can do both read and write.
Flink’s catalogs use a strict two-level structure, that is, catalogs contain databases, and databases contain meta-objects. Thus, the full name of a meta-object is always structured as
All registered catalogs are managed by a
CatalogManager instance in a
TableEnvironment. In order to ease access to meta-objects,
CatalogManager has a concept of default catalog and default database. Usually how users access meta-objects in a catalog is to specify its full name in the format of
objectName. By setting default catalog and default database, users can use just the meta-object’s name in their queries. This greatly simplifies user experience. For example, a previous query as
select * from mycatalog.mydb.myTable
can be shortened as
select * from myTable
Querying tables in a different databases under the default catalog would be
select * from mydb2.myTable
CatalogManager always has a built-in
FlinkInMemoryCatalog with name of
builtin, which has a built-in default database named
default. If no catalog is explicitly set as default catalog, they will be the default catalog and default database. All temp meta-objects will be registered to this catalog. Users can set default catalog and database via
TableEnvironment.setDefaultDatabase() in Table API or
use catalog.db in Flink SQL Cli.
This class implements
ReadableWritableCatalog and will be provided by Flink to host meta-objects defined in Flink using memory as storage.
HiveCatalog integrates Flink with Hive at metadata level.
The ultimate goal for
HiveCatalog is that:
existing meta-objects, like tables, views, and functions, created by Hive or other Hive-compatible applications can be used by Flink
meta-objects created by
HiveCatalog can be written back to Hive metastore such that Hive and other Hive-compatibile applications can consume.
To query Hive data with
HiveCatalog, users have to use Flink’s
batch mode by either using
BatchTableEnvironment in Table APIs or setting
batch in Flink SQL Cli.
Note that currently
HiveCatalog only offers capabilities of reading Hive metastore metadata, including databases, tables, table partitions, simple data types, and table and column stats. Other meta-objects read and write capabilities are under either experiment or active development.
Also note that currently only registering
HiveCatalog through Table APIs allows users to customize their
HiveConf with additional Hive connection parameters. Users need to make sure Flink can connect to their Hive metastore within their environment.
HiveCatalog supports most simple data types. Upon reading Hive table,
HiveCatalog will map Hive data type to Flink data type according to the following mapping:
|Hive Data Type||Flink Data Type|
|decimal(p, s)||Decimal(p, s)|
GenericHiveMetastoreCatalog is created to persist Flink meta-objects in Hive metastore by using Hive metastore purely as storage. Users are not dealing with anything specific to Hive, and Hive may not understand these objects at all.
GenericHiveMetastoreCatalog is under active development. Stay tuned.
For Hive compatibility and versions, see Hive Compatibility
FlinkInMemoryCatalog is built in
In order to use Hive-metastore-backed catalogs in Flink, users need to include
flink-connector-hive jar in their projects.
We will use
HiveCatalog for example in the following content.
Users can specify catalogs in the yaml config file of Flink SQL Cli. See SQL Cli for more details.
catalogs: - name: myHive1 catalog: type: hive connector: # Hive metastore thrift uri Hive.metastore.uris: thrift://<ip1>:<port1>,thrift://<ip2>:<port2> - name: myHive2 catalog: type: hive connector: # Hive metastore thrift uri Hive.metastore.uris: thrift://<ip>:<port>
To run a few example SQL commands to access a Hive table.
Flink SQL> show catalogs; myHive1 myHive2 # ------ Set default catalog and database ------ Flink SQL> use myHive1.myDb; # ------ Access Hive metadata ------ Flink SQL> show databases; myDb Flink SQL> show tables; myTable Flink SQL> describe myTable; root |-- name: name |-- type: StringType |-- isNullable: true |-- name: value |-- type: DoubleType |-- isNullable: true Flink SQL> ......
For a full list of Flink SQL commands to access Hive meta-objects, see FLINK SQL