Using Flink Connectors Correctly

By Dong Tingting (Kuaishou)

Section 1: Flink Streaming Connectors

Apache Flink is a new generation stream computing engine with a unified stream and batch data processing capabilities. It reads data from different third-party storage engines, processes the data, and writes the output to another storage engine. Flink connectors connect the Flink computing engine to external storage systems. Flink provides many data exchange methods, including but not limited to:

  • Predefined sources and sinks
  • Built-in bundled connectors
  • Connectors provided by third-party project Apache Bahir
  • Async I/O

Now, I will briefly introduce those four methods.

Method 1: Predefined Sources and Sinks

Flink provides some predefined sources and sinks. They are divided into the following types:

  • File-based sources and sinks

If you need to read data from text files, directly use the following to read the content of a text file in the form of text:


You can also use the following to read the content in the specified fileInputFormat:

env.readFile(fileInputFormat, path)

If data computation is involved with Flink and you want to write the results into a file, you can use some predefined internal sinks. For example, you can write the results into a text or csv file by using the writeAsText(path) method or writeAsCsv(path) method of the DataStream class.

  • Socket-based sources and sinks

Flink provides the hostname and port for each socket. Directly create a socket-based source by calling the socketTextStream API predefined by the StreamExecutionEnvironment class. Then, read data in text form from the socket. If you want to write the result to another socket, directly call the writeToSocket API of the DataStream class.

  • Sources based on collections and iterators in memory

You can create sources based on collections and iterators in the memory by calling the fromCollection API or fromElements API of the StreamExecutionEnvironment class. You can also write the result data to standard output or standard errors by using the print or printToError method.

For more information on the application of predefined sources and sinks, refer to corresponding examples provided in Flink source code. For example, WordCount and SocketWindowWordCount.

Method 2: Bundled Connectors

Flink provides some bundled connectors, such as Kafka sources, Kafka sinks, and ES sinks. When you read data from or write data to Kafka, ES, and RabbitMQ, you can directly call APIs of the corresponding connectors. The most commonly used Kafka connectors in the production environment will be described in detail in the next section.

These connectors are a part of the Flink source code. However, they are not technically related to the Flink engine’s related logic. Therefore, they are not included in the Flink binary release package. When you submit a job, be sure to include connector-related classes into the JAR package of your job. Otherwise, your job submission will fail, and an error will be returned, indicating that the corresponding class is not found, or an exception was thrown while initializing some classes.

Method 3: Connectors Provided by Apache Bahir

Apache Bahir was derived from the Apache Spark project to provide Spark-related extensions/plug-ins, connectors, and other pluggable components. Apache Bahir extends the coverage of analytic platforms by providing a wide variety of streaming connectors and SQL data sources. If you need to write data to Flume and Redis, use connectors provided by this project.

Image for post
Image for post

Method 4: Async I/O

Stream computing requires interaction with some external storage systems. For example, you may need to associate a MySQL data table. Generally, using synchronous I/O may result in a long system wait time, affecting the system throughput and latency. To solve this problem, asynchronous I/O is used to process multiple requests in parallel, increasing the throughput and reducing the latency.

Tip: For details about how Async works, refer to the official document.

Section 2: Flink Kafka Connectors

This section focuses on Flink Kafka connectors commonly used in production. If you use Flink, you may be familiar with Kafka, which is a distributed, partitioned, multi-replica, and high-throughput message publishing/subscription system. We may also frequently exchange data between Flink and Kafka in the production environment. For example, we may use Kafka consumer to read data, then use Flink to process the data and write the results to Kafka. This process involves two connectors: Flink Kafka Consumer and Flink Kafka Producer.

Let’s look at an example of how Flink Kafka connectors work. The logic of the code is simple. It first reads data from Kafka, then does some simple computation, and writes the results back to Kafka.

Code in the red frame can be used to create a source-sink function. Flink provides readily available Flink Kafka Consumer and Producer APIs for you to use.

Note: Kafka has many versions, and different versions may use different interface protocols. Flink provides different consumers and producers for different Kafka versions. Specifically, Flink provides Flink Kafka Consumer 08, 09, 010, and 011 for Kafka 08, 09, 10, and 11, respectively. The same applies to Flink Kafka producers.

1 Flink Kafka Consumer

  • Data Deserialization

