Apache Flink Fundamentals: Five Modes of Client Operations

Alibaba Cloud
23 min readJan 14, 2020

--

By Zhou Kaibo

Environment

The previous tutorials in the series describe how to build a Flink development environment and how to deploy and run Flink applications. This tutorial talks about Flink client operations and focuses on actual operations. For this tutorial, we’re using the Flink 1.7.2 community version, the Mac operating system, and the Google Chrome browser.

Overview

The following figure shows how Flink provides a variety of client operations to submit and interact with tasks, including the Flink command line, Scala Shell, SQL Client, Restful API, and Web. Among them, the most important is the command line, followed by the SQL Client for submitting SQL tasks and the Scala Shell for submitting Table API tasks. Flink also provides Restful services that can be called over HTTP. In addition, you can submit tasks through the Web.

Files, such as Flink, start-scala-shell.sh, and sql-client.sh, are displayed under the bin directory of the Flink installation directory. These files are portals for client operations.

Flink Client Operations

Flink Command Line

The Flink command line has many parameters. Enter flink -h to see the complete description.

flink-1.7.2 bin/flink -h

To view the parameters of a command, such as the Run command, enter run -h.

flink-1.7.2 bin/flink run -h

This article describes common operations. For more information, see Official Flink command-line document.

Standalone

Start a standalone cluster.

flink-1.7.2 bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host zkb-MBP.local.
Starting taskexecutor daemon on host zkb-MBP.local.

To see the Web interface, visit: http://127.0.0.1:8081

Run

It helps to run a task. Take TopSpeedWindowing, as an example.

flink-1.7.2 bin/flink run -d examples/streaming/TopSpeedWindowing.jar
Starting execution of a program
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify the output path.
Job has been submitted with JobID 5e20cb6b0f357591171dfcca2eea09de

After it runs, the parallelism defaults to 1.

Click Task Manager on the left and then Stdout to view the output log.

You can also view the *.out file in the local log directory.

List

It is used to view the task list.

flink-1.7.2 bin/flink list -m 127.0.0.1:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
24.03.2019 10:14:06 : 5e20cb6b0f357591171dfcca2eea09de : CarTopSpeedWindowingExample (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

Stop

It helps to stop a task. Use the -m parameter to specify the host address and port of the job manager to be stopped.

flink-1.7.2 bin/flink stop -m 127.0.0.1:8081 d67420e52bd051fae2fddbaa79e046bb
Stopping job d67420e52bd051fae2fddbaa79e046bb.
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.util.FlinkException: Could not stop the job d67420e52bd051fae2fddbaa79e046bb.
at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:554)
at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:547)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1062)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Job termination (STOP) failed: This job is not stoppable.]
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.client.program.rest.RestClusterClient.stop(RestClusterClient.java:392)
at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:552)
... 9 more
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Job termination (STOP) failed: This job is not stoppable.]
at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:380)
at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:364)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

The log shows that the Stop command fails to execute. When a Job needs to be stopped, all sources must be stoppable. Hence, the StoppableFunction interface is implemented.

/**
* 需要能 stoppable 的函数必须实现这个接口,例如流式任务的 source。
* stop() 方法在任务收到 STOP 信号的时候调用。
* source 在接收到这个信号后,必须停止发送新的数据且优雅的停止。
*/
@PublicEvolving
public interface StoppableFunction {
/**
* 停止 source。与 cancel() 不同的是,这是一个让 source 优雅停止的请求。
* 等待中的数据可以继续发送出去,不需要立即停止。
*/
void stop();
}

Cancel

It is used to cancel a task. If state.savepoints.dir is configured in conf/flink-conf.yaml, savepoints will be saved; otherwise, it will not be saved.

flink-1.7.2 bin/flink cancel -m 127.0.0.1:8081 5e20cb6b0f357591171dfcca2eea09de

Cancelling job 5e20cb6b0f357591171dfcca2eea09de.
Cancelled job 5e20cb6b0f357591171dfcca2eea09de.

Also explicitly specify the savepoint directory while stopping the task.

flink-1.7.2 bin/flink cancel -m 127.0.0.1:8081 -s /tmp/savepoint 29da945b99dea6547c3fbafd57ed8759

Cancelling job 29da945b99dea6547c3fbafd57ed8759 with savepoint to /tmp/savepoint.
Cancelled job 29da945b99dea6547c3fbafd57ed8759. Savepoint stored in file:/tmp/savepoint/savepoint-29da94-88299bacafb7.

