The Run-In Period for Flink and Hive

Alibaba Cloud
10 min readAug 20, 2020

By Jason

Many readers told us that some bugs and compatibility issues occurred when they tried to deploy Apache Flink (Flink) and integrate Hive by referring to the previous article titled “Hive Finally Has Flink!”. Although the Flink-Hive integration now exists, it does not work. To address these concerns, I wrote this article to give you more detail.

Review

In the previous article, I used CDH 5.16.2 with Hive 1.1.0. Hive versions in CDH 5.x cannot be later than Hive 1.1.0. This seems hard to understand. The Flink source code is not very compatible with Hive 1.1.0, so many problems occur. To achieve compatibility with the current version, I modified the Flink code based on the CDH 5.16.2 environment, packaged, and deployed the code again.

Many open-source projects, such as Apache Atlas, Apache Spark, Hive 1.2.x, and Hive 1.1.x, are compatible with Flink in most cases after some JAR packages have been replaced. In the environment I use, the JAR package of Hive 1.1.0 is replaced by Hive 1.2.1. At the beginning of this article, I will solve this issue. Then, I will introduce some practices that are not mentioned in the previous article.

Complex and Difficult Issues

I summarize all the questions from readers into three types:

  1. Flink is connected to Hive by APIs. Is there a command-line interface (CLI) similar to Spark SQL CLI?
  2. The Hadoop environment is not identified or the configuration file is not found.
  3. Dependency packages, classes, or methods are not found.

1. An Approach to Connecting Flink with Hive

Some readers do not know how to configure catalogs for Flink to connect with Hive. Here is a complete conf/sql-client-hive.yaml example file:

catalogs:
- name: staginghive
type: hive
hive-conf-dir: /etc/hive/conf
hive-version: 1.2.1
execution:
planner: blink
type: batch
time-characteristic: event-time
periodic-watermarks-interval: 200
result-mode: table
max-table-result-rows: 1000000
parallelism: 1
max-parallelism: 128
min-idle-state-retention: 0
max-idle-state-retention: 0
current-catalog: staginghive
current-database: ssb
restart-strategy:
type: fallback
deployment:
response-timeout: 5000
gateway-address: ""
gateway-port: 0
m: yarn-cluster
yn: 2
ys: 5
yjm: 1024
ytm: 2048

The sql-client-hive.yaml file contains the following content:

  1. The path of the Hive configuration file is configured in catalogs.
  2. The YARN configuration information is configured in deployment.
  3. The Blink planner and batch type are configured in execution. The batch type is relatively stable, applicable to traditional batch processing, and fault-tolerant. Moreover, we recommend that you enable compression when you write intermediate data to a disk. In addition to batch, Flink also supports the streaming type.

Flink SQL CLI

Flink provides an SQL CLI tool, or specifically, the sql-client.sh script, which is similar to Spark SQL CLI. In Apache Flink 1.10, Flink SQL CLI has improvements in many features. I will introduce them later.

You can use sql-client.sh as follows:

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

2. Failure to Identify the Hadoop Environment or Find the Configuration File

As I mentioned previously, to deploy the CDH gateway (including Hadoop and Hive clients) in the Flink environment, you must set some environment variables:

export HADOOP_CONF_DIR=/etc/hadoop/conf
export YARN_CONF_DIR=/etc/hadoop/conf
export HIVE_HOME=/opt/cloudera/parcels/CDH/lib/hive
export HIVE_CONF_DIR=/etc/hive/conf

3. Failure to Find Dependency Packages, Classes, or Methods

First, check the lib directory in Flink’s home directory:

$ tree  lib
lib
├── flink-connector-hive_2.11-1.10.0.jar
├── flink-dist_2.11-1.10.0.jar
├── flink-hadoop-compatibility_2.11-1.10.0.jar
├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar
├── flink-table_2.11-1.10.0.jar
├── flink-table-blink_2.11-1.10.0.jar
├── hive-exec-1.1.0-cdh5.16.2.jar
├── hive-metastore-1.1.0-cdh5.16.2.jar
├── libfb303-0.9.3.jar
├── log4j-1.2.17.jar
└── slf4j-log4j12-1.7.15.jar

After you solve the preceding two problems, run the following command:

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

An error is reported:

Caused by: java.lang.ClassNotFoundException: org.apache.commons.logging.LogFactory

