Apache Flink Fundamentals: Five Modes of Client Operations

By Zhou Kaibo

Environment

The previous tutorials in the series describe how to build a Flink development environment and how to deploy and run Flink applications. This tutorial talks about Flink client operations and focuses on actual operations. For this tutorial, we’re using the Flink 1.7.2 community version, the Mac operating system, and the Google Chrome browser.

Overview

The following figure shows how Flink provides a variety of client operations to submit and interact with tasks, including the Flink command line, Scala Shell, SQL Client, Restful API, and Web. Among them, the most important is the command line, followed by the SQL Client for submitting SQL tasks and the Scala Shell for submitting Table API tasks. Flink also provides Restful services that can be called over HTTP. In addition, you can submit tasks through the Web.

Image for post
Image for post
Image for post
Image for post

Flink Client Operations

Flink Command Line

The Flink command line has many parameters. Enter flink -h to see the complete description.

flink-1.7.2 bin/flink -h
flink-1.7.2 bin/flink run -h

Standalone

Start a standalone cluster.

flink-1.7.2 bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host zkb-MBP.local.
Starting taskexecutor daemon on host zkb-MBP.local.

Run

It helps to run a task. Take TopSpeedWindowing, as an example.

flink-1.7.2 bin/flink run -d examples/streaming/TopSpeedWindowing.jar
Starting execution of a program
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify the output path.
Job has been submitted with JobID 5e20cb6b0f357591171dfcca2eea09de
Image for post
Image for post
Image for post
Image for post
Image for post
Image for post

List

It is used to view the task list.

flink-1.7.2 bin/flink list -m 127.0.0.1:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
24.03.2019 10:14:06 : 5e20cb6b0f357591171dfcca2eea09de : CarTopSpeedWindowingExample (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

Stop

It helps to stop a task. Use the -m parameter to specify the host address and port of the job manager to be stopped.

flink-1.7.2 bin/flink stop -m 127.0.0.1:8081 d67420e52bd051fae2fddbaa79e046bb
Stopping job d67420e52bd051fae2fddbaa79e046bb.
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.util.FlinkException: Could not stop the job d67420e52bd051fae2fddbaa79e046bb.
at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:554)
at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:547)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1062)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Job termination (STOP) failed: This job is not stoppable.]
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.client.program.rest.RestClusterClient.stop(RestClusterClient.java:392)
at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:552)
... 9 more
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Job termination (STOP) failed: This job is not stoppable.]
at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:380)
at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:364)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
/**
* 需要能 stoppable 的函数必须实现这个接口,例如流式任务的 source。
* stop() 方法在任务收到 STOP 信号的时候调用。
* source 在接收到这个信号后,必须停止发送新的数据且优雅的停止。
*/
@PublicEvolving
public interface StoppableFunction {
/**
* 停止 source。与 cancel() 不同的是,这是一个让 source 优雅停止的请求。
* 等待中的数据可以继续发送出去,不需要立即停止。
*/
void stop();
}

Cancel

It is used to cancel a task. If state.savepoints.dir is configured in conf/flink-conf.yaml, savepoints will be saved; otherwise, it will not be saved.

flink-1.7.2 bin/flink cancel -m 127.0.0.1:8081 5e20cb6b0f357591171dfcca2eea09de

Cancelling job 5e20cb6b0f357591171dfcca2eea09de.
Cancelled job 5e20cb6b0f357591171dfcca2eea09de.
flink-1.7.2 bin/flink cancel -m 127.0.0.1:8081 -s /tmp/savepoint 29da945b99dea6547c3fbafd57ed8759

Cancelling job 29da945b99dea6547c3fbafd57ed8759 with savepoint to /tmp/savepoint.
Cancelled job 29da945b99dea6547c3fbafd57ed8759. Savepoint stored in file:/tmp/savepoint/savepoint-29da94-88299bacafb7.

