Introduction: The Apache Flink community has worked hard to integrate Hive. Currently, the integration is going smoothly. Apache Flink 1.10.0 RC1 has been released. If you are interested, you can learn more and check out its features.
When did Apache Spark start integrating Hive? I believe that if you have ever used Spark, you would say the integration started a long time ago.
When did Apache Flink (Flink) start the integration with Hive? You may have some doubts. You may think that Flink has not yet done this. The latest Flink version supports the integration with Hive, but relevant features are still weak.
The comparison between communities makes no sense. Each community has its own development goals. Moreover, Flink has invested a lot in real-time stream computing. However, I want to say that Apache Hive has become a focus of the data warehousing ecosystem. It is not only an SQL engine for big data analytics and extract-transform-load (ETL), but it is also a data management platform. Therefore, Spark, Flink, Impala, and Presto all actively support the integration with Hive.
People that need to use Flink to access Hive for data reading and writing would find that starting from Apache Flink 1.9.0, Flink integrates Hive. The Flink community has worked hard to integrate Hive. Currently, the integration is going smoothly. Apache Flink 1.10.0 RC1 has been released. If you are interested, you can learn more and check out its features.
First, I will describe the architecture for integrating Flink with Hive by referring to public materials and blogs in the community. Flink’s integration with Hive is mainly purposed to access metadata and table data.
To access metadata from external systems, Flink provides
ExternalCatalog at the beginning. However, the definition of
ExternalCatalog is incomplete and it almost does not work. Apache Flink 1.10 removed the
ExternalCatalog API (FLINK-13697), which includes the following content:
ExternalCatalogand all dependent classes such as
In response to the problems with
ExternalCatalog, the Flink community proposed a completely new Catalog API to replace the
ExternalCatalog API. The new Catalog API has the following features:
- Supports various metadata objects, such as databases, tables, and partitions.
- Maintains multiple catalog instances in a user session and supports concurrent access to multiple external systems.
- Connects to Flink in a pluggable manner and supports user-defined implementations.
The following figure shows the overall architecture of the new Catalog API:
When you create a
CatalogManager is created simultaneously to manage different catalog instances. A
TableEnvironment uses catalogs to provide metadata services for users of the Table API and SQL Client.
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val tableEnv = TableEnvironment.create(settings)val name = "myhive"
val defaultDatabase = "mydatabase"
val hiveConfDir = "/opt/hive-conf"// a local path
val version = "2.3.4"val hive = newHiveCatalog(name, defaultDatabase, hiveConfDir, version)
tableEnv.registerCatalog("myhive", hive)// set the HiveCatalog as the current catalog of the session
Currently, catalogs have two implementations:
GenericInMemoryCatalog maintains Flink's original metadata management mechanism and stores all metadata in memory. HiveCatalog is connected to a Hive Metastore instance and provides metadata persistence capabilities. To use Flink to interact with Hive, you must configure a HiveCatalog and access metadata in Hive by using the HiveCatalog.
HiveCatalog can also be used to process Flink metadata. In this scenario, HiveCatalog uses only Hive Metastore as persistent storage, and the metadata written to Hive Metastore is not necessarily in a format supported by Hive. A HiveCatalog instance supports both modes. You do not need to create different instances to manage Hive and Flink metadata.
HiveShim is designed to support different Hive Metastore versions. For more information about supported versions, see the official documentation.
Flink provides the Hive Data Connector to read data from and write data to Hive tables. The Hive Data Connector reuses Hive classes, such as Input Format, Output Format, and SerDe, as much as possible. This method reduces code duplication, and more importantly, makes Flink compatible with Hive to the maximum extent. This method allows Hive to read data written by Flink and allows Flink to read data written by Hive.
The Hive integration was released as an experimental feature in Apache Flink 1.9.0, but has many limitations. The upcoming stable Apache Flink 1.10 will further improve the integration and allow enterprises to use it.
I will compile Apache Flink 1.10.0 RC1 based on Cloudera CDH and perform a comprehensive test to help you experience the Hive integration in Apache Flink 1.10.0 in advance.
Prepare the Environment
CDH version: cdh5.16.2
Flink version: release-1.10.0-rc1
The Flink RC version is used for testing purposes only. We recommend that you do not use it in the production environment. Cloudera Data Platform has officially integrated Flink as its stream computing product and it is very convenient to use.
Sentry and Kerberos are enabled in the CDH environment.
Download and Compile Flink
$ wget https://github.com/apache/flink/archive/release-1.10.0-rc1.tar.gz
$ tar zxvf release-1.10.0-rc1.tar.gz
$ cd flink-release-1.10.0-rc1/
$ mvn clean install -DskipTests-Pvendor-repos -Dhadoop.version=2.6.0-cdh5.16.2
When you compile the
flink-hadoop-fs module, the following error message may appear:
[ERROR] Failed to execute goal on project flink-hadoop-fs: Could not resolve dependencies for project org.apache.flink:flink-hadoop-fs:jar:1.10.0: Failed to collect dependencies at org.apache.flink:flink-shaded-hadoop-2:jar:2.6.0-cdh5.16.2-9.0: Failed to read artifact descriptor for org.apache.flink:flink-shaded-hadoop-2:jar:2.6.0-cdh5.16.2-9.0: Could not transfer artifact org.apache.flink:flink-shaded-hadoop-2:pom:2.6.0-cdh5.16.2-9.0 from/to [HDPReleases](https://repo.hortonworks.com/content/repositories/releases/): Remote host closed connection during handshake: SSL peer shut down incorrectly
flink-shaded-hadoop-2 cannot be found. When you view the Maven repository, you will find out the root cause that no compiled version in the Maven central repository corresponds to the
flink-shaded-hadoop-2 jar of CDH. Therefore, you must package
flink-shaded-hadoop-2 on which Flink depends before compilation.
Solve the Problem
- Obtain the source code of
- According to the preceding error message, switch to the
flink-shaded-hadoop-2version on Flink 1.10.
flink-shaded-hadoop-2 9.0is missing when Flink is compiled.
git checkout release-9.0
- Configure the CDH repository
pom.xml file in the
flink-shaded project and add the CDH Maven repository. Otherwise, the related CDH package cannot be found during compilation.
In ⋯, enter the following content:
<! -- Add vendor maven repositories -->
<! -- Cloudera -->
mvn clean install -DskipTests-Drat.skip=true-Pvendor-repos -Dhadoop.version=2.6.0-cdh5.16.2
We recommend compiling
flink-shaded by using a VPN. If you have a problem with network connectivity, you can retry or change the repository URL of the dependent component. After the compilation is completed,
flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar is installed in the local Maven repository. The following is the final log of compilation:
Installing /Users/.../source/flink-shaded/flink-shaded-hadoop-2-uber/target/flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar to /Users/... /.m2/repository/org/apache/flink/flink-shaded-hadoop-2-uber/2.6.0-cdh5.16.2-9.0/flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar
Installing /Users/.../source/flink-shaded/flink-shaded-hadoop-2-uber/target/dependency-reduced-pom.xml to /Users/... /.m2/repository/org/apache/flink/flink-shaded-hadoop-2-uber/2.6.0-cdh5.16.2-9.0/flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.pom
mvn clean install -DskipTests-Pvendor-repos -Dhadoop.version=2.6.0-cdh5.16.2
You can do other things while waiting. The following error message may appear during the compilation:
[INFO] Running 'npm ci --cache-max=0 --no-save' in /Users/xxx/Downloads/Flink/flink-release-1.10.0-rc1/flink-release-1.10.0-rc1/flink-runtime-web/web-dashboard [WARNING] npm WARN prepare removing existing node_modules/ before installation [ERROR] WARN registry Unexpected warning for https://registry.npmjs.org/: Miscellaneous Warning ECONNRESET: request to https://registry.npmjs.org/mime/-/mime-2.4.0.tgz failed, reason: read ECONNRESET [ERROR] WARN registry Using stale package data from https://registry.npmjs.org/ due to a request error during revalidation. [ERROR] WARN registry Unexpected warning for https://registry.npmjs.org/: Miscellaneous Warning ECONNRESET: request to https://registry.npmjs.org/optimist/-/optimist-0.6.1.tgz failed, reason: read ECONNRESET
As you can see, the
flink-runtime-web module introduces the dependency on
frontend-maven-plugin, and the node, npm, and dependent component must be installed.
If you do not use a VPN, you can modify the
flink-runtime-web/pom.xml file to add information about
<id>install node and npm</id>
<arguments>ci --cache-max=0 --no-save</arguments>
<id>npm run build</id>
After the compilation is completed, the Flink installation file is located in the
flink-release-1.10.0-rc1/flink-dist/target/flink-1.10.0-bin directory. Package, deploy, and upload the file to the node where Flink will be deployed.
$ cd flink-dist/target/flink-1.10.0-bin
$ tar zcvf flink-1.10.0.tar.gz flink-1.10.0
Deploy and Configure Flink
You can deploy Flink by unzipping the package. You can also configure soft links and environment variables. The core configuration file of Flink is
flink-conf.yaml. A typical configuration is:
I list only a few common configuration parameters. You can modify them as needed. The parameters are relatively easy to understand. I will expound on them through practices in another article.
Dependencies of Hive Integration
If you want to use the Hive integration, you must add the following dependencies in addition to the preceding configuration:
- If you need to use SQL Client, copy the corresponding dependency jars to Flink’s lib directory.
- If you need to use the Table API, add the corresponding dependencies to the project, such as
<! -- Flink Dependency -->
</dependency><! -- Hive Dependency -->
In this example, SQL Client is used. CDH 5.16.2, Hadoop 2.6.0, and Hive 1.1.0 are used. Therefore, you must copy the following jars to the lib directory in the home directory for Flink deployment.
- Flink’s Hive connector
- Hadoop dependencies
- Hive dependencies
flink-shaded-hadoop-2-uber contains Hive's dependency on Hadoop. If you do not use the package provided by Flink, you can add the Hadoop package used in your cluster. You must ensure that the Hadoop version you add is compatible with the Hive version.
You can also use the Hive jars in your cluster as the dependent Hive jars
(hive-exec and hive-metastore). For more information, see Support for different Hive versions.
You must add Hadoop, YARN, and Hive clients to the node used to deploy Flink.
Over the years, Hive Metastore has evolved into a de facto metadata center in the Hadoop ecosystem. Many companies have a separate Hive Metastore service instance in their production environments to manage all their metadata (Hive or non-Hive metadata.)
If both Hive and Flink are deployed, you can use Hive Metastore to manage Flink metadata by using HiveCatalog.
For users that have just Flink deployment, HiveCatalog is the only persistent catalog provided out-of-box by Flink. If you do not have a persistent catalog, when you use Flink
SQL CREATE DDL, you have to repeatedly create
meta-objects, such as a Kafka table in each session, which wastes a lot of time. HiveCatalog fills this gap by empowering you to create tables and other meta-objects only once, and reference and manage them with convenience later across sessions.
If you want to use the SQL Client, you must specify the required catalog in the
sql-client-defaults.yaml. You can specify one or more catalog instances in the catalogs list of the
The following example shows how to specify a HiveCatalog:
execution: planner: blink
current-catalog: myhive # set the HiveCatalog as the current catalog of the session
- name: myhive
hive-conf-dir: /opt/hive-conf # contains hive-site.xml
In this example:
nameis the name that you specify for each catalog instance. The catalog name and the database name constitute the namespace for Flink SQL metadata. Therefore, the name of each catalog must be unique.
typeis the catalog type. For a HiveCatalog, the type must be set to hive.
hive-conf-diris used to read the Hive configuration file. You can set it to the Hive configuration file directory of the cluster.
hive-versionis used to specify the Hive version.
After you specify the HiveCatalog, start
sql-client and run the following command to verify whether the specified HiveCatalog has been correctly loaded.
Flink SQL> show catalogs;
myhiveFlink SQL> use catalog myhive;
In this example,
show catalogs lists all loaded catalog instances. Note: Apart from the catalog that you have configured in the
sql-client-defaults.yaml file, Flink SQL automatically loads a
GenericInMemoryCatalog instance as the built-in catalog. The default name of this catalog is
Read and Write Hive Tables
After you set up the HiveCatalog, you can read and write Hive tables with the Table API or SQL Client. Assume that you have a Hive table named
mytable. You can execute the following SQL statements to read and write this table:
Flink SQL> show catalogs;
default_catalogFlink SQL> use catalog myhive;Flink SQL> show databases;
defaultFlink SQL> show tables;
mytableFlink SQL> describe mytable;
root|-- name: name
|-- type: STRING
|-- name: value
|-- type: DOUBLEFlink SQL> SELECT * FROM mytable; name value
Flink SQL> INSERT INTO mytable SELECT 'Tom',
25;Flink SQL> INSERT OVERWRITE mytable SELECT 'Tom', 25;# Static partition
Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') SELECT 'Tom', 25;# Dynamic partitionFlink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08';# Static partition and dynamic partitionFlink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1') SELECT 'Tom', 25, '2019-08-08';
In this article, I first introduced the architecture of Hive integration in Flink. Then, I compiled it from the source code to the solve problems I encountered. Next, I introduced how to deploy and configure the Flink environment and integrate Hive. Finally, I referred to some official cases about reading data from and writing data to Hive tables.
Later, I will explain how to use Flink SQL to operate Hive in the production environment.
- (Content in Chinese) https://ververica.cn/developers/flink1-9-hive/