Apache Flink Fundamentals: Building a Development Environment and Configure, Deploy and Run Applications

By Sha Shengyang


Flink is an open-source big data project with Java and Scala as development languages. It provides open-source code on GitHub and uses Maven to compile and build the project. Java, Maven, and Git are essential tools for most Flink users. In addition, a powerful integrated development environment (IDE) helps to read code, develop new functions and fix bugs faster. While this article doesn’t include the installation details of each tool, it provides the necessary installation suggestions.

  • How to run the Flink application (including local Flink cluster mode, Standalone cluster mode, and Yarn cluster mode).

Deploy and Configure Flink Development Environment

Use Mac OS, Linux or Windows systems in the development and testing environment. If you are using Windows 10, we recommend using the Windows 10 subsystem for Linux to compile and run.

Compile Flink Code

After configuring the above-mentioned tools, execute the following commands to simply compile Flink.

mvn clean install -DskipTests
# Or
mvn clean package -DskipTests
-Dfast    This is mainly to ignore the compilation of QA plugins and JavaDocs.
-Dhadoop.version=2.6.1 To specify the Hadoop version
--settings=${maven_file_path} To explicitly specify the maven settings.xml configuration file

Prepare Development Environment

The IntelliJ IDEA IDE is the recommended IDE tool for Flink. The Eclipse IDE is not officially recommended, mainly because Scala IDE in Eclipse is incompatible with Scala in Flink.

Run Flink Application

Basic Concepts

It is simple to run a Flink application. However, before running a Flink application, it is necessary to understand the components of the Flink runtime, because this involves the configuration of the Flink application. Figure 1 shows a data processing program written with the DataStream API. The operators that cannot be chained in a DAG Graph are separated into different tasks. Tasks are the smallest unit of resource dispatching in Flink.

Figure 1. Parallel Dataflows
  • TaskManager (also known as Worker) executes tasks in the Dataflow figure. This includes allocating memory buffers and transferring data streams.
Figure 2. Flink Runtime Architecture Diagram
Figure 3. Process

Prepare Runtime Environment

Preparing a runtime environment includes the following:

  • Download the Flink binary package from the Flink official website or compile Flink binary from Flink source code.
  • Install Java and configure JAVA_HOME environment variables.

Run Flink in Local Flink Cluster Mode

Basic Startup Process

The simplest way to run a Flink application is to run it in the local Flink cluster mode.

./bin/flink run examples/streaming/WordCount.jar
./bin/flink run examples/streaming/WordCount.jar --input ${your_source_file}

Common Configurations

Use the ‘- conf/slaves’ configuration to configure the TaskManager deployment. By default, only one TaskManager process starts. To add a TaskManager process, add a “localhost” line to the file.

./bin/taskmanager.sh start|start-foreground|stop|stop-all- conf/flink-conf.yaml
jobmanager.heap.mb: 1024
taskmanager.heap.mb: 1024
taskmanager.numberOfTaskSlots: 4
taskmanager.managed.memory.size: 256
TotalHeapMemory = taskmanager.heap.mb + taskmanager.managed.memory.size + taskmanager.process.heap.memory.mb(the default value is 128MB)
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m

View and Configure Logs

The startup logs of JobManager and TaskManager are available in the log subdirectory under the Flink binary directory. The files prefixed with flink-${user}-standalonesession-${id}-${hostname} in the log directory correspond to the output of JobManager. These include the following three files:

  • flink-${user}-standalonesession-${id}-${hostname}.out: the stdout output during process execution
  • flink-${user}-standalonesession-${id}-${hostname}-gc.log: the GC log for JVM
  • log4j-yarn-session.properties: The log configurations used by the Flink command-line client while starting a YARN session (yarn-session.sh).
  • log4j.properties: Whether in Standalone or Yarn mode, the log configuration used on JobManager and TaskManager is log4j.properties.
  • log4j-yarn-session.properties -> logback-yarn.xml
  • log4j.properties -> logback.xml

Moving Ahead

Repeat the ./bin/start-cluster.sh command and check the Web page (or execute the jps command) to see what happens.

Deploy the Flink Standalone Cluster on Multiple Hosts

Note the following key points before deployment.

  • The Flink binary directory deployed on each host must be the same directory.
  • If you need to use HDFS, configure HADOOP_CONF_DIR environment variables.
jobmanager.rpc.address: z05f06378.sqa.zth.tbsite.net
./bin/flink run examples/streaming/WordCount.jar
hdfs dfs -copyFromLocal story /test_dir/input_dir/story
./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output --parallelism 20

