DAG 2.0: The Adaptive Execution Engine That Supported 150 Million Distributed Jobs in the 2020 Double 11

Alibaba Cloud
15 min readFeb 4, 2021

--

By Chen Yingda, Senior Staff Engineer at Alibaba Cloud

Preface

This Double 11, the largest number of jobs were run with the least manual intervention in the history of big promotional events. The technical support for this Double 11 featured dynamic adjustment, intelligent data orchestration, and the bubble execution mode. The dynamic adjustment feature reduced the number of scheduled compute nodes by 1 billion on November 11 alone. The intelligent data orchestration feature expedited the execution of a single high-priority job on an important pipeline by dozens of hours. The bubble execution mode improved the performance of millions of jobs by more than 30%.

This article introduces DAG 2.0, the next-generation execution engine of Alibaba Cloud computing platforms. Faced with the challenges of running hundreds of millions of jobs and coping with abrupt data distribution changes of this Double 11, DAG 2.0 provided adaptive dynamic execution capabilities and a brand-new execution framework to automatically guarantee the timeliness and stability of data output.

Challenge and Background

Double 11 is undoubtedly the best opportunity to verify the stability and scalability of the distributed scheduling and execution frameworks of Alibaba Cloud core computing platforms. On a regular day of this year, more than 10 million distributed jobs are scheduled and executed on the computing platforms. During Double 11 of this year (from November 1st to November 11th), the total number of jobs exceeded 150 million, the peak number of jobs per day exceeded 16 million, and the amount of data processed per day exceeded 1.7 EB. As Alibaba Group CTO Lusu said, “The ‘abnormality’ of Double 11 eventually becomes the ‘normality’ of regular days.” Take the number of distributed jobs scheduled and executed on computing platforms per day during Double 11 as an example. This number increases by more than 50% compared with last year, and the peak of the previous Double 11 eventually becomes commonplace in the coming year.

Number of distributed jobs running on computing platforms per day

Stability is the cornerstone of computing platforms, but not all. Behind the large number of jobs and the huge amount of data are increasingly diversified job operation modes and data distribution characteristics. During large-scale events, the distribution of data changes more drastically. Manual O&M measures are no longer capable of supporting tens of millions of jobs on regular days, not to mention the hundreds of millions of jobs during the two sales periods of the nunchucks-style Double 11 this year.

Faced with these challenges, the distributed scheduling and execution framework of Alibaba Cloud computing engines, such as MaxCompute and PAI, is expected to dynamically adjust jobs in various scales with different data characteristics. Based on the dynamic adjustment capabilities of the scheduling and execution framework, computing platforms can promptly process data with less manual intervention, even during Double 11.

DAG 2.0, the next-generation execution engine of Alibaba Cloud computing platforms, is developed with these needs in mind. Since making its debut on Double 11 2019 (FY20), DAG 2.0 has provided upper-level computing platforms with even more dynamic execution capabilities in 2020 (FY21). DAG 2.0 has higher adaptive capabilities and supports various upper-level computing modes. New computing modes are also supported. DAG 2.0 played an important role in guaranteeing the timeliness and stability of data output during the entire period of this Double 11. The capabilities were clearly demonstrated with great success.

Adaptive Dynamic Execution: A “Different” Double 11

The effectiveness of an execution plan for distributed jobs depends on how an engine optimizer predicts the characteristics of data. However, online data is complex and dynamic, and different processing strategies are required for different scenarios. These factors significantly affect the prediction accuracy of an optimizer.

During Double 11, data characteristics including the amount and distribution of data are anything but normal. The standard performance tuning strategies for key jobs in day-to-day operations are less effective or even counterproductive during Double 11. It is possible to invest in manpower to manually ensure that jobs are executed as expected during large-scale events. However, a more effective method is to optimize execution plans when jobs are running instead of before jobs are submitted. This way, the plans can be adjusted based on how the data is distributed in real time. The DAG 2.0 execution engine provides dynamic plan adjustment and execution features to ensure normal job execution.

The following sections describe the adaptive dynamic execution features of the DAG 2.0 execution engine that played a key role in the success of Double 11 2020.

Adaptive Shuffle: Intelligent Data Orchestration to Avoid Data Skew

Data skew has always been a pain point of distributed systems. All the components across the data processing link, including storage devices, optimizers, and compute engines, have adopted multiple methods to avoid data skew. However, data skew is almost impossible to eliminate due to the characteristics of data distribution. During Double 11, data skew occurs more frequently. For example, the sales volume of hot items is skyrocketing, which causes data skew. Most transactions are completed in the early morning of November 11th, which also results in data skew.

