The Run-In Period for Flink and Hive

Review

Complex and Difficult Issues

  1. Flink is connected to Hive by APIs. Is there a command-line interface (CLI) similar to Spark SQL CLI?
  2. The Hadoop environment is not identified or the configuration file is not found.
  3. Dependency packages, classes, or methods are not found.

1. An Approach to Connecting Flink with Hive

catalogs:
- name: staginghive
type: hive
hive-conf-dir: /etc/hive/conf
hive-version: 1.2.1
execution:
planner: blink
type: batch
time-characteristic: event-time
periodic-watermarks-interval: 200
result-mode: table
max-table-result-rows: 1000000
parallelism: 1
max-parallelism: 128
min-idle-state-retention: 0
max-idle-state-retention: 0
current-catalog: staginghive
current-database: ssb
restart-strategy:
type: fallback
deployment:
response-timeout: 5000
gateway-address: ""
gateway-port: 0
m: yarn-cluster
yn: 2
ys: 5
yjm: 1024
ytm: 2048
  1. The path of the Hive configuration file is configured in catalogs.
  2. The YARN configuration information is configured in deployment.
  3. The Blink planner and batch type are configured in execution. The batch type is relatively stable, applicable to traditional batch processing, and fault-tolerant. Moreover, we recommend that you enable compression when you write intermediate data to a disk. In addition to batch, Flink also supports the streaming type.

Flink SQL CLI

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

2. Failure to Identify the Hadoop Environment or Find the Configuration File

export HADOOP_CONF_DIR=/etc/hadoop/conf
export YARN_CONF_DIR=/etc/hadoop/conf
export HIVE_HOME=/opt/cloudera/parcels/CDH/lib/hive
export HIVE_CONF_DIR=/etc/hive/conf

3. Failure to Find Dependency Packages, Classes, or Methods

$ tree  lib
lib
├── flink-connector-hive_2.11-1.10.0.jar
├── flink-dist_2.11-1.10.0.jar
├── flink-hadoop-compatibility_2.11-1.10.0.jar
├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar
├── flink-table_2.11-1.10.0.jar
├── flink-table-blink_2.11-1.10.0.jar
├── hive-exec-1.1.0-cdh5.16.2.jar
├── hive-metastore-1.1.0-cdh5.16.2.jar
├── libfb303-0.9.3.jar
├── log4j-1.2.17.jar
└── slf4j-log4j12-1.7.15.jar
$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml
Caused by: java.lang.ClassNotFoundException: org.apache.commons.logging.LogFactory
export HADOOP_CLASSPATH=`hadoop classpath`
$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:753) at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:228) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:98) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive Metastore client
  1. Download apache-hive-1.2.1
  2. Replace the Hive JAR package in Flink’s lib directory, delete hive-exec-1.1.0-cdh5.16.2.jar, hive-metastore-1.1.0-cdh5.16.2.jar, and libfb303-0.9.3.jar, and add hive-exec-1.2.1.jar, hive-metastore-1.2.1.jar, and libfb303-0.9.2.jar. Then, check the lib directory again:
$ tree lib
lib
├── flink-connector-hive_2.11-1.10.0.jar
├── flink-dist_2.11-1.10.0.jar
├── flink-hadoop-compatibility_2.11-1.10.0.jar
├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar
├── flink-table_2.11-1.10.0.jar
├── flink-table-blink_2.11-1.10.0.jar
├── hive-exec-1.2.1.jar
├── hive-metastore-1.2.1.jar
├── libfb303-0.9.2.jar
├── log4j-1.2.17.jar
└── slf4j-log4j12-1.7.15.jar
$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

Flink SQL CLI Practices

0. Help

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml
Flink SQL>
  • CREATE TABLE
  • DROP TABLE
  • CREATE VIEW
  • DESCRIBE
  • DROP VIEW
  • EXPLAIN
  • INSERT INTO
  • INSERT OVERWRITE
  • SELECT
  • SHOW FUNCTIONS
  • USE CATALOG
  • SHOW TABLES
  • SHOW DATABASES
  • SOURCE
  • USE
  • SHOW CATALOGS

