Apache Flink Fundamentals: Building a Development Environment and Configure, Deploy and Run Applications

Preface

  • How to deploy and configure the Flink development environment.
  • How to run the Flink application (including local Flink cluster mode, Standalone cluster mode, and Yarn cluster mode).

Deploy and Configure Flink Development Environment

Compile Flink Code

mvn clean install -DskipTests
# Or
mvn clean package -DskipTests
-Dfast    This is mainly to ignore the compilation of QA plugins and JavaDocs.
-Dhadoop.version=2.6.1 To specify the Hadoop version
--settings=${maven_file_path} To explicitly specify the maven settings.xml configuration file

Prepare Development Environment

Run Flink Application

Basic Concepts

Figure 1. Parallel Dataflows
  • JobManager (also known as JobMaster) coordinates the distributed execution of tasks. This includes dispatching tasks, adjusting checkpoints, and coordinating the recovery of each task from the checkpoint when a job fails over.
  • TaskManager (also known as Worker) executes tasks in the Dataflow figure. This includes allocating memory buffers and transferring data streams.
Figure 2. Flink Runtime Architecture Diagram
Figure 3. Process

Prepare Runtime Environment

  • Prepare the Flink binary.
  • Download the Flink binary package from the Flink official website or compile Flink binary from Flink source code.
  • Install Java and configure JAVA_HOME environment variables.

Run Flink in Local Flink Cluster Mode

Basic Startup Process

./bin/start-cluster.sh
./bin/flink run examples/streaming/WordCount.jar
./bin/flink run examples/streaming/WordCount.jar --input ${your_source_file}
./bin/stop-cluster.sh

Common Configurations

./bin/taskmanager.sh start|start-foreground|stop|stop-all- conf/flink-conf.yaml
  • The heap size for the JobManager JVM
jobmanager.heap.mb: 1024
  • The heap size for the TaskManager JVM
taskmanager.heap.mb: 1024
  • The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 4
  • The managed memory size for each task manager
taskmanager.managed.memory.size: 256
TotalHeapMemory = taskmanager.heap.mb + taskmanager.managed.memory.size + taskmanager.process.heap.memory.mb(the default value is 128MB)
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m

View and Configure Logs

  • flink-${user}-standalonesession-${id}-${hostname}.log: the log output in the code
  • flink-${user}-standalonesession-${id}-${hostname}.out: the stdout output during process execution
  • flink-${user}-standalonesession-${id}-${hostname}-gc.log: the GC log for JVM
  • log4j-cli.properties: The log configuration used by the Flink command-line client (such as executing the flink run command).
  • log4j-yarn-session.properties: The log configurations used by the Flink command-line client while starting a YARN session (yarn-session.sh).
  • log4j.properties: Whether in Standalone or Yarn mode, the log configuration used on JobManager and TaskManager is log4j.properties.
  • log4j-cli.properties -> logback-console.xml
  • log4j-yarn-session.properties -> logback-yarn.xml
  • log4j.properties -> logback.xml

Moving Ahead

Deploy the Flink Standalone Cluster on Multiple Hosts

  • Java and JAVA_HOME environment variables are configured on each host.
  • The Flink binary directory deployed on each host must be the same directory.
  • If you need to use HDFS, configure HADOOP_CONF_DIR environment variables.
jobmanager.rpc.address: z05f06378.sqa.zth.tbsite.net
conf/masters
conf/slaves
conf/flink-conf.yaml
./bin/start-cluster.sh
./bin/flink run examples/streaming/WordCount.jar
hdfs dfs -copyFromLocal story /test_dir/input_dir/story
./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output --parallelism 20

Deploy and Configure High Availability (HA) in Standalone Mode

Figure 4. Flink JobManager HA diagram
  • If you want to use the Flink standalone HA mode, make sure it is based on Flink Release-1.6.1 or later version, because a bug in the community may cause the leading JobManager to not work properly in this mode.
  • HDFS is needed in the following experiment. Hence, download the Flink Binary package with Hadoop support.

Use Standard Flink Script to Deploy ZooKeeper (Optional)

# <em>The port at which the clients will connect</em>
clientPort=3181
server.1=z05f06378.sqa.zth.tbsite.net:4888:5888
server.2=z05c19426.sqa.zth.tbsite.net:4888:5888
server.3=z05f10219.sqa.zth.tbsite.net:4888:5888
./bin/start-zookeeper-quorum.sh
./bin/stop-zookeeper-quorum.sh

Modify Configuration of Flink Standalone Cluster