flink-1.7.2 ll /tmp/savepoint/savepoint-29da94-88299bacafb7
total 32K
-rw-r--r-- 1 baoniu 29K Mar 24 10:33 _metadata
  • stop() call: A more elegant way to stop running streaming jobs. stop() is only applicable to jobs whose sources implement the StoppableFunction interface. When a user requests to stop a job, all the sources of the job will receive the stop() method call. The job will not end normally until all sources are properly closed. This way, all jobs are processed normally.

Savepoint

Trigger the savepoint.

flink-1.7.2 bin/flink savepoint -m 127.0.0.1:8081 ec53edcfaeb96b2a5dadbfbe5ff62bbb /tmp/savepoint
Triggering savepoint for job ec53edcfaeb96b2a5dadbfbe5ff62bbb.
Waiting for your response...
Savepoint completed. Path: file:/tmp/savepoint/savepoint-ec53ed-84b00ce500ee
You can resume your program from this savepoint with the run command.
  • Flink performs the savepoint process and saves the full state of the job, which makes each savepoint takes a longer time and produces more data. The user must trigger it manually. Use the savepoint for application version updates. For more details, refer to the document, bug fixes, and A/B Test.
flink-1.7.2 bin/flink run -d -s /tmp/savepoint/savepoint-f049ff-24ec0d3e0dc7 ./examples/streaming/TopSpeedWindowing.jar
Starting execution of program
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify the output path.
2019-03-28 10:30:53,957 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     
- Starting job 790d7b98db6f6af55d04aec1d773852d from savepoint /tmp/savepoint/savepoint-f049ff-24ec0d3e0dc7 ()
2019-03-28 10:30:53,959 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Reset the checkpoint ID of job 790d7b98db6f6af55d04aec1d773852d to 2.
2019-03-28 10:30:53,959 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Restoring job 790d7b98db6f6af55d04aec1d773852d from latest valid checkpoint: Checkpoint 1 @ 0 for 790d7b98db6f6af55d04aec1d773852d.

Modify

Modify the parallelism of a task.

taskmanager.numberOfTaskSlots: 4
state.savepoints.dir: file:///tmp/savepoint
flink-1.7.2 bin/stop-cluster.sh && bin/start-cluster.sh
Stopping taskexecutor daemon (pid: 53139) on host zkb-MBP.local.
Stopping standalonesession daemon (pid: 52723) on host zkb-MBP.local.
Starting cluster.
Starting standalonesession daemon on host zkb-MBP.local.
Starting taskexecutor daemon on host zkb-MBP.local.

flink-1.7.2 bin/flink run -d examples/streaming/TopSpeedWindowing.jar
Starting execution of program
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify the output path.
Job has been submitted with JobID 7752ea7b0e7303c780de9d86a5ded3fa
Image for post
Image for post
Image for post
Image for post
flink-1.7.2 bin/flink modify -p 4 7752ea7b0e7303c780de9d86a5ded3fa
Modify job 7752ea7b0e7303c780de9d86a5ded3fa.
Rescaled job 7752ea7b0e7303c780de9d86a5ded3fa. Its new parallelism is 4.

flink-1.7.2 ll /tmp/savepoint
total 0
drwxr-xr-x 3 baoniu 96 Jun 17 09:05 savepoint-7752ea-00c05b015836/

flink-1.7.2 bin/flink modify -p 3 7752ea7b0e7303c780de9d86a5ded3fa
Modify job 7752ea7b0e7303c780de9d86a5ded3fa.
Rescaled job 7752ea7b0e7303c780de9d86a5ded3fa. Its new parallelism is 3.