Kafka stores data in binary bytes. Flink needs to convert binary data that has been read from Kafka into specific Java and Scala objects. It must implement a schema class to define how to serialize and deserialize data. Data deserialization requires implementing the DeserializationSchema API and rewriting the deserialize(byte[] message) function. To deserialize KV data read from Kafka, implement the KeyedDeserializationSchema API and rewrite this function: deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset).

Flink also provides some commonly used serialization and deserialization schema classes. For example, SimpleStringSchema implements serialization and deserialization for strings. TypeInformationSerializationSchema determines the schema to be used based on the TypeInformation of Flink. JsonDeserializationSchema uses Jackson to deserialize a JSON-formatted message and returns the ObjectNode. Use the .get("property") method to access the corresponding field.

  • Consumer Start Offset Setup

I will show you how to set the Kafka consumer start offset for a job. Flink also encapsulates many useful functions for this part. Call the corresponding functions of the FlinkKafkaConsumer class to set the appropriate start offset.

  • setStartFromGroupOffsets is the default policy. It reads data from the group offset, which refers to the last consumer offset of a group recorded by a Kafka broker. However, the Kafka broker does not have the group information. It determines the offset to start consumption based on the Kafka parameter auto.offset.reset
  • setStartFromEarliest reads data starting from the earliest offset of Kafka.
  • setStartFromLatest reads data starting from the latest offset of Kafka.
  • setStartFromTimestamp(long) reads data starting from a particular offset, the timestamp of which is greater than or equal to a specified timestamp. A Kafka timestamp means the timestamp added by Kafka to each message. This timestamp could mean the time when the message was generated at the producer, or when it enters the Kafka broker.
  • setStartFromSpecificOffsets reads data starting from the offset of a specified partition. If the consumer needs to read a partition that does not have a specified offset within the provided offsets collection, it will fall back to the default group offsets behavior (setStartFromGroupOffsets()) for that particular partition and read data starting from the group offset. We need to specify the partition and offsets collections.

For details of the usage, refer to the following image. The Flink Framework has a fault tolerance mechanism. If a job fails when a checkpoint is enabled, the job recovers from the state that was saved at the last checkpoint. We can also manually run savepoint upon job termination. When the job is started, it recovers from the savepoint. In these two cases, when a job recovers, the consumer reads data starting from the saved state, regardless of the previous settings.

  • Dynamic Discovery of Topics and Partitions

In actual production, we may encounter the following requirements.

Scenario 1: A Flink job needs to aggregate data from five Kafka topics. As the business grows, the job needs to aggregate data from one more Kafka topic. How to make the job automatically detect the new topic without restarting the job?

Scenario 2: A job reads data from a fixed Kafka topic, which has 10 partitions. As the business grows, data volume increases. To scale out the Kafka partitions from 10 to 20. How can we make the job dynamically detect the new partitions without restarting the job?

In the preceding two scenarios, we must first set the flink.partition-discovery.interval-millis parameter to a non-negative value in properties when building FlinkKafkaConsumer to enable dynamic discovery. The value indicates the interval of dynamic discovery in milliseconds. Then FlinkKafkaConsumer starts a separate internal thread to get the latest meta information from Kafka on a regular basis.

For Scenario 1, we also need to pass a topic pattern described by regular expressions when we build FlinkKafkaConsumer. Flink will then get the latest Kafka meta information, including the latest topic list described by regular expressions.

For Scenario 2, after we set the dynamic discovery parameter, Flink will automatically match the latest partitions when regularly obtaining the latest meta information from Kafka. To ensure the data is correct, the consumer reads data in new partitions starting from the earliest offset.

  • Commit Offsets

The methods for Flink Kafka consumer to commit offsets may vary, depending on whether the checkpoint is enabled. If a checkpoint is disabled, Flink Kafka consumer relies on the auto-commit function of Kafka client to commit offsets. Pass the and parameters to consumer properties when building FlinkKafkaConsumer. Then, Flink Kafka consumer will automatically commit offsets to Kafka at fixed intervals.