$cat conf/masters
z05f06378.sqa.zth.tbsite.net:8081
z05c19426.sqa.zth.tbsite.net:8081
$cat conf/slaves
z05f06378.sqa.zth.tbsite.net
z05c19426.sqa.zth.tbsite.net
z05f10219.sqa.zth.tbsite.net
high-availability: zookeeper
high-availability.zookeeper.quorum z05f02321.sqa.zth.tbsite.net:2181,z05f10215.sqa.zth.tbsite.net:2181
high-availability.zookeeper.path.root: /test_dir/test_standalone2_root
high-availability.cluster-id: /test_dir/test_standalone2
high-availability.storageDir: hdfs:///test_dir/recovery2/
jobmanager.rpc.address
jobmanager.rpc.port
./bin/start-zookeeper-quorum.sh
./bin/start-cluster.sh
./bin/jobmanager.sh start z05c19426.sqa.zth.tbsite.net 8081

Run the Flink Job in Yarn Mode

Figure 5. Flink Yarn Deployment Flowchart
  • Resources are used on-demand to improve the resource utilization of the cluster.
  • Tasks have priorities, and jobs are run according to those priorities.
  • Based on the Yarn dispatching system, the failover of each role is automatically processed.
  • The JobManager process and the TaskManager process are both monitored by the Yarn NodeManager.
  • If the JobManager process exits unexpectedly, the Yarn ResourceManager re-dispatches the JobManager to other hosts.
  • If the TaskManager process exits unexpectedly, the JobManager receives the message and requests resources from the Yarn ResourceManager again to restart the TaskManager process.

Start the Long-Running Flink Cluster on Yarn (Session Cluster Mode)

./bin/yarn-session.sh -h
./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m
  • -n,--container ~ Number of TaskManagers
  • -jm,--jobManagerMemory ~ Memory for JobManager Container with an optional unit (default: MB)
  • -tm,--taskManagerMemory ~ Memory per TaskManager Container with optional unit (default: MB)
    - -qu,--queue ~ Specify YARN queue.
  • -s,--slots ~ Number of slots per TaskManager
  • -t,--ship ~ Ship files in the specified directory (t for transfer)
./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
/bin/flink run -yid application_1548056325049_0048 examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
slotmanager.taskmanager-timeout: 30000L         # deprecated, used in release-1.5
resourcemanager.taskmanager-timeout: 30000L

Run a Single Flink Job on Yarn (Job Cluster Mode)

./bin/flink run -m yarn-cluster -yn 2 examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
  • -yn,--yarncontainer ~ Number of Task Managers
  • -yqu,--yarnqueue ~ Specify YARN queue.
  • -ys,--yarnslots ~ Number of slots per TaskManager
  • -yqu,--yarnqueue ~ Specify YARN queue.
./bin/flink run -h
  • The -n and -yn parameters do not have actual control function in the Community version (Release 1.5 — Release 1.7). The actual resources are requested based on the -p parameter, and the TM will return the resources after using them.
  • In the open-source version of Blink, the -n parameter (in Yarn Session mode) is used to start a specified number of TaskManagers at the beginning. Later, it will not apply for new TaskManagers even if the job requires more slots.
  • In the open-source version of Blink, the -yn parameter (in Yarn Single Job mode) indicates the initial number of TaskManagers, without setting the maximum number of TaskManagers. Note that the Single Job mode is only used if the -yd parameter is added (for example, the command ./bin/flink run -yd -m yarn-cluster xxx).

Configure High Availability in Yarn Mode

<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>100</value>
</property>
yarn.application-attempts: 10     # 1+ 9 retries
high-availability: zookeeper
high-availability.zookeeper.quorum z05f02321.sqa.zth.tbsite.net:2181,z05f10215.sqa.zth.tbsite.net:2181
high-availability.zookeeper.path.root: /test_dir/test_standalone2_root
high-availability.cluster-id: /test_dir/test_standalone2
high-availability.storageDir: hdfs:///test_dir/recovery2/

Original Source:

--

--

--

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Hackathon…..what?

Rise of the Streaming Databases — Episode 2 : Apache Pinot

Breaking the Limits of Relational Databases: An Analysis of Cloud-Native Database Middleware (1)

How to Install and Configure OTRS on Ubuntu 16.04

PocketQube Workshop at Institute of Engineering, Western Region Campus

Tonkean Summer Update 2020

Working with AWS DynamoDB using Lambda and C#

Bit stock floor fly.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

More from Medium

Understanding Docker Architecture

Understanding Docker Architecture

Optimistic vs. Pessimistic Database Locking

DataBase 101

Database — Introduction