Flink 1.10 vs. Hive 3.0 — A Performance Comparison

  • Optimized Row Columnar (ORC) is a common production file format, which provides a high compression ratio and good reading performance.
  • This article uses the TPC-DS Benchmark 10-TB dataset. A 10-TB dataset is a common production scale. The data volume of only 1 TB can be handled by traditional databases and is not suitable for big data testing.
  • Geometric Mean: This metric represents the central tendency of a group of numbers and effectively eliminates the influence of individual queries for a better average value.

Overview of Results

1) Flink 1.10 vs. Hive 3.0 on MapReduce

  • The geometric mean performance of Flink Queries was 780% better than that of Hive on MapReduce.
  • The geometric mean performance of Flink Queries was 200% greater than that of Hive on MapReduce.

Benchmark Details

Benchmark Environment

Environment and Optimizations

  • Cluster Environment: Yarn + HDFS + Hive.
  • Flink Parameters: See flink-conf.yaml [2].
  • Hive Parameters: Optimize the threshold of MapJoin to improve the performance while avoiding any out of memory (OOM) errors.
  • Select the most recent Hadoop version (3.X), Hive and Tez versions.

Benchmark Procedure

1) Environment Preparation

  • Prepare the Hive environment.
create table ${NAME} stored as ${FILE} as select * from ${SOURCE}. ${NAME};
analyze table ${NAME} compute statistics for columns;

Benchmark Analysis

Flink 1.10

Flink 1.9 had already done a lot of work while incorporating Blink code, including deep CodeGeneration, Binary storage and computing improved CBO, and Batch Shuffler. This laid a solid foundation for future performance breakthroughs.

  • Vectorized ORC Reading: This feature is currently enabled by default only in Hive 2.0 and later versions.
  • Support for Hive 1.X is In-Progress: See FLINK-14802 [3].
  • The Vectorized Reading Support for Parquet is Under Development: See FLINK-11899 [4].
  • Proportional Allocation of Elastic Memory: This not only helps the Operator use more memory resources, but also greatly facilitates user configuration. With this, users no longer need to configure Operator memory. Instead, the Operator elastically acquires memory based on the Slot, making it easier to use Flink out-of-the-box. For more information, see FLIP-53 [5].
  • Shuffle Compression: By default, Flink allows intermediate data to be flushed to disks for batch jobs, which helps to avoid the possibility of scheduling deadlocks and provides a good fault tolerance mechanism. However, when a large amount of data is flushed to a disk, the disk throughput may result in a job bottleneck. Therefore, Flink 1.10 implements Shuffle compression and use CPUs for I/O performance compensation.
  • New Scheduling Framework: A new scheduling framework has also been introduced in Flink 1.10, which improves the scheduling performance of JobMaster and prevents JobMaster from becoming a performance bottleneck when the concurrency is high.

Flink Parameter Analysis

Flink 1.10 has implemented many parameter optimizations to improve the overall out-of-the-box experience. However, due to some limitations of batch and stream integration, some parameters need to be configured separately. This section provides a general analysis of these parameters.

  • table.optimizer.join.broadcast-threshold = 1010241024: Change the value of this parameter from the default 1 MB to 10 MB. Currently, the Flink broadcasting mechanism needs to be improved, so the default value is 1 MB. However, when the concurrency is low, this parameter may be set to a maximum of 10 MB.
  • table.exec.resource.default-parallelism = 800: This is the Operator's concurrency setting. For an input of 10 TB, we recommend setting the concurrency to 800. We do not recommend setting an overly large value. The higher the concurrency, the greater the pressure on all parts of the system.
  • TaskManager memory parameters: The memory of TaskManager is divided into three types: management memory, network memory, and other JVM-related memory. Refer to the documents on the official website to set these parameters properly.
  • taskmanager.memory.process.size = 15000m: This parameter sets the total memory of TaskManager. After subtracting the size of other memory. Generally, leave 3 to 5 GB of memory in the heap.
  • taskmanager.memory.managed.size = 8000m: This parameter sets the management memory, which is used for Operator computing. A reasonable configuration allocates 300 to 800 MB memory for each slot.
  • taskmanager.network.memory.max = 2200mb: Task point-to-point communication requires four buffers. Based on the concurrency, about 2 GB is required for this parameter. Obtain this value through testing. If the buffer memory is insufficient, an exception occurs.
  • taskmanager.network.blocking-shuffle.compression.enabled = true: This enables compression for Shuffle. This parameter is reused for batch and stream processing. We strongly recommend enabling compression for batch jobs. Otherwise, the disk will incur a bottleneck.
  • jobmanager.execution.failover-strategy = region: Global retry is enabled by default. Enable region retry to enable single-node failover.
  • restart-strategy = fixed-delay: The retry policy needs to be set manually. By default, no retry is performed.

Flink 1.11 and Future Plans

Going forward, the Flink community will further perfect Flink functions while continuing to improve its performance:

  • They will provide a Hive syntax compatibility mode for the greater convenience of Hive users.
  • They will improve the vectorized reading of ORC and Parquet.
  • N-Ary stream operator [8]: They will develop the chain framework at the table layer to further avoid overhead caused by Shuffle disk flushing.

References

1] https://github.com/ververica/flink-sql-benchmark
2] https://github.com/ververica/flink-sql-benchmark/blob/master/flink-tpcds/flink-conf.yaml
3] http://jira.apache.org/jira/browse/FLINK-14802
4] https://issues.apache.org/jira/browse/FLINK-11899
5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management
6] https://github.com/ververica/flink-sql-gateway
7] https://github.com/ververica/flink-jdbc-driver
8] https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+Add+N-Ary+Stream+Operator+in+Flink

Original Source:

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store