flink-1.7.2 ll /tmp/savepoint
total 0
drwxr-xr-x 3 baoniu 96 Jun 17 09:08 savepoint-7752ea-449b131b2bd4/
Image for post
Image for post
2019-06-17 09:05:11,179 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Starting job 7752ea7b0e7303c780de9d86a5ded3fa from savepoint file:/tmp/savepoint/savepoint-790d7b-3581698f007e ()
2019-06-17 09:05:11,182 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Reset the checkpoint ID of job 7752ea7b0e7303c780de9d86a5ded3fa to 3.
2019-06-17 09:05:11,182 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring job 790d7b98db6f6af55d04aec1d773852d from latest valid checkpoint: Checkpoint 2 @ 0 for 7752ea7b0e7303c780de9d86a5ded3fa.
2019-06-17 09:05:11,184 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - No master state to restore
2019-06-17 09:05:11,184 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job CarTopSpeedWindowingExample (7752ea7b0e7303c780de9d86a5ded3fa) switched from state RUNNING to SUSPENDING.
org.apache.flink.util.FlinkException: Job is being rescaled.

Info

Use the Info command to view the execution plan (StreamGraph) of Flink tasks.

flink-1.7.2 bin/flink info examples/streaming/TopSpeedWindowing.jar
----------------------- Execution Plan -----------------------
{"nodes":[{"id":1,"type":"Source: Custom Source","pact":"Data Source","contents":"Source: Custom Source","parallelism":1},{"id":2,"type":"Timestamps/Watermarks","pact":"Operator","contents":"Timestamps/Watermarks","parallelism":1,"predecessors":[{"id":1,"ship_strategy":"FORWARD","side":"second"}]},{"id":4,"type":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","pact":"Operator","contents":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","parallelism":1,"predecessors":[{"id":2,"ship_strategy":"HASH","side":"second"}]},{"id":5,"type":"Sink: Print to Std. Out","pact":"Data Sink","contents":"Sink: Print to Std. Out","parallelism":1,"predecessors":[{"id":4,"ship_strategy":"FORWARD","side":"second"}]}]}
--------------------------------------------------------------
Image for post
Image for post
Image for post
Image for post

Yarn Per-Job

Attach Mode for a Single Task

The default mode is Attach, which means that the client will not exit until the application completes.

  • The name “Flink session cluster” is displayed on Yarn, and FINISHED will be displayed when the WordCount task of this batch is finished.
  • View the resulting output on the client.
[admin@z17.sqa.zth /home/admin/flink/flink-1.7.2]
$echo $HADOOP_CONF_DIR
/etc/hadoop/conf/

[admin@z17.sqa.zth /home/admin/flink/flink-1.7.2]
$./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar

2019-06-17 09:15:24,511 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050
2019-06-17 09:15:24,690 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-06-17 09:15:24,690 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-06-17 09:15:24,907 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=4}
2019-06-17 09:15:25,430 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2019-06-17 09:15:25,438 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
2019-06-17 09:15:36,239 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1532332183347_0724
2019-06-17 09:15:36,276 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1532332183347_0724
2019-06-17 09:15:36,276 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated
2019-06-17 09:15:36,281 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED
2019-06-17 09:15:40,426 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully.
Starting execution of program
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
... ...
(would,2)
(wrong,1)
(you,1)
Program execution finished
Job with JobID 8bfe7568cb5c3254af30cbbd9cd5971e has finished.
Job Runtime: 9371 ms
Accumulator Results:
- 2bed2c5506e9237fb85625416a1bc508 (java.util.ArrayList) [170 elements]
Image for post
Image for post
Image for post
Image for post
./bin/flink run -m yarn-cluster ./examples/streaming/TopSpeedWindowing.jar

Detach Mode for a Single Task

  • In the Detached mode, the client exits after submitting the task.
  • The name “Flink per-job cluster” is displayed on Yarn.
$./bin/flink run -yd -m yarn-cluster ./examples/streaming/TopSpeedWindowing.jar