flink-1.7.2 ll /tmp/savepoint/savepoint-29da94-88299bacafb7
total 32K
-rw-r--r-- 1 baoniu 29K Mar 24 10:33 _metadata

The difference between canceling and stopping a streaming job.

  • cancel() call: To immediately call the cancel() method for the job operators to cancel them as soon as possible. If the operators do not stop after receiving the cancel () call, Flink periodically interrupts the execution of operator threads until all operators are stopped.
  • stop() call: A more elegant way to stop running streaming jobs. stop() is only applicable to jobs whose sources implement the StoppableFunction interface. When a user requests to stop a job, all the sources of the job will receive the stop() method call. The job will not end normally until all sources are properly closed. This way, all jobs are processed normally.

Savepoint

Trigger the savepoint.

flink-1.7.2 bin/flink savepoint -m 127.0.0.1:8081 ec53edcfaeb96b2a5dadbfbe5ff62bbb /tmp/savepoint
Triggering savepoint for job ec53edcfaeb96b2a5dadbfbe5ff62bbb.
Waiting for your response...
Savepoint completed. Path: file:/tmp/savepoint/savepoint-ec53ed-84b00ce500ee
You can resume your program from this savepoint with the run command.

Note: To know the difference between the savepoint and the checkpoint, refer to the document for details.

  • Flink performs the checkpoint process and saves the incremental state of a job, which makes each checkpoint take a shorter time and produces fewer data. It triggers automatically as long as it is enabled in the application and does not need to be perceived by users. The checkpoint is used automatically when a job fails over and does not need to be specified by users.
  • Flink performs the savepoint process and saves the full state of the job, which makes each savepoint takes a longer time and produces more data. The user must trigger it manually. Use the savepoint for application version updates. For more details, refer to the document, bug fixes, and A/B Test.

Use the -s parameter to start from the specified savepoint.

flink-1.7.2 bin/flink run -d -s /tmp/savepoint/savepoint-f049ff-24ec0d3e0dc7 ./examples/streaming/TopSpeedWindowing.jar
Starting execution of program
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify the output path.

When you view the log of the job manager, you can see a log like below:

2019-03-28 10:30:53,957 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     
- Starting job 790d7b98db6f6af55d04aec1d773852d from savepoint /tmp/savepoint/savepoint-f049ff-24ec0d3e0dc7 ()
2019-03-28 10:30:53,959 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Reset the checkpoint ID of job 790d7b98db6f6af55d04aec1d773852d to 2.
2019-03-28 10:30:53,959 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Restoring job 790d7b98db6f6af55d04aec1d773852d from latest valid checkpoint: Checkpoint 1 @ 0 for 790d7b98db6f6af55d04aec1d773852d.

Modify

Modify the parallelism of a task.

To facilitate the demonstration, we modify the conf/flink-conf.yaml to change the number of task slots from the default of 1 to 4 and configure the savepoint directory. The Modify parameter is followed by -s to specify the savepoint path. (The current version may have bugs, indicating that the savepoint path cannot be identified.)

taskmanager.numberOfTaskSlots: 4
state.savepoints.dir: file:///tmp/savepoint

After modifying the parameters, restart the cluster to take effect before starting the task.

flink-1.7.2 bin/stop-cluster.sh && bin/start-cluster.sh
Stopping taskexecutor daemon (pid: 53139) on host zkb-MBP.local.
Stopping standalonesession daemon (pid: 52723) on host zkb-MBP.local.
Starting cluster.
Starting standalonesession daemon on host zkb-MBP.local.
Starting taskexecutor daemon on host zkb-MBP.local.

flink-1.7.2 bin/flink run -d examples/streaming/TopSpeedWindowing.jar
Starting execution of program
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify the output path.
Job has been submitted with JobID 7752ea7b0e7303c780de9d86a5ded3fa

On following the page, you can see that the number of task slots has changed to 4, and the default parallelism of the task is 1.

Run the Modify command to change the parallelism to 4 and 3, respectively. Note that each Modify command triggers a savepoint.

flink-1.7.2 bin/flink modify -p 4 7752ea7b0e7303c780de9d86a5ded3fa
Modify job 7752ea7b0e7303c780de9d86a5ded3fa.
Rescaled job 7752ea7b0e7303c780de9d86a5ded3fa. Its new parallelism is 4.

