Principles and Practices of Flink on YARN and Kubernetes: Flink Advanced Tutorials

Flink Architecture Overview

Flink Architecture Overview — Jobs

Flink Architecture Overview — JobManager

  • It transforms a JobGraph into an ExecutionGraph for eventual execution.
  • It provides a Scheduler to schedule tasks.
  • It provides a checkpoint coordinator to adjust the checkpointing of each task, including the checkpointing start and end times.
  • It communicates with the TaskManager through the Actor System.
  • It provides recovery metadata used to read data from metadata while recovering from a fault.

Flink Architecture Overview — TaskManager

  • Memory and I/O manager used to manage the memory I/O
  • Network Manager used to manage networks
  • Actor System used to implement network communication
  • In Standalone mode, the master node and TaskManager may run on the same machine or on different machines.
  • In the master process, the Standalone ResourceManager manages resources. On submitting a JobGraph to the master node through a Flink cluster client, the JobGraph is first forwarded through the Dispatcher.
  • After receiving a request from the client, the Dispatcher generates a JobManager. The JobManager applies for resources from the Standalone ResourceManager and then starts the TaskManager.
  • The TaskManager initiates registration after startup. After registration is completed, the JobManager allocates tasks to the TaskManager for execution. This completes the job execution process in Standalone mode.

Flink Runtime-related Components

  • A client allows submitting jobs through SQL statements or APIs. A JobGraph is generated after a job is submitted.
  • After receiving a request, JobManager schedules the job and applies for resources to start a TaskManager.
  • The TaskManager is responsible for task execution. It registers with the JobManager and executes the tasks that are allocated by the JobManager.

Principles and Practices of Flink on YARN

YARN Architecture Overview

YARN Architecture — Components

  • The ResourceManager processes client requests, starts and monitors the ApplicationMaster, monitors the NodeManager, and allocates and schedules resources. The ResourceManager includes the Scheduler and Applications Manager.
  • The ApplicationMaster runs on a worker node and is responsible for data splitting, resource application and allocation, task monitoring, and fault tolerance.
  • The NodeManager runs on a worker node and is responsible for single-node resource management, communication between the ApplicationMaster and ResourceManager, and status reporting.
  • Containers are used to abstract resources, such as memory, CPUs, disks, and network resources.

YARN Architecture — Interaction

  • A user submits a job through a client after writing MapReduce code.
  • After receiving the request from the client, the ResourceManager allocates a container used to start the ApplicationMaster and instructs the NodeManager to start the ApplicationMaster in this container.
  • After startup, the ApplicationMaster initiates a registration request to the ResourceManager. The ApplicationMaster applies for resources from the ResourceManager. Based on the obtained resources, the ApplicationMaster communicates with the corresponding NodeManager, requiring it to start the program.
  • One or more NodeManagers start MapReduce tasks.
  • The NodeManager continuously reports the status and execution progress of the MapReduce tasks to the ApplicationMaster.
  • When all MapReduce tasks are completed, the ApplicationMaster reports task completion to the ResourceManager and deregisters itself.

Flink on YARN — Per Job

  • A client submits a YARN application, such as a JobGraph or a JAR package.
  • The YARN ResourceManager applies for the first container. This container starts a process through the ApplicationMaster, which runs Flink programs, namely, the Flink YARN ResourceManager and JobManager.
  • The Flink YARN ResourceManager applies for resources from the YARN ResourceManager. The TaskManager is started after resources are allocated. After startup, the TaskManager registers with the Flink YARN ResourceManager. After registration, the JobManager allocates tasks to the TaskManager for execution.

Flink on YARN — Session

Features of YARN

  • Centralized Management and Scheduling of Resources: The resources of all nodes in a YARN cluster are abstracted as containers. These resources include memory, CPUs, disks, and networks. The computing framework applies for containers from the ResourceManager to execute computing tasks. YARN schedules resources and allocates containers based on specified policies. YARN provides multiple task scheduling policies to improve the utilization of cluster resources. These policies include the FIFO Scheduler, Capacity Scheduler, and Fair Scheduler to allow setting task priorities.
  • Resource Isolation: YARN uses cgroups for resource isolation to prevent interference between resources. Once the amount of resources used by a container exceeds the predefined threshold, the container is killed.
  • Automatic Failover: YARN supports NodeManager monitoring and the recovery of the ApplicationManager from exceptions.

