Hive Finally Has Flink!

Image for post
Image for post

By Jason

Introduction: The Apache Flink community has worked hard to integrate Hive. Currently, the integration is going smoothly. Apache Flink 1.10.0 RC1 has been released. If you are interested, you can learn more and check out its features.

When did Apache Spark start integrating Hive? I believe that if you have ever used Spark, you would say the integration started a long time ago.

When did Apache Flink (Flink) start the integration with Hive? You may have some doubts. You may think that Flink has not yet done this. The latest Flink version supports the integration with Hive, but relevant features are still weak.

The comparison between communities makes no sense. Each community has its own development goals. Moreover, Flink has invested a lot in real-time stream computing. However, I want to say that Apache Hive has become a focus of the data warehousing ecosystem. It is not only an SQL engine for big data analytics and extract-transform-load (ETL), but it is also a data management platform. Therefore, Spark, Flink, Impala, and Presto all actively support the integration with Hive.

People that need to use Flink to access Hive for data reading and writing would find that starting from Apache Flink 1.9.0, Flink integrates Hive. The Flink community has worked hard to integrate Hive. Currently, the integration is going smoothly. Apache Flink 1.10.0 RC1 has been released. If you are interested, you can learn more and check out its features.


First, I will describe the architecture for integrating Flink with Hive by referring to public materials and blogs in the community. Flink’s integration with Hive is mainly purposed to access metadata and table data.


To access metadata from external systems, Flink provides at the beginning. However, the definition of is incomplete and it almost does not work. Apache Flink 1.10 removed the API (FLINK-13697), which includes the following content:

  • and all dependent classes such as
  • , , and

In response to the problems with , the Flink community proposed a completely new Catalog API to replace the API. The new Catalog API has the following features:

  • Supports various metadata objects, such as databases, tables, and partitions.
  • Maintains multiple catalog instances in a user session and supports concurrent access to multiple external systems.
  • Connects to Flink in a pluggable manner and supports user-defined implementations.

The following figure shows the overall architecture of the new Catalog API:

Image for post
Image for post

When you create a , a is created simultaneously to manage different catalog instances. A uses catalogs to provide metadata services for users of the Table API and SQL Client.

val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val tableEnv = TableEnvironment.create(settings)
val name = "myhive"
val defaultDatabase = "mydatabase"
val hiveConfDir = "/opt/hive-conf"// a local path
val version = "2.3.4"
val hive = newHiveCatalog(name, defaultDatabase, hiveConfDir, version)
tableEnv.registerCatalog("myhive", hive)
// set the HiveCatalog as the current catalog of the session

Currently, catalogs have two implementations: and . maintains Flink's original metadata management mechanism and stores all metadata in memory. HiveCatalog is connected to a Hive Metastore instance and provides metadata persistence capabilities. To use Flink to interact with Hive, you must configure a HiveCatalog and access metadata in Hive by using the HiveCatalog.

HiveCatalog can also be used to process Flink metadata. In this scenario, HiveCatalog uses only Hive Metastore as persistent storage, and the metadata written to Hive Metastore is not necessarily in a format supported by Hive. A HiveCatalog instance supports both modes. You do not need to create different instances to manage Hive and Flink metadata.

HiveShim is designed to support different Hive Metastore versions. For more information about supported versions, see the official documentation.

Table Data

Flink provides the Hive Data Connector to read data from and write data to Hive tables. The Hive Data Connector reuses Hive classes, such as Input Format, Output Format, and SerDe, as much as possible. This method reduces code duplication, and more importantly, makes Flink compatible with Hive to the maximum extent. This method allows Hive to read data written by Flink and allows Flink to read data written by Hive.

Hive Integration

The Hive integration was released as an experimental feature in Apache Flink 1.9.0, but has many limitations. The upcoming stable Apache Flink 1.10 will further improve the integration and allow enterprises to use it.

I will compile Apache Flink 1.10.0 RC1 based on Cloudera CDH and perform a comprehensive test to help you experience the Hive integration in Apache Flink 1.10.0 in advance.

Prepare the Environment

CDH version: cdh5.16.2

Flink version: release-1.10.0-rc1

The Flink RC version is used for testing purposes only. We recommend that you do not use it in the production environment. Cloudera Data Platform has officially integrated Flink as its stream computing product and it is very convenient to use.

Sentry and Kerberos are enabled in the CDH environment.

Download and Compile Flink