If data skew occurs, the data processed by a single compute node can increase by hundreds of times. This causes long job execution, which may extend to dozens of hours. This is unacceptable for high-priority jobs that have high service-level requirements for completion, especially during Double 11. Data skew brought by changes in data distribution characteristics poses a huge challenge to MaxCompute. Currently, many companies rely on manual efforts to ensure the timely completion of jobs: manually terminating jobs, modifying SQL scripts, or running jobs again. However, our solution is a system that can dynamically, adaptively orchestrate data based on real-time data statistics. Based on the DAG 2.0 framework, the system uses adaptive shuffle to adaptively explore and orchestrate shuffled data. This allows you to handle data skew issues without generating file fragments when data is dynamically written into partitions. Adaptive shuffle has been enabled for all MaxCompute jobs. Double 11 2020 demonstrates the positive effects that adaptive shuffle can bring.

Prolonged job execution caused by data skew
Intelligent data orchestration based on adaptive shuffle to avoid data skew

Performance during Double 11: The DAG 2.0 execution engine intelligently detects data skew issues and uses adaptive shuffle to schedule compute nodes and resolve such issues. Adaptive shuffle covered all production jobs related to dynamic partitions during Double 11. During peak hours on November 11th, 2020, adaptive shuffle ensured the proper running of more than 130,000 distributed jobs. For high-priority jobs, adaptive shuffle reduced data skew issues by up to 550 times. To put that in simpler terms, only 6 minutes were required to complete jobs that originally required 59 hours without manual intervention. For standard jobs, adaptive shuffle reduced data skew issues by up to thousands of times. This intelligently and effectively eliminated long tails and ensured the timely completion of baseline jobs in large-scale events.

Distribution of the 130,000 jobs for which adaptive shuffle eliminated data skew issues

Adaptive Parallelism Adjustment: Dynamic Data Distribution at the Partition Level to Optimize Resource Usage

In the industry, the most common method to dynamically adjust parallelism for distributed jobs is based on the application master (AM). To estimate parallelism, the AM obtains the total amount of output data from upstream nodes and divides the total data amount by the data amount that each compute node is expected to process. If the number of compute nodes in the original execution plan exceeds the number of nodes required to process the actual output data, the AM performs a scale-down of the parallelism for dynamic adjustment. The specific scale-down ratio is based on the original parallelism and adjusted parallelism. The AM also uses a single compute node to merge adjacent data partitions. This is a simple method and is suitable only when data is evenly distributed. However, it is almost impossible to achieve an even data distribution for production jobs, especially during Double 11. If this method is used to process jobs such as these, serious data skew may occur after the data is merged.

For key baseline jobs, the MaxCompute platform uses history based optimization (HBO) to generate execution plans. However, during large-scale events, data characteristics change significantly and the platform cannot determine the parallelism based on historical information of the baseline jobs. To overcome this challenge, DAG 2.0 implements adaptive parallelism adjustment at the partition level based on real-time data distribution. This way, DAG 2.0 can make accurate decisions on parallelism adjustment in real time and prevent the negative impacts, such as data skew, caused by general parallelism adjustment policies. This allows DAG 2.0 to be applied to a wide variety of scenarios. During Double 11 2020, the platform implements dynamic parallelism adjustment for HBO-enabled baseline jobs for the first time. This greatly increases the coverage of dynamic parallelism adjustment.

Simple dynamic parallelism adjustment vs adaptive dynamic parallelism adjustment

Performance during Double 11: Based on the execution capabilities provided by improved dynamic graphs in DAG 2.0, the MaxCompute platform implements adaptive dynamic parallelism adjustment for its batch jobs during traffic peaks. During Double 11, adaptive dynamic parallelism adjustment is applied to over 10 million large-scale distributed jobs. When traffic peaked on November 1st, 2020, the adaptive dynamic parallelism adjustment precluded the scheduling of 780 million compute nodes. When traffic peaked on November 11th, 2020, the adaptive dynamic parallelism adjustment precluded the scheduling of nearly 1 billion compute nodes. In addition, the adaptive dynamic parallelism adjustment ensures the even distribution of data processed among compute nodes and maximizes the usage of cluster resources.

Conditional Join: The Optimal Choice for Real-Time Join Operations

For traditional big data jobs, the execution plan is typically determined by the optimizer before the jobs are submitted and cannot be adjusted when the jobs are in progress. This mechanism requires superior prejudgment from the optimizer, a capability that is often unfulfilled in the actual execution of production jobs.