flink-1.7.2 ll /tmp/savepoint
total 0
drwxr-xr-x 3 baoniu 96 Jun 17 09:05 savepoint-7752ea-00c05b015836/

flink-1.7.2 bin/flink modify -p 3 7752ea7b0e7303c780de9d86a5ded3fa
Modify job 7752ea7b0e7303c780de9d86a5ded3fa.
Rescaled job 7752ea7b0e7303c780de9d86a5ded3fa. Its new parallelism is 3.

flink-1.7.2 ll /tmp/savepoint
total 0
drwxr-xr-x 3 baoniu 96 Jun 17 09:08 savepoint-7752ea-449b131b2bd4/

When viewing the log of the job manager, you will see the following:

2019-06-17 09:05:11,179 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Starting job 7752ea7b0e7303c780de9d86a5ded3fa from savepoint file:/tmp/savepoint/savepoint-790d7b-3581698f007e ()
2019-06-17 09:05:11,182 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Reset the checkpoint ID of job 7752ea7b0e7303c780de9d86a5ded3fa to 3.
2019-06-17 09:05:11,182 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring job 790d7b98db6f6af55d04aec1d773852d from latest valid checkpoint: Checkpoint 2 @ 0 for 7752ea7b0e7303c780de9d86a5ded3fa.
2019-06-17 09:05:11,184 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - No master state to restore
2019-06-17 09:05:11,184 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job CarTopSpeedWindowingExample (7752ea7b0e7303c780de9d86a5ded3fa) switched from state RUNNING to SUSPENDING.
org.apache.flink.util.FlinkException: Job is being rescaled.

Info

Use the Info command to view the execution plan (StreamGraph) of Flink tasks.

flink-1.7.2 bin/flink info examples/streaming/TopSpeedWindowing.jar
----------------------- Execution Plan -----------------------
{"nodes":[{"id":1,"type":"Source: Custom Source","pact":"Data Source","contents":"Source: Custom Source","parallelism":1},{"id":2,"type":"Timestamps/Watermarks","pact":"Operator","contents":"Timestamps/Watermarks","parallelism":1,"predecessors":[{"id":1,"ship_strategy":"FORWARD","side":"second"}]},{"id":4,"type":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","pact":"Operator","contents":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","parallelism":1,"predecessors":[{"id":2,"ship_strategy":"HASH","side":"second"}]},{"id":5,"type":"Sink: Print to Std. Out","pact":"Data Sink","contents":"Sink: Print to Std. Out","parallelism":1,"predecessors":[{"id":4,"ship_strategy":"FORWARD","side":"second"}]}]}
--------------------------------------------------------------

Copy the output JSON content and paste it to the following website. http://flink.apache.org/visualizer/

Compare it to the actual physical execution plan.

Yarn Per-Job

Attach Mode for a Single Task

The default mode is Attach, which means that the client will not exit until the application completes.

  • Specify the Yarn mode through the -m yarn-cluster parameter.
  • The name “Flink session cluster” is displayed on Yarn, and FINISHED will be displayed when the WordCount task of this batch is finished.
  • View the resulting output on the client.
[admin@z17.sqa.zth /home/admin/flink/flink-1.7.2]
$echo $HADOOP_CONF_DIR
/etc/hadoop/conf/

[admin@z17.sqa.zth /home/admin/flink/flink-1.7.2]
$./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar

2019-06-17 09:15:24,511 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050
2019-06-17 09:15:24,690 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-06-17 09:15:24,690 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-06-17 09:15:24,907 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=4}
2019-06-17 09:15:25,430 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2019-06-17 09:15:25,438 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
2019-06-17 09:15:36,239 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1532332183347_0724
2019-06-17 09:15:36,276 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1532332183347_0724
2019-06-17 09:15:36,276 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated
2019-06-17 09:15:36,281 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED
2019-06-17 09:15:40,426 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully.
Starting execution of program
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
... ...
(would,2)
(wrong,1)
(you,1)
Program execution finished
Job with JobID 8bfe7568cb5c3254af30cbbd9cd5971e has finished.
Job Runtime: 9371 ms
Accumulator Results:
- 2bed2c5506e9237fb85625416a1bc508 (java.util.ArrayList) [170 elements]

Run a streaming task in Attach mode, and note that the client will always wait and not exit. You can run the following example to test this:

./bin/flink run -m yarn-cluster ./examples/streaming/TopSpeedWindowing.jar

Detach Mode for a Single Task

  • In the Detached mode, the client exits after submitting the task.
  • The name “Flink per-job cluster” is displayed on Yarn.