2019-06-18 09:21:59,247 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050
2019-06-18 09:21:59,428 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-06-18 09:21:59,428 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-06-18 09:21:59,940 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=4}
2019-06-18 09:22:00,427 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2019-06-18 09:22:00,436 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
^@2019-06-18 09:22:12,113 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1532332183347_0729
2019-06-18 09:22:12,151 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1532332183347_0729
2019-06-18 09:22:12,151 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated
2019-06-18 09:22:12,155 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED
2019-06-18 09:22:16,275 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully.
2019-06-18 09:22:16,275 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - The Flink YARN client has been started in detached mode. In order to stop Flink on YARN, use the following command or a YARN web interface to stop it:
yarn application -kill application_1532332183347_0729
Please also note that the temporary files of the YARN session in the home directory will not be removed.
Job has been submitted with JobID e61b9945c33c300906ad50a9a11f36df
Image for post
Image for post
Image for post
Image for post

Yarn Session

Start a Session

./bin/yarn-session.sh -tm 2048 -s 3
flink-1.7.2 ./bin/yarn-session.sh -tm 2048 -s 3
2019-06-17 09:21:50,177 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost
2019-06-17 09:21:50,179 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123
2019-06-17 09:21:50,179 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.size, 1024m
2019-06-17 09:21:50,179 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.size, 1024m
2019-06-17 09:21:50,179 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 4
2019-06-17 09:21:50,179 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: state.savepoints.dir, file:///tmp/savepoint
2019-06-17 09:21:50,180 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1
2019-06-17 09:21:50,180 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: rest.port, 8081
2019-06-17 09:21:50,644 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-06-17 09:21:50,746 INFO org.apache.flink.runtime.security.modules.HadoopModule - Hadoop user set to baoniu (auth:SIMPLE)
2019-06-17 09:21:50,848 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050
2019-06-17 09:21:51,148 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=2048, numberTaskManagers=1, slotsPerTaskManager=3}
2019-06-17 09:21:51,588 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2019-06-17 09:21:51,596 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
^@2019-06-17 09:22:03,304 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1532332183347_0726
2019-06-17 09:22:03,336 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1532332183347_0726
2019-06-17 09:22:03,336 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated
2019-06-17 09:22:03,340 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED
2019-06-17 09:22:07,722 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully.
2019-06-17 09:22:08,050 INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started.
Flink JobManager is now running on z07.sqa.net:37109 with leader id 00000000-0000-0000-0000-000000000000.
JobManager Web Interface: http://z07.sqa.net:37109
  • Or use the -d parameter to set it to the Detached mode at startup. The name “Flink session cluster” is displayed on Yarn.
Image for post
Image for post
Image for post
Image for post
flink-1.7.2 cat /var/folders/2b/r6d49pcs23z43b8fqsyz885c0000gn/T/.yarn-properties-baoniu
#Generated YARN properties file
#Mon Jun 17 09:22:08 CST 2019
parallelism=3
dynamicPropertiesString=
applicationID=application_1532332183347_0726

Submit the Task