Principles of Flink on Kubernetes

Kubernetes — Basic Concepts

Kubernetes — Architecture Diagram

  • The API server is equivalent to a portal that receives user requests. Submit commands to etcd, which stores user requests.
  • etcd is a key-value store and responsible for assigning tasks to specific machines. The kubelet on each node finds the corresponding container to run tasks on the local machine.
  • Submit a resource description for the Replication Controller to monitor and maintain the number of containers in the cluster. You may also submit a Service description file to enable the kube-proxy to forward traffic.

Kubernetes — Core Concepts

  • The Replication Controller is used to manage pod replicas. It ensures that a specified number of pod replicas are running in a Kubernetes cluster at any given time. If the number of pod replicas is smaller than the specified number, the Replication Controller starts new containers. Otherwise, it kills the extra containers to maintain the specified number of pod replicas.
  • A Service provides a central service access portal and implements service proxy and discovery.
  • Persistent Volumes (PVs) and Persistent Volume Claims (PVCs) are used for persistent data storage.
  • ConfigMap stores the configuration files of user programs and uses etcd as its backend storage.

Flink on Kubernetes — Architecture

  • After a resource description file is submitted to the Kubernetes cluster, the master container and worker containers are started.
  • The master container starts the Flink master process, which consists of the Flink-Container ResourceManager, JobManager, and Program Runner.
  • The worker containers start TaskManagers, which register with the ResourceManager. After registration, the JobManager allocates tasks to the containers for execution.
  • In Flink, the master and worker containers are essentially images but have different script commands. You can choose whether to start the master or worker containers by setting parameters.

Flink on Kubernetes — JobManager

Flink on Kubernetes — TaskManager

Flink on Kubernetes — Interaction

  • The Service uses a label selector to find the JobManager’s pod for service exposure.
  • The Deployment ensures that the containers of n replicas run the JobManager and TaskManager and applies the upgrade policy.
  • The ConfigMap mounts the /etc/flink directory, which contains the flink-conf.yaml file, to each pod.

Flink on Kubernetes — Practices

•Session Cluster
• Start
•kubectl create -f jobmanager-service.yaml
•kubectl create -f jobmanager-deployment.yaml
•kubectl create -f taskmanager-deployment.yaml
•Submit job
•kubectl port-forward service/flink-jobmanager 8081:8081
•bin/flink run -d -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar
• Stop
•kubectl delete -f jobmanager-deployment.yaml
•kubectl delete -f taskmanager-deployment.yaml
•kubectl delete -f jobmanager-service.yaml
  • Build an Image: In the flink/flink-container/docker directory, run the script to specify the version used to build an image. If the script is executed successfully, the “Successfully tagged topspeed:latest” message appears.
sh --from-release --flink-version 1.7.0 --hadoop-version 2.8 --scala-version 2.11 --job-jar ~/flink/flink-1.7.1/examples/streaming/TopSpeedWindowing.jar --image-name topspeed
  • Upload the Image: Visit to register an account and create a repository for image uploading.
docker tag topspeed zkb555/topspeedwindowing 
docker push zkb555/topspeedwindowing
  • Start a Job after the image is uploaded.
kubectl create -f job-cluster-service.yaml 
FLINK_IMAGE_NAME=zkb555/topspeedwindowing:latest FLINK_JOB=org.apache.flink.streaming.examples.windowing.TopSpeedWindowing FLINK_JOB_PARALLELISM=3 envsubst < job-cluster-job.yaml.template | kubectl create -f –
FLINK_IMAGE_NAME=zkb555/topspeedwindowing:latest FLINK_JOB_PARALLELISM=4 envsubst < task-manager-deployment.yaml.template | kubectl create -f -

Flink on YARN and Flink on Kubernetes FAQ

Original Source:



Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website: