The Run-In Period for Flink and Hive

Review

In the previous article, I used CDH 5.16.2 with Hive 1.1.0. Hive versions in CDH 5.x cannot be later than Hive 1.1.0. This seems hard to understand. The Flink source code is not very compatible with Hive 1.1.0, so many problems occur. To achieve compatibility with the current version, I modified the Flink code based on the CDH 5.16.2 environment, packaged, and deployed the code again.

Complex and Difficult Issues

I summarize all the questions from readers into three types:

  1. The Hadoop environment is not identified or the configuration file is not found.
  2. Dependency packages, classes, or methods are not found.

1. An Approach to Connecting Flink with Hive

Some readers do not know how to configure catalogs for Flink to connect with Hive. Here is a complete conf/sql-client-hive.yaml example file:

catalogs:
- name: staginghive
type: hive
hive-conf-dir: /etc/hive/conf
hive-version: 1.2.1
execution:
planner: blink
type: batch
time-characteristic: event-time
periodic-watermarks-interval: 200
result-mode: table
max-table-result-rows: 1000000
parallelism: 1
max-parallelism: 128
min-idle-state-retention: 0
max-idle-state-retention: 0
current-catalog: staginghive
current-database: ssb
restart-strategy:
type: fallback
deployment:
response-timeout: 5000
gateway-address: ""
gateway-port: 0
m: yarn-cluster
yn: 2
ys: 5
yjm: 1024
ytm: 2048
  1. The YARN configuration information is configured in deployment.
  2. The Blink planner and batch type are configured in execution. The batch type is relatively stable, applicable to traditional batch processing, and fault-tolerant. Moreover, we recommend that you enable compression when you write intermediate data to a disk. In addition to batch, Flink also supports the streaming type.

Flink SQL CLI

Flink provides an SQL CLI tool, or specifically, the sql-client.sh script, which is similar to Spark SQL CLI. In Apache Flink 1.10, Flink SQL CLI has improvements in many features. I will introduce them later.

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

2. Failure to Identify the Hadoop Environment or Find the Configuration File

As I mentioned previously, to deploy the CDH gateway (including Hadoop and Hive clients) in the Flink environment, you must set some environment variables:

export HADOOP_CONF_DIR=/etc/hadoop/conf
export YARN_CONF_DIR=/etc/hadoop/conf
export HIVE_HOME=/opt/cloudera/parcels/CDH/lib/hive
export HIVE_CONF_DIR=/etc/hive/conf

3. Failure to Find Dependency Packages, Classes, or Methods

First, check the lib directory in Flink’s home directory:

$ tree  lib
lib
├── flink-connector-hive_2.11-1.10.0.jar
├── flink-dist_2.11-1.10.0.jar
├── flink-hadoop-compatibility_2.11-1.10.0.jar
├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar
├── flink-table_2.11-1.10.0.jar
├── flink-table-blink_2.11-1.10.0.jar
├── hive-exec-1.1.0-cdh5.16.2.jar
├── hive-metastore-1.1.0-cdh5.16.2.jar
├── libfb303-0.9.3.jar
├── log4j-1.2.17.jar
└── slf4j-log4j12-1.7.15.jar
$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml
Caused by: java.lang.ClassNotFoundException: org.apache.commons.logging.LogFactory
export HADOOP_CLASSPATH=`hadoop classpath`
$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:753) at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:228) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:98) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive Metastore client
  1. Replace the Hive JAR package in Flink’s lib directory, delete hive-exec-1.1.0-cdh5.16.2.jar, hive-metastore-1.1.0-cdh5.16.2.jar, and libfb303-0.9.3.jar, and add hive-exec-1.2.1.jar, hive-metastore-1.2.1.jar, and libfb303-0.9.2.jar. Then, check the lib directory again:
$ tree lib
lib
├── flink-connector-hive_2.11-1.10.0.jar
├── flink-dist_2.11-1.10.0.jar
├── flink-hadoop-compatibility_2.11-1.10.0.jar
├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar
├── flink-table_2.11-1.10.0.jar
├── flink-table-blink_2.11-1.10.0.jar
├── hive-exec-1.2.1.jar
├── hive-metastore-1.2.1.jar
├── libfb303-0.9.2.jar
├── log4j-1.2.17.jar
└── slf4j-log4j12-1.7.15.jar
$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

Flink SQL CLI Practices

In Apache Flink 1.10 (currently RC1), the Flink community has made a lot of changes to SQL CLI. Now, SQL CLI supports View, more data types and DDL statements, partition reading and writing, INSERT OVERWRITE, and more Table API features. Therefore, it is easier to use.

0. Help

