Flink 1.10 vs. Hive 3.0 — A Performance Comparison

  • In the test, all input tables are standard Hive tables, and all data is stored in a Hive data warehouse that is consistent with production conditions. Meanwhile, other computing engines may easily analyze these tables.
  • Optimized Row Columnar (ORC) is a common production file format, which provides a high compression ratio and good reading performance.
  • This article uses the TPC-DS Benchmark 10-TB dataset. A 10-TB dataset is a common production scale. The data volume of only 1 TB can be handled by traditional databases and is not suitable for big data testing.
  • Total Duration: This metric provides intuitive performance data but may be significantly affected by individual queries.
  • Geometric Mean: This metric represents the central tendency of a group of numbers and effectively eliminates the influence of individual queries for a better average value.

Overview of Results

  • The total duration performance of Flink was 870% better than that of Hive on MapReduce.
  • The geometric mean performance of Flink Queries was 780% better than that of Hive on MapReduce.
  • The total duration performance of Flink was 210% better than that of Hive on Tez.
  • The geometric mean performance of Flink Queries was 200% greater than that of Hive on MapReduce.

Benchmark Details

Benchmark Environment

  • Computing Environment: It contains 20 servers with 64-core Intel processors, 256 GB memory, 1 SSD disk for the computing engine, multiple SATA disks for HDFS, and a 10-Gigabit network adapter.
  • Cluster Environment: Yarn + HDFS + Hive.
  • Flink Parameters: See flink-conf.yaml [2].
  • Hive Parameters: Optimize the threshold of MapJoin to improve the performance while avoiding any out of memory (OOM) errors.
  • Select the most recent Hadoop version (3.X), Hive and Tez versions.

Benchmark Procedure

  • Prepare the Hadoop (HDFS + YARN) environment.
  • Prepare the Hive environment.
create table ${NAME} stored as ${FILE} as select * from ${SOURCE}. ${NAME};
analyze table ${NAME} compute statistics for columns;

Benchmark Analysis

Flink 1.10

  • Support for Multiple Hive Versions: Supports all major versions later than Hive 1.0.
  • Vectorized ORC Reading: This feature is currently enabled by default only in Hive 2.0 and later versions.
  • Support for Hive 1.X is In-Progress: See FLINK-14802 [3].
  • The Vectorized Reading Support for Parquet is Under Development: See FLINK-11899 [4].
  • Proportional Allocation of Elastic Memory: This not only helps the Operator use more memory resources, but also greatly facilitates user configuration. With this, users no longer need to configure Operator memory. Instead, the Operator elastically acquires memory based on the Slot, making it easier to use Flink out-of-the-box. For more information, see FLIP-53 [5].
  • Shuffle Compression: By default, Flink allows intermediate data to be flushed to disks for batch jobs, which helps to avoid the possibility of scheduling deadlocks and provides a good fault tolerance mechanism. However, when a large amount of data is flushed to a disk, the disk throughput may result in a job bottleneck. Therefore, Flink 1.10 implements Shuffle compression and use CPUs for I/O performance compensation.
  • New Scheduling Framework: A new scheduling framework has also been introduced in Flink 1.10, which improves the scheduling performance of JobMaster and prevents JobMaster from becoming a performance bottleneck when the concurrency is high.

Flink Parameter Analysis

  • table.optimizer.join-reorder-enabled = true: This parameter needs to be enabled manually. Currently, major engines rarely enable JoinReorder by default. Enable this parameter when statistical information is relatively complete. Generally, reorder errors are rare.
  • table.optimizer.join.broadcast-threshold = 1010241024: Change the value of this parameter from the default 1 MB to 10 MB. Currently, the Flink broadcasting mechanism needs to be improved, so the default value is 1 MB. However, when the concurrency is low, this parameter may be set to a maximum of 10 MB.
  • table.exec.resource.default-parallelism = 800: This is the Operator's concurrency setting. For an input of 10 TB, we recommend setting the concurrency to 800. We do not recommend setting an overly large value. The higher the concurrency, the greater the pressure on all parts of the system.
  • taskmanager.numberOfTaskSlots = 10: This parameter sets the number of slots in a single TM.
  • TaskManager memory parameters: The memory of TaskManager is divided into three types: management memory, network memory, and other JVM-related memory. Refer to the documents on the official website to set these parameters properly.
  • taskmanager.memory.process.size = 15000m: This parameter sets the total memory of TaskManager. After subtracting the size of other memory. Generally, leave 3 to 5 GB of memory in the heap.
  • taskmanager.memory.managed.size = 8000m: This parameter sets the management memory, which is used for Operator computing. A reasonable configuration allocates 300 to 800 MB memory for each slot.
  • taskmanager.network.memory.max = 2200mb: Task point-to-point communication requires four buffers. Based on the concurrency, about 2 GB is required for this parameter. Obtain this value through testing. If the buffer memory is insufficient, an exception occurs.
  • taskmanager.network.blocking-shuffle.type = mmap: Shuffle read uses mmap to get the system manage the memory. This is a convenient approach.
  • taskmanager.network.blocking-shuffle.compression.enabled = true: This enables compression for Shuffle. This parameter is reused for batch and stream processing. We strongly recommend enabling compression for batch jobs. Otherwise, the disk will incur a bottleneck.
  • cluster.evenly-spread-out-slots = true: This parameter sets the system to evenly schedule tasks to each TaskManager, which helps achieve full resource utilization.
  • jobmanager.execution.failover-strategy = region: Global retry is enabled by default. Enable region retry to enable single-node failover.
  • restart-strategy = fixed-delay: The retry policy needs to be set manually. By default, no retry is performed.

Flink 1.11 and Future Plans

  • They will provide SQL Gateway and JDBC Driver, which are currently provided as independent warehouses for Flink 1.10. [6] [7]
  • They will provide a Hive syntax compatibility mode for the greater convenience of Hive users.
  • They will improve the vectorized reading of ORC and Parquet.
  • N-Ary stream operator [8]: They will develop the chain framework at the table layer to further avoid overhead caused by Shuffle disk flushing.

References

Original Source:

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com