DAG 2.0: The Adaptive Execution Engine That Supported 150 Million Distributed Jobs in the 2020 Double 11

By Chen Yingda, Senior Staff Engineer at Alibaba Cloud


This article introduces DAG 2.0, the next-generation execution engine of Alibaba Cloud computing platforms. Faced with the challenges of running hundreds of millions of jobs and coping with abrupt data distribution changes of this Double 11, DAG 2.0 provided adaptive dynamic execution capabilities and a brand-new execution framework to automatically guarantee the timeliness and stability of data output.

Challenge and Background

Number of distributed jobs running on computing platforms per day

Stability is the cornerstone of computing platforms, but not all. Behind the large number of jobs and the huge amount of data are increasingly diversified job operation modes and data distribution characteristics. During large-scale events, the distribution of data changes more drastically. Manual O&M measures are no longer capable of supporting tens of millions of jobs on regular days, not to mention the hundreds of millions of jobs during the two sales periods of the nunchucks-style Double 11 this year.

Faced with these challenges, the distributed scheduling and execution framework of Alibaba Cloud computing engines, such as MaxCompute and PAI, is expected to dynamically adjust jobs in various scales with different data characteristics. Based on the dynamic adjustment capabilities of the scheduling and execution framework, computing platforms can promptly process data with less manual intervention, even during Double 11.

DAG 2.0, the next-generation execution engine of Alibaba Cloud computing platforms, is developed with these needs in mind. Since making its debut on Double 11 2019 (FY20), DAG 2.0 has provided upper-level computing platforms with even more dynamic execution capabilities in 2020 (FY21). DAG 2.0 has higher adaptive capabilities and supports various upper-level computing modes. New computing modes are also supported. DAG 2.0 played an important role in guaranteeing the timeliness and stability of data output during the entire period of this Double 11. The capabilities were clearly demonstrated with great success.

Adaptive Dynamic Execution: A “Different” Double 11

During Double 11, data characteristics including the amount and distribution of data are anything but normal. The standard performance tuning strategies for key jobs in day-to-day operations are less effective or even counterproductive during Double 11. It is possible to invest in manpower to manually ensure that jobs are executed as expected during large-scale events. However, a more effective method is to optimize execution plans when jobs are running instead of before jobs are submitted. This way, the plans can be adjusted based on how the data is distributed in real time. The DAG 2.0 execution engine provides dynamic plan adjustment and execution features to ensure normal job execution.

The following sections describe the adaptive dynamic execution features of the DAG 2.0 execution engine that played a key role in the success of Double 11 2020.

Adaptive Shuffle: Intelligent Data Orchestration to Avoid Data Skew

If data skew occurs, the data processed by a single compute node can increase by hundreds of times. This causes long job execution, which may extend to dozens of hours. This is unacceptable for high-priority jobs that have high service-level requirements for completion, especially during Double 11. Data skew brought by changes in data distribution characteristics poses a huge challenge to MaxCompute. Currently, many companies rely on manual efforts to ensure the timely completion of jobs: manually terminating jobs, modifying SQL scripts, or running jobs again. However, our solution is a system that can dynamically, adaptively orchestrate data based on real-time data statistics. Based on the DAG 2.0 framework, the system uses adaptive shuffle to adaptively explore and orchestrate shuffled data. This allows you to handle data skew issues without generating file fragments when data is dynamically written into partitions. Adaptive shuffle has been enabled for all MaxCompute jobs. Double 11 2020 demonstrates the positive effects that adaptive shuffle can bring.

Prolonged job execution caused by data skew
Intelligent data orchestration based on adaptive shuffle to avoid data skew

Performance during Double 11: The DAG 2.0 execution engine intelligently detects data skew issues and uses adaptive shuffle to schedule compute nodes and resolve such issues. Adaptive shuffle covered all production jobs related to dynamic partitions during Double 11. During peak hours on November 11th, 2020, adaptive shuffle ensured the proper running of more than 130,000 distributed jobs. For high-priority jobs, adaptive shuffle reduced data skew issues by up to 550 times. To put that in simpler terms, only 6 minutes were required to complete jobs that originally required 59 hours without manual intervention. For standard jobs, adaptive shuffle reduced data skew issues by up to thousands of times. This intelligently and effectively eliminated long tails and ensured the timely completion of baseline jobs in large-scale events.