$./bin/flink run -yd -m yarn-cluster ./examples/streaming/TopSpeedWindowing.jar

2019-06-18 09:21:59,247 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050
2019-06-18 09:21:59,428 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-06-18 09:21:59,428 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-06-18 09:21:59,940 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=4}
2019-06-18 09:22:00,427 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2019-06-18 09:22:00,436 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
^@2019-06-18 09:22:12,113 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1532332183347_0729
2019-06-18 09:22:12,151 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1532332183347_0729
2019-06-18 09:22:12,151 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated
2019-06-18 09:22:12,155 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED
2019-06-18 09:22:16,275 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully.
2019-06-18 09:22:16,275 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - The Flink YARN client has been started in detached mode. In order to stop Flink on YARN, use the following command or a YARN web interface to stop it:
yarn application -kill application_1532332183347_0729
Please also note that the temporary files of the YARN session in the home directory will not be removed.
Job has been submitted with JobID e61b9945c33c300906ad50a9a11f36df

Yarn Session

Start a Session

./bin/yarn-session.sh -tm 2048 -s 3

This indicates the initiation of a Yarn session cluster, where each TM has 2 GB memory, and three slots.

Note: The -n parameter does not take effect.

flink-1.7.2 ./bin/yarn-session.sh -tm 2048 -s 3
2019-06-17 09:21:50,177 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost
2019-06-17 09:21:50,179 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123
2019-06-17 09:21:50,179 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.size, 1024m
2019-06-17 09:21:50,179 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.size, 1024m
2019-06-17 09:21:50,179 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 4
2019-06-17 09:21:50,179 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: state.savepoints.dir, file:///tmp/savepoint
2019-06-17 09:21:50,180 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1
2019-06-17 09:21:50,180 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: rest.port, 8081
2019-06-17 09:21:50,644 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-06-17 09:21:50,746 INFO org.apache.flink.runtime.security.modules.HadoopModule - Hadoop user set to baoniu (auth:SIMPLE)
2019-06-17 09:21:50,848 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050
2019-06-17 09:21:51,148 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=2048, numberTaskManagers=1, slotsPerTaskManager=3}
2019-06-17 09:21:51,588 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2019-06-17 09:21:51,596 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
^@2019-06-17 09:22:03,304 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1532332183347_0726
2019-06-17 09:22:03,336 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1532332183347_0726
2019-06-17 09:22:03,336 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated
2019-06-17 09:22:03,340 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED
2019-06-17 09:22:07,722 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully.
2019-06-17 09:22:08,050 INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started.
Flink JobManager is now running on z07.sqa.net:37109 with leader id 00000000-0000-0000-0000-000000000000.
JobManager Web Interface: http://z07.sqa.net:37109

The client uses the Attach mode by default and does not exit.

  • Press Ctrl+C to exit, and then attach a session through ./bin/yarn-session.sh-id application_1532332183347_0726.
  • Or use the -d parameter to set it to the Detached mode at startup. The name “Flink session cluster” is displayed on Yarn.
  • A file is generated in the local temporary directory, /tmp directory for some hosts.
flink-1.7.2 cat /var/folders/2b/r6d49pcs23z43b8fqsyz885c0000gn/T/.yarn-properties-baoniu
#Generated YARN properties file
#Mon Jun 17 09:22:08 CST 2019
parallelism=3
dynamicPropertiesString=
applicationID=application_1532332183347_0726

Submit the Task

./bin/flink run ./examples/batch/WordCount.jar

The task is submitted to the session based on the content of the /tmp/.yarn-properties-admin file.

