Principles and Practices of Flink on YARN and Kubernetes: Flink Advanced Tutorials
By Zhou Kaibo (Baoniu) and compiled by Maohe
Flink Architecture Overview
This article provides an overview of Apache Flink architecture and introduces the principles and practices of how Flink runs on YARN and Kubernetes, respectively.
Flink Architecture Overview — Jobs
Use the DataStream API, DataSet API, SQL statements, and Table API to compile a Flink job and create a JobGraph. The JobGraph is composed of operators such as source, map(), keyBy(), window(), apply(), and sink. After the JobGraph is submitted to a Flink cluster, it can be executed in Local, Standalone, YARN, or Kubernetes mode.
Flink Architecture Overview — JobManager
A JobManager provides the following functions:
- It transforms a JobGraph into an ExecutionGraph for eventual execution.
- It provides a Scheduler to schedule tasks.
- It provides a checkpoint coordinator to adjust the checkpointing of each task, including the checkpointing start and end times.
- It communicates with the TaskManager through the Actor System.
- It provides recovery metadata used to read data from metadata while recovering from a fault.
Flink Architecture Overview — TaskManager
TaskManager is responsible for executing tasks. It is started after the JobManager applies for resources. It has the following components:
- Memory and I/O manager used to manage the memory I/O
- Network Manager used to manage networks
- Actor System used to implement network communication
TaskManager is divided into multiple task slots. Each task runs within a task slot. A task slot is the smallest unit for resource scheduling.
Let’s have a look at Flink’s Standalone mode to better understand the architectures of YARN and Kubernetes.
- In Standalone mode, the master node and TaskManager may run on the same machine or on different machines.
- In the master process, the Standalone ResourceManager manages resources. On submitting a JobGraph to the master node through a Flink cluster client, the JobGraph is first forwarded through the Dispatcher.
- After receiving a request from the client, the Dispatcher generates a JobManager. The JobManager applies for resources from the Standalone ResourceManager and then starts the TaskManager.
- The TaskManager initiates registration after startup. After registration is completed, the JobManager allocates tasks to the TaskManager for execution. This completes the job execution process in Standalone mode.
Flink Runtime-related Components
This section summarizes Flink’s basic architecture and the components of Flink runtime.
- A client allows submitting jobs through SQL statements or APIs. A JobGraph is generated after a job is submitted.
- After receiving a request, JobManager schedules the job and applies for resources to start a TaskManager.
- The TaskManager is responsible for task execution. It registers with the JobManager and executes the tasks that are allocated by the JobManager.
Principles and Practices of Flink on YARN
YARN Architecture Overview
YARN is widely used in the production environments of Chinese companies. This section introduces the YARN architecture to help you better understand how Flink runs on YARN.
The preceding figure shows the YARN architecture. The ResourceManager assumes the core role and is responsible for resource management. The clients submit jobs to the ResourceManager.
After a client submits a job to the ResourceManager, the ResourceManager starts a container and then an ApplicationMaster, the two of which form a master node. After startup, the master node applies for resources from the ResourceManager, which then allocates resources to the ApplicationMaster. The ApplicationMaster schedules tasks for execution.
YARN Architecture — Components
A YARN cluster consists of the following components:
- The ResourceManager processes client requests, starts and monitors the ApplicationMaster, monitors the NodeManager, and allocates and schedules resources. The ResourceManager includes the Scheduler and Applications Manager.
- The ApplicationMaster runs on a worker node and is responsible for data splitting, resource application and allocation, task monitoring, and fault tolerance.
- The NodeManager runs on a worker node and is responsible for single-node resource management, communication between the ApplicationMaster and ResourceManager, and status reporting.
- Containers are used to abstract resources, such as memory, CPUs, disks, and network resources.
YARN Architecture — Interaction
This section describes the interaction process in the YARN architecture using an example of running MapReduce tasks on YARN.
- A user submits a job through a client after writing MapReduce code.
- After receiving the request from the client, the ResourceManager allocates a container used to start the ApplicationMaster and instructs the NodeManager to start the ApplicationMaster in this container.
- After startup, the ApplicationMaster initiates a registration request to the ResourceManager. The ApplicationMaster applies for resources from the ResourceManager. Based on the obtained resources, the ApplicationMaster communicates with the corresponding NodeManager, requiring it to start the program.
- One or more NodeManagers start MapReduce tasks.
- The NodeManager continuously reports the status and execution progress of the MapReduce tasks to the ApplicationMaster.
- When all MapReduce tasks are completed, the ApplicationMaster reports task completion to the ResourceManager and deregisters itself.
Flink on YARN — Per Job
Flink on YARN supports the Per Job mode in which one job is submitted at a time and resources are released after the job is completed. The Per Job process is as follows:
- A client submits a YARN application, such as a JobGraph or a JAR package.
- The YARN ResourceManager applies for the first container. This container starts a process through the ApplicationMaster, which runs Flink programs, namely, the Flink YARN ResourceManager and JobManager.
- The Flink YARN ResourceManager applies for resources from the YARN ResourceManager. The TaskManager is started after resources are allocated. After startup, the TaskManager registers with the Flink YARN ResourceManager. After registration, the JobManager allocates tasks to the TaskManager for execution.
Flink on YARN — Session
In Per Job mode, all resources, including the JobManager and TaskManager, are released after job completion. In Session mode, the Dispatcher and ResourceManager are reused by different jobs. In Session mode, after receiving a request, the Dispatcher starts JobManager (A), which starts the TaskManager. Then, the Dispatcher starts JobManager (B) and the corresponding TaskManager. Resources are not released after Job A and Job B are completed. The Session mode is also called the multithreading mode, in which resources are never released and multiple JobManagers share the same Dispatcher and Flink YARN ResourceManager.
The Session mode is used in different scenarios than the Per Job mode. The Per Job mode is suitable for time-consuming jobs that are insensitive to the startup time. The Session mode is suitable for jobs that take a short time to complete, especially batch jobs. Executing jobs with a short runtime in Per Job mode results in the frequent application for resources. Resources must be released after a job is completed, and new resources must be requested again to run the next job. Obviously, the Session mode is more applicable to scenarios where jobs are frequently started and is completed within a short time.
Features of YARN
YARN has the following advantages:
- Centralized Management and Scheduling of Resources: The resources of all nodes in a YARN cluster are abstracted as containers. These resources include memory, CPUs, disks, and networks. The computing framework applies for containers from the ResourceManager to execute computing tasks. YARN schedules resources and allocates containers based on specified policies. YARN provides multiple task scheduling policies to improve the utilization of cluster resources. These policies include the FIFO Scheduler, Capacity Scheduler, and Fair Scheduler to allow setting task priorities.
- Resource Isolation: YARN uses cgroups for resource isolation to prevent interference between resources. Once the amount of resources used by a container exceeds the predefined threshold, the container is killed.
- Automatic Failover: YARN supports NodeManager monitoring and the recovery of the ApplicationManager from exceptions.
Despite these advantages, YARN also has disadvantages, such as inflexible operations and expensive O&M and deployment.
Principles of Flink on Kubernetes
Kubernetes is an open-source container cluster management system developed by Google. It supports application deployment, maintenance, and scaling. Kubernetes allows easily managing containerized applications running on different machines. Compared with YARN, Kubernetes is essentially a next-generation resource management system, but its capabilities go far beyond.
Kubernetes — Basic Concepts
In Kubernetes, a master node is used to manage clusters. It contains an access portal for cluster resource data and etcd, a high-availability key-value store. The master node runs the API server, Controller Manager, and Scheduler.
A node is an operating unit of a cluster and also a host on which a pod runs. A node contains an agent process, which maintains all containers on the node and manages how these containers are created, started, and stopped. A node also provides kube-proxy, which is a server for service discovery, reverse proxy, and load balancing. A node provides the underlying Docker engine used to create and manage containers on the local machine.
A pod is the combination of several containers that run on a node. In Kubernetes, a pod is the smallest unit for creating, scheduling, and managing resources.
Kubernetes — Architecture Diagram
The preceding figure shows the architecture of Kubernetes and its entire running process.
- The API server is equivalent to a portal that receives user requests. Submit commands to etcd, which stores user requests.
- etcd is a key-value store and responsible for assigning tasks to specific machines. The kubelet on each node finds the corresponding container to run tasks on the local machine.
- Submit a resource description for the Replication Controller to monitor and maintain the number of containers in the cluster. You may also submit a Service description file to enable the kube-proxy to forward traffic.
Kubernetes — Core Concepts
Kubernetes involves the following core concepts:
- The Replication Controller is used to manage pod replicas. It ensures that a specified number of pod replicas are running in a Kubernetes cluster at any given time. If the number of pod replicas is smaller than the specified number, the Replication Controller starts new containers. Otherwise, it kills the extra containers to maintain the specified number of pod replicas.
- A Service provides a central service access portal and implements service proxy and discovery.
- Persistent Volumes (PVs) and Persistent Volume Claims (PVCs) are used for persistent data storage.
- ConfigMap stores the configuration files of user programs and uses etcd as its backend storage.
Flink on Kubernetes — Architecture
The preceding figure shows the architecture of Flink on Kubernetes. The process of running a Flink job on Kubernetes is as follows:
- After a resource description file is submitted to the Kubernetes cluster, the master container and worker containers are started.
- The master container starts the Flink master process, which consists of the Flink-Container ResourceManager, JobManager, and Program Runner.
- The worker containers start TaskManagers, which register with the ResourceManager. After registration, the JobManager allocates tasks to the containers for execution.
- In Flink, the master and worker containers are essentially images but have different script commands. You can choose whether to start the master or worker containers by setting parameters.
Flink on Kubernetes — JobManager
The execution process of a JobManager is divided into two steps:
1) The JobManager is described by a Deployment to ensure that it is executed by the container of a replica. This JobManager are labeled as flink-jobmanager.
2) A JobManager Service is defined and exposed by using the service name and port number. Pods are selected based on the JobManager label.
Flink on Kubernetes — TaskManager
A TaskManager is also described by a Deployment to ensure that it is executed by the containers of n replicas. A label, such as flink-taskmanager, is defined for this TaskManager.
The runtimes of the JobManager and TaskManager require configuration files, such as flink-conf.yaml, hdfs-site.xml, and core-site.xml. Define them as ConfigMaps in order to transfer and read configurations.
Flink on Kubernetes — Interaction
The entire interaction process is simple. You only need to submit defined resource description files, such as Deployment, ConfigMap, and Service description files, to the Kubernetes cluster. The Kubernetes cluster automatically completes the subsequent steps. The Kubernetes cluster starts pods and runs user programs based on the defined description files. The following components take part in the interaction process within the Kubernetes cluster:
- The Service uses a label selector to find the JobManager’s pod for service exposure.
- The Deployment ensures that the containers of n replicas run the JobManager and TaskManager and applies the upgrade policy.
- The ConfigMap mounts the /etc/flink directory, which contains the flink-conf.yaml file, to each pod.
Flink on Kubernetes — Practices
This section describes how to run a job in Flink on Kubernetes.
•kubectl create -f jobmanager-service.yaml
•kubectl create -f jobmanager-deployment.yaml
•kubectl create -f taskmanager-deployment.yaml
•kubectl port-forward service/flink-jobmanager 8081:8081
•bin/flink run -d -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar
•kubectl delete -f jobmanager-deployment.yaml
•kubectl delete -f taskmanager-deployment.yaml
•kubectl delete -f jobmanager-service.yaml
Start the session cluster. Run the preceding three commands to start Flink’s JobManager Service, JobManager Deployment, and TaskManager Deployment. Then, access these components through interfaces and submit a job through a port. To delete the cluster, run the kubectl delete command.
The preceding figure shows an example provided by Flink. The left part is the jobmanager-deployment.yaml configuration, and the right part is the taskmanager-deployment.yaml configuration.
In the jobmanager-deployment.yaml configuration, the first line of the code is apiVersion, which is set to the API version of extensions/vlbetal. The resource type is Deployment, and the metadata name is flink-jobmanager. Under spec, the number of replicas is 1, and labels are used for pod selection. The image name for containers is jobmanager. Containers include an image downloaded from the public Docker repository and may also use an image from a proprietary repository. The args startup parameter determines whether to start the JobManager or TaskManager. The ports parameter specifies the service ports to use. Port 8081 is a commonly used service port. The env parameter specifies an environment variable, which is passed to a specific startup script.
The taskmanager-deployment.yaml configuration is similar to the jobmanager-deployment.yaml configuration. However, in the former, the number of replicas is 2.
In the jobmanager-service.yaml configuration, the resource type is Service, which contains fewer configurations. Under spec, the service ports to expose are configured. The selector parameter specifies the pod of the JobManager based on a label.
In addition to the Session mode, the Per Job mode is also supported. In Per Job mode, the user code is passed to the image. An image is regenerated each time a change of the business logic leads to JAR package modification. This process is complex, so the Per Job mode is rarely used in production environments.
The following uses the public Docker repository as an example to describe the execution process of the job cluster.
- Build an Image: In the flink/flink-container/docker directory, run the build.sh script to specify the version used to build an image. If the script is executed successfully, the “Successfully tagged topspeed:latest” message appears.
sh build.sh --from-release --flink-version 1.7.0 --hadoop-version 2.8 --scala-version 2.11 --job-jar ~/flink/flink-1.7.1/examples/streaming/TopSpeedWindowing.jar --image-name topspeed
- Upload the Image: Visit hub.docker.com to register an account and create a repository for image uploading.
docker tag topspeed zkb555/topspeedwindowing
docker push zkb555/topspeedwindowing
- Start a Job after the image is uploaded.
kubectl create -f job-cluster-service.yaml
FLINK_IMAGE_NAME=zkb555/topspeedwindowing:latest FLINK_JOB=org.apache.flink.streaming.examples.windowing.TopSpeedWindowing FLINK_JOB_PARALLELISM=3 envsubst < job-cluster-job.yaml.template | kubectl create -f –
FLINK_IMAGE_NAME=zkb555/topspeedwindowing:latest FLINK_JOB_PARALLELISM=4 envsubst < task-manager-deployment.yaml.template | kubectl create -f -
Flink on YARN and Flink on Kubernetes FAQ
Q) Can I submit jobs to Flink on Kubernetes using operators?
Currently, Flink does not support operator implementation. Lyft provides open-source operator implementation. For more information, check out this page.
Q) Can I use a high-availability (HA) solution other than ZooKeeper in a Kubernetes cluster?
Q) In Flink on Kubernetes, the number of TaskManagers must be specified upon task startup. Does Flink on Kubernetes support a dynamic application for resources as YARN does?
In Flink on Kubernetes, if the number of specified TaskManagers is insufficient, tasks cannot be started. If excessive TaskManagers are specified, resources are wasted. The Flink community is trying to figure out a way to enable the dynamic application for resources upon task startup, just as YARN does. The Active mode implements a Kubernetes-native combination with Flink, in which the ResourceManager can directly apply for resources from a Kubernetes cluster. For more information, see this link.