./bin/flink run ./examples/batch/WordCount.jar
flink-1.7.2 ./bin/flink run ./examples/batch/WordCount.jar
2019-06-17 09:26:42,767 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /var/folders/2b/r6d49pcs23z43b8fqsyz885c0000gn/T/.yarn-properties-baoniu.
2019-06-17 09:26:42,767 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /var/folders/2b/r6d49pcs23z43b8fqsyz885c0000gn/T/.yarn-properties-baoniu.
2019-06-17 09:26:43,058 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 3
2019-06-17 09:26:43,058 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 3
YARN properties set default parallelism to 3
2019-06-17 09:26:43,097 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050
2019-06-17 09:26:43,229 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-06-17 09:26:43,229 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-06-17 09:26:43,327 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Found application JobManager host name 'z05c07216.sqa.zth.tbsite.net' and port '37109' from supplied application id 'application_1532332183347_0726'
Starting execution of program
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
^@(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
... ...
(wrong,1)
(you,1)
Program execution finished
Job with JobID ad9b0f1feed6d0bf6ba4e0f18b1e65ef has finished.
Job Runtime: 9152 ms
Accumulator Results:
- fd07c75d503d0d9a99e4f27dd153114c (java.util.ArrayList) [170 elements]
Image for post
Image for post

Submit the Task to the Specified Session

Use the -yid parameter to submit the task to the specific session.

$./bin/flink run -d -p 30 -m yarn-cluster -yid application_1532332183347_0708 ./examples/streaming/TopSpeedWindowing.jar

2019-03-24 12:36:33,668 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050
2019-03-24 12:36:33,773 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-03-24 12:36:33,773 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-03-24 12:36:33,837 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Found application JobManager host name 'z05c05218.sqa.zth.tbsite.net' and port '60783' from supplied application id 'application_1532332183347_0708'
Starting execution of program
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 58d5049ebbf28d515159f2f88563f5fd
Image for post
Image for post
  • For the Blink session, use the -n parameter to specify the number of TMs to be started, and these TMs will start in advance.

Scala Shell

For detailed information, refer to the Official document

Deploy

Local

$bin/start-scala-shell.sh local
Starting Flink Shell:
Starting local Flink cluster (host: localhost, port: 8081).
Connecting to Flink cluster (host: localhost, port: 8081).
... ...
scala>
  • The senv variable is built into the streaming task. The task is submitted through senv.execute("job name"), and the output of the DataStream is only printed to the console in Local mode.

Remote

Start a Yarn session cluster first.

$./bin/yarn-session.sh  -tm 2048 -s 32019-03-25 09:52:16,341 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, localhost
2019-03-25 09:52:16,342 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123
2019-03-25 09:52:16,342 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.size, 1024m
2019-03-25 09:52:16,343 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.size, 1024m
2019-03-25 09:52:16,343 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 4
2019-03-25 09:52:16,343 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1
2019-03-25 09:52:16,343 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: state.savepoints.dir, file:///tmp/savepoint
2019-03-25 09:52:16,343 INFO org.apache.flink.configuration.GlobalConfiguration
⋯ ...
Flink JobManager is now running on z054.sqa.net:28665 with leader id 00000000-0000-0000-0000-000000000000.
JobManager Web Interface: http://z054.sqa.net:28665
$bin/start-scala-shell.sh remote z054.sqa.net 28665
Starting Flink Shell:
Connecting to Flink cluster (host: z054.sqa.net, port: 28665).
... ...
scala>

Yarn

$./bin/start-scala-shell.sh yarn -n 2 -jm 1024 -s 2 -tm 1024 -nm flink-yarnStarting Flink Shell:
2019-03-25 09:47:44,695 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost
2019-03-25 09:47:44,697 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123
2019-03-25 09:47:44,697 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.size, 1024m
2019-03-25 09:47:44,697 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.size, 1024m
2019-03-25 09:47:44,697 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 4
2019-03-25 09:47:44,698 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1
2019-03-25 09:47:44,698 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: state.savepoints.dir, file:///tmp/savepoint
2019-03-25 09:47:44,698 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: rest.port, 8081
2019-03-25 09:47:44,717 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-admin.
2019-03-25 09:47:45,041 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050
2019-03-25 09:47:45,098 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-03-25 09:47:45,266 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-03-25 09:47:45,275 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - The argument yn is deprecated in will be ignored.
2019-03-25 09:47:45,357 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=2, slotsPerTaskManager=2}
2019-03-25 09:47:45,711 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2019-03-25 09:47:45,718 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/home/admin/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
2019-03-25 09:47:46,514 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1532332183347_0710
2019-03-25 09:47:46,534 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1532332183347_0710
2019-03-25 09:47:46,534 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated
2019-03-25 09:47:46,535 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED
2019-03-25 09:47:51,051 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully.
2019-03-25 09:47:51,222 INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started.
Connecting to Flink cluster (host: 10.10.10.10, port: 56942).
Image for post
Image for post

Execute

DataSet