Distribution of the 130,000 jobs for which adaptive shuffle eliminated data skew issues

Adaptive Parallelism Adjustment: Dynamic Data Distribution at the Partition Level to Optimize Resource Usage

For key baseline jobs, the MaxCompute platform uses history based optimization (HBO) to generate execution plans. However, during large-scale events, data characteristics change significantly and the platform cannot determine the parallelism based on historical information of the baseline jobs. To overcome this challenge, DAG 2.0 implements adaptive parallelism adjustment at the partition level based on real-time data distribution. This way, DAG 2.0 can make accurate decisions on parallelism adjustment in real time and prevent the negative impacts, such as data skew, caused by general parallelism adjustment policies. This allows DAG 2.0 to be applied to a wide variety of scenarios. During Double 11 2020, the platform implements dynamic parallelism adjustment for HBO-enabled baseline jobs for the first time. This greatly increases the coverage of dynamic parallelism adjustment.

Simple dynamic parallelism adjustment vs adaptive dynamic parallelism adjustment

Performance during Double 11: Based on the execution capabilities provided by improved dynamic graphs in DAG 2.0, the MaxCompute platform implements adaptive dynamic parallelism adjustment for its batch jobs during traffic peaks. During Double 11, adaptive dynamic parallelism adjustment is applied to over 10 million large-scale distributed jobs. When traffic peaked on November 1st, 2020, the adaptive dynamic parallelism adjustment precluded the scheduling of 780 million compute nodes. When traffic peaked on November 11th, 2020, the adaptive dynamic parallelism adjustment precluded the scheduling of nearly 1 billion compute nodes. In addition, the adaptive dynamic parallelism adjustment ensures the even distribution of data processed among compute nodes and maximizes the usage of cluster resources.

Conditional Join: The Optimal Choice for Real-Time Join Operations

Each join algorithm in SQL has its own advantages. For example, Broadcast Join has superior performance, whereas Sort Merge Join is more universal, which enables it to join more types of data. For production jobs, the optimizer must be able to accurately judge the size of intermediate data to choose the optimal execution plan. This is difficult to achieve and misjudgment often occurs.

Many factors contribute to misjudgment, such as inaccurate statistical data, varied data processing logic, and complex, changing data characteristics. In large-scale events like Double 11, the data characteristics change dramatically. For example, a small table may grow into a large table due to a sudden increase in data volume. If this table is still processed in the same way as a small table by using broadcast joins without any manual intervention, Out Of Memory (OOM) errors may occur, jeopardizing the stability of production pipelines.

Different join algorithms in distributed SQL

By leveraging its capabilities in dynamic logic adjustment, the DAG 2.0 execution engine offers an alternative solution. If the optimizer cannot determine the optimal join algorithm before job submission, DAG 2.0 uses conditional joins to generate various execution plans. While jobs are in progress, the DAG 2.0 execution engine collects as much as real-time data distribution information to dynamically tune the execution plan towards the optimal choice.

Implementation of conditional joins on DAG 2.0

Dynamic logic adjustment (such as shown in the following figure) applies to single joins, multiple joins, and nested joins.

Dynamic execution plan adjustment by using nested conditional joins

Performance during Double 11: On November 11th, a total of 570,000 jobs used conditional joins, accounting for 40% of batch jobs that use join operations. DAG 2.0 enables the optimizer to make correct dynamic adjustments for all of these jobs. According to the statistics collected, the optimizer selects correct broadcast joins for almost every job (98.5%). There were no OOM errors reported.

On November 11th of this year, the number of jobs that use broadcast joins nearly doubled and the percentage of such jobs among all jobs that use join operations increased by 20% compared with last year. This real-time information-based dynamic adjustment enables the optimizer to continuously pursue optimal execution plans, without being affected by a small portion of inapplicable scenarios (1.5%). In addition, for jobs with soaring data volume, DAG 2.0 automatically detects inefficient broadcast joins and changes them to merge joins as soon as possible without requiring manual intervention.