Run the following command to log on to Flink SQL Client:

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml
Flink SQL>
  • DROP TABLE
  • CREATE VIEW
  • DESCRIBE
  • DROP VIEW
  • EXPLAIN
  • INSERT INTO
  • INSERT OVERWRITE
  • SELECT
  • SHOW FUNCTIONS
  • USE CATALOG
  • SHOW TABLES
  • SHOW DATABASES
  • SOURCE
  • USE
  • SHOW CATALOGS

1. Hive Operations

1.1 Create a Table and Import the Data

For your convenience, I use ssb-dbgen to generate test data. You can also use the existing data in the test environment for testing. For more information about how to easily create tables and insert data in Hive, click this link to see my earlier projects.

1.2 View the Hive Table

View the Hive table created in the preceding step:

0: jdbc:hive2://xx.xxx.xxx.xxx:10000> show tables;
+--------------+--+
| tab_name |
+--------------+--+
| customer |
| dates |
| lineorder |
| p_lineorder |
| part |
| supplier |
+--------------+--+

2. Flink Operations

2.1 Use HiveCatalog to Access Hive Databases

Log on to Flink SQL CLI and query catalogs:

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml
Flink SQL> show catalogs;
default_catalog
staginghive
Flink SQL> use catalog staginghive;

2.2 Query Hive Metadata

Query a Hive database and table by using Flink SQL:

# Query the database
Flink SQL> show databases;
...
ssb
tmp
...
Flink SQL> use ssb;
# Query the table
Flink SQL> show tables;
customer
dates
lineorder
p_lineorder
part
supplier
# Query the table schema
Flink SQL> DESCRIBE customer;
root
|-- c_custkey: INT
|-- c_name: STRING
|-- c_address: STRING
|-- c_city: STRING
|-- c_nation: STRING
|-- c_region: STRING
|-- c_phone: STRING
|-- c_mktsegment: STRING

2.3 Execute SQL Statements

Next, execute some SQL statements in Flink SQL CLI. For complete SQL statements, see the README file. Currently, some bugs may appear when Flink SQL parses the metadata of Hive views. For example, when you execute Q1.1 SQL, an error is reported because Flink SQL cannot find the fact table in the view:

Flink SQL> select sum(v_revenue) as revenue
> from p_lineorder
> left join dates on lo_orderdate = d_datekey
> where d_year = 1993
> and lo_discount between 1 and 3
> and lo_quantity < 25;
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Tabeorder' not found; did you mean 'LINEORDER'?
CREATE VIEW P_LINEORDER AS
SELECT LO_ORDERKEY,
LO_LINENUMBER,
LO_CUSTKEY,
LO_PARTKEY,
LO_SUPPKEY,
LO_ORDERDATE,
LO_ORDERPRIOTITY,
LO_SHIPPRIOTITY,
LO_QUANTITY,
LO_EXTENDEDPRICE,
LO_ORDTOTALPRICE,
LO_DISCOUNT,
LO_REVENUE,
LO_SUPPLYCOST,
LO_TAX,
LO_COMMITDATE,
LO_SHIPMODE,
LO_EXTENDEDPRICE*LO_DISCOUNT AS V_REVENUE
FROM ssb.LINEORDER;
0: jdbc:hive2://xx.xxx.xxx.xxx:10000> create view p_lineorder as
select lo_orderkey,
lo_linenumber,
lo_custkey,
lo_partkey,
lo_suppkey,
lo_orderdate,
lo_orderpriotity,
lo_shippriotity,
lo_quantity,
lo_extendedprice,
lo_ordtotalprice,
lo_discount,
lo_revenue,
lo_supplycost,
lo_tax,
lo_commitdate,
lo_shipmode,
lo_extendedprice*lo_discount as v_revenue
from ssb.lineorder;
Flink SQL> select sum(v_revenue) as revenue
> from p_lineorder
> left join dates on lo_orderdate = d_datekey
> where d_year = 1993
> and lo_discount between 1 and 3
> and lo_quantity < 25;
revenue
894280292647
Flink SQL> select sum(lo_revenue) as lo_revenue, d_year, p_brand
> from p_lineorder
> left join dates on lo_orderdate = d_datekey
> left join part on lo_partkey = p_partkey
> left join supplier on lo_suppkey = s_suppkey
> where p_category = 'MFGR#12' and s_region = 'AMERICA'
> group by d_year, p_brand
> order by d_year, p_brand;
lo_revenue d_year p_brand
819634128 1998 MFGR#1206
877651232 1998 MFGR#1207
754489428 1998 MFGR#1208
816369488 1998 MFGR#1209
668482306 1998 MFGR#1210
660366608 1998 MFGR#1211
862902570 1998 MFGR#1212
...
Flink SQL> select d_year, s_city, p_brand, sum(lo_revenue) - sum(lo_supplycost) as profit
> from p_lineorder
> left join dates on lo_orderdate = d_datekey
> left join customer on lo_custkey = c_custkey
> left join supplier on lo_suppkey = s_suppkey
> left join part on lo_partkey = p_partkey
> where c_region = 'AMERICA'and s_nation = 'UNITED STATES'
> and (d_year = 1997 or d_year = 1998)
> and p_category = 'MFGR#14'
> group by d_year, s_city, p_brand
> order by d_year, s_city, p_brand;
d_year s_city p_brand profit
1998 UNITED ST9 MFGR#1440 6665681