flink-1.7.2 ./bin/flink run ./examples/batch/WordCount.jar
2019-06-17 09:26:42,767 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /var/folders/2b/r6d49pcs23z43b8fqsyz885c0000gn/T/.yarn-properties-baoniu.
2019-06-17 09:26:42,767 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /var/folders/2b/r6d49pcs23z43b8fqsyz885c0000gn/T/.yarn-properties-baoniu.
2019-06-17 09:26:43,058 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 3
2019-06-17 09:26:43,058 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 3
YARN properties set default parallelism to 3
2019-06-17 09:26:43,097 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050
2019-06-17 09:26:43,229 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-06-17 09:26:43,229 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-06-17 09:26:43,327 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Found application JobManager host name 'z05c07216.sqa.zth.tbsite.net' and port '37109' from supplied application id 'application_1532332183347_0726'
Starting execution of program
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
^@(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
... ...
(wrong,1)
(you,1)
Program execution finished
Job with JobID ad9b0f1feed6d0bf6ba4e0f18b1e65ef has finished.
Job Runtime: 9152 ms
Accumulator Results:
- fd07c75d503d0d9a99e4f27dd153114c (java.util.ArrayList) [170 elements]

The TM resources are released after the operation.

Submit the Task to the Specified Session

Use the -yid parameter to submit the task to the specific session.

$./bin/flink run -d -p 30 -m yarn-cluster -yid application_1532332183347_0708 ./examples/streaming/TopSpeedWindowing.jar

2019-03-24 12:36:33,668 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050
2019-03-24 12:36:33,773 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-03-24 12:36:33,773 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-03-24 12:36:33,837 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Found application JobManager host name 'z05c05218.sqa.zth.tbsite.net' and port '60783' from supplied application id 'application_1532332183347_0708'
Starting execution of program
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 58d5049ebbf28d515159f2f88563f5fd

Note: The difference between the Blink session and Flink session.

  • For the Flink session, the -n parameter does not take effect, and the TM does not start in advance.
  • For the Blink session, use the -n parameter to specify the number of TMs to be started, and these TMs will start in advance.

Scala Shell

For detailed information, refer to the Official document

Deploy

Local

$bin/start-scala-shell.sh local
Starting Flink Shell:
Starting local Flink cluster (host: localhost, port: 8081).
Connecting to Flink cluster (host: localhost, port: 8081).
... ...
scala>

Running Task Description

  • The benv variable is built into the batch task. Results are output to the console through print().
  • The senv variable is built into the streaming task. The task is submitted through senv.execute("job name"), and the output of the DataStream is only printed to the console in Local mode.

Remote

Start a Yarn session cluster first.

$./bin/yarn-session.sh  -tm 2048 -s 32019-03-25 09:52:16,341 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, localhost
2019-03-25 09:52:16,342 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123
2019-03-25 09:52:16,342 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.size, 1024m
2019-03-25 09:52:16,343 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.size, 1024m
2019-03-25 09:52:16,343 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 4
2019-03-25 09:52:16,343 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1
2019-03-25 09:52:16,343 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: state.savepoints.dir, file:///tmp/savepoint
2019-03-25 09:52:16,343 INFO org.apache.flink.configuration.GlobalConfiguration
⋯ ...
Flink JobManager is now running on z054.sqa.net:28665 with leader id 00000000-0000-0000-0000-000000000000.
JobManager Web Interface: http://z054.sqa.net:28665

Start the Scala Shell and connect to the JM:

$bin/start-scala-shell.sh remote z054.sqa.net 28665
Starting Flink Shell:
Connecting to Flink cluster (host: z054.sqa.net, port: 28665).
... ...
scala>

Yarn

$./bin/start-scala-shell.sh yarn -n 2 -jm 1024 -s 2 -tm 1024 -nm flink-yarnStarting Flink Shell:
2019-03-25 09:47:44,695 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost
2019-03-25 09:47:44,697 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123
2019-03-25 09:47:44,697 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.size, 1024m
2019-03-25 09:47:44,697 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.size, 1024m
2019-03-25 09:47:44,697 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 4
2019-03-25 09:47:44,698 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1
2019-03-25 09:47:44,698 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: state.savepoints.dir, file:///tmp/savepoint
2019-03-25 09:47:44,698 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: rest.port, 8081
2019-03-25 09:47:44,717 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-admin.
2019-03-25 09:47:45,041 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050
2019-03-25 09:47:45,098 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-03-25 09:47:45,266 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-03-25 09:47:45,275 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - The argument yn is deprecated in will be ignored.
2019-03-25 09:47:45,357 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=2, slotsPerTaskManager=2}
2019-03-25 09:47:45,711 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2019-03-25 09:47:45,718 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/home/admin/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
2019-03-25 09:47:46,514 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1532332183347_0710
2019-03-25 09:47:46,534 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1532332183347_0710
2019-03-25 09:47:46,534 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated
2019-03-25 09:47:46,535 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED
2019-03-25 09:47:51,051 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully.
2019-03-25 09:47:51,222 INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started.
Connecting to Flink cluster (host: 10.10.10.10, port: 56942).

After you press Ctrl+C to exit the Shell, the Flink cluster continues to run and does not exit.

Execute

DataSet