Each join algorithm in SQL has its own advantages. For example, Broadcast Join has superior performance, whereas Sort Merge Join is more universal, which enables it to join more types of data. For production jobs, the optimizer must be able to accurately judge the size of intermediate data to choose the optimal execution plan. This is difficult to achieve and misjudgment often occurs.

Many factors contribute to misjudgment, such as inaccurate statistical data, varied data processing logic, and complex, changing data characteristics. In large-scale events like Double 11, the data characteristics change dramatically. For example, a small table may grow into a large table due to a sudden increase in data volume. If this table is still processed in the same way as a small table by using broadcast joins without any manual intervention, Out Of Memory (OOM) errors may occur, jeopardizing the stability of production pipelines.

Different join algorithms in distributed SQL

By leveraging its capabilities in dynamic logic adjustment, the DAG 2.0 execution engine offers an alternative solution. If the optimizer cannot determine the optimal join algorithm before job submission, DAG 2.0 uses conditional joins to generate various execution plans. While jobs are in progress, the DAG 2.0 execution engine collects as much as real-time data distribution information to dynamically tune the execution plan towards the optimal choice.

Implementation of conditional joins on DAG 2.0

Dynamic logic adjustment (such as shown in the following figure) applies to single joins, multiple joins, and nested joins.

Dynamic execution plan adjustment by using nested conditional joins

Performance during Double 11: On November 11th, a total of 570,000 jobs used conditional joins, accounting for 40% of batch jobs that use join operations. DAG 2.0 enables the optimizer to make correct dynamic adjustments for all of these jobs. According to the statistics collected, the optimizer selects correct broadcast joins for almost every job (98.5%). There were no OOM errors reported.

On November 11th of this year, the number of jobs that use broadcast joins nearly doubled and the percentage of such jobs among all jobs that use join operations increased by 20% compared with last year. This real-time information-based dynamic adjustment enables the optimizer to continuously pursue optimal execution plans, without being affected by a small portion of inapplicable scenarios (1.5%). In addition, for jobs with soaring data volume, DAG 2.0 automatically detects inefficient broadcast joins and changes them to merge joins as soon as possible without requiring manual intervention.

Next-Generation Execution Framework: Higher Flexibility and Efficiency

This year the computing platforms upgraded the quasi-real-time execution framework to 2.0 based on the DAG 2.0 execution engine. In the framework 2.0, resources and jobs are managed separately and the job-managing system component supports horizontal scaling, contributing to the successful handling of the data generated on Double 11. In addition, the DAG 2.0 execution engine tunes performance based on the implementation logic of the core state machine that is used for event processing, so as to optimize execution of jobs on various scales. This optimization enhances the performance of more than tens of millions of jobs that run every day.

System Metric Optimization

As the quasi-real-time execution framework 2.0 was widely used and continuously optimized, the overheads caused by job execution on offline computing nodes in Double 11 2020 decreased by 2.8 times compared with last year. The proportion of the job running time that generates overheads to the total running time of computing nodes was also reduced by several times. The reduction in scheduling overheads saved the costs of thousands of physical machines for Alibaba Group. The total number of quasi-real-time jobs that were running on a single day exceeded 10 million, up by 65% compared with last year, whereas the overheads decreased by 1.9 times. The proportion of sub-second jobs increased by 2.4 times. The probability of quasi-real-time jobs returning to the batch mode reduced by 2.9 times.

Centralized Management of High-Load Clusters

The stress testing conducted before Double 11 2020 found that in some large clusters, the QPS of quasi-real-time services reached the upper limit due to the soaring number of jobs. This caused a large number of jobs to return to the batch mode. If the job parallelism on a Fuxi master reaches the upper limit, these batch jobs need to queue for a long time before being processed. To address this issue, the quasi-real-time execution framework 2.0 allows the job-managing system component to scale horizontally, so that more jobs can be executed in quasi-real-time mode, which significantly improves job performance and shortens queuing time. This enables the system to efficiently cope with the sharp increase in data volume on Double 11 without requiring additional cluster resources.

Quasi-real-time execution framework 2.0

Take a large cluster as an example. After the horizontal scaling feature of the job-managing system component is enabled, no jobs return to the batch mode even though the QPS reaches the upper limit. The system can process an 130,000 quasi-real-time jobs every day, delivering an execution performance 10 times higher than that of the batch mode. In addition, the number of batch jobs is reduced by 23%. The horizontal scaling feature greatly eases the pressure and reduces the latency caused by a large number of batch jobs.