Next-Generation Execution Framework: Higher Flexibility and Efficiency

System Metric Optimization

Centralized Management of High-Load Clusters

Quasi-real-time execution framework 2.0

Take a large cluster as an example. After the horizontal scaling feature of the job-managing system component is enabled, no jobs return to the batch mode even though the QPS reaches the upper limit. The system can process an 130,000 quasi-real-time jobs every day, delivering an execution performance 10 times higher than that of the batch mode. In addition, the number of batch jobs is reduced by 23%. The horizontal scaling feature greatly eases the pressure and reduces the latency caused by a large number of batch jobs.

The DAG 2.0 execution engine works with Apsara Name Service and Distributed Lock Synchronization System (Nvwa) that coordinates jobs for clusters of Alibaba Group, to redefine the method used by the quasi-real-time framework 2.0 to call the interface of Apsara Name Service and Distributed Lock Synchronization System. This innovation effectively prevents the impact of peak traffic and job accumulation on Apsara Name Service and Distributed Lock Synchronization System resulted from a large number of service restarts and ensures the stability of other distributed components during Double 11.

Prevented impact of peak traffic on Apsara Name Service and Distributed Lock Synchronization System

Support for the PAI Elastic-Inference Engine

Prior to the release of the PAI elastic-inference engine, inference jobs are executed based on the TensorFlow model that is used by deep learning training jobs, including the logic on how to apply for and use resources, job scheduling, and failover handling. However, inference jobs are vastly different from training jobs, in terms of features. Inference jobs feature predictable computing amount, high concurrency, and high scalability of computing resources. In addition, inference jobs run on independent compute nodes. The PAI elastic-inference engine adopts a number of methods to optimize the execution of inference jobs. These methods include elastic scaling of compute nodes, oversold and oversold upgrades for guaranteed GPU resources, and dynamic data allocation.

The PAI elastic-inference engine is enabled by using PAI-EVPredict before Double 11 2020. This engine provides services across business units (BUs) within Alibaba, including search engines, Alimama, Taobao, Youku Tudou Inc., ICBU, Amap, Ele.me, and new retail. After this engine is enabled, the average queuing duration of tasks on PAI-EVPredict decreases by 95%, and the average task execution duration decreases by over 50%. The following figure shows the trend of execution duration of video feature extraction tasks performed by an algorithm recommendation team before and after the PAI elastic-inference engine is used.

Trend of execution duration of video feature extraction tasks performed by an algorithm recommendation team

Support for the Bubble Execution Mode

Bubble execution mode based on the DAG model

The bubble execution mode is being implemented across all BUs within Alibaba. Right before Double 11 2020, the bubble execution mode has been applied to over 3,200 projects across the entire Alibaba network, covering 70% of all batch jobs. These projects do not include high-priority jobs on important pipelines. On November 11th of 2020, approximately 1.5 million jobs were executed in bubble execution mode. The following figure shows the comparison results obtained in business days of a week before Double 11 2020.

Latency and resource consumption comparison between the batch execution mode and bubble execution mode

When the bubble splitting algorithm is used, the actual job statistics show that the bubble execution mode improves the execution performance of production jobs by 30% to 40% in the case of balanced resource consumption. The bubble execution mode is being applied to high-priority jobs on important pipelines and expected to improve the execution performance of jobs on the entire Alibaba network on Double 12 2020 and Double 11 2021.


However, while the improvements helped us handle existing problems effectively, we identified specific issues that persisted during large-scale events. For example, after data characteristics change, some data skew issues still persist, which affects operations, such as join queries. To address these issues, the execution engine team and the computing engine team are working together to promote the launch of an adaptive solution that uses dynamic monitoring based on adaptive skew join. The new architecture will be further integrated with upper-layer computing engines and other system components to allow even more services to reap the benefits of the upgrade and improve the platform as a whole.

Original Source:

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.