Flink Is Attempting to Build a Data Warehouse Simply by Using a Set of SQL Statements

Image for post
Image for post

A data warehouse service is a fundamental requirement for a company whose data volume has grown to a certain magnitude. A data warehouse is also an essential part of data intelligence. The fast acquisition of data feedback not only helps improve the product quality and user experience but it also enables scientific decision making for companies. This makes real-time access to data particularly important.

Currently, most enterprises build one offline data warehouse and one real-time data warehouse. They use the real-time data warehouse for businesses that require low latency and the offline data warehouse for complex businesses. In this case, the architecture requires many systems and computing frameworks and therefore is rather complex. To maintain such the architecture, enterprises must hire a wealth of talent, resulting in high labor costs. In addition, it is difficult to locate problems in the architecture, and end-users are required to learn multiple syntaxes. By analyzing the current data warehouse architecture, this article explores whether offline and real-time data warehouses can be integrated and whether Flink’s unified architecture can resolve most issues.

Data Warehouse Architecture

Image for post
Image for post

A data warehouse can be divided into the operational data store (ODS) layer, the data warehouse (DW) layer, and the application data service (ADS) layer.

1. ODS Layer

2. DW Layer

  • Data warehouse detail (DWD) layer: The data at this layer has been cleansed and is clean and correct. The data contains the same information as the ODS layer but complies with the standard schema definitions for data warehouses and databases.
  • Data warehouse service (DWS) layer: The data at this layer may have been aggregated in a lightweight manner or organized in the star or snowflake schema. Some service-layer computing is performed at this layer. Based on this layer, you can calculate the data required by data services.

3. ADS Layer

The data services are mainly provided in the following pipelines:

  1. Business databases and logs :arrow_right: Kafka :arrow_right: Real-time data warehouse (Kafka + Dimension table) :arrow_right: BI database :arrow_right: Data services
  2. Business databases and logs :arrow_right: Kafka :arrow_right: Offline data warehouse (Hive metastore (HMS) + HDFS) :arrow_right: BI database :arrow_right: Data services

Lambda architectures remain the most popular data warehouse architectures. Despite their complexity, Lambda architectures are most flexible for businesses because they can be used for the required business scenarios.

Lambda Architecture Provides Two Pipelines

  • The conventional offline data warehouse pipeline features high stability, computing complexity, and flexibility. Batch processing is performed to ensure that T+1 reports are generated and flexible ad hoc queries are supported.
  • The real-time data warehouse pipeline provides latency-sensitive data services. Conventional offline data warehouses always incur T+1 latency, so analysts cannot make real-time decisions. In contrast, the latency of the entire real-time data warehouse pipeline can be minimized to seconds. This not only accelerates analysis and decision-making but also creates possibilities for more services, such as real-time monitoring and alerting. Flink excels at real-time processing and stream processing, while Kafka is the core of real-time data warehousing.

The preceding figure indicates data conversion steps 1 to 9 during big data computing. This article will analyze these data conversions and explore the role Flink can play in this process.

Flink’s One-Stack Computing

Metadata

1. Confluent Schema Registry

2. HiveCatalog

In Flink, HiveCatalog is used in both offline and real-time data warehouses.

use catalog my_hive;
-- build streaming database and tables;
create database stream_db;
use stream_db;
create table order_table (
id long,
amount double,
user_id long,
status string,
ts timestamp,
⋯ -- 可能还有几十个字段
ts_day string,
ts_hour string
) with (
'connector.type' = 'kafka',
⋯ -- Kafka table相关配置
);
-- build batch database and tables;
create database batch_db;
use batch_db;
create table order_table like stream_db.order_table (excluding options)
partitioned by (ts_day, ts_hour)
with (
'connector.type' = 'hive',
⋯ -- Hive table相关配置
);

When HiveCatalog is used, batches and streams can be fully reused for subsequent processing, delivering the same user experience.

Data Warehousing

Previously, most data was imported by using DataStream and StreamingFileSink. However, Apache ORC was not supported and HMS cannot be updated.