flink-1.7.2 bin/stop-cluster.sh
No taskexecutor daemon to stop on host zkb-MBP.local.
No standalonesession daemon to stop on host zkb-MBP.local.
flink-1.7.2 bin/start-scala-shell.sh local
Starting Flink Shell:
Starting local Flink cluster (host: localhost, port: 8081).
Connecting to Flink cluster (host: localhost, port: 8081).
scala> val text = benv.fromElements("To be, or not to be,--that is the question:--")
text: org.apache.flink.api.scala.DataSet[String] = org.apache.flink.api.scala.DataSet@5b407336
scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)
counts: org.apache.flink.api.scala.AggregateDataSet[(String, Int)] = org.apache.flink.api.scala.AggregateDataSet@6ee34fe4
scala> counts.print()
(be,2)
(is,1)
(not,1)
(or,1)
(question,1)
(that,1)
(the,1)
(to,2)

For a DataSet task, print() triggers the execution of the task.

You can also output the results to a file (delete /tmp/out1 first; otherwise an error will be reported indicating that a file with the same name already exists), and continue to execute the following command:

scala> counts.writeAsText("/tmp/out1")
res1: org.apache.flink.api.java.operators.DataSink[(String, Int)] = DataSink '<unnamed>' (TextOutputFormat (/tmp/out1) - UTF-8)
scala> benv.execute("batch test")
res2: org.apache.flink.api.common.JobExecutionResult = org.apache.flink.api.common.JobExecutionResult@737652a9

View the output results in the /tmp/out1 file.

flink-1.7.2 cat /tmp/out1
(be,2)
(is,1)
(not,1)
(or,1)
(question,1)
(that,1)
(the,1)
(to,2)

DataSteam

scala> val textStreaming = senv.fromElements("To be, or not to be,--that is the question:--")
textStreaming: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@4970b93d
scala> val countsStreaming = textStreaming.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.keyBy(0).sum(1)
countsStreaming: org.apache.flink.streaming.api.scala.DataStream[(String, Int)] = org.apache.flink.streaming.api.scala.DataStream@6a478680
scala> countsStreaming.print()
res3: org.apache.flink.streaming.api.datastream.DataStreamSink[(String, Int)] = org.apache.flink.streaming.api.datastream.DataStreamSink@42bfc11f
scala> senv.execute("Streaming Wordcount")
(to,1)
(be,1)
(or,1)
(not,1)
(to,2)
(be,2)
(that,1)
(is,1)
(the,1)
(question,1)
res4: org.apache.flink.api.common.JobExecutionResult = org.apache.flink.api.common.JobExecutionResult@1878815a

For a DataStream task, print() does not trigger the execution of the task. You must explicitly call the execute(“job name”) function to execute the task.

Table API

The open-source version of Blink supports submitting tasks through the Table API (you can submit SQL queries by using btenv.sqlQuery). The community version of Flink 1.8 supports the Table API

SQL Client Beta

SQL Client is currently available only as a beta version, so it’s still under development. Use it only for prototype verification of SQL, and don’t use it for production environments.

Basic Usage

flink-1.7.2 bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host zkb-MBP.local.
Starting taskexecutor daemon on host zkb-MBP.local.
flink-1.7.2 ./bin/sql-client.sh embedded
No default environment specified.
Searching for '/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf/sql-client-defaults.yaml'...found.
Reading default environment from: file:/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf/sql-client-defaults.yaml
No session environment specified.
Validating current environment...done.
⋯ ⋯
Flink SQL> help;
The following commands are available:
QUIT Quits the SQL CLI client.
CLEAR Clears the current terminal.
HELP Prints the available commands.
SHOW TABLES Shows all registered tables.
SHOW FUNCTIONS Shows all registered user-defined functions.
DESCRIBE Describes the schema of a table with the given name.
EXPLAIN Describes the execution plan of a query or table with the given name.
SELECT Executes a SQL SELECT query on the Flink cluster.
INSERT INTO Inserts the results of a SQL SELECT query into a declared table sink.
CREATE VIEW Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'
DROP VIEW Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'
SOURCE Reads a SQL SELECT query from a file and executes it on the Flink cluster.
SET Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties.
RESET Resets all session configuration properties.
Hint: Make sure that a statement ends with ';' for finalizing (multi-line) statements.

SELECT Query

Flink SQL> SELECT 'Hello World';

Press Q to exit the interface.