1. Hive Operations

1.1 Create a Table and Import the Data

1.2 View the Hive Table

0: jdbc:hive2://xx.xxx.xxx.xxx:10000> show tables;
+--------------+--+
| tab_name |
+--------------+--+
| customer |
| dates |
| lineorder |
| p_lineorder |
| part |
| supplier |
+--------------+--+

2. Flink Operations

2.1 Use HiveCatalog to Access Hive Databases

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml
Flink SQL> show catalogs;
default_catalog
staginghive
Flink SQL> use catalog staginghive;

2.2 Query Hive Metadata

# Query the database
Flink SQL> show databases;
...
ssb
tmp
...
Flink SQL> use ssb;
# Query the table
Flink SQL> show tables;
customer
dates
lineorder
p_lineorder
part
supplier
# Query the table schema
Flink SQL> DESCRIBE customer;
root
|-- c_custkey: INT
|-- c_name: STRING
|-- c_address: STRING
|-- c_city: STRING
|-- c_nation: STRING
|-- c_region: STRING
|-- c_phone: STRING
|-- c_mktsegment: STRING

2.3 Execute SQL Statements

Flink SQL> select sum(v_revenue) as revenue
> from p_lineorder
> left join dates on lo_orderdate = d_datekey
> where d_year = 1993
> and lo_discount between 1 and 3
> and lo_quantity < 25;
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Tabeorder' not found; did you mean 'LINEORDER'?
CREATE VIEW P_LINEORDER AS
SELECT LO_ORDERKEY,
LO_LINENUMBER,
LO_CUSTKEY,
LO_PARTKEY,
LO_SUPPKEY,
LO_ORDERDATE,
LO_ORDERPRIOTITY,
LO_SHIPPRIOTITY,
LO_QUANTITY,
LO_EXTENDEDPRICE,
LO_ORDTOTALPRICE,
LO_DISCOUNT,
LO_REVENUE,
LO_SUPPLYCOST,
LO_TAX,
LO_COMMITDATE,
LO_SHIPMODE,
LO_EXTENDEDPRICE*LO_DISCOUNT AS V_REVENUE
FROM ssb.LINEORDER;
0: jdbc:hive2://xx.xxx.xxx.xxx:10000> create view p_lineorder as
select lo_orderkey,
lo_linenumber,
lo_custkey,
lo_partkey,
lo_suppkey,
lo_orderdate,
lo_orderpriotity,
lo_shippriotity,
lo_quantity,
lo_extendedprice,
lo_ordtotalprice,
lo_discount,
lo_revenue,
lo_supplycost,
lo_tax,
lo_commitdate,
lo_shipmode,
lo_extendedprice*lo_discount as v_revenue
from ssb.lineorder;
Flink SQL> select sum(v_revenue) as revenue
> from p_lineorder
> left join dates on lo_orderdate = d_datekey
> where d_year = 1993
> and lo_discount between 1 and 3
> and lo_quantity < 25;
revenue
894280292647
Flink SQL> select sum(lo_revenue) as lo_revenue, d_year, p_brand
> from p_lineorder
> left join dates on lo_orderdate = d_datekey
> left join part on lo_partkey = p_partkey
> left join supplier on lo_suppkey = s_suppkey
> where p_category = 'MFGR#12' and s_region = 'AMERICA'
> group by d_year, p_brand
> order by d_year, p_brand;
lo_revenue d_year p_brand
819634128 1998 MFGR#1206
877651232 1998 MFGR#1207
754489428 1998 MFGR#1208
816369488 1998 MFGR#1209
668482306 1998 MFGR#1210
660366608 1998 MFGR#1211
862902570 1998 MFGR#1212
...
Flink SQL> select d_year, s_city, p_brand, sum(lo_revenue) - sum(lo_supplycost) as profit
> from p_lineorder
> left join dates on lo_orderdate = d_datekey
> left join customer on lo_custkey = c_custkey
> left join supplier on lo_suppkey = s_suppkey
> left join part on lo_partkey = p_partkey
> where c_region = 'AMERICA'and s_nation = 'UNITED STATES'
> and (d_year = 1997 or d_year = 1998)
> and p_category = 'MFGR#14'
> group by d_year, s_city, p_brand
> order by d_year, s_city, p_brand;
d_year s_city p_brand profit
1998 UNITED ST9 MFGR#1440 6665681

