Flink 1.10 vs. Hive 3.0 — A Performance Comparison

Image for post
Image for post

By Li Jinsong (Zhixin)

As a unified computing engine, Flink is designed to provide a unified stream and batch processing experience and technology stack. Flink 1.9 incorporated Blink code, whereas Flink 1.10 improved a lot of functions and optimized performance. Flink now runs all TPC-DS queries and is very competitive in performance. Flink 1.10 is an SQL engine with productive-level availability and the unified batch and stream processing capabilities.

In the big data batch computing field, with the maturity of the Hive data warehouse, the most popular model is the Hive Metastore + computing engine. Common computing engines include Hive on MapReduce, Hive on Tez, Hive on Spark, Spark Integrate Hive, Presto Integrate Hive, and Flink Batch SQL, now that it has reached production availability with the release of Flink 1.10.

The performance and cost of the selected computing engine are critical for building a computing platform. Therefore, Ververica’s flink-SQL-benchmark [1] project provides a TPC-DS Benchmark test tool based on Hive Metastore to make tests more closely resemble real production jobs.

  • In the test, all input tables are standard Hive tables, and all data is stored in a Hive data warehouse that is consistent with production conditions. Meanwhile, other computing engines may easily analyze these tables.
  • Optimized Row Columnar (ORC) is a common production file format, which provides a high compression ratio and good reading performance.
  • This article uses the TPC-DS Benchmark 10-TB dataset. A 10-TB dataset is a common production scale. The data volume of only 1 TB can be handled by traditional databases and is not suitable for big data testing.

Also, it uses 20 hosts to test 3 engines: Flink 1.10, Hive 3.0 on MapReduce, and Hive 3.0 on Tez. The engine performance is tested on two dimensions:

  • Total Duration: This metric provides intuitive performance data but may be significantly affected by individual queries.
  • Geometric Mean: This metric represents the central tendency of a group of numbers and effectively eliminates the influence of individual queries for a better average value.

Overview of Results

1) Flink 1.10 vs. Hive 3.0 on MapReduce

  • The total duration performance of Flink was 870% better than that of Hive on MapReduce.
  • The geometric mean performance of Flink Queries was 780% better than that of Hive on MapReduce.

2) Flink 1.10 vs. Hive 3.0 on Tez

  • The total duration performance of Flink was 210% better than that of Hive on Tez.
  • The geometric mean performance of Flink Queries was 200% greater than that of Hive on MapReduce.

The total duration performance of the three engines is shown in the following figure:

Image for post
Image for post

The geometrical mean performance results are shown in the following figure:

Image for post
Image for post

This article only tests the data sets of the preceding engines on a 10-TB dataset. Select a specific data set based on any cluster size and use the flink-sql-benchmark tool to run comparison tests for more engines.

Benchmark Details

Environment and Optimizations

  • Computing Environment: It contains 20 servers with 64-core Intel processors, 256 GB memory, 1 SSD disk for the computing engine, multiple SATA disks for HDFS, and a 10-Gigabit network adapter.
  • Cluster Environment: Yarn + HDFS + Hive.
  • Flink Parameters: See flink-conf.yaml [2].
  • Hive Parameters: Optimize the threshold of MapJoin to improve the performance while avoiding any out of memory (OOM) errors.
  • Select the most recent Hadoop version (3.X), Hive and Tez versions.

1) Environment Preparation

  • Prepare the Hadoop (HDFS + YARN) environment.
  • Prepare the Hive environment.

2) Dataset Generation

i) Generate the TPC-DS dataset in a distributed manner and load the TEXT data set to Hive. The raw data is in CSV format. We recommend generating data in a distributed manner. This is a relatively time-consuming step (the flink-sql-benchmark tool integrates the TPC-DS tool.)
ii) Convert Hive TEXT tables to ORC tables. The ORC format is a common Hive data file format, and its row-column hybrid storage facilitates quick analysis and provides a high compression ratio. Run the following query statement:

create table ${NAME} stored as ${FILE} as select * from ${SOURCE}. ${NAME};
Image for post
Image for post

As shown in the figure, 7 fact tables and 17 dimension tables officially described by TPC-DS are generated.

While analyzing Hive tables, the statistical information is very important for the query optimization of analysis jobs. For complex SQL statements, the execution efficiency of the plan may vary greatly. Flink reads both Hive table statistics and Hive partition statistics and performs cost-based optimization (CBO) based on these statistics. Run the following command:

analyze table ${NAME} compute statistics for columns;
Image for post
Image for post

3) Run Queries on Flink

i) Prepare the Flink environment and build the Flink Yarn Session environment. We recommend using the Standalone or Session mode in order to reuse the Flink process to speed up analytical jobs.
ii) Write code to run Queries and collect execution time statistics. Directly reuse the flink-tpcds project in flink-sql-benchmark.
iii) Run FLINK_HOME/flink to start the program, execute all queries, wait for the process to end, and calculate the execution time.

Image for post
Image for post

4) Run Queries on Other Engines

i) Set up the environment according to the information on the official website of another engine.
ii) Thanks to standard Hive datasets, you may use other engines to read Hive data.
iii) During runtime, note that cluster bottlenecks, such as CPU or disk bottlenecks, must occur if the operation method and parameters are reasonable. Therefore, performance tuning is required.

Benchmark Analysis

