By Ren Chunde
Apache Flink, considered by some to represent the next generation of big data computing engines, is trailing along its development path quicker than ever. The internal architecture behind the Flink framework is also constantly undergoing further steps in optimization and reconstructing to adapt to an increasing number runtime environments in addition to accommodating a much larger computing scale. In Flink Improvement Proposals-6, there is now a resigned, unified architecture for resource scheduling in various cluster management systems (among these systems are Standalone, YARN, and Kubernetes).
In this blog, we’re going to take a look into new developments for Apache Flink, including how standalone clusters are being implemented, how the Per-Job and Session modes on YARN are now supported, and also how Apache Flink is gaining native integration with Kubernetes.
Apache Flink Standalone Clusters
As shown in Figure 1, the deployment of the Flink Standalone cluster is in a master-slave architecture, in which the master JobManager (JM) is responsible for task scheduling for the computing unit of the job, and the TaskManager (TM) reports to the JobManager and is responsible for executing tasks internally with threads.
The reason why it is standalone is that it does not rely on other underlying resource scheduling systems, and can be directly deployed and started on its own bare machine nodes. Some automated O&M tools can be used for convenient deployment and management, but the following problems exist:
- Isolation: When multiple jobs run in one cluster, tasks of different jobs may be executed on the same TM, causing the resources used by the threads (CPU/MEM) cannot be controlled and affect each other, and even one task can cause the entire TM to be out of memory and affects the jobs on it. The scheduling of multiple jobs is also in the same JM, which also has the problem of being affected by the problematic job.
- Multi-Tenant Resource Usage (Quota) Management: The total amount of Job resources used by users cannot be controlled, and the coordinated management of resources among tenants is lacking.
- Cluster Availability: Although the JM can be deployed with Standby machines to support high availability, the lack of maintenance of JM and TM processes inevitably causes too many processes to be down and the entire cluster is unavailable, due to the above isolation problems.
- Cluster O&M: For version upgrade, capacity scaling, and other, complex O&M operations are required.
To solve the above problems, Flink needs to be run on popular and mature resource scheduling systems, such as YARN, Kubernetes, and Mesos.
Native Integration of Flink and YARN
Apache Flink Standalone Clusters on YARN
A simple and effective deployment method is to deploy the Flink Standalone cluster to the YARN cluster by using the features supported by YARN, as shown below:
- Multiple YARN applications can be started for multiple jobs. Each app is a standalone cluster and runs independently. Moreover, through isolation, such as with cgroups, supported by YARN, the mutual influence between multiple tasks is avoided, and the isolation problem is solved.
- Apps of different users can also be run in different YARN scheduling queues to solve the multi-tenant problem through the Queue Quota management capability.
- At the same time, the YARN policy for restarting, retrying and rescheduling the app process, can be used to make the Flink Standalone cluster have high availability.
- With the simple modification for parameters and the configuration file, Flink jars can be distributed through the Distributed Cache of YARN, thus facilitating upgrade and capacity scaling.
Although the above problems may have found this fix, it is difficult to achieve efficient resource utilization by starting a Standalone Cluster for each job or a small number of jobs. The reasons are as follows:
- The Cluster scale (the number of TMs) is statically specified by the parameters when the YARN app is started. The compilation optimization of Flink itself makes it difficult to estimate the resource demand before running, so it is difficult to rationalize the number of TMs. If the number of TMs is large, resources will be wasted. Otherwise, if it is small, the job execution speed will be affected or may not even be able to run.
- The resource size owned by each TM is also statically specified by parameters, and it is also difficult to estimate the actual needs. Different TM sizes cannot be applied dynamically for different task resource requirements, and only TM with the same specification size can be set, which makes it difficult to place an exact integer number of tasks, and waste the remaining resources.
- App startup (1. Submit YARN App) and Flink job submission (7. Submit Job) needs to be completed in two stages, which will make the submission efficiency of each task low, and reduce the resource flow rate of the cluster.
The more Flink jobs in a large-scale YARN cluster, the more resources are wasted and the higher the cost is. Moreover, the above problems do not only exist on YARN. When the Flink Standalone cluster runs directly on other resource scheduling systems, the same problems also exist. Therefore, Alibaba real-time computing takes the lead in improving the Flink resource utilization model based on the practical production experience with YARN, and subsequently designs and implements a common architecture with the community, which is applicable to different resource scheduling systems.
FLIP-6: Deployment and Process Model
In FLIP-6, the refactoring of the deployment architecture is fully recorded. The new module is shown in Figure 3. Similar to the upgrade of MapReduce-1 architecture to YARN+MapReduce-2, resource scheduling and job computing logical unit (task) scheduling are divided into two layers, so that two modules (systems), which are ResourceManager (RM) perform different jobs with JobManager (JM), perform their respective functions. Therefore, the coupling with the underlying resource scheduling system is reduced (only different plugable ResourceManagers need to be implemented), the logic complexity is reduced to reduce the difficulty of development and maintenance, and the JM is optimized to realize the application of resources according to task requirements, thus solving the problem of low resource utilization rate of Standalone on YARN/K8S, and facilitating the scaling of cluster and job size.
- Dispatcher: It is responsible for communicating with the Client to receive job submission and generate JobManager. The lifecycle can span across jobs.
- ResourceManager: It integrates with different resource scheduling systems to implement resource scheduling (application/release), and manage Container/TaskManager. Similarly, the lifecycle can span across jobs.
- JobManager: It is responsible for scheduling and execution of the computing logic of the job. Each job has one instance.
- TaskManager: It registers with RM to report the status of resources. It receives task execution from JM, and reports status.
Native Integration of Apache Flink and YARN
Based on the above architecture, Flink on YARN implements two different deployment and running modes: Per-Job and Session (for the usage documents, see Flink on YARN).
Per-Job: a Flink job is bound to its YARN application (app) lifecycle. The execution process is shown in Figure 4. When the YARN app is submitted, the files/jars of the Flink job is distributed through YARN Distributed Cache, and the submission is completed at one time. Moreover, the JM applies to the RM for slots to execute based on the actual resource demand of tasks generated by JobGraph, and the Flink RM dynamically applies for/releases the YARN container. This solves all the previous problems. Not only the isolation of YARN is utilized, but also the resource utilization is efficient.
Is Per-Job perfect? No, limitations still exist. When the YARN app is submitted, it takes a long time (as in several seconds) to apply for resources and start the TM. Especially in scenarios, such as interactive analysis of short queries, the execution time of the job calculation logic is very short, so the large proportion of app startup time seriously affects the end-to-end user experience, lacking the advantage of fast job submission in Standalone mode. However, the FLIP-6 architecture can still easily resolve this issue. As shown in Figure 5, run a Flink session through the pre-started YARN app (Master and multiple TMs are started, similar to Standalone, which can run multiple jobs), then submit and execute jobs, and these jobs can quickly execute computing using existing resources. The specific implementation of the Blink branch is a little different from that of the Master (that is, whether to pre-start the TM). In the future, it will be merged and unified, and the resource elasticity of the Session will be further developed — the number of TMs can be automatically scaled as needed, which cannot be implemented by Standalone.
The above describes the changes in the architecture, but in order to apply for resources on demand, a protocol API is also required, which is the Resource Profile. It can describe the resource usage of CPU & Memory of a single Operator, and the RM applies to the underlying resource management system for the Container to execute the TM based on these resource requests. For detailed usage documents, see Task slots and resources.
Native Integration of Flink and Kubernetes
In recent years, Kubernetes has developed rapidly, and has become the major native operating system of the cloud era. If the deployment of Apache Flink, the next-generation big data computing engine, is integrated with it, can a new era of big data computing be opened up? Possibly, right?
Apache Flink Standalone Clusters on Kubernetes
With the powerful capabilities of Kubernetes in supporting Service deployment, the Flink Standalone cluster can be easily deployed to the Kubernetes cluster through simple K8S: Deployment and Service or Flink Helm chart. However, it also has problems like low resource utilization rate, similar to Standalone on YARN, so the so-called “native integration” is still very much needed.
Native Integration of Apache Flink and Kubernetes
The “native integration” of Flink and K8S is mainly to implement K8SResourceManager on FLIP-6 architecture to interface with the resource scheduling protocol of Kubernetes. Now, the Blink branch implementation architecture is shown in the following figure (for the usage documents, see Flink on K8S), and the work of merging to the backbone Master is in progress.
Deployment management and resource scheduling are the underlying foundations of what make big data processing systems. Through the abstraction layering and refactoring of FLIP-6, Apache Flink shows a solid effort and builds a solid foundation for further future developments. Flink can now be natively run on various resource scheduling systems — YARN, Kubernetes, and Mesos — to support larger-scale and higher-concurrency computing, efficiently utilize cluster resources, and provide a reliable guarantee for the continuous development in the future.
However, with all of that said, the optimization and improvement of related functions are still very much an ongoing process. Some developers find the difficulty of configuring resources by the Resource Profile daunting, and this whole process significantly reduces the usability of Flink overall. We are trying to implement Auto Config and Scaling and other functions for resource and concurrent configuration to solve these problems. Nevertheless, the “Serverless” architecture is developing rapidly, and it is expected that Flink and Kubernetes will be integrated into a powerful cloud-native computing engine (like FaaS) to save resources and bring greater value to users. We are excited for this future.