Apache Iceberg 0.11.0: Features and Deep Integration with Flink
By Hu Zheng (Ziyi) — a technical expert from Alibaba, an Apache Iceberg Committer, and a member of Apache HBase PMC. Currently, he is mainly responsible for the solution design and development of the Flink Data Lake. He is also the author of HBase Principle and Practice.
On January 27, 2021, Apache Iceberg version 0.11.0 was released with the following core functions implemented:
1) Apache Iceberg supports changes of partitions in the Core API. It also adds the SortOrder specification to the Iceberg Format V2 — used mainly to aggregate columns with a high hash into a few files to reduce the number of small files significantly. The read efficiency is improved at the same time. The reason is that after data is written by sort, the range of min-max at the file level and page level is smaller, which helps efficient data filtering.
2) In terms of integrating Flink and Iceberg, the community has achieved the following goals:
- The Flink Streaming Reader is supported, allowing users to incrementally pull the newly generated data from the Apache Iceberg through Flink stream processing. For the stream-batch unified storage layer such as Apache Iceberg, Apache Flink is the first computing engine that implements the stream-batch unified read and write of Iceberg. It also marks a new chapter in creating a data lake architecture with stream-batch unification using Apache Flink and Apache Iceberg.
- The limit pushdown and filter pushdown of Flink Streaming and Batch Reader are implemented.
- The CDC and Upsert events are written into Apache Iceberg through the Flink computing engine, with the correctness validated based on a medium scale of data.
write.distribution-mode=hashis supported to write data in the Flink Iceberg Sink, which can reduce a large number of small files from the source.
3) For integrating Spark3 with Iceberg, the community supports a large number of high-level SQLs:
- MERGE INTO
- DELETE FROM
- ALTER TABLE … ADD/DROP PARTITION
- ALTER TABLE … WRITE ORDERED BY
- More data management operations through Call, such as merging small files and deleting expired files.
4) In terms of ecological integration, the community has achieved the following goals:
- Introduced the AWS module for integration with cloud services, such as AWS S3 and Glue Catalog; and
- Integrate the popular open-source catalog service Nessie.
In the following sections, we discuss the details of Apache Flink integration in Apache Iceberg 0.11.0.
Apache Flink Streaming Read
In Apache Iceberg 0.10.0, the following features are supported in the Flink SQL:
1) Write data to the Apache Iceberg table in streaming tasks;
2) Write data to the Apache Iceberg table in batch tasks; and
3) Read Apache Iceberg tables in batch tasks.
The reading of Apache Iceberg tables by Flink streaming tasks has been integrated into the Apache Iceberg 0.11.0. As such, users can quickly implement the data transmission and ETL operations between different Iceberg tables. If an original table A needs to be processed or widened into table B, the Streaming Reader of Apache Iceberg would be a good choice.
Additionally, Netflix states that they use Flink Streaming Reader to implement backfill and bootstrap of historical data. This means that Iceberg should be integrated into FLIP-27 in the future. For now, Netflix has provided some related practices and design work for reference.
Currently, Flink SQL and the DataStream API are both available (we recommend Flink SQL). Users can start the Flink SQL client by reading the documentation and then can start a streaming task to access the incremental data of Apache Iceberg, described as follows:
-- Submit the Flink job in the streaming mode for the current session.
SET execution. Type = streaming;-- Enable this switch because streaming read SQL will provide few job options in Flink SQL hint options.
SET table.dynamic-table-options.enabled= true;-- Read all the records from the iceberg current snapshot, and then read incremental data starting from that snapshot.
SELECT * FROM sample /*+ OPTIONS ('streaming'=' true','monitor-interval'='1 s')*/ ;-- Read all incremental data starting from the snapshot-id '3821550127947089987' (records from this snapshot will be excluded).
SELECT * FROM sample /*+ OPTIONS ('streaming'=' true','monitor-interval'='1 s ','start-snapshot-id'='3821550127947089987')*/ ;
Limit Pushdown and Filter Pushdown of Flink Source
In the Batch Source and Streaming Source of Flink, the pushdown of the Iceberg table with the Limit and Filter operations has been implemented. This means that when dealing with the following SQL that reads the Apache Iceberg table,
SELECT * FROM sample LIMIT 10;
the data can be filtered at the storage layer without reading the data from the storage layer and then throwing it to the computing engine. This significantly improves the data access efficiency.
The filter pushdown operation is similar. Currently, the following filter pushdown operations are supported, which cover nearly all typical filter pushdown operations:
SELECT * FROM sample WHERE data = 'a';
SELECT * FROM sample WHERE data != 'a';
SELECT * FROM sample WHERE data >= 'a';
SELECT * FROM sample WHERE data <= 'a';
SELECT * FROM sample WHERE data < 'a';
SELECT * FROM sample WHERE data > 'a';
SELECT * FROM sample WHERE data = 'a' AND id = 1;
SELECT * FROM sample WHERE data = 'a' OR id = 1;
SELECT * FROM sample WHERE data IS NULL;
SELECT * FROM sample WHERE NOT (id = 1);
SELECT * FROM sample WHERE data LIKE 'aaa%';
Support for CDC and Upsert Events
This feature is prevalent in the Apache Flink community, which mainly targets two core scenarios:
1) Users want to import binlog from a relational database to the Apache Iceberg data lake to provide near-real-time data analysis.
2) Users want to import the upsert stream generated by Flink streaming job AGG to the Apache Iceberg data lake. And they want to provide near-real-time data reports with the help of Apache Iceberg’s storage capability and Apache Flink’s analysis capability.
Generally speaking, each of the open source solutions available has its shortcomings: With Hive MR, only the T+1 data timeliness can be guaranteed; With Apache Kudu, HDFS and Cloud Object Storage will disconnect; With HBase, row-based storage will cause inadequate analysis capability; With Spark and Delta, the Apache Flink cannot assume full play in stream computing. Users expected Apache Iceberg to solve these problems.
We have generally divided the integration of CDC and Upsert by Flink and Iceberg into two stages:
1) In the first phase, Flink can successfully write the CDC and Upsert data into Apache Iceberg and read a correct result; and
2) In the second stage, Flink and Iceberg can pass the stability and performance tests based on a massive data set, guaranteeing the stability and performance of the whole procedure for production.
So far, we have reached the first phase in version 0.11.0 as the streaming task has been able to write the CDC and Upsert data into Apache Iceberg. And partners in China, such as Autohome and Bilibili, have verified the accuracy based on a medium volume data set.
A series of features related to performance and stability are planned for Apache Iceberg 0.12.0. Version 0.12.0 will be iconic with its Iceberg CDC and Upsert functions ready for production.
write.distribution-mode=hash is Supported to Write Data to Apache Iceberg
When writing data files of the file system in Flink streaming tasks, small files may turn out to be the problem. If the data on the source end is written to the partition without any shuffle or cluster, each task will write a large number of partitions and buckets. Consequently, there will be multiple data writing tasks for each partition, and each task generates at least one file. In a data lake architecture, such as Apache Iceberg, every checkpoint of Flink rolls over file writers to submit txn. With checkpoints submitted in minutes, a large number of small files will definitely be generated.
Currently, Apache Iceberg provides three methods to solve the problem of small files:
write.distribution-mode=hash in the Iceberg table. For example:
CREATE TABLE sample (
) PARTITIONED BY( data) WITH(
This ensures that each record is written after the shuffle is performed according to the partition key. One task at most writes each partition, which significantly reduces the number of small files. However, there is another problem: the data skew. Many business tables are partitioned by time field, and new data is written by time. Consequently, new data may be written into the same partition, resulting in a data write hotspot. We recommend users set buckets with hash under partitions so that the data of each partition is evenly distributed to each bucket. As such, only one task at most writes each bucket, which solves both the problems: small files and the data write hotspot.
Currently, creating buckets through SQL is not supported in Flink 1.11. However, users can utilize Java API to add buckets to the above table partitioned according to a data field, as follows:
.addField( Expressions.bucket(" ID", 32 bytes)
2) Perform Major Compaction on the Apache Iceberg table regularly to merge small files in the Apache Iceberg table. This is currently a Flink batch task, which users can submit through the Java API. For more information, see reference documentation.
3) Use plug-in operators to merge small files after each Flink Sink streaming task automatically. This feature is not available in the community version yet but will be released in version 0.12.0 since it involves the compaction of format v2.
Since Apache Flink’s integration with Apache Iceberg, two versions have been released in the community. The read and write capability of Flink and Iceberg featuring stream-batch unification has been implemented in both versions.
So far, there have been many successful cases related to Flink and Iceberg launching at home and abroad:
- A large amount of log data is flushed through Flink in Tencent every day and then imported to Iceberg. Tens of TBs of data is generated on a daily basis;
- Netflix imports almost all user behavior data to Iceberg through Flink’s stream computing and then stores it in AWS S3. Compared to HDFS, Flink and Iceberg help the company minimize storage costs;
- Tongcheng Yilong (merged by TravelGo and eLong) has also explored Flink and Iceberg a lot. It stored almost all the analysis data in Hive. Given Hive’s weakness in ACID and history backfill, Tongcheng Yilong studied Iceberg and found that Iceberg is very suitable for replacing their Hive storage formats. In addition, due to the excellent connection with the upper-layer computing ecosystem, it is easy to switch Hive tables to Iceberg without changing all historical computing tasks. So far, Tongcheng Yilong has switched dozens of Hive tables to Iceberg tables; and
- Autohome is one of the companies that successfully replaced Hive tables with Iceberg tables on a large scale in the production environment. At the same time, it is also the first company to use the community version of Iceberg to do CDC and Upsert data analysis PoC. They are also looking forward to more optimizations of CDC and Upsert scenarios in version 0.12.0.
In the future Apache Iceberg 0.12.0, the core functions in the figure above have been planned. Essentially, we will provide better support for CDC and Upsert scenarios using Flink and Iceberg. We will do more work to optimize stability, performance, and usability.
Finally, we talk about the status quo of Apache Iceberg applied in the computing ecosystem.
Apache Iceberg’s 0.11.0 release shows more and more advantages in ecosystem integration as a unified and universal data lake table format. Since Apache Iceberg 0.11.0 does not lean towards specific computing engines in terms of table format, the integration of computing engines is in full swing. Almost all mainstream computing engines in the big data ecosystem are connected to Iceberg in different degrees:
1) Contributors from Netflix, Tencent, and Apple significantly promote the integration of Spark and Iceberg. Tencent, Netflix, and Apple have many Spark PMCs and Spark committers in the Apache Spark community, who are very influential in the Spark and Iceberg communities. We believe that the integration of Apache Iceberg and Spark can be comparable to Databricks Delta (commercial version) in the future.
2) The Alibaba Flink team, Netflix, and the large Flink user base (both inside and outside China) are constantly promoting the integration of Flink and Iceberg;
3) The AWS Presto team and Trino team are continuously promoting the integration of Presto and Iceberg. The AWS Presto team has selected Iceberg as their table format for data lake construction. At the same time, the AWS team has done a lot of work in opening up the ecosystem of Iceberg with S3 and Glue. Apache Iceberg has become an essential part of the AWS data lake ecosystem; and
4) Cloudera has determined to select Apache Iceberg to build the commercial version of their data lake. Those who have used Hadoop may be familiar with this company. It is one of the companies with the best commercial version of Hadoop. In the future, it will launch public cloud services based on Apache Iceberg, which will bring users a perfected data lake integration experience for Flink, Spark, Hive, and Impala. Here, we focus on Apache Impala. In interactive analysis scenarios, Cloudera relies heavily on its open source Apache Impala (in fact, Impala performs better than Presto in the big data benchmark test). Apache Iceberg’s perfect abstraction of the storage layer and its inclusiveness for diversified computing engines are among the core elements that persuade Cloudera to select Apache Iceberg.