Flink 1.9 had already done a lot of work while incorporating Blink code, including deep CodeGeneration, Binary storage and computing improved CBO, and Batch Shuffler. This laid a solid foundation for future performance breakthroughs.

Flink 1.10 continues to improve Hive integration and meet the production-level Hive integration standard. Following other improvements were made in performance and out-of-the-box use:

  • Support for Multiple Hive Versions: Supports all major versions later than Hive 1.0.
  • Vectorized ORC Reading: This feature is currently enabled by default only in Hive 2.0 and later versions.
  • Support for Hive 1.X is In-Progress: See FLINK-14802 [3].
  • The Vectorized Reading Support for Parquet is Under Development: See FLINK-11899 [4].
  • Proportional Allocation of Elastic Memory: This not only helps the Operator use more memory resources, but also greatly facilitates user configuration. With this, users no longer need to configure Operator memory. Instead, the Operator elastically acquires memory based on the Slot, making it easier to use Flink out-of-the-box. For more information, see FLIP-53 [5].
  • Shuffle Compression: By default, Flink allows intermediate data to be flushed to disks for batch jobs, which helps to avoid the possibility of scheduling deadlocks and provides a good fault tolerance mechanism. However, when a large amount of data is flushed to a disk, the disk throughput may result in a job bottleneck. Therefore, Flink 1.10 implements Shuffle compression and use CPUs for I/O performance compensation.
  • New Scheduling Framework: A new scheduling framework has also been introduced in Flink 1.10, which improves the scheduling performance of JobMaster and prevents JobMaster from becoming a performance bottleneck when the concurrency is high.

Flink Parameter Analysis

Flink 1.10 has implemented many parameter optimizations to improve the overall out-of-the-box experience. However, due to some limitations of batch and stream integration, some parameters need to be configured separately. This section provides a general analysis of these parameters.

Table Layer Parameters

  • table.optimizer.join-reorder-enabled = true: This parameter needs to be enabled manually. Currently, major engines rarely enable JoinReorder by default. Enable this parameter when statistical information is relatively complete. Generally, reorder errors are rare.
  • table.optimizer.join.broadcast-threshold = 1010241024: Change the value of this parameter from the default 1 MB to 10 MB. Currently, the Flink broadcasting mechanism needs to be improved, so the default value is 1 MB. However, when the concurrency is low, this parameter may be set to a maximum of 10 MB.
  • table.exec.resource.default-parallelism = 800: This is the Operator's concurrency setting. For an input of 10 TB, we recommend setting the concurrency to 800. We do not recommend setting an overly large value. The higher the concurrency, the greater the pressure on all parts of the system.

TaskManager Parameter Analysis

  • taskmanager.numberOfTaskSlots = 10: This parameter sets the number of slots in a single TM.
  • TaskManager memory parameters: The memory of TaskManager is divided into three types: management memory, network memory, and other JVM-related memory. Refer to the documents on the official website to set these parameters properly.
  • taskmanager.memory.process.size = 15000m: This parameter sets the total memory of TaskManager. After subtracting the size of other memory. Generally, leave 3 to 5 GB of memory in the heap.
  • taskmanager.memory.managed.size = 8000m: This parameter sets the management memory, which is used for Operator computing. A reasonable configuration allocates 300 to 800 MB memory for each slot.
  • taskmanager.network.memory.max = 2200mb: Task point-to-point communication requires four buffers. Based on the concurrency, about 2 GB is required for this parameter. Obtain this value through testing. If the buffer memory is insufficient, an exception occurs.

Network Parameter Analysis

  • taskmanager.network.blocking-shuffle.type = mmap: Shuffle read uses mmap to get the system manage the memory. This is a convenient approach.
  • taskmanager.network.blocking-shuffle.compression.enabled = true: This enables compression for Shuffle. This parameter is reused for batch and stream processing. We strongly recommend enabling compression for batch jobs. Otherwise, the disk will incur a bottleneck.

Scheduling Parameter Analysis

  • cluster.evenly-spread-out-slots = true: This parameter sets the system to evenly schedule tasks to each TaskManager, which helps achieve full resource utilization.
  • jobmanager.execution.failover-strategy = region: Global retry is enabled by default. Enable region retry to enable single-node failover.
  • restart-strategy = fixed-delay: The retry policy needs to be set manually. By default, no retry is performed.

Other timeout-related parameters are used to avoid the network jitter caused by large amounts of data during scheduling and operation and to prevent subsequent job failure.

Going forward, the Flink community will further perfect Flink functions while continuing to improve its performance:

  • They will provide SQL Gateway and JDBC Driver, which are currently provided as independent warehouses for Flink 1.10. [6] [7]
  • They will provide a Hive syntax compatibility mode for the greater convenience of Hive users.
  • They will improve the vectorized reading of ORC and Parquet.
  • N-Ary stream operator [8]: They will develop the chain framework at the table layer to further avoid overhead caused by Shuffle disk flushing.


1] https://github.com/ververica/flink-sql-benchmark
2] https://github.com/ververica/flink-sql-benchmark/blob/master/flink-tpcds/flink-conf.yaml
3] http://jira.apache.org/jira/browse/FLINK-14802
4] https://issues.apache.org/jira/browse/FLINK-11899
5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management
6] https://github.com/ververica/flink-sql-gateway
7] https://github.com/ververica/flink-jdbc-driver
8] https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+Add+N-Ary+Stream+Operator+in+Flink

Original Source:

Written by

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store