flink-1.7.2 bin/stop-cluster.sh
No taskexecutor daemon to stop on host zkb-MBP.local.
No standalonesession daemon to stop on host zkb-MBP.local.
flink-1.7.2 bin/start-scala-shell.sh local
Starting Flink Shell:
Starting local Flink cluster (host: localhost, port: 8081).
Connecting to Flink cluster (host: localhost, port: 8081).
scala> val text = benv.fromElements("To be, or not to be,--that is the question:--")
text: org.apache.flink.api.scala.DataSet[String] = org.apache.flink.api.scala.DataSet@5b407336
scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)
counts: org.apache.flink.api.scala.AggregateDataSet[(String, Int)] = org.apache.flink.api.scala.AggregateDataSet@6ee34fe4
scala> counts.print()
(be,2)
(is,1)
(not,1)
(or,1)
(question,1)
(that,1)
(the,1)
(to,2)
Image for post
Image for post
scala> counts.writeAsText("/tmp/out1")
res1: org.apache.flink.api.java.operators.DataSink[(String, Int)] = DataSink '<unnamed>' (TextOutputFormat (/tmp/out1) - UTF-8)
scala> benv.execute("batch test")
res2: org.apache.flink.api.common.JobExecutionResult = org.apache.flink.api.common.JobExecutionResult@737652a9
flink-1.7.2 cat /tmp/out1
(be,2)
(is,1)
(not,1)
(or,1)
(question,1)
(that,1)
(the,1)
(to,2)
Image for post
Image for post

DataSteam

scala> val textStreaming = senv.fromElements("To be, or not to be,--that is the question:--")
textStreaming: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@4970b93d
scala> val countsStreaming = textStreaming.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.keyBy(0).sum(1)
countsStreaming: org.apache.flink.streaming.api.scala.DataStream[(String, Int)] = org.apache.flink.streaming.api.scala.DataStream@6a478680
scala> countsStreaming.print()
res3: org.apache.flink.streaming.api.datastream.DataStreamSink[(String, Int)] = org.apache.flink.streaming.api.datastream.DataStreamSink@42bfc11f
scala> senv.execute("Streaming Wordcount")
(to,1)
(be,1)
(or,1)
(not,1)
(to,2)
(be,2)
(that,1)
(is,1)
(the,1)
(question,1)
res4: org.apache.flink.api.common.JobExecutionResult = org.apache.flink.api.common.JobExecutionResult@1878815a
Image for post
Image for post
Image for post
Image for post

Table API

The open-source version of Blink supports submitting tasks through the Table API (you can submit SQL queries by using btenv.sqlQuery). The community version of Flink 1.8 supports the Table API

SQL Client Beta

SQL Client is currently available only as a beta version, so it’s still under development. Use it only for prototype verification of SQL, and don’t use it for production environments.

Basic Usage

flink-1.7.2 bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host zkb-MBP.local.
Starting taskexecutor daemon on host zkb-MBP.local.
flink-1.7.2 ./bin/sql-client.sh embedded
No default environment specified.
Searching for '/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf/sql-client-defaults.yaml'...found.
Reading default environment from: file:/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf/sql-client-defaults.yaml
No session environment specified.
Validating current environment...done.
⋯ ⋯
Flink SQL> help;
The following commands are available:
QUIT Quits the SQL CLI client.
CLEAR Clears the current terminal.
HELP Prints the available commands.
SHOW TABLES Shows all registered tables.
SHOW FUNCTIONS Shows all registered user-defined functions.
DESCRIBE Describes the schema of a table with the given name.
EXPLAIN Describes the execution plan of a query or table with the given name.
SELECT Executes a SQL SELECT query on the Flink cluster.
INSERT INTO Inserts the results of a SQL SELECT query into a declared table sink.
CREATE VIEW Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'
DROP VIEW Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'
SOURCE Reads a SQL SELECT query from a file and executes it on the Flink cluster.
SET Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties.
RESET Resets all session configuration properties.
Hint: Make sure that a statement ends with ';' for finalizing (multi-line) statements.

SELECT Query

Flink SQL> SELECT 'Hello World';
Image for post
Image for post
Image for post
Image for post
Image for post
Image for post

