Yuan Haisheng, Senior Technical Expert at Alibaba Cloud, shared a topic called “The Technologies and Stories between MaxCompute and the Big Data Query Engine.” He mainly explained the differences and similarities between MaxCompute and Massively Parallel Processing (MPP) query engines and the Join implementation on distributed systems. Additionally, he elaborated on the optimization of the Hash Clustering Table and Range Clustering Table introduced in MaxCompute for the Join and aggregation operation.
This article is based on his speech and the related presentation slides.
Comparison between MaxCompute and MPP
MaxCompute significantly differs from Massively Parallel Processing (MPP) query engines mainly in performance, cost, scalability, and flexibility.
- Performance: The most important metric of a data warehouse is the performance. Typical MPP products include Greenplum, Vertica, and Redshift. They are mainly used for online real-time data analysis, which generally requires millisecond-level performance. MaxCompute is mostly applied in offline data scenarios. MaxCompute dynamically pulls process and data encapsulation. If MapReduce is required, MaxCompute has to deal with data landing. Therefore, offline data analysis is relatively slow, making MaxCompute unsuitable for real-time data analysis scenarios. However, MaxCompute has its advantage in scenarios with a large amount of data. MaxCompute can dynamically adjust the number of instances and guarantee enough instances are available for data processing. When a fixed number of clusters and nodes are enabled in an MPP database, the cluster computing resources may be limited in case of a relatively large amount of data.
- Cost: MaxCompute has a great cost advantage over MPP. Data is stored on Alibaba Cloud. MaxCompute only charges users for the computing resources that they actually use when performing computations. When no computation is performed, users only need to pay for storage resources. However, when a certain amount of resources are enabled in an MPP database, users will be billed even when they are not using those resources.
- Scalability: Alibaba Cloud also used MPP and initially set a fixed number of clusters. However, as the amount of the internal Alibaba Cloud business data keeps increasing later, the computing resources fall gravely short of the required amount. MaxCompute dynamically distributes resources and adjusts the number of instances in real time based on the computational complexity to ensure high scalability.
- Flexibility: In addition to processing SQL queries, MaxCompute can process MapReduce and query machine learning nodes. Due to its high scalability and flexibility, MaxCompute can support 95% of the internal data computations of Alibaba Cloud and perform a large number of tasks.
Join Implementation on Distributed Systems
Query Plan Generation: First, a user submits SQL to the parser, which then compiles it into a relational expression. The relational expression is then sent to the optimizer for a series of optimization operations, including physical and logical transformations. The cost model selects the physical execution plan with the lowest cost. The transformer transforms this optimal plan into a physical operator tree and sends it to the manager. The manager starts the instance and sends it to the RunTime to run the query. In this process, the cost model obtains statistics information (such as table width and number of rows) from the metadata to decide an optimal plan; Apache Calcite is used as the framework for the optimizer.
Optimizer Core Components: Logical operators describe what tasks to perform, such as LogicalInnerJoin for defining InnerJoin, LogicalScan for defining a scan, and LogicalFilter for defining filtering. Physical operators describe how to perform tasks. For example, HashJoin, MergeJoin, and NLJoin represent different algorithms; IndexScan represents an index scan and TableScan represents a full-table scan; PhysicalFilter is a physical filter. A logical operator can be transformed to a new logical operator through the logical transformation rules or a physical operator through the logical implementation rules, for example, transforming InnerJoin to HashJoin. A physical operator can produce a new physical operator through the physical property enforcement, such as Distribution for implementing the distribution property and Sort for the sorting property.
The following diagram shows how MaxCompute generates a Join plan. First, the Inner Join generates a new physical plan through the PartitionedJoinRule-Sort Merge Join, which is stored in the Apsara Distributed File System and does not meet the distribution property. Therefore, MaxCompute needs to perform an exchange operation.
That is, perform the Shuffle operation based on T1.a and T2.b and then run Sort Merge. If T1.a and T2.b have the same value, they will be grouped into the same bucket. Different buckets will start multiple instances, each instance processing each bucket to implement distributed computing. In this process, the Shuffle operation occupies a relatively large number of resources, because it not only involves data reads/writes but also sorting. The key point of the optimization lies in how to reduce sorting operations to speed up data processing.
Assume that T1 or T2 is relatively small. Then the full T2 table can be broadcast to T1 for Hash Join. In this way, the Shuffle operation does not have to be performed on T1 many times, and Hash computing and sorting are also not required T2. At this point, the Join plan only consists of two stages. In the M2 stage, a scan is performed on T2. Then T2 is broadcast to T1. Shuffle is not required for T1. The data in the full T2 table is used to create a Hash table, and a part of the data in T1 is used for the Hash Build operation. Then the user can obtain the final results of Hash Join.
Hash Clustering Table and Range Clustering Optimization Introduced in MaxCompute for Join and Aggregation
Hash Clustering Table
The Join implementation on a distributed system involves many Shuffle operations. To optimize the implementation, MaxCompute introduced Hash Clustering Tables. A Hash Clustering Table performs hashing on selected columns and assigns different data entries to different buckets. This means that shuffling and sorting have already been performed when the Hash Clustering Table is created. The basic syntax is shown in the following picture, where CLUSTERED BY indicates shuffling by column and SORTED BY indicates sorting by column. 2ⁿ is the recommended value of the number of buckets to make it easy to join another table. At the same time, we recommend that the columns in CLUSTERED BY and SORTED BY be set to the same or that the columns in SORTED BY is a subset of the columns in CLUSTERED BY. This is because a Hash Clustering Table is usually used for the Join and Shuffle Remove operations to avoid unnecessary shuffling and sorting with its own table properties and implement the performance optimization.
The detailed steps are shown in the following picture. Merge Join sends a request to T1 to pull the properties of T1. Assume that T1 is a Hash Clustering Table. The feedback of T1 is hashing by T1.a. T1 has 100 buckets and sorting is performed by T1.a. The same is true for T2. At this point, the generated Join plan can meet the sorting requirement for M1, M2, and R3. Finally, all the operators only need one stage (M1) and unnecessary shuffling is not required.
In contrast, if the feedback of T2 is None, Merge Join will send a request to hash and sort T2 by T2.b and set 100 buckets. The Join plan generated at this point consists of two stages (M1 and M2). The Shuffle operation is required for T2 but no for T1, eliminating the Shuffle operation in one stage.
Consider this scenario: The feedback of T2 is hashing by T2.b, 100 buckets are ready, and the sorting is not by T2.b. In this case, Merge Join still requests T2 to sort by T2.b. At this time, the Join plan only has one stage (M1) and only a Sort operator is added, without redundant Shuffle.
If 200 buckets are set for T2, the 100 buckets of T1 will be read twice for filtering. Each bucket in T1 corresponds to two buckets in T2. Still, no shuffle is required.
Limitations of Hash Clustering Tables: A Hash Clustering Table has obvious limitations in data skew. Hashing a very large amount of data to one bucket will slow down the computing for the entire cluster. Hash clustering tables only support bucket pruning for equality conditions. If the table is clustered by column a, and we have a filter condition a = 5, we can compute the hash value of 5, and find the bucket that contains tuples with a = 5. In case of non-equivalence, a Hash Clustering Table cannot support bucket pruning. For a Hash Clustering Table, all the clustering keys must present in aggregation or join keys. In case of “CLUSTERED BY C1, C2; GROUP BY C1”, a Hash Clustering Table cannot support the performance optimization. Similarly, in a case like CLUSTERED BY C1, C2; … Join… ON a.C1 == b.C1, optimization cannot be implemented, either. A Hash Clustering Table requires that Join keys include C1 and C2.
Range Clustering Table
As its name implies, a Range Clustering Table is sorted by range. MaxCompute automatically decides the range of each bucket.
How does a Range Clustering Table decide the range of a bucket? As shown in the following diagram, the upper layer is Mappers, the middle layer is Job Manager and the bottom layer is Reducers. First, sorting is performed in Stage 1. Then histograms are generated. Each Worker sends a corresponding histogram to the Job Manager. The Job Manager merges the histograms and decides how many buckets to be merged into according to the data volume. The Job Manager sends the ranges of the buckets to the Mappers, which decide to which bucket each piece of data is sent. Finally, the Reducers will provide the actual aggregation results.
Range Clustering Tables have obvious advantages. Range Clustering Tables support range comparison. Meanwhile, Range Clustering Tables support aggregation and join on prefix keys. That means Range Clustering Tables can support performance optimization even in case of “CLUSTERED BY C1, C2; GROUP BY C1”.
How a Range Clustering Table enables the Join implementation: Assume that the ranges of T1 and T2 are different, as shown in the following picture. Therefore, T1 and T2 cannot be joined directly. In this case, it is necessary to split the ranges and send the split ranges to the Join Workers, which read new ranges. As shown in the following picture, w0 reads the first split range of T1 and prunes the range intervals in T2 that are not overlapped with the first range of T1.
How Range Clustering Supports the Join on Prefix Keys: The Join on prefix keys requires the reassignment of histograms and buckets. Assume that clustering is based on a and b. From the histograms, we know at which point A is split. After buckets are reassigned, the ranges of buckets are also updated. New bucket ranges are sent to the Join Worker.
The following picture shows the time comparison of the TPCH queries in a range table and a normal table. As the picture shows, the overall query speed in the range table is increased by 60–70%. Queries 5, 17, and 21 in the range table are several times faster than in the normal table.
Tips for Clustering Table
How can we choose proper clustering keys to save resources and improve the execution performance? I have several tips for you. If you have Join condition keys, Distinct values, Where condition keys (EQUAL/IN, GT/GE/LT/BETWEEN), you can create a Clustering Table targeting these existing keys. For Aggregate keys, you can create a Range Clustering Table. For Window keys, you can create clustering keys and sort keys based on Partition keys and Order keys. For example, if the code is SELECT row_number() OVER (PARTITION BY a ORDER BY b) FROM foo, the plan generated when the optimizer runs the Window is CLUSTERED BY a SORTED BY a, b INTO x BUCKETS, that is, to shuffle by a and sort by a and b. In a bucket, not all the a values are the same. A value that is different from a specific A value can be seen as a frame. In the frame, you need to sort by B. Therefore, each instance performs sorting by A and B. In this way, pre-calculation is omitted, and shuffling and sorting are not required.
Note that the Shuffle operation is still needed when two Hash tables are of different types, even though they have the same distribution and sorting and the same number of buckets. In addition, it will take a long time to create a Clustering Table. It will waste resources if a Clustering Table is created but queries are not frequently performed. Avoid data skew in a Clustering Table as much as possible. Rewriting is needed when you use FULL OUTER Join for incremental updates.
Use FULL OUTER Join for incremental updates: The example in the following picture involves two tables, a snapshot table and a delta table. The Join keys used are s.key and d.key. However, when an expression is inserted into a new partition, it cannot be determined whether the SQL expression can meet the data sorting requirement. Therefore, you have to shuffle the data again. In the picture, ANTI JOIN and UNION ALL are added to the SQL expression. ANTI JOIN uses the sorting property and UNION ALL allows distribution and sorting by the original keys. In this way, the Shuffle Remove can be completely implemented.
Suggestions on Clustering Table partitions: You need to consider the partition size when creating a Clustering Table. Partitions that are too small do not leave much room for optimization. In addition, they may cause too many small files. If 1,000 buckets are set, 1,000 small files will be produced, causing great burden on the Mappers. Note that it is more advantageous to create a Clustering Table if a table has a higher partition read-write ratio. Because it takes a relatively long time to create a Clustering Table, it is more advantageous if the read operations are frequent. The higher the field utilization rate of a table is (column pruning is relatively infrequent), the more advantageous it is to create a Clustering Table. A low data utilization rate after column pruning indicates that too much time is wasted. Therefore, a Clustering Table does not provide convenience in this case.