A Deep Dive into Apache Flink 1.11: Stream-Batch Integrated Hive Data Warehouse

Data Warehouse Architecture

Offline Data Warehouses

Real-Time Data Warehouses

Real-Time Hive Data Warehouses

Hive Streaming Sink

-- Use the Hive DDL syntax with the Hive dialect.
SET table.sql-dialect=hive;
CREATE TABLE hive_table (
user_id STRING,
order_amount DOUBLE
) PARTITIONED BY (
dt STRING,
hour STRING
) STORED AS PARQUET TBLPROPERTIES (
-- Determine when to commit a partition according to the time extracted from the partition and the watermark.
'sink.partition-commit.trigger'='partition-time',
-- Configure an hour-level partition time extraction policy. In this example, the dt field represents the day in yyyy-MM-dd format and hour is from 0 to 23. timestamp-pattern defines how to extract a complete timestamp from the two partition fields.
'partition.time-extractor.timestamp-pattern'='$dt $ hour:00:00',
-- Configure delay to the hour level. When watermark is greater than the partition time plus one hour, you can commit the partition.
'sink.partition-commit.delay'='1 h',
-- The partition commit policy is to update metastore(addPartition) before the writing of the SUCCESS file.
'sink.partition-commit.policy.kind'='metastore,success-file'
)

SET table.sql-dialect=default;
CREATE TABLE kafka_table (
user_id STRING,
order_amount DOUBLE,
log_ts TIMESTAMP(3),
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
)

-- Dynamically specify table properties [3] by using table hints.
INSERT INTO TABLE hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;

Hive Streaming Source

SELECT * FROM hive_table
/*+ OPTIONS('streaming-source.enable'='true',
'streaming-source.consume-start-offset'='2020-05-20') */;

Real-Time Data Association with Hive Tables

SELECT
o.amout, o.currency, r.rate, o.amount * r.rate
FROM
Orders AS o
JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
ON r.currency = o.currency

Hive Enhancements

Hive Dialect Syntax Compatibility

Vectorized Reading

Simplification of Hive Dependencies

Flink Enhancements

Flink Filesystem Connector

CREATE TABLE fs_table (
user_id STRING,
order_amount DOUBLE,
dt STRING,
hour STRING
) PARTITIONED BY (dt, hour) WITH (
'connector'='filesystem',
'path'='...',
' format'='parquet',
'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.policy.kind'='success-file')
)
-- stream environment or batch environment
INSERT INTO TABLE fs_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;
-- Query by partition
SELECT * FROM fs_table WHERE dt='2020-05-20' and hour='12';

Max Slot

slotmanager.number-of-slots.max

Summary

References:

About the Authors

Original Source:

--

--

--

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

SimScale’s Values — Part 4

Java vs. Kotlin: Which is the Better Option for Android App Development?

Torah Community Manager Recruitment: Looking for someone who dares to try!

Parallelism in python — Why is using threading module in python a bad idea?

Operator Overloading Quiz

Download a file every X minutes — Curl and Windows PowerShell Automation

Keep it Short and Simple.

Activities (Android Lesson)

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

More from Medium

Recommendation System with Apache Beam as the Streaming Data-Parallel Processing Pipeline

OpenTelemetry for AI-Enabled Intelligent Systems in Production

Blending Efficient Ingestion and Querying

What is Apache Spark???