Visit http://127.0.0.1:8081 to see that the generated query task. With this query, the SELECT statement ends. This query uses a custom source to read a fixed dataset and uses the stream collect sink to output the result. Only one result is output.

Note: If a file similar to .yarn-properties-baoniu exists in the local temporary directory, the task is submitted to Yarn.

Explain

View the SQL execution plan via Explain command.

Flink SQL> explain SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;== Abstract Syntax Tree ==        // 抽象语法树
LogicalAggregate(group=[{0}], cnt=[COUNT()])
LogicalValues(tuples=[[{ _UTF-16LE'Bob ' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg ' }, { _UTF-16LE'Bob ' }]])
== Optimized Logical Plan == // 优化后的逻辑执行计划
DataStreamGroupAggregate(groupBy=[name], select=[name, COUNT(*) AS cnt])
DataStreamValues(tuples=[[{ _UTF-16LE'Bob ' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg ' }, { _UTF-16LE'Bob ' }]])
== Physical Execution Plan == // 物理执行计划
Stage 3 : Data Source
content : collect elements with CollectionInputFormat
Stage 5 : Operator
content : groupBy: (name), select: (name, COUNT(*) AS cnt)
ship_strategy : HASH

Results

SQL Client supports two modes to maintain and display query results.

  • Table Mode: The query results are materialized in memory, and displayed in a paging table. You can use the following command to enable the Table mode.
SET execution.result-mode=table
  • ChangeLog Mode: The query results are not materialized, but the addition and retraction results generated by continuous queries are directly displayed.
SET execution.result-mode=changelog

Next, let’s demonstrate them with real examples.

Table Mode

Flink SQL> SET execution.result-mode=table;
[INFO] Session property has been set.
Flink SQL> SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;

View the execution results in the following figure.

Changlog mode

Flink SQL> SET execution.result-mode=changelog;
[INFO] Session property has been set.
Flink SQL> SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;

View the execution results in the following figure.

The ‘-’ indicates the retraction of the result.

Environment Files

Currently, the SQL Client does not support DDL statements. Use a YAML file to define information, such as tables, UDFs, and runtime parameters required for SQL queries.

First, prepare env.yaml and input.csv files.

flink-1.7.2 cat /tmp/env.yaml
tables:
- name: MyTableSource
type: source-table
update-mode: append
connector:
type: filesystem
path: "/tmp/input.csv"
format:
type: csv
fields:
- name: MyField1
type: INT
- name: MyField2
type: VARCHAR
line-delimiter: "\n"
comment-prefix: "#"
schema:
- name: MyField1
type: INT
- name: MyField2
type: VARCHAR
- name: MyCustomView
type: view
query: "SELECT MyField2 FROM MyTableSource"
- name: MyTableSink
type: sink-table
update-mode: append
connector:
type: filesystem
path: "/tmp/output.csv"
format:
type: csv
fields:
- name: MyField1
type: INT
- name: MyField2
type: VARCHAR
schema:
- name: MyField1
type: INT
- name: MyField2
type: VARCHAR

Execution properties allow for changing the behavior of a table program.

execution:
type: streaming # required: execution mode either 'batch' or 'streaming'
result-mode: table # required: either 'table' or 'changelog'
max-table-result-rows: 1000000 # optional: maximum number of maintained rows in
# 'table' mode (1000000 by default, smaller 1 means unlimited)
time-characteristic: event-time # optional: 'processing-time' or 'event-time' (default)
parallelism: 1 # optional: Flink's parallelism (1 by default)
periodic-watermarks-interval: 200 # optional: interval for periodic watermarks (200 ms by default)
max-parallelism: 16 # optional: Flink's maximum parallelism (128 by default)
min-idle-state-retention: 0 # optional: table program's minimum idle state time
max-idle-state-retention: 0 # optional: table program's maximum idle state time
restart-strategy: # optional: restart strategy
type: fallback # "fallback" to global restart strategy by default

Deployment properties allow for describing the cluster to which table programs are submitted to.

deployment:
response-timeout: 5000
flink-1.7.2 cat /tmp/input.csv
1,hello
2,world
3,hello world
1,ok
3,bye bye
4,yes

Start the SQL Client.

flink-1.7.2 ./bin/sql-client.sh embedded -e /tmp/env.yaml
No default environment specified.
Searching for '/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf/sql-client-defaults.yaml'...found.
Reading default environment from: file:/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf/sql-client-defaults.yaml
Reading session environment from: file:/tmp/env.yaml
Validating current environment...done.
Flink SQL> show tables;
MyCustomView
MyTableSink
MyTableSource
Flink SQL> describe MyTableSource;
root
|-- MyField1: Integer
|-- MyField2: String
Flink SQL> describe MyCustomView;
root
|-- MyField2: String
Flink SQL> create view MyView1 as select MyField1 from MyTableSource;
[INFO] View has been created.
Flink SQL> show tables;
MyCustomView
MyTableSource
MyView1
Flink SQL> describe MyView1;
root
|-- MyField1: Integer
Flink SQL> select * from MyTableSource;

Use “insert into” to write the results to the result table.

Flink SQL> insert into MyTableSink select * from MyTableSource;
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Cluster ID: StandaloneClusterId
Job ID: 3fac2be1fd891e3e07595c684bb7b7a0
Web interface: http://localhost:8081

Query the generated result data file.

flink-1.7.2 cat /tmp/output.csv
1,hello
2,world
3,hello world
1,ok
3,bye bye
4,yes

You can also define UDFs in the Environment file, query, and use it in the SQL Client through “HOW FUNCTIONS.” This article does not explain it in detail.

The SQL Client function community is still under development. For more information, visit FLIP-24

Restful API

The following describes how to use the Rest API to submit Jar packages and execute tasks. For more detailed operations, see the Flink Restful API document.

flink-1.7.2 curl http://127.0.0.1:8081/overview
{"taskmanagers":1,"slots-total":4,"slots-available":0,"jobs-running":3,"jobs-finished":0,"jobs-cancelled":0,"jobs-failed":0,"flink-version":"1.7.2","flink-commit":"ceba8af"}%
flink-1.7.2 curl -X POST -H "Expect:" -F "jarfile=@/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/examples/streaming/TopSpeedWindowing.jar" http://127.0.0.1:8081/jars/upload
{"filename":"/var/folders/2b/r6d49pcs23z43b8fqsyz885c0000gn/T/flink-web-124c4895-cf08-4eec-8e15-8263d347efc2/flink-web-upload/6077eca7-6db0-4570-a4d0-4c3e05a5dc59_TopSpeedWindowing.jar","status":"success"}%
flink-1.7.2 curl http://127.0.0.1:8081/jars
{"address":"http://localhost:8081","files":[{"id":"6077eca7-6db0-4570-a4d0-4c3e05a5dc59_TopSpeedWindowing.jar","name":"TopSpeedWindowing.jar","uploaded":1553743438000,"entry":[{"name":"org.apache.flink.streaming.examples.windowing.TopSpeedWindowing","description":null}]}]}%

flink-1.7.2 curl http://127.0.0.1:8081/jars/6077eca7-6db0-4570-a4d0-4c3e05a5dc59_TopSpeedWindowing.jar/plan
{"plan":{"jid":"41029eb3feb9132619e454ec9b2a89fb","name":"CarTopSpeedWindowingExample","nodes":[{"id":"90bea66de1c231edf33913ecd54406c1","parallelism":1,"operator":"","operator_strategy":"","description":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction) -> Sink: Print to Std. Out","inputs":[{"num":0,"id":"cbc357ccb763df2852fee8c4fc7d55f2","ship_strategy":"HASH","exchange":"pipelined_bounded"}],"optimizer_properties":{}},{"id":"cbc357ccb763df2852fee8c4fc7d55f2","parallelism":1,"operator":"","operator_strategy":"","description":"Source: Custom Source -> Timestamps/Watermarks","optimizer_properties":{}}]}}% flink-1.7.2 curl -X POST http://127.0.0.1:8081/jars/6077eca7-6db0-4570-a4d0-4c3e05a5dc59_TopSpeedWindowing.jar/run
{"jobid":"04d80a24b076523d3dc5fbaa0ad5e1ad"}%

The Restful API also provides a variety of monitoring and metrics-related functions and supports comprehensive task submission operations.

Web

On the left side of the Flink Dashboard page, click ‘Submit’ new Job. Upload Jar packages, display execution plans and submit tasks. beginners usually use the Web submission function to get started with a quick demonstration.

Conclusion

This tutorial comprehensively covers five methods of submitting tasks in Flink. It illustrates several Flink client operations with great detail.

We hope it helps users to improve proficiency in submitting various tasks and improve their development skills and operations and maintenance (O&M) efficiency.

Original Source:

--

--

Alibaba Cloud
Alibaba Cloud

Written by Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

No responses yet