$ wget
$ tar zxvf release-1.10.0-rc1.tar.gz
$ cd flink-release-1.10.0-rc1/
$ mvn clean install -DskipTests-Pvendor-repos -Dhadoop.version=2.6.0-cdh5.16.2

When you compile the module, the following error message may appear:

[ERROR] Failed to execute goal on project flink-hadoop-fs: Could not resolve dependencies for project org.apache.flink:flink-hadoop-fs:jar:1.10.0: Failed to collect dependencies at org.apache.flink:flink-shaded-hadoop-2:jar:2.6.0-cdh5.16.2-9.0: Failed to read artifact descriptor for org.apache.flink:flink-shaded-hadoop-2:jar:2.6.0-cdh5.16.2-9.0: Could not transfer artifact org.apache.flink:flink-shaded-hadoop-2:pom:2.6.0-cdh5.16.2-9.0 from/to [HDPReleases]( Remote host closed connection during handshake: SSL peer shut down incorrectly

cannot be found. When you view the Maven repository, you will find out the root cause that no compiled version in the Maven central repository corresponds to the of CDH. Therefore, you must package on which Flink depends before compilation.

Solve the Problem

  • Obtain the source code of
git clone
  • According to the preceding error message, switch to the version on Flink 1.10. is missing when Flink is compiled.
git checkout release-9.0
  • Configure the CDH repository

Modify the file in the project and add the CDH Maven repository. Otherwise, the related CDH package cannot be found during compilation.

In ⋯, enter the following content:

<! -- Add vendor maven repositories -->
<! -- Cloudera -->


Start compilation:

mvn clean install -DskipTests-Drat.skip=true-Pvendor-repos -Dhadoop.version=2.6.0-cdh5.16.2

We recommend compiling by using a VPN. If you have a problem with network connectivity, you can retry or change the repository URL of the dependent component. After the compilation is completed, is installed in the local Maven repository. The following is the final log of compilation:

Installing /Users/.../source/flink-shaded/flink-shaded-hadoop-2-uber/target/flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar to /Users/... /.m2/repository/org/apache/flink/flink-shaded-hadoop-2-uber/2.6.0-cdh5.16.2-9.0/flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar
Installing /Users/.../source/flink-shaded/flink-shaded-hadoop-2-uber/target/dependency-reduced-pom.xml to /Users/... /.m2/repository/org/apache/flink/flink-shaded-hadoop-2-uber/2.6.0-cdh5.16.2-9.0/flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.pom

Recompile Flink

mvn clean install -DskipTests-Pvendor-repos -Dhadoop.version=2.6.0-cdh5.16.2

You can do other things while waiting. The following error message may appear during the compilation:

[INFO] Running 'npm ci --cache-max=0 --no-save' in /Users/xxx/Downloads/Flink/flink-release-1.10.0-rc1/flink-release-1.10.0-rc1/flink-runtime-web/web-dashboard [WARNING] npm WARN prepare removing existing node_modules/ before installation [ERROR] WARN registry Unexpected warning for Miscellaneous Warning ECONNRESET: request to failed, reason: read ECONNRESET [ERROR] WARN registry Using stale package data from due to a request error during revalidation. [ERROR] WARN registry Unexpected warning for Miscellaneous Warning ECONNRESET: request to failed, reason: read ECONNRESET

As you can see, the introduces the dependency on , and the node, npm, and dependent component must be installed.

If you do not use a VPN, you can modify the file to add information about and .

<id>install node and npm</id>
<id>npm install</id>
<arguments>ci --cache-max=0 --no-save</arguments>
<id>npm run build</id>
<arguments>run build</arguments>

After the compilation is completed, the Flink installation file is located in the directory. Package, deploy, and upload the file to the node where Flink will be deployed.

$ cd flink-dist/target/flink-1.10.0-bin
$ tar zcvf flink-1.10.0.tar.gz flink-1.10.0

Deploy and Configure Flink

You can deploy Flink by unzipping the package. You can also configure soft links and environment variables. The core configuration file of Flink is . A typical configuration is:

jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.heap.size: 2048m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 4
parallelism.default: 1
high-availability: zookeeper
high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181
state.backend: filesystem
state.checkpoints.dir: hdfs:///user/flink110/checkpoints
jobmanager.execution.failover-strategy: region
rest.port: 8081
taskmanager.memory.preallocate: false
classloader.resolve-order: parent-first
security.kerberos.login.use-ticket-cache: true
security.kerberos.login.principal: flink_user
historyserver.web.port: 8082
historyserver.archive.fs.refresh-interval: 10000

I list only a few common configuration parameters. You can modify them as needed. The parameters are relatively easy to understand. I will expound on them through practices in another article.

Dependencies of Hive Integration

If you want to use the Hive integration, you must add the following dependencies in addition to the preceding configuration:

  • If you need to use SQL Client, copy the corresponding dependency jars to Flink’s lib directory.
  • If you need to use the Table API, add the corresponding dependencies to the project, such as .
<! -- Flink Dependency -->
<! -- Hive Dependency -->

In this example, SQL Client is used. CDH 5.16.2, Hadoop 2.6.0, and Hive 1.1.0 are used. Therefore, you must copy the following jars to the lib directory in the home directory for Flink deployment.

  • Flink’s Hive connector

  • Hadoop dependencies

  • Hive dependencies


contains Hive's dependency on Hadoop. If you do not use the package provided by Flink, you can add the Hadoop package used in your cluster. You must ensure that the Hadoop version you add is compatible with the Hive version.

You can also use the Hive jars in your cluster as the dependent Hive jars . For more information, see Support for different Hive versions.

You must add Hadoop, YARN, and Hive clients to the node used to deploy Flink.

Configure HiveCatalog

Over the years, Hive Metastore has evolved into a de facto metadata center in the Hadoop ecosystem. Many companies have a separate Hive Metastore service instance in their production environments to manage all their metadata (Hive or non-Hive metadata.)

If both Hive and Flink are deployed, you can use Hive Metastore to manage Flink metadata by using HiveCatalog.

For users that have just Flink deployment, HiveCatalog is the only persistent catalog provided out-of-box by Flink. If you do not have a persistent catalog, when you use Flink , you have to repeatedly create , such as a Kafka table in each session, which wastes a lot of time. HiveCatalog fills this gap by empowering you to create tables and other meta-objects only once, and reference and manage them with convenience later across sessions.

If you want to use the SQL Client, you must specify the required catalog in the . You can specify one or more catalog instances in the catalogs list of the .

The following example shows how to specify a HiveCatalog:

execution:    planner: blink
type: streaming
current-catalog: myhive # set the HiveCatalog as the current catalog of the session
current-database: mydatabase
- name: myhive
type: hive
hive-conf-dir: /opt/hive-conf # contains hive-site.xml

In this example:

  • is the name that you specify for each catalog instance. The catalog name and the database name constitute the namespace for Flink SQL metadata. Therefore, the name of each catalog must be unique.
  • is the catalog type. For a HiveCatalog, the type must be set to hive.
  • is used to read the Hive configuration file. You can set it to the Hive configuration file directory of the cluster.
  • is used to specify the Hive version.

After you specify the HiveCatalog, start and run the following command to verify whether the specified HiveCatalog has been correctly loaded.

Flink SQL> show catalogs;
Flink SQL> use catalog myhive;

In this example, all loaded catalog instances. Note: Apart from the catalog that you have configured in the file, Flink SQL automatically loads a instance as the built-in catalog. The default name of this catalog is .

Read and Write Hive Tables

After you set up the HiveCatalog, you can read and write Hive tables with the Table API or SQL Client. Assume that you have a Hive table named . You can execute the following SQL statements to read and write this table:

Read Data

Flink SQL> show catalogs;
Flink SQL> use catalog myhive;Flink SQL> show databases;
Flink SQL> show tables;
Flink SQL> describe mytable;
|-- name: name
|-- type: STRING
|-- name: value
|-- type: DOUBLE
Flink SQL> SELECT * FROM mytable; name value
__________ __________
Tom 4.72
John 8.0
Tom 24.2
Bob. 3.14
Bob 4.72
Tom 34.9
Mary 4.79
Tiff 2.72
Bill 4.33
Mary 77.7

Write Data

Flink SQL> INSERT INTO mytable SELECT 'Tom',
Flink SQL> INSERT OVERWRITE mytable SELECT 'Tom', 25;# Static partition
Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') SELECT 'Tom', 25;
# Dynamic partitionFlink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08';# Static partition and dynamic partitionFlink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1') SELECT 'Tom', 25, '2019-08-08';


In this article, I first introduced the architecture of Hive integration in Flink. Then, I compiled it from the source code to the solve problems I encountered. Next, I introduced how to deploy and configure the Flink environment and integrate Hive. Finally, I referred to some official cases about reading data from and writing data to Hive tables.

Later, I will explain how to use Flink SQL to operate Hive in the production environment.


Original Source:

Written by

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store