Deploy and Configure High Availability (HA) in Standalone Mode

In Figure 2, the Flink runtime architecture shows that the JobManager is the most likely role in the entire system that causes the system unavailability. In case a TaskManager fails, if there are enough resources, you only need to dispatch related tasks to other idle task slots, and then recover the job from the checkpoint.

Figure 4. Flink JobManager HA diagram
  • HDFS is needed in the following experiment. Hence, download the Flink Binary package with Hadoop support.

Use Standard Flink Script to Deploy ZooKeeper (Optional)

Flink currently supports ZooKeeper-based HA. If ZK is not deployed in your cluster, Flink provides a script to start the ZooKeeper cluster. First, modify the configuration file conf/zoo.cfg, and configure the server.X=addressX:peerPort:leaderPort based on the number of ZooKeeper server hosts that you want to deploy. "X" is the unique ID of the ZooKeeper server and must be a number.

# <em>The port at which the clients will connect</em>

Modify Configuration of Flink Standalone Cluster

Modify the conf/masters file and add a JobManager.

$cat conf/masters
$cat conf/slaves
high-availability: zookeeper
high-availability.zookeeper.quorum z05f02321.sqa.zth.tbsite.net:2181,z05f10215.sqa.zth.tbsite.net:2181
high-availability.zookeeper.path.root: /test_dir/test_standalone2_root
high-availability.cluster-id: /test_dir/test_standalone2
high-availability.storageDir: hdfs:///test_dir/recovery2/
./bin/jobmanager.sh start z05c19426.sqa.zth.tbsite.net 8081

Run the Flink Job in Yarn Mode

Figure 5. Flink Yarn Deployment Flowchart
  • Tasks have priorities, and jobs are run according to those priorities.
  • Based on the Yarn dispatching system, the failover of each role is automatically processed.
  • The JobManager process and the TaskManager process are both monitored by the Yarn NodeManager.
  • If the JobManager process exits unexpectedly, the Yarn ResourceManager re-dispatches the JobManager to other hosts.
  • If the TaskManager process exits unexpectedly, the JobManager receives the message and requests resources from the Yarn ResourceManager again to restart the TaskManager process.

Start the Long-Running Flink Cluster on Yarn (Session Cluster Mode)

View command parameters.

./bin/yarn-session.sh -h
./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m
  • -jm,--jobManagerMemory ~ Memory for JobManager Container with an optional unit (default: MB)
  • -tm,--taskManagerMemory ~ Memory per TaskManager Container with optional unit (default: MB)
    - -qu,--queue ~ Specify YARN queue.
  • -s,--slots ~ Number of slots per TaskManager
  • -t,--ship ~ Ship files in the specified directory (t for transfer)
./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
/bin/flink run -yid application_1548056325049_0048 examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
slotmanager.taskmanager-timeout: 30000L         # deprecated, used in release-1.5
resourcemanager.taskmanager-timeout: 30000L

Run a Single Flink Job on Yarn (Job Cluster Mode)

Run the following command if you only want to run a single Flink job and then exit.

./bin/flink run -m yarn-cluster -yn 2 examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
  • -yqu,--yarnqueue ~ Specify YARN queue.
  • -ys,--yarnslots ~ Number of slots per TaskManager
  • -yqu,--yarnqueue ~ Specify YARN queue.
./bin/flink run -h
  • In the open-source version of Blink, the -n parameter (in Yarn Session mode) is used to start a specified number of TaskManagers at the beginning. Later, it will not apply for new TaskManagers even if the job requires more slots.
  • In the open-source version of Blink, the -yn parameter (in Yarn Single Job mode) indicates the initial number of TaskManagers, without setting the maximum number of TaskManagers. Note that the Single Job mode is only used if the -yd parameter is added (for example, the command ./bin/flink run -yd -m yarn-cluster xxx).

Configure High Availability in Yarn Mode

First, make sure that the configuration in the yarn-site.xml file is used to start the Yarn cluster. This configuration is the upper limit for restarting the YARN cluster-level AM.

yarn.application-attempts: 10     # 1+ 9 retries
high-availability: zookeeper
high-availability.zookeeper.quorum z05f02321.sqa.zth.tbsite.net:2181,z05f10215.sqa.zth.tbsite.net:2181
high-availability.zookeeper.path.root: /test_dir/test_standalone2_root
high-availability.cluster-id: /test_dir/test_standalone2
high-availability.storageDir: hdfs:///test_dir/recovery2/

Original Source:

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com