Explain

View the SQL execution plan via Explain command.

Flink SQL> explain SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;== Abstract Syntax Tree ==        // 抽象语法树
LogicalAggregate(group=[{0}], cnt=[COUNT()])
LogicalValues(tuples=[[{ _UTF-16LE'Bob ' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg ' }, { _UTF-16LE'Bob ' }]])
== Optimized Logical Plan == // 优化后的逻辑执行计划
DataStreamGroupAggregate(groupBy=[name], select=[name, COUNT(*) AS cnt])
DataStreamValues(tuples=[[{ _UTF-16LE'Bob ' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg ' }, { _UTF-16LE'Bob ' }]])
== Physical Execution Plan == // 物理执行计划
Stage 3 : Data Source
content : collect elements with CollectionInputFormat
Stage 5 : Operator
content : groupBy: (name), select: (name, COUNT(*) AS cnt)
ship_strategy : HASH

Results

SQL Client supports two modes to maintain and display query results.

SET execution.result-mode=table
SET execution.result-mode=changelog

Table Mode

Flink SQL> SET execution.result-mode=table;
[INFO] Session property has been set.
Flink SQL> SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
Image for post
Image for post
Image for post
Image for post
Image for post
Image for post

Changlog mode

Flink SQL> SET execution.result-mode=changelog;
[INFO] Session property has been set.
Flink SQL> SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
Image for post
Image for post
Image for post
Image for post
Image for post
Image for post

Environment Files

Currently, the SQL Client does not support DDL statements. Use a YAML file to define information, such as tables, UDFs, and runtime parameters required for SQL queries.

flink-1.7.2 cat /tmp/env.yaml
tables:
- name: MyTableSource
type: source-table
update-mode: append
connector:
type: filesystem
path: "/tmp/input.csv"
format:
type: csv
fields:
- name: MyField1
type: INT
- name: MyField2
type: VARCHAR
line-delimiter: "\n"
comment-prefix: "#"
schema:
- name: MyField1
type: INT
- name: MyField2
type: VARCHAR
- name: MyCustomView
type: view
query: "SELECT MyField2 FROM MyTableSource"
- name: MyTableSink
type: sink-table
update-mode: append
connector:
type: filesystem
path: "/tmp/output.csv"
format:
type: csv
fields:
- name: MyField1
type: INT
- name: MyField2
type: VARCHAR
schema:
- name: MyField1
type: INT
- name: MyField2
type: VARCHAR
execution:
type: streaming # required: execution mode either 'batch' or 'streaming'
result-mode: table # required: either 'table' or 'changelog'
max-table-result-rows: 1000000 # optional: maximum number of maintained rows in
# 'table' mode (1000000 by default, smaller 1 means unlimited)
time-characteristic: event-time # optional: 'processing-time' or 'event-time' (default)
parallelism: 1 # optional: Flink's parallelism (1 by default)
periodic-watermarks-interval: 200 # optional: interval for periodic watermarks (200 ms by default)
max-parallelism: 16 # optional: Flink's maximum parallelism (128 by default)
min-idle-state-retention: 0 # optional: table program's minimum idle state time
max-idle-state-retention: 0 # optional: table program's maximum idle state time
restart-strategy: # optional: restart strategy
type: fallback # "fallback" to global restart strategy by default
deployment:
response-timeout: 5000
flink-1.7.2 cat /tmp/input.csv
1,hello
2,world
3,hello world
1,ok
3,bye bye
4,yes
flink-1.7.2 ./bin/sql-client.sh embedded -e /tmp/env.yaml
No default environment specified.
Searching for '/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf/sql-client-defaults.yaml'...found.
Reading default environment from: file:/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf/sql-client-defaults.yaml
Reading session environment from: file:/tmp/env.yaml
Validating current environment...done.
Flink SQL> show tables;
MyCustomView
MyTableSink
MyTableSource
Flink SQL> describe MyTableSource;
root
|-- MyField1: Integer
|-- MyField2: String
Flink SQL> describe MyCustomView;
root
|-- MyField2: String
Flink SQL> create view MyView1 as select MyField1 from MyTableSource;
[INFO] View has been created.
Flink SQL> show tables;
MyCustomView
MyTableSource
MyView1
Flink SQL> describe MyView1;
root
|-- MyField1: Integer
Flink SQL> select * from MyTableSource;
Image for post
Image for post
Image for post
Image for post
Image for post
Image for post
Flink SQL> insert into MyTableSink select * from MyTableSource;
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Cluster ID: StandaloneClusterId
Job ID: 3fac2be1fd891e3e07595c684bb7b7a0
Web interface: http://localhost:8081
Image for post
Image for post
Image for post
Image for post
flink-1.7.2 cat /tmp/output.csv
1,hello
2,world
3,hello world
1,ok
3,bye bye
4,yes

Restful API

The following describes how to use the Rest API to submit Jar packages and execute tasks. For more detailed operations, see the Flink Restful API document.

flink-1.7.2 curl http://127.0.0.1:8081/overview
{"taskmanagers":1,"slots-total":4,"slots-available":0,"jobs-running":3,"jobs-finished":0,"jobs-cancelled":0,"jobs-failed":0,"flink-version":"1.7.2","flink-commit":"ceba8af"}%
flink-1.7.2 curl -X POST -H "Expect:" -F "jarfile=@/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/examples/streaming/TopSpeedWindowing.jar" http://127.0.0.1:8081/jars/upload
{"filename":"/var/folders/2b/r6d49pcs23z43b8fqsyz885c0000gn/T/flink-web-124c4895-cf08-4eec-8e15-8263d347efc2/flink-web-upload/6077eca7-6db0-4570-a4d0-4c3e05a5dc59_TopSpeedWindowing.jar","status":"success"}%
flink-1.7.2 curl http://127.0.0.1:8081/jars
{"address":"http://localhost:8081","files":[{"id":"6077eca7-6db0-4570-a4d0-4c3e05a5dc59_TopSpeedWindowing.jar","name":"TopSpeedWindowing.jar","uploaded":1553743438000,"entry":[{"name":"org.apache.flink.streaming.examples.windowing.TopSpeedWindowing","description":null}]}]}%

flink-1.7.2 curl http://127.0.0.1:8081/jars/6077eca7-6db0-4570-a4d0-4c3e05a5dc59_TopSpeedWindowing.jar/plan
{"plan":{"jid":"41029eb3feb9132619e454ec9b2a89fb","name":"CarTopSpeedWindowingExample","nodes":[{"id":"90bea66de1c231edf33913ecd54406c1","parallelism":1,"operator":"","operator_strategy":"","description":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction) -> Sink: Print to Std. Out","inputs":[{"num":0,"id":"cbc357ccb763df2852fee8c4fc7d55f2","ship_strategy":"HASH","exchange":"pipelined_bounded"}],"optimizer_properties":{}},{"id":"cbc357ccb763df2852fee8c4fc7d55f2","parallelism":1,"operator":"","operator_strategy":"","description":"Source: Custom Source -> Timestamps/Watermarks","optimizer_properties":{}}]}}% flink-1.7.2 curl -X POST http://127.0.0.1:8081/jars/6077eca7-6db0-4570-a4d0-4c3e05a5dc59_TopSpeedWindowing.jar/run
{"jobid":"04d80a24b076523d3dc5fbaa0ad5e1ad"}%
Image for post
Image for post
Image for post
Image for post

Web

On the left side of the Flink Dashboard page, click ‘Submit’ new Job. Upload Jar packages, display execution plans and submit tasks. beginners usually use the Web submission function to get started with a quick demonstration.

Image for post
Image for post

Conclusion

This tutorial comprehensively covers five methods of submitting tasks in Flink. It illustrates several Flink client operations with great detail.

Original Source:

Written by

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store