Using Hive in Apache Flink 1.9
By Li Rui, Alibaba Technical Expert, Apache Hive PMC
Overview — Flink on Hive
SQL is an important piece of the big data field. To improve the Flink ecosystem and explore the potential of Apache Flink in batch data processing, we decided to enhance FlinkSQL to enable users to complete more tasks through Apache Flink.
Hive is the earliest SQL engine in the big data field. It provides a variety of functions and has a wide user base. All subsequent SQL engines, such as Spark SQL and Impala, support integration with Hive to allow users to conveniently use existing data warehouses and perform job migration. Therefore, interaction with Hive is very important for FlinkSQL.
The integration with Hive includes access to metadata and actual table data. Therefore, this article introduces the architecture of this project from those two aspects.
To access metadata from external systems, Apache Flink provides the “ExternalCatalog” concept. However, the definitions of ExternalCatalog are incomplete and basically do not work. As a result, we propose a completely new Catalog API to replace the existing ExternalCatalog. The new Catalog supports various metadata objects, such as databases, tables, and partitions. It allows you to maintain multiple Catalog instances in a user session to access multiple external systems simultaneously. In addition, the Catalog can be connected to Flink in a pluggable manner, allowing users to provide custom implementations. The following figure depicts the overall architecture of the Catalog API.
Creating a TableEnvironment, also creates a CatalogManager simultaneously to manage different Catalog instances. TableEnvironment provides metadata services for Table API and SQL Client users through Catalog.
Currently, Catalog has two implementations: GenericInMemoryCatalog and HiveCatalog. GenericInMemoryCatalog maintains the original metadata management mechanism of Flink and stores all metadata in memory. HiveCatalog is connected to a Hive Metastore instance to provide metadata persistence capabilities. To use Flink to interact with Hive, you must configure a HiveCatalog and access the metadata in Hive through the HiveCatalog. HiveCatalog can also be used to process Flink metadata. In this case, HiveCatalog only uses Hive Metastore as persistent storage, and the metadata written to Hive Metastore is not necessarily in a format supported by Hive. A HiveCatalog instance supports both modes. There is no need to create different instances to manage metadata for both Hive and Flink.
In addition, we have designed HiveShim to support different versions of Hive Metastore. The currently supported Hive versions are 2.3.4 and 1.2.1.
2 Table Data
We provide the Hive Data Connector to read and write Hive table data. Hive Data Connector reuses Hive classes, such as Input Format, Output Format, and SerDe, as much as possible. This reduces code duplication, and more importantly, makes Flink compatible with Hive to the maximum extent. That is, it allows Hive to read data written by Flink, and vice versa.
Similar to HiveCatalog, the Hive Data Connector currently supports Hive 2.3.4 and 1.2.1.
The Flink-Hive integration has been released as an experimental feature in Apache Flink 1.9.0. Users can interact with Hive through the Table API or SQL Client mode. Functions supported in Apache Flink 1.9.0 are listed as follows:
- Apache Flink provides simple Data Definition Language (DDL) statements to read Hive metadata, such as show databases, show tables, and describe tables.
- Allows using the Catalog API to modify Hive metadata, such as create table and drop table.
- Allows reading Hive data in both partition tables and non-partition tables.
- Supports writing Hive data in non-partitioned tables.
- Supports file formats, such as Text, ORC, Parquet, and SequenceFile
- Allows to call UDFs created in Hive
As a trial feature, it is not perfect. Functions that are not supported in Apache Flink 1.9.0 are listed as follows:
- INSERT and OVERWRITE are not supported.
- Writing data to the partition table is not supported.
- ACID tables are not supported.
- Bucket tables are not supported.
- Data view is not supported.
Some data types are not supported, such as Decimal, Char, Varchar, Date, Time, Timestamp, Interval, and Union.
1 Add Dependencies
To use the Flink-Hive integration feature, you must first add corresponding dependencies. If you use SQL Client, add the dependency JAR package to the Flink lib directory. If you use Table API, add the corresponding dependency to the project file (for example, pom.xml).
The currently supported Hive versions are 2.3.4 and 1.2.1. The following table lists the dependencies required for different versions.
Note: Flink-shaded-Hadoop-2-uber contains the Hadoop dependencies of Hive. If you do not want to use the package provided by Flink, add the Hadoop package used in the cluster. Make sure that the version of the Hadoop package you want to add is compatible with Hive (Hive 2.3.4 depends on Hadoop 2.7.2, and Hive 1.2.1 depends on Hadoop 2.6.0).
You can also use Hive JAR packages in your cluster, as the dependent packages (hive-exec and hive-metastore). For more information, refer to “Support for Different Hive Versions.”
2 Configure HiveCatalog
To interact with Hive, use a HiveCatalog. HiveCatalog configuration is described as follows.
2.1 SQL Client
When using the SQL Client, specify the required catalog in the
sql-client-defaults.yaml file. Specify one or more catalog instances in the "catalogs" list of the
sql-client-defaults.yaml file. The following example shows how to specify a HiveCatalog:
Catalogs: A typical catalog definition looks like:
- name: myhive
In this example code, name is what you specify for each catalog instance. The catalog name and the database name constitute the namespace for Flink SQL metadata. Therefore, the name of each catalog must be unique. type is the catalog type. For a HiveCatalog, the type must be set to Hive.
hive-conf-dir is used to read the Hive configuration file. Set it to the Hive configuration file directory of the cluster. hive-version is used to specify the Hive version that you use, which can be either 2.3.4 or 1.2.1.
After specifying the HiveCatalog, you can start the SQL Client and run the following command to verify whether the specified HiveCatalog has been correctly loaded.
Flink SQL> show catalogs;
myhiveFlink SQL> use catalog myhive;
The show catalogs command is used to list all loaded catalog instances.
Note: Apart from the configured catalog in the
sql-client-defaults.yaml file, FlinkSQL automatically loads a GenericInMemoryCatalog instance as the built-in catalog. The default name of this catalog is
Use catalog command to specify the current catalog of your session. If you do not specify the catalog name when you access metadata objects (such as databases and tables) by using SQL statements, FlinkSQL searches the objects in the current catalog.
2.2 Table API
The following example code shows how to create a HiveCatalog through Table API and register it in the TableEnvironment.
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/path/to/hive_conf_dir";
String version = "2.3.4";TableEnvironment tableEnv = ⋯; // create TableEnvironment
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase,
After registering HiveCatalog in the TableEnvironment, access the metadata in HiveCatalog by submitting SQL statements through TableEnvironment. Similar to SQL Client, TableEnvironment also provides the useCatalog API, which allows you to specify the current catalog.
3 Read and Write Hive Tables
After setting up HiveCatalog, you can read and write Hive tables through SQL Client or Table API.
3.1 SQL Client
Assume that you have a Hive table named src. Use the following SQL statement to read and write this table.
Flink SQL> describe src;
|-- key: STRING
|-- value: STRING
Flink SQL> select * from src; key value
⋯⋯ ⋯⋯Flink SQL> insert into src values ('newKey','newVal');
3.2 Table API
Likewise, you can use the Table API to read and write the above table. The following code shows you how to implement this operation.
TableEnvironment tableEnv = ⋯; // create TableEnvironment
// set myhive as current catalog
tableEnv.useCatalog("myhive");Table src = tableEnv.sqlQuery("select * from src");
// write src into a sink or do further analysis
⋯⋯tableEnv.sqlUpdate("insert into src values ('newKey', 'newVal')");
tableEnv.execute("insert into src");
4 Support for Different Hive Versions
Apache Flink 1.9.0 supports Hive 2.3.4 and 1.22.1. Currently, only these two versions are tested. When you use the SQL Client, if you do not specify the Hive version in the
sql-client-defaults.yaml file, Flink automatically detects the Hive version in classpath. If the detected Hive version is neither 2.3.4 nor 1.2.1, an error is returned.
Based on strong Hive compatibility, some different minor versions may also work properly. Therefore, if you use a different minor version of Hive from the ones that we support, you may specify a supported version and try the Flink-Hive integration feature. For example, if you use Hive 2.3.3, you can specify Hive 2.3.4 in the
sql-client-defaults.yaml file or in the code.
5 Selection of Execution Mode and Planner
In Apache Flink 1.9.0, Hive TableSink only works in batch mode. Therefore, set the execution mode to batch if you want to use Hive TableSink.
Apache Flink 1.9.0 provides a new Blink planner, which provides more comprehensive functions than the original planner. We recommend that you use Blink planner when integrating FlinkSQL with Hive. In the future, some new functions may only support Blink planner.
When you use SQL Client, you can specify the execution mode and planner in the
sql-client-defaults.yaml file as follows:
# select the implementation responsible for planning table programs
# possible values are 'old' (used by default) or 'blink'
# 'batch' or 'streaming' execution
If you use Table API, use the following code:
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
We will further improve the Flink-Hive integration feature in subsequent Flink versions. We hope this feature is Production-Ready in Apache Flink 1.10.0. Work to be carried out in subsequent versions include:
- Adding support for more data types
- Adding support for writing data to partition tables, including static and dynamic partitions
- Adding support for INSERT and OVERWRITE
- Support for View
- Providing better support for DDL and DML statements
- Adding support for TableSink in streaming mode to allow users to conveniently write stream data to Hive
- Testing and supporting more Hive versions
- Adding support for Bucket tables
- Adding support for performance testing and optimization
We suggest you to try Hive integration functions in Apache Flink 1.9 today!