Before you run the sql-client.sh script, you must specify the path of the dependency package for the Hadoop environment. I recommend not adding a package every time an error is reported. Here, I recommend setting the HADOOPCLASSPATH environment variable (which can be added to ~/.bashprofile):

export HADOOP_CLASSPATH=`hadoop classpath`

Run the command again:

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

Another error is reported:

Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:753) at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:228) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:98) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive Metastore client

This indicates that the JAR package of Hive 1.1.0 is not compatible with Flink. The solution is:

  1. Download apache-hive-1.2.1
  2. Replace the Hive JAR package in Flink’s lib directory, delete hive-exec-1.1.0-cdh5.16.2.jar, hive-metastore-1.1.0-cdh5.16.2.jar, and libfb303-0.9.3.jar, and add hive-exec-1.2.1.jar, hive-metastore-1.2.1.jar, and libfb303-0.9.2.jar. Then, check the lib directory again:
$ tree lib
lib
├── flink-connector-hive_2.11-1.10.0.jar
├── flink-dist_2.11-1.10.0.jar
├── flink-hadoop-compatibility_2.11-1.10.0.jar
├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar
├── flink-table_2.11-1.10.0.jar
├── flink-table-blink_2.11-1.10.0.jar
├── hive-exec-1.2.1.jar
├── hive-metastore-1.2.1.jar
├── libfb303-0.9.2.jar
├── log4j-1.2.17.jar
└── slf4j-log4j12-1.7.15.jar

Finally, run the command again:

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

Now, you can see a cute squirrel holding a chestnut.

Flink SQL CLI Practices

In Apache Flink 1.10 (currently RC1), the Flink community has made a lot of changes to SQL CLI. Now, SQL CLI supports View, more data types and DDL statements, partition reading and writing, INSERT OVERWRITE, and more Table API features. Therefore, it is easier to use.

Next, I will introduce Flink SQL CLI in detail.

0. Help

Run the following command to log on to Flink SQL Client:

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml
Flink SQL>

Execute HELP to view the commands that Flink SQL supports. The following list includes most of the common commands:

  • CREATE TABLE
  • DROP TABLE
  • CREATE VIEW
  • DESCRIBE
  • DROP VIEW
  • EXPLAIN
  • INSERT INTO
  • INSERT OVERWRITE
  • SELECT
  • SHOW FUNCTIONS
  • USE CATALOG
  • SHOW TABLES
  • SHOW DATABASES
  • SOURCE
  • USE
  • SHOW CATALOGS

1. Hive Operations

1.1 Create a Table and Import the Data

For your convenience, I use ssb-dbgen to generate test data. You can also use the existing data in the test environment for testing. For more information about how to easily create tables and insert data in Hive, click this link to see my earlier projects.

1.2 View the Hive Table

View the Hive table created in the preceding step:

0: jdbc:hive2://xx.xxx.xxx.xxx:10000> show tables;
+--------------+--+
| tab_name |
+--------------+--+
| customer |
| dates |
| lineorder |
| p_lineorder |
| part |
| supplier |
+--------------+--+

You can perform various queries on the Hive table and compare the results with those of subsequent Flink SQL queries.

2. Flink Operations

2.1 Use HiveCatalog to Access Hive Databases

Log on to Flink SQL CLI and query catalogs:

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml
Flink SQL> show catalogs;
default_catalog
staginghive
Flink SQL> use catalog staginghive;

Retrieve all of the configured catalogs by executing SHOW CATALOGS. I set the default catalog in the sql-client-hive.yaml file, or specifically, staginghive. If you want to switch to another catalog, you can use usecatalog xxx.

2.2 Query Hive Metadata

Query a Hive database and table by using Flink SQL:

# Query the database
Flink SQL> show databases;
...
ssb
tmp
...
Flink SQL> use ssb;
# Query the table
Flink SQL> show tables;
customer
dates
lineorder
p_lineorder
part
supplier
# Query the table schema
Flink SQL> DESCRIBE customer;
root
|-- c_custkey: INT
|-- c_name: STRING
|-- c_address: STRING
|-- c_city: STRING
|-- c_nation: STRING
|-- c_region: STRING
|-- c_phone: STRING
|-- c_mktsegment: STRING

Note: Hive metadata is used in lowercase letters in Flink catalogs.

2.3 Execute SQL Statements

Next, execute some SQL statements in Flink SQL CLI. For complete SQL statements, see the README file. Currently, some bugs may appear when Flink SQL parses the metadata of Hive views. For example, when you execute Q1.1 SQL, an error is reported because Flink SQL cannot find the fact table in the view:

Flink SQL> select sum(v_revenue) as revenue
> from p_lineorder
> left join dates on lo_orderdate = d_datekey
> where d_year = 1993
> and lo_discount between 1 and 3
> and lo_quantity < 25;
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Tabeorder' not found; did you mean 'LINEORDER'?

The p_lineorder table is a Hive view. The statement for creating the table is:

CREATE VIEW P_LINEORDER AS
SELECT LO_ORDERKEY,
LO_LINENUMBER,
LO_CUSTKEY,
LO_PARTKEY,
LO_SUPPKEY,
LO_ORDERDATE,
LO_ORDERPRIOTITY,
LO_SHIPPRIOTITY,
LO_QUANTITY,
LO_EXTENDEDPRICE,
LO_ORDTOTALPRICE,
LO_DISCOUNT,
LO_REVENUE,
LO_SUPPLYCOST,
LO_TAX,
LO_COMMITDATE,
LO_SHIPMODE,
LO_EXTENDEDPRICE*LO_DISCOUNT AS V_REVENUE
FROM ssb.LINEORDER;

Flink SQL does not process the view in Hive well. Here, I delete and recreate the view in Hive to run the subsequent SQL statements smoothly:

0: jdbc:hive2://xx.xxx.xxx.xxx:10000> create view p_lineorder as
select lo_orderkey,
lo_linenumber,
lo_custkey,
lo_partkey,
lo_suppkey,
lo_orderdate,
lo_orderpriotity,
lo_shippriotity,
lo_quantity,
lo_extendedprice,
lo_ordtotalprice,
lo_discount,
lo_revenue,
lo_supplycost,
lo_tax,
lo_commitdate,
lo_shipmode,
lo_extendedprice*lo_discount as v_revenue
from ssb.lineorder;

Continue to execute Q1.1 SQL in Flink SQL CLI:

Flink SQL> select sum(v_revenue) as revenue
> from p_lineorder
> left join dates on lo_orderdate = d_datekey
> where d_year = 1993
> and lo_discount between 1 and 3
> and lo_quantity < 25;
revenue
894280292647

Then, execute Q2.1 SQL:

Flink SQL> select sum(lo_revenue) as lo_revenue, d_year, p_brand
> from p_lineorder
> left join dates on lo_orderdate = d_datekey
> left join part on lo_partkey = p_partkey
> left join supplier on lo_suppkey = s_suppkey
> where p_category = 'MFGR#12' and s_region = 'AMERICA'
> group by d_year, p_brand
> order by d_year, p_brand;
lo_revenue d_year p_brand
819634128 1998 MFGR#1206
877651232 1998 MFGR#1207
754489428 1998 MFGR#1208
816369488 1998 MFGR#1209
668482306 1998 MFGR#1210
660366608 1998 MFGR#1211
862902570 1998 MFGR#1212
...

Finally, execute Q4.3 SQL:

Flink SQL> select d_year, s_city, p_brand, sum(lo_revenue) - sum(lo_supplycost) as profit
> from p_lineorder
> left join dates on lo_orderdate = d_datekey
> left join customer on lo_custkey = c_custkey
> left join supplier on lo_suppkey = s_suppkey
> left join part on lo_partkey = p_partkey
> where c_region = 'AMERICA'and s_nation = 'UNITED STATES'
> and (d_year = 1997 or d_year = 1998)
> and p_category = 'MFGR#14'
> group by d_year, s_city, p_brand
> order by d_year, s_city, p_brand;
d_year s_city p_brand profit
1998 UNITED ST9 MFGR#1440 6665681

If you are interested, you can execute the remaining SQL statements and compare the efficiency of executing them in Flink SQL and Spark SQL. Flink SQL also supports EXPLAIN that is executed to query SQL execution plans.

2.4 Create Views

You can create and delete views in Flink SQL CLI:

Flink SQL> create view p_lineorder2 as
> select lo_orderkey,
> lo_linenumber,
> lo_custkey,
> lo_partkey,
> lo_suppkey,
> lo_orderdate,
> lo_orderpriotity,
> lo_shippriotity,
> lo_quantity,
> lo_extendedprice,
> lo_ordtotalprice,
> lo_discount,
> lo_revenue,
> lo_supplycost,
> lo_tax,
> lo_commitdate,
> lo_shipmode,
> lo_extendedprice * lo_discount as v_revenue
> from ssb.lineorder;
[INFO] View has been created.