If checkpoint is enabled, Flink manages offsets in checkpoint states, which supports a certain degree of fault tolerance. Offsets committed to Kafka, in this case, are generally used for external progress monitoring. Committing offsets to Kafka also allows us to track the consumer offsets and jobs lag in real-time, by setting the parameter setCommitOffsetsOnCheckpoints to true. This means when a checkpoint is successful, the offsets will be committed to Kafka. The interval for committing offsets depends on the checkpoint interval. Therefore, job lags viewed on Kafka may not be exactly real-time. If the checkpoint interval is relatively long, the lag curve may have zigzags.

Image for post
Image for post
  • Timestamp Extraction/Watermark Generation

When we use the EventTime attribute in a Flink job, we must specify a function to extract the timestamp from the message and generate the watermark. I recommend that you call the assignTimestampsAndWatermarks function to generate watermarks after building sources on FlinkKakfaConsumer. This ensures that each partition to be consumed will have a watermark assigner as shown in the following image. The watermark generated at a source is based on the smallest timestamp among all timestamps of partitions consumed by the same source. This ensures that when a source reads data from multiple partitions, and timestamps of data in these partitions are different, no data loss will occur. The smallest timestamp is used to generate the watermark after all timestamps of these partitions are aligned at the source.

2 Flink Kafka Producer

Producer Partitioners

  • When we use FlinkKafkaProducer to write data to Kafka, FlinkFixedPartitioner will be used by default, if we do not set a separate partitioner. This partitioner manages partitions by determining the remainder of the total number of parallel task IDs divided by the total partition length: parallelInstanceId % partitions.length.
  • If we have four sinks and one partition, all four tasks write data to the same partition. However, if the number of sink tasks is less than that of partitions, some partitions will end up with no data. For example, if we have two sink tasks and four partitions, the tasks only write data to the first two partitions.
  • If we set the partitioner to null when we build the FlinkKafkaProducer, the default round-robin Kafka partitioner will be used. In this case, each sink task writes data to all downstream partitions in a round-robin manner. The advantage is that data is evenly written to all downstream partitions. It has a drawback, too. When there are a lot of partitions, many network connections must be maintained because each task must connect to the broker of each partition.
  • Fault Tolerance

In Flink Kafka versions 09 and 010, we can set setLogFailuresOnly to false and setFlushOnCheckpoint to true to achieve the at-least-once semantics. The value of setLogFailuresOnly is false by default. This parameter specifies whether to only print the failure log without throwing an exception and terminating the job when writing to Kafka fails.

The value of setFlushOnCheckpoint is true by default. This parameter specifies whether to flush data to Kafka during the checkpoint to ensure that the data has been written to Kafka. Otherwise, the buffer of the Kafka client may still be cache the data instead of being written to Kafka. Data will be lost when the job fails, and the "at-least-once" semantics cannot be achieved.

Flink Kafka 011 provides end-to-end exactly-once guarantees by providing support for transactions that integrate with the TwoPhaseCommitSinkFunction. For more information, visit:

Section 3: Frequently Asked Questions

Q: Should the number of parallel tasks of the Flink consumer be equal to the total number of partitions? Should parallelism be set to the total number of partitions when there are multiple topics and data sources?

A: The number of parallel tasks is not necessarily the same as the number of partitions. The parallelism also depends on the data volume of a topic. If the data volume is small, you can set the number of parallel tasks to be less than the number of partitions. Do not set it to be greater than the total number of partitions. Otherwise, some tasks will end up with no data to process when no partitions are assigned to them.

Q: Is data sent to every partition round-robin, if partitioner is set to null? Will keyed data be distributed to the corresponding partitions?

A: If you do not specify a separate partitioner when building FlinkKafkaProducer, the FlinkFixedPartitioner is used by default, and all data is written in the same manner, regardless of whether any of it is keyed. If you manually set the partitioner to null, keyed data will be written by keys, with data having the same key written to the same partition. Keyed data with null keys are written round-robin. Non-keyed data will be written to all partitions round-robin.

Q: Let’s assume that the checkpoint interval is long, and a node fails before an offset is committed. How do we ensure all data is consumed after the node is restarted?

A: When a checkpoint is enabled, offsets are managed by Flink and the job recovers from the saved states, rather than Kafka offsets. In the checkpoint mechanism, the job recovers from the state of the latest checkpoint, and it reloads some of the historical data, which will be consumed repeatedly. The Flink engine only guarantees exactly-once of the computation state. To ensure end-to-end exactly-once, you need some idempotent storage systems or transaction operations.

Original Source:

Written by

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store