After Flink streaming was integrated with Hive, Hive’s streaming sink[3] was provided, facilitating the use of SQL statements. You can use built-in functions and user-defined functions (UDFs) in SQL statements and reuse streams and batches to execute stream processing jobs and batch processing jobs concurrently.

insert into [stream_db.|batch_db.]order_table select ⋯ from log_table;

Data Processing

  1. ETL: Similar to data import, the ETL operation during batch processing is the same as that during stream processing.
  2. Dimension table joins: Adding attributes to a dimension table is common in data warehouses. In an offline data warehouse, you can directly execute an SQL statement to join a Hive table. However, this operation is different in streaming jobs. We will describe this in detail later.
  3. Aggregation: Streaming jobs generate constantly changing values during stateful computing.

Dimension Table Joins

In addition, to improve the efficiency of the join operation, a streaming job usually joins a database table instead of only a Hive table.

Example:

-- stream 维表
use stream_db;
create table user_info (
user_id long,
age int,
address,
primary key(user_id)
) with (
'connector.type' = 'jdbc',
...
);

-- 将离线数仓的维表导入实时数仓中
insert into user_info select * from batch_db.user_info;

-- 维表Join,SQL批流复用
insert into order_with_user_age select * from order_table join user_info for system_time as of order_table.proctime on user_info.user_id = user_info.user_id;

In a real-time data warehouse, you must periodically schedule and update dimension tables to a real-time dimension table database. Therefore, we can ask: Can we directly join Hive dimension tables in an offline data warehouse? Currently, the community is also developing Hive dimension tables. The challenges faced by the community are:

  • Hive dimension tables are too large to be accommodated in the cache.
  • We can consider shuffling by key and distributed dimension table joins to reduce the amount of data cached in single-threaded mode.
  • We can consider storing dimension table data in states.
  • Dimension tables need to be updated.
  • A simple solution is to set a timeout period for time-to-live (TTL).
  • A more complex solution is to implement the Hive streaming source and integrate Flink’s watermark mechanism.

Stateful Computing and Data Output

select age, avg(amount) from order_with_user_age group by age;

A simple SQL statement with an aggregate function is executed differently during stream processing and batch processing.

The greatest difference is that a streaming job is a dynamic table[4] and outputs constantly changing results. Simply put, during stream processing, the output data volume of a dynamic table is driven by the input data volume. In contrast, during batch processing, a dynamic table is output only after all the input is obtained, and therefore its results change dynamically:

  • If the dynamic table is converted in an SQL statement, the retract mechanism in Flink ensures that the result of the SQL statement is the same as the result output during batch processing.
  • If the dynamic table is stored externally, challenges are posed for the sink.

After stateful computing is performed, the data output is:

  • If the sink is an updatable database, such as an HBase, Redis, or JDBC database, we just need to keep updating the sink.
  • However, if the sink is a non-updatable storage, we cannot update the original data. To address this issue, Flink proposed support for Changelog[5] to internally support this sink and output data of a specific schema. This will improve the user experience of downstream consumers.

Example:

-- batch:计算完成后,一次性输出到mysql中,同key只有一个数据
-- streaming:mysql里面的数据不断更新,不断变化
insert into mysql_table select age, avg(amount) from order_with_user_age group by age;
-- batch: 同key只有一个数据,append即可
insert into hive_table select age, avg(amount) from order_with_user_age group by age;
-- streaming: kafka里面的数据不断append,并且多出一列,来表示这是upsert的消息,后续的Flink消费会自动做出机制来处理upsert
insert into kafka_table select age, avg(amount) from order_with_user_age group by age;

Ad Hoc and OLAP

Currently, one of the major weaknesses of real-time data warehouses is the lack of support for ad hoc queries because real-time data warehouses do not store historical data. Kafka may be able to store data for more than three days, but this leads to high storage costs and low query efficiency.

We have the idea of providing the following unified sink components for batch processing and stream processing in online analytical processing (OLAP) databases:

  • Druid sink
  • Doris sink
  • Clickhouse sink
  • HBase or Phoenix sink

Summary

  • Unified metadata
  • Unified SQL development
  • Unified data import and export
  • Unified storage in the future

References

Original Source:

Written by

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store