The DAG 2.0 execution engine works with Apsara Name Service and Distributed Lock Synchronization System (Nvwa) that coordinates jobs for clusters of Alibaba Group, to redefine the method used by the quasi-real-time framework 2.0 to call the interface of Apsara Name Service and Distributed Lock Synchronization System. This innovation effectively prevents the impact of peak traffic and job accumulation on Apsara Name Service and Distributed Lock Synchronization System resulted from a large number of service restarts and ensures the stability of other distributed components during Double 11.

Prevented impact of peak traffic on Apsara Name Service and Distributed Lock Synchronization System

Support for the PAI Elastic-Inference Engine

In addition to MaxCompute jobs, DAG 2.0 provides native support for TensorFlow and PyTorch jobs running on the PAI platform. Compared with DAG 1.0, DAG 2.0 provides accurate semantic descriptions of jobs and offers greatly improved dynamic capabilities and fault tolerance. Before Double 11 2020, the PAI team worked together with the scheduling and execution framework team to develop the PAI elastic-inference engine that is implemented based on resource scaling. This brand-new engine significantly improves the overall performance of inference jobs on the PAI platform.

Prior to the release of the PAI elastic-inference engine, inference jobs are executed based on the TensorFlow model that is used by deep learning training jobs, including the logic on how to apply for and use resources, job scheduling, and failover handling. However, inference jobs are vastly different from training jobs, in terms of features. Inference jobs feature predictable computing amount, high concurrency, and high scalability of computing resources. In addition, inference jobs run on independent compute nodes. The PAI elastic-inference engine adopts a number of methods to optimize the execution of inference jobs. These methods include elastic scaling of compute nodes, oversold and oversold upgrades for guaranteed GPU resources, and dynamic data allocation.

The PAI elastic-inference engine is enabled by using PAI-EVPredict before Double 11 2020. This engine provides services across business units (BUs) within Alibaba, including search engines, Alimama, Taobao, Youku Tudou Inc., ICBU, Amap, Ele.me, and new retail. After this engine is enabled, the average queuing duration of tasks on PAI-EVPredict decreases by 95%, and the average task execution duration decreases by over 50%. The following figure shows the trend of execution duration of video feature extraction tasks performed by an algorithm recommendation team before and after the PAI elastic-inference engine is used.

Trend of execution duration of video feature extraction tasks performed by an algorithm recommendation team

Support for the Bubble Execution Mode

DAG 2.0 uses the all-in-one execution mode to unify the batch execution mode and quasi-real-time mode based on the flexible DAG hierarchical model. This DAG model lays the foundation for the bubble execution mode that is developed during FY21 of Alibaba. In this mode, a variety of jobs can be executed with balanced resource consumption and superior performance.

Bubble execution mode based on the DAG model

The bubble execution mode is being implemented across all BUs within Alibaba. Right before Double 11 2020, the bubble execution mode has been applied to over 3,200 projects across the entire Alibaba network, covering 70% of all batch jobs. These projects do not include high-priority jobs on important pipelines. On November 11th of 2020, approximately 1.5 million jobs were executed in bubble execution mode. The following figure shows the comparison results obtained in business days of a week before Double 11 2020.

Latency and resource consumption comparison between the batch execution mode and bubble execution mode

When the bubble splitting algorithm is used, the actual job statistics show that the bubble execution mode improves the execution performance of production jobs by 30% to 40% in the case of balanced resource consumption. The bubble execution mode is being applied to high-priority jobs on important pipelines and expected to improve the execution performance of jobs on the entire Alibaba network on Double 12 2020 and Double 11 2021.

Outlook

DAG 2.0 features an upgraded architecture, and the execution engine is developed with the goal to become the foundation for the long-term development of computing platforms. DAG 2.0 also aims to support the combination of upper-layer computing engines and distributed scheduling to implement various innovations and create a whole new computing ecosystem. The architecture upgrade was completed during Double 11 2019, marking a big step forward. In 2020, a number of new features were developed based on the new DAG 2.0 architecture and have been implemented, including adaptive dynamic execution and new computing modes. These new features have helped Alibaba Group once again withstand the test of the Double 11 Global Shopping Festival.

However, while the improvements helped us handle existing problems effectively, we identified specific issues that persisted during large-scale events. For example, after data characteristics change, some data skew issues still persist, which affects operations, such as join queries. To address these issues, the execution engine team and the computing engine team are working together to promote the launch of an adaptive solution that uses dynamic monitoring based on adaptive skew join. The new architecture will be further integrated with upper-layer computing engines and other system components to allow even more services to reap the benefits of the upgrade and improve the platform as a whole.

Original Source:

--

--

Alibaba Cloud
Alibaba Cloud

Written by Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

No responses yet