Build an Apache Flink Application from Scratch in 5 Minutes

Alibaba Cloud
5 min readAug 29, 2019

--

By Wu Chong

By completing the steps given in this tutorial you can build your own Apache Flink Application from scratch in around 5 minutes or so.

Prepare the Development Dnvironment

Apache Flink can be run on and is compatible with Linux, Max OS X, or Windows. To develop a Flink application, you need to run either the Java version 8.0 or later or Maven environment on your computer. If you are using the Java environment, running the $java Cversion command will output the following version information that you are using:

java version "1.8.0_65"
Java(TM) SE Runtime Environment (build 1.8.0_65-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode)

If you are using the Maven environment, running the $ mvn -version command will output the following version information:

Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /Users/wuchong/dev/maven
Java version: 1.8.0_65, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.13.6", arch: "x86_64", family: "mac"

In addition, we recommend that you use IntelliJ IDEA (the Community Free Version is sufficient for this tutorial) as the IDE for your Flink application. Eclipse also works for this purpose, but because Eclipse had some issues with Scala and Java hybrid projects in the past, we don’t recommend that you choose Eclipse.

Create a Maven Project

You can follow the steps in this section to create a Flink project and then import it to IntelliJ IDEA. We use Flink Maven Archetype to create the project structure and some initial default dependencies. In the working directory, run the mvn archetype:generate command to create the project:

-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.6.1 \
-DgroupId=my-flink-project \
-DartifactId=my-flink-project \
-Dversion=0.1 \
-Dpackage=myflink \
-DinteractiveMode=false

The above groupId, artifactId, and package can be edited into your desired paths. Using the above parameters, Maven automatically creates the following project structure:

$ tree my-flink-project
my-flink-project
├── pom.xml
└── src
└── main
├── java
│ └── myflink
│ ├── BatchJob.java
│ └── StreamingJob.java
└── resources
└── log4j.properties

The pom.xml file already contains the required Flink dependencies, and several sample program frameworks are provided in src/main/java.

Compile the Flink Program

Now, follow these steps to write your own Flink program. To do so, start IntelliJ IDEA, select Import Project, and select pom.xml under the root directory of my-flink-project. Then, import the project as instructed.

Create a SocketWindowWordCount.java file under src/main/java/myflink:

package myflink;public class SocketWindowWordCount {    public static void main(String[] args) throws Exception {    }
}

As of now, this program is just a basic framework, and we will fill in code step by step. Note that import statements will not be written in the following, because IDE will automatically add them. At the end of this section, I will display the complete code. If you want to skip the following steps, you can directly paste the final complete code into the editor.

The first step of the Flink program is to create a StreamExecutionEnvironment. This is an entry class that can be used to set parameters, create data sources, and submit tasks. So let's add it to the main function:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Next, create a data source that reads data from the socket on the local port 9000:

DataStream text = env.socketTextStream("localhost", 9000, "\n");

This creates a DataStream of string type. DataStream is the core API for stream processing in Flink. It defines many common operations (such as filtering, conversion, aggregation, window, and association). In this example, we are interested in the number of times each word appears in a specific time window, for example, a five-second window. For this purpose, the string data is first parsed into the word and the number of times it appears (represented by Tuple2<String, Integer>), where the first field is the word, the second field is the occurrences of the word. The initial value of occurrences is set to 1. A flatmap is implemented to perform the parsing, because a row of data may have multiple words.

DataStream> wordCounts = text
.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) {
for (String word : value.split("\\s")) {
out.collect(Tuple2.of(word, 1));
}
}
});

Next, group the data stream based on the Word field (that is, index field 0). Here, we can use the keyBy(int index) method to obtain a Tuple2<String, Integer> data stream with the word as the key. Then, we can specify the desired window on the stream, and compute the result based on the data in the window. In the example, we want to aggregate the occurrences of a word every five seconds, and each window is counted from zero.

DataStream> windowCounts = wordCounts
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);

The second called .timeWindow() specifies that we want a 5-second Tumble window. The third call specifies a sum aggregate function for each key and each window. In this example, this is added by the occurrences field (that is, index field 1). The resulting data stream outputs the number of occurrences of each word every five seconds.

Finally, print the data stream to the console, and start the execution:

windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");

The last env.execute call is required to start the actual Flink job. All Operator operations (such as source creation, aggregation, and printing) only build the graphs of internal operator operations. Only when execute() is called will they be submitted to the cluster or the local computer for execution.

package myflink;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCount { public static void main(String[] args) throws Exception { // Create the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Obtain the input data by connecting to the socket. Here you want to connect to the local 9000 port. If 9000 port is not available, you'll need to change the port.
DataStream text = env.socketTextStream("localhost", 9000, "\n");
// Parse the data, group by word, and perform the window and aggregation operations.
DataStream> windowCounts = text
.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) {
for (String word : value.split("\\s")) {
out.collect(Tuple2.of(word, 1));
}
}
})
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
// Print the results to the console. Note that here single-threaded printed is used, rather than multi-threading.
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
}

Run the Program

To run the sample program, start NetCat on the terminal to obtain the input stream:

nc -lk 9000

For Windows, Ncat can be installed through NMAP, and then run:

ncat -lk 9000

Then, run the main method of SocketWindowWordCount directly.

You only need to enter words in the NetCat console to see the statistics for the occurrence frequency of each word in the output console of SocketWindowWordCount. If you want to see the count greater than one, type the same word repeatedly within five seconds.

Original Source

--

--

Alibaba Cloud
Alibaba Cloud

Written by Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

No responses yet