2.4 Create Views

You can create and delete views in Flink SQL CLI:

Flink SQL> create view p_lineorder2 as
> select lo_orderkey,
> lo_linenumber,
> lo_custkey,
> lo_partkey,
> lo_suppkey,
> lo_orderdate,
> lo_orderpriotity,
> lo_shippriotity,
> lo_quantity,
> lo_extendedprice,
> lo_ordtotalprice,
> lo_discount,
> lo_revenue,
> lo_supplycost,
> lo_tax,
> lo_commitdate,
> lo_shipmode,
> lo_extendedprice * lo_discount as v_revenue
> from ssb.lineorder;
[INFO] View has been created.
Flink SQL> drop view p_lineorder;
[ERROR] Could not execute SQL statement. Reason:
The given view does not exist in the current CLI session. Only views created with a CREATE VIEW statement can be accessed.

2.5 Perform Partition Operations

Create a partition table in the Hive database:

CREATE TABLE IF NOT EXISTS flink_partition_test (
id int,
name string
) PARTITIONED BY (day string, type string)
stored as textfile;
# Insert data into the static partition
Flink SQL> INSERT INTO flink_partition_test PARTITION (type='Flink', `day`='2020-02-01') SELECT 100001, 'Flink001';
# Query
Flink SQL> select * from flink_partition_test;
id name day type
100001 Flink001 2020-02-01 Flink
# Insert data into the dynamic partition
Flink SQL> INSERT INTO flink_partition_test SELECT 100002, 'Spark', '2020-02-02', 'SparkSQL';
# Query
Flink SQL> select * from flink_partition_test;
id name day type
100002 Spark 2020-02-02 SparkSQL
100001 FlinkSQL 2020-02-01 Flink
# Similar steps are performed to use dynamic and static partitions together. No more demo is introduced.
# Overwrite inserted data
Flink SQL> INSERT OVERWRITE flink_partition_test PARTITION (type='Flink') SELECT 100002, 'Spark', '2020-02-08', 'SparkSQL-2.4';
id name day type
100002 Spark 2020-02-02 SparkSQL
100001 FlinkSQL 2020-02-01 Flink

2.6 Other Features

  • 2.6.1 Functions
Flink SQL> set;
deployment.gateway-address=
deployment.gateway-port=0
deployment.m=yarn-cluster
deployment.response-timeout=5000
deployment.yjm=1024
deployment.yn=2
deployment.ys=5
deployment.ytm=2048
execution.current-catalog=staginghive
execution.current-database=ssb
execution.max-idle-state-retention=0
execution.max-parallelism=128
execution.max-table-result-rows=1000000
execution.min-idle-state-retention=0
execution.parallelism=1
execution.periodic-watermarks-interval=200
execution.planner=blink
execution.restart-strategy.type=fallback
execution.result-mode=table
execution.time-characteristic=event-time
execution.type=batch
Flink SQL> set deployment.yjm = 2048;

Summary

In this article, I operated on a Hive database using Flink SQL and demonstrated some features provided by Flink SQL.

  • Flink SQL only supports TextFile tables in Hive databases, and ROW FORMAT SERDE used with the TextFile format is org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe. Although Flink SQL supports storage formats, such as RCFile, ORC, Parquet, and Sequence File, it cannot automatically identify the storage formats of Hive tables. To use other storage formats, you have to modify the source code and recompile the code. The community has tested these storage formats, and I believe that they will be used in Flink SQL soon.
  • Flink SQL does not completely support OpenCSVSerde. When you use org.apache.hadoop.hive.serde2.OpenCSVSerde as ROW FORMAT SERDE of TextFile, field types cannot be correctly identified, and all fields in a Hive table are mapped to the String type.
  • Flink SQL does not support bucketed tables in Hive.
  • Flink SQL does not support ACID tables in Hive.
  • Flink SQL only has a few optimization features.
  • Flink SQL is similar to Spark SQL in permission control.

Original Source:

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store