I want to emphasize. Currently, Flink does not support deletion of Hive views:

Flink SQL> drop view p_lineorder;
[ERROR] Could not execute SQL statement. Reason:
The given view does not exist in the current CLI session. Only views created with a CREATE VIEW statement can be accessed.

2.5 Perform Partition Operations

Create a partition table in the Hive database:

CREATE TABLE IF NOT EXISTS flink_partition_test (
id int,
name string
) PARTITIONED BY (day string, type string)
stored as textfile;

Next, insert and query data by using Flink SQL:

# Insert data into the static partition
Flink SQL> INSERT INTO flink_partition_test PARTITION (type='Flink', `day`='2020-02-01') SELECT 100001, 'Flink001';
# Query
Flink SQL> select * from flink_partition_test;
id name day type
100001 Flink001 2020-02-01 Flink
# Insert data into the dynamic partition
Flink SQL> INSERT INTO flink_partition_test SELECT 100002, 'Spark', '2020-02-02', 'SparkSQL';
# Query
Flink SQL> select * from flink_partition_test;
id name day type
100002 Spark 2020-02-02 SparkSQL
100001 FlinkSQL 2020-02-01 Flink
# Similar steps are performed to use dynamic and static partitions together. No more demo is introduced.
# Overwrite inserted data
Flink SQL> INSERT OVERWRITE flink_partition_test PARTITION (type='Flink') SELECT 100002, 'Spark', '2020-02-08', 'SparkSQL-2.4';
id name day type
100002 Spark 2020-02-02 SparkSQL
100001 FlinkSQL 2020-02-01 Flink

The day field is a keyword in Flink and must be specially processed.

2.6 Other Features

  • 2.6.1 Functions

Flink SQL supports built-in functions and user-defined functions. You can execute SHOW FUNCTIONS to view built-in functions. I will introduce how to create user-created functions later.

  • 2.6.2 Parameter Setting

Flink SQL supports setting environment parameters. You can run the set command to view and set parameters:

Flink SQL> set;
deployment.gateway-address=
deployment.gateway-port=0
deployment.m=yarn-cluster
deployment.response-timeout=5000
deployment.yjm=1024
deployment.yn=2
deployment.ys=5
deployment.ytm=2048
execution.current-catalog=staginghive
execution.current-database=ssb
execution.max-idle-state-retention=0
execution.max-parallelism=128
execution.max-table-result-rows=1000000
execution.min-idle-state-retention=0
execution.parallelism=1
execution.periodic-watermarks-interval=200
execution.planner=blink
execution.restart-strategy.type=fallback
execution.result-mode=table
execution.time-characteristic=event-time
execution.type=batch
Flink SQL> set deployment.yjm = 2048;

Summary

In this article, I operated on a Hive database using Flink SQL and demonstrated some features provided by Flink SQL.

Currently, Flink SQL still has some problems operating on Hive databases:

  • Flink SQL only supports the TextFile storage format in Hive.
  • Flink SQL only supports TextFile tables in Hive databases, and ROW FORMAT SERDE used with the TextFile format is org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe. Although Flink SQL supports storage formats, such as RCFile, ORC, Parquet, and Sequence File, it cannot automatically identify the storage formats of Hive tables. To use other storage formats, you have to modify the source code and recompile the code. The community has tested these storage formats, and I believe that they will be used in Flink SQL soon.
  • Flink SQL does not completely support OpenCSVSerde. When you use org.apache.hadoop.hive.serde2.OpenCSVSerde as ROW FORMAT SERDE of TextFile, field types cannot be correctly identified, and all fields in a Hive table are mapped to the String type.
  • Flink SQL does not support bucketed tables in Hive.
  • Flink SQL does not support ACID tables in Hive.
  • Flink SQL only has a few optimization features.
  • Flink SQL is similar to Spark SQL in permission control.

Based on access control lists (ACLs) of the Hadoop Distributed File System (HDFS), Sentry or Ranger permissions are not supported. However, Cloudera is developing a policy for setting shared access permissions between Spark SQL and Hive based on Ranger. In this way, row-and column-level control can be implemented and information can be audited.

The Flink community develops rapidly. All these problems will be solved with the release of the new version. We recommend using APIs when Flink SQL cannot meet your demands.

Original Source:

--

--

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com