The Secrets Behind the Optimized SQL Performance of EMR Spark

By Lin Xuewei, a technical expert from the Alibaba Cloud intelligent E-MapReduce (EMR) team. Currently, Mr. Lin focuses on the optimization of open-source computing engines in EMR products.


The Alibaba Cloud E-MapReduce EMR team recently submitted their latest results to TPC-DS Perf, which showed performance and cost efficiency more than double those of our nearest rival. In fact, our nearest rival was ourselves, back in 2019. For more information, click here or check the image below.

We introduced native runtime as an inclusive optimization method that applies to many scenarios, in contrast to the optimizations made to optimizers, which apply to special scenarios. According to statistics, native runtime generally reduces the E2E duration of SQL queries by 15% to 20%. This represents a significant performance improvement in TPC-DS Perf.

The Alibaba Cloud EMR team has invested a great deal of effort and R&D resources in products, ease of use, and security to develop the popular big data product Alibaba Cloud EMR. The team has also made long-term investments and continuous efforts to develop engines that are fully compatible with open-source software. The team leverages sophisticated technologies to create technical barriers for our products. This gives customers higher cost efficiency when using open-source software stacks. Customers can also smoothly migrate their businesses to the cloud and minimize the costs of business operations on the cloud.

The outstanding achievements made in TPC-DS Perf by the Alibaba Cloud EMR team prove the team’s technical depth and prowess in Spark engine development. A series of articles will be published to introduce the optimizations and ideas that allowed us to perform so well in TPC-DS Perf in 2020. If you are developing a Spark engine or related applications in the community, you can read these articles and tell us what you think. You are also welcome to send us your resume for an opportunity to join the Alibaba Cloud EMR team.

Determination to Reach the Top of the TPC-DS-Perf Rankings for the Third Time

Based on the information you can find by clicking the TPC-DS Perf link above, the EMR team has submitted three sets of results at the 10 TB level. There is a story behind the third submission, which is described in this article. On the Perf page, TPC-DS focuses on two key metrics: performance and cost efficiency. We set a difficult goal for the third submission: to double performance and cost efficiency only using software optimization while maintaining the existing hardware.

Comparing Open-Source Spark and EMR Spark

After submitting the results, we used open-source Spark 2.4.3 to test 99 TPC-DS queries. You can compare the performance data in the following figures.

Nearly 300% Performance Improvement in the Load Phase

Nearly 600% Performance Improvement in the PT Phase

Please Note — The performance of Spark community edition 2.4.3 when executing Query 14 and Query 95 could not be tested due to an out of memory (OOM) error, so these two queries were excluded from our calculations.

The Queries that took Spark Community Edition 2.4.3 more than 200 Seconds to Execute were Singled Out for Comparison with the Corresponding Queries executed by EMR Spark.

Please Note — Among these queries, Query 78 saw a 300% performance improvement in EMR Spark, which was the lowest performance improvement among any of the queries. Query 57 performance was improved nearly 100 times over.



Common table express (CTE) materialization based on InMemoryTable Cache

Simply put, we needed to make better use of InMemoryTable Cache to reduce unnecessary repeated computations. For example, the scalar computation in Query 23A/B is very important and must be repeated. We can implement CTE-optimized mode matching to identify time-consuming operations that require repeated computations and use InMemoryTable Cache to reduce the E2E duration.

More effective filter-related optimization

Dynamic partition pruning:

This feature is only available in the latest Spark community edition 3.0.

Small table broadcast reuse:

If a small table with the filter function can filter two or more data records of fact tables, the filter effect of this table can be reused. Query 64 is a typical example.

Bloom filter before SMJ:

A Bloom filter is inserted before SMJ implementation to reduce the data volume involved in the Join process and minimize the probability of the spill disk problem.

Primary key (PK) and foreign key (FK) constraints optimization:

PK and FK information is used to provide more optimization recommendations for optimizers.

RI-Join removal:

Fact tables and dimension tables are joined by using the PK and FK. However, the Join operation is unnecessary when the columns of dimension tables are not projected.

Removal of non-PK columns from GroupBy keys:

When GroupBy keys include PK columns and non-PK columns, the non-PK columns do not affect the GroupBy results because the PK columns contain implicit Unique information.

GroupBy Push Down before JoinFast Decimal

Based on Table Analyze and Stat information during runtime, an optimizer can convert data of the decimal type to the long or int type for computing, which significantly improves performance. Many decimal computations are performed when the 99 TPC-DS queries are executed.


We introduced native runtime as an inclusive optimization method that applies to many scenarios, in contrast to the optimizations made to optimizers, which only apply to special scenarios. According to statistics, native runtime generally reduces the E2E duration of SQL queries by 15% to 20%. This represents a significant performance improvement in TPC-DS Perf.

The following section briefly introduces native runtime.

In the Whole-Stage Code Generation framework based on the Spark open source community edition, the original Java code is replaced with Weld IR code for actual execution. For more information about Weld, click here. The replacement with Weld IR is just a small part of our project. To allow Weld IR to run properly, we need to do the following:

-Expression Weld IR CodeGen (fully supported in TPC-DS)
Operators Weld IR CodeGen (SortMergeJoin is implemented in C++, and others can be replaced with Weld IR)

-Unified memory layout (OffHeap UnsafeRow -> C++ and Weld Runtime)

-Batch execution framework (the generated code is executed as one row record at a time during Java runtime, which results in a high cost for the native runtime. This high cost cannot be tolerated by Java Native Interface [JNI] and WeldRuntime.)

-Other high-performance native operators, including SortMergeJoin, PartitionBy, and CSV Parsing (which cannot be directly executed through the interfaces provided by Weld IR but can be executed in Native mode by using C++.)


This article introduces some optimizations that we made for our third submission to TPC-DS Perf. More articles in the same series will describe and analyze each optimization. If you are interested in Spark SQL, you can check back when these articles are published.

To learn more about E-MapReduce, visit the official product page

Original Source:

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store