Data Warehouse: In-depth Interpretation of Flink Resource Management Mechanism
By Song Xintong (Wuzang)
Edited by Wang Wenjie (volunteer of Flink community)
This article is composed based on the Apache Flink series live broadcasts and shared by Song Xintong, a senior development engineer at Alibaba. It helps developers understand Flink resource management mechanism in depth from three aspects: basic concepts, current mechanisms and policies, and future development directions.
- Basic Concepts
- Current Mechanisms and Policies
- Future Development Directions
1. Basic Concepts
1.1 Related Components
The components introduced in this article are related to Flink resource management. A Flink cluster consists of one Flink master and multiple task managers. The Flink master and task managers are process-level components. Other components are in-process components.
As shown in Figure 1, a Flink master has one resource manager and multiple job managers. Each job manager in the Flink master manages a specific job separately. The scheduler component of the job manager schedules and runs all tasks in the DAG of the job, and sends resource requests. The slot pool component of the job manager owns all resources allocated to the job. The only resource manager in the Flink master schedules resources in the entire Flink cluster and interconnects with external scheduling systems. External scheduling systems refer to resource management systems such as Kubernetes, Mesos, and Yarn.
A task manager is responsible for task execution. A slot is a subset of the resources of the task manager, and is also the basic unit of Flink resource management. The slot concept runs throughout the resource scheduling process.
1.2 Logical Hierarchy
After introducing the components, we will learn the logical relationships between these components. The relationships are divided into four layers:
- Operator: Operators are the most basic data processing units.
- Task: They are the smallest units that are actually scheduled in Flink runtime. Each one consists of a series of chained operators. Note: If two operators belong to the same task and one operator has already started running, the other operator must be already scheduled.
- Job: Each one corresponds to a job graph.
- Flink Cluster: 1 Flink Master + N Task Managers
The scope of resource scheduling is actually the content in the red box in figure 2. Among the components related to resource scheduling, the job manager, scheduler, and slot pool are job-level components, and the resource manager, slot manager, and task manager are Flink cluster-level components.
The chaining between an operator and a task specifies how operators form a task. The slot sharing between tasks and jobs refers to how multiple tasks share a slot resource. Slots are not shared across jobs. The slot allocation between a Flink cluster and a job refers to how slots in a Flink cluster are allocated to different jobs.
1.3 Two-layer Resource Scheduling Model
The resource scheduling in Flink is a classic two-layer model. The process of allocating resources from a cluster to a job is performed by the slot manager. In each job, resources are allocated to tasks by the scheduler. As shown in figure 3, the scheduler sends a slot request to the slot pool. If the slot pool cannot meet the resource requirement, it will further request resources from the slot manager in the resource manager.
Two ways are available for tasks to reuse slots:
- Slot Caching
- Batch jobs
- Failover of streaming jobs
- Multiple tasks using slot resources in sequence or in turns
- Slot Sharing
- Multiple tasks sharing the same slot under certain conditions.
2. Current Mechanisms and Policies
The following details the current resource management mechanisms and policies in Flink 1.10.
2.1 Resources of a Task Manager
Task Manager (TM) resources are determined by configuration. In standalone deployment mode, TM resources may be different. In other deployment modes, TM resources are the same. Other extended resources include GPU (FLIP-108, available in Flink 1.11).
2.2 Resources of a Slot
The task manager has a fixed number of slots, and the specific number is determined by the configuration. There is no difference between the slots in the same task manager. The slots have the same size, that is, the same number of resources.
2.3 How Many Task Managers Does a Flink Cluster Have?
Standalone Deployment Mode
In standalone deployment mode, the number of task managers is fixed. If a start-cluster.sh script is used to start a cluster, the number of task managers can be adjusted by modifying the configuration in the following file. You can also manually run the taskmanager.sh script to start a task manager.
Deployment Mode of Active Resource Managers
It is dynamically determined by a slot manager or resource manager on demand. If the current number of slots cannot meet a new slot request, a new task manager is requested and started. A task manager is released if it is idle for a specified period of time.
Note: The On-Yarn deployment mode does not support specifying a fixed number of task managers. That is, the following command parameters have become invalid:
yarn-session.sh -n <num>
flink run -yn <num>
2.4 Process of Scheduling Resources from a Cluster to a Job
As shown in Figure 6, the process of resource scheduling from a cluster to a job includes two sub-processes.
- Slot Allocation (Illustrated by red arrows in figure 6)
The scheduler sends a request to the slot pool. If the slot resources are sufficient, the scheduler allocates resources directly. If the slot resources are insufficient, the slot pool sends a resource request to the slot manager, that is, a job requests resources from the cluster. If the slot manager determines that the cluster has sufficient resources to meet the requirements, the slot manager sends an Assign command to the task manager, which then offers slots to the slot pool. The slot pool then meets the resource request of the scheduler.
- Starting Task Managers (Illustrated by blue arrows in figure 6)
In the resource deployment mode of active resource managers, if the resource manager determines that the Flink cluster does not have enough resources to meet the requirements, it requests for more resources from the underlying resource scheduling system. The scheduling system starts a new task manager, which then registers with the resource manager. In this way, new slots are added.
2.5 Process of Scheduling Resources from a Job to a Task
- Scheduler: Determines the next task to be scheduled based on the execution graph and task execution status. Initiates a slot request and determines the allocation between tasks and slots.
- Slot Sharing: Tasks in the same slot sharing group can share slots. By default, all nodes are in the same slot sharing group. One slot can contain only one identical task.
- Advantages: The maximum number of slots required to run a job is the maximum number of concurrencies. Relative load balancing.
The slot sharing process is shown in Figure 7. Each row indicates the multiple concurrencies of a task, and from the bottom up the tasks are A, B, and C respectively. The concurrency numbers of A, B, and C are 4, 4 and 3, respectively. These tasks belong to the same slot sharing group, so different tasks can run in the same slot. As illustrated on the right side of figure 7, three slots run A, B and C, and the fourth slot runs A and B. From the preceding process, you can easily infer that the number of slots required for this job is 4, which is also the maximum number of concurrencies.
2.6 Resource Optimization
From the mechanism described previously, you can easily find that Flink adopts top-down resource management. You have configured the overall resources of the job, while Flink uses the slot sharing mechanism to control the number of slots and load balancing. It adapts to the resource requirements of a slot sharing group by adjusting the resources of task managers or slots. Flink’s resource management and configuration are simple, easy to use, and suitable for jobs with simple topologies or small scales.
3. Future Development Directions
3.1 Fine-grained Resource Management
Limitations of Slot Sharing
Non-Optimal Resource Utilization
From the slot sharing mechanism, you can see that the resource utilization is not optimal, because the slot resources are configured based on the maximum concurrency number. As a result, some resources are wasted, as shown in figure 8.
As shown in figure 9, the concurrency number of A is 2, and the concurrency number of B and C is 1. Both allocation modes in figure 9 meet the requirements of the slot sharing mechanism. In this case, the following situation may occur: During a test, the slot resource configuration on the right part of figure 9 is used. Then, we perform optimization and configure the slot size. However, after we submit the job to the production environment, as shown on the left part of the preceding figure, resources are insufficient. As a result, the job cannot be executed.
Fine-grained Resource Management
Based on the limitations of the slot sharing mechanism described previously, we propose the idea of fine-grained resource management.
- If the resource requirements of the operators are known, the slot size can be measured through empirical estimation and semi-automated or automated tools.
- Each task exclusively occupies a slot for resource scheduling.
3.2 Dynamic Slot Segmentation
As shown in figure 10, the size of the circle indicates the amount of resources required for the task. If the slot sharing group mechanism is not adopted, the existing resource management mechanism of Flink requires that slots be of the same size. Therefore, we can obtain the slot resource configuration, which has four task managers, as shown in the right part of the figure.
If the size of each slot can be dynamically determined based on different tasks, task managers can be divided as shown in figure 11. Only three task managers are required.
Dynamic slot segmentation (FLIP-56)
Figure 12 shows the static management mode of task managers with fixed-size slots. With the execution of tasks, slots can only be simply occupied or released, without additional adjustments.
As shown in figure 13, after a task manager is started, it has a whole resource. Each time a resource request is received, a slot is dynamically allocated based on the request. However, this is also flawed. No matter how you split, a small amount of resources are often wasted, which is often referred to as the resource fragmentation problem.
3.3 Resource Fragmentation Problem
To address the resource fragmentation problem, a solution is proposed. You can customize task manager resources based on the slot resource request. Task managers have the same number of resources in Flink 1.10. However, if fine-grained resource management is adopted and resource requirements are known, you can customize the task managers. In theory, the problem of resource fragmentation can be completely eliminated.
This comes at the expense of prolonging the scheduling time of jobs. To customize the task managers, the system has to wait until it receives the slot request. However, starting task managers is time-consuming. On the other hand, it may be difficult to reuse task managers. It may be necessary to release the old task manager and start a new one, which also takes a lot of time.
Different solutions can be used in different scenarios:
- Scheduling once for long-term operation
- Higher benefits from improved resource utilization
- Scheduling policies suitable for using custom Task Manager resources
- Batch (batch processing, especially for short queries)
- Frequent scheduling and short task running time
- Sensitive to scheduling latency
- Scheduling policies suitable for using non-custom task manager resources
3.4 Ease-of-Use Issues
Contrary to existing resource optimization, resource optimization in the case of fine-grained resource management is bottom-up resource management. You no longer need to configure the overall resources of a job. Instead, you need to configure the specific resource requirements for each task. You need to configure the task resources as closely as possible to its actual resource requirements to improve resource utilization. However, the problem is that the configuration is difficult. Therefore, this policy is more suitable for jobs with complex topologies or large scales.
Compared with the current resource optimization, neither mechanism is superior or inferior. Instead, different optimization policies can be used in different scenarios. From the perspective of the community, both policies have their own values.
3.5 Making Resource Scheduling Policies Plug-ins (FLINK-14106)
Both the static resource management mechanism and the fine-grained resource management mechanism require adaptation of the scheduling policies to different scenarios. Currently, the development of the scheduling policies into plug-ins in Flink 1.11 has been completed.
- Resource scheduling policies
- Number of task managers
- When to apply for or release a task manager
- Size of task manager resources
- Adaptation between slot request and task manager resources
From these three resource scheduling policies, you can summarize the following advantages:
- Provides different resource scheduling policies for stream processing and batch processing
- Provides fine-grained and non-fine-grained resource management to meet diverse requirements
- Allows the possibility of more resource scheduling policies in the future
- For example, Spark performs elastic scaling on clusters based on the load.
As Flink supports more application scenarios, flexible resource scheduling policies are essential to ensure high performance and resource efficiency. More Flink enthusiasts and developers are welcome to join our community and work together.
About the Author
Song Xintong (Wuzang), senior development engineer at Alibaba. In 2018, he graduated from the Institute of Network and Information Systems of Peking University and joined Alibaba’s real-time computing team. He was primarily responsible for the research and development of the resource scheduling and management mechanism in Apache Flink and Alibaba Enterprise Edition Blink.