2.4 Create Views

Flink SQL> create view p_lineorder2 as
> select lo_orderkey,
> lo_linenumber,
> lo_custkey,
> lo_partkey,
> lo_suppkey,
> lo_orderdate,
> lo_orderpriotity,
> lo_shippriotity,
> lo_quantity,
> lo_extendedprice,
> lo_ordtotalprice,
> lo_discount,
> lo_revenue,
> lo_supplycost,
> lo_tax,
> lo_commitdate,
> lo_shipmode,
> lo_extendedprice * lo_discount as v_revenue
> from ssb.lineorder;
[INFO] View has been created.
Flink SQL> drop view p_lineorder;
[ERROR] Could not execute SQL statement. Reason:
The given view does not exist in the current CLI session. Only views created with a CREATE VIEW statement can be accessed.

2.5 Perform Partition Operations

CREATE TABLE IF NOT EXISTS flink_partition_test (
id int,
name string
) PARTITIONED BY (day string, type string)
stored as textfile;
# Insert data into the static partition
Flink SQL> INSERT INTO flink_partition_test PARTITION (type='Flink', `day`='2020-02-01') SELECT 100001, 'Flink001';
# Query
Flink SQL> select * from flink_partition_test;
id name day type
100001 Flink001 2020-02-01 Flink
# Insert data into the dynamic partition
Flink SQL> INSERT INTO flink_partition_test SELECT 100002, 'Spark', '2020-02-02', 'SparkSQL';
# Query
Flink SQL> select * from flink_partition_test;
id name day type
100002 Spark 2020-02-02 SparkSQL
100001 FlinkSQL 2020-02-01 Flink
# Similar steps are performed to use dynamic and static partitions together. No more demo is introduced.
# Overwrite inserted data
Flink SQL> INSERT OVERWRITE flink_partition_test PARTITION (type='Flink') SELECT 100002, 'Spark', '2020-02-08', 'SparkSQL-2.4';
id name day type
100002 Spark 2020-02-02 SparkSQL
100001 FlinkSQL 2020-02-01 Flink

2.6 Other Features

  • 2.6.1 Functions
  • 2.6.2 Parameter Setting
Flink SQL> set;
deployment.gateway-address=
deployment.gateway-port=0
deployment.m=yarn-cluster
deployment.response-timeout=5000
deployment.yjm=1024
deployment.yn=2
deployment.ys=5
deployment.ytm=2048
execution.current-catalog=staginghive
execution.current-database=ssb
execution.max-idle-state-retention=0
execution.max-parallelism=128
execution.max-table-result-rows=1000000
execution.min-idle-state-retention=0
execution.parallelism=1
execution.periodic-watermarks-interval=200
execution.planner=blink
execution.restart-strategy.type=fallback
execution.result-mode=table
execution.time-characteristic=event-time
execution.type=batch
Flink SQL> set deployment.yjm = 2048;

Summary

  • Flink SQL only supports the TextFile storage format in Hive.
  • Flink SQL only supports TextFile tables in Hive databases, and ROW FORMAT SERDE used with the TextFile format is org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe. Although Flink SQL supports storage formats, such as RCFile, ORC, Parquet, and Sequence File, it cannot automatically identify the storage formats of Hive tables. To use other storage formats, you have to modify the source code and recompile the code. The community has tested these storage formats, and I believe that they will be used in Flink SQL soon.
  • Flink SQL does not completely support OpenCSVSerde. When you use org.apache.hadoop.hive.serde2.OpenCSVSerde as ROW FORMAT SERDE of TextFile, field types cannot be correctly identified, and all fields in a Hive table are mapped to the String type.
  • Flink SQL does not support bucketed tables in Hive.
  • Flink SQL does not support ACID tables in Hive.
  • Flink SQL only has a few optimization features.
  • Flink SQL is similar to Spark SQL in permission control.

Original Source:

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com