Using Flink Connectors Correctly

Section 1: Flink Streaming Connectors

Apache Flink is a new generation stream computing engine with a unified stream and batch data processing capabilities. It reads data from different third-party storage engines, processes the data, and writes the output to another storage engine. Flink connectors connect the Flink computing engine to external storage systems. Flink provides many data exchange methods, including but not limited to:

  • Predefined sources and sinks
  • Built-in bundled connectors
  • Connectors provided by third-party project Apache Bahir
  • Async I/O

Method 1: Predefined Sources and Sinks

Flink provides some predefined sources and sinks. They are divided into the following types:

  • File-based sources and sinks
env.readFile(fileInputFormat, path)
  • Socket-based sources and sinks
  • Sources based on collections and iterators in memory

Method 2: Bundled Connectors

Flink provides some bundled connectors, such as Kafka sources, Kafka sinks, and ES sinks. When you read data from or write data to Kafka, ES, and RabbitMQ, you can directly call APIs of the corresponding connectors. The most commonly used Kafka connectors in the production environment will be described in detail in the next section.

Method 3: Connectors Provided by Apache Bahir

Apache Bahir was derived from the Apache Spark project to provide Spark-related extensions/plug-ins, connectors, and other pluggable components. Apache Bahir extends the coverage of analytic platforms by providing a wide variety of streaming connectors and SQL data sources. If you need to write data to Flume and Redis, use connectors provided by this project.

Method 4: Async I/O

Stream computing requires interaction with some external storage systems. For example, you may need to associate a MySQL data table. Generally, using synchronous I/O may result in a long system wait time, affecting the system throughput and latency. To solve this problem, asynchronous I/O is used to process multiple requests in parallel, increasing the throughput and reducing the latency.

Section 2: Flink Kafka Connectors

This section focuses on Flink Kafka connectors commonly used in production. If you use Flink, you may be familiar with Kafka, which is a distributed, partitioned, multi-replica, and high-throughput message publishing/subscription system. We may also frequently exchange data between Flink and Kafka in the production environment. For example, we may use Kafka consumer to read data, then use Flink to process the data and write the results to Kafka. This process involves two connectors: Flink Kafka Consumer and Flink Kafka Producer.

1 Flink Kafka Consumer

  • Data Deserialization
  • Consumer Start Offset Setup
  • setStartFromGroupOffsets is the default policy. It reads data from the group offset, which refers to the last consumer offset of a group recorded by a Kafka broker. However, the Kafka broker does not have the group information. It determines the offset to start consumption based on the Kafka parameter auto.offset.reset
  • setStartFromEarliest reads data starting from the earliest offset of Kafka.
  • setStartFromLatest reads data starting from the latest offset of Kafka.
  • setStartFromTimestamp(long) reads data starting from a particular offset, the timestamp of which is greater than or equal to a specified timestamp. A Kafka timestamp means the timestamp added by Kafka to each message. This timestamp could mean the time when the message was generated at the producer, or when it enters the Kafka broker.
  • setStartFromSpecificOffsets reads data starting from the offset of a specified partition. If the consumer needs to read a partition that does not have a specified offset within the provided offsets collection, it will fall back to the default group offsets behavior (setStartFromGroupOffsets()) for that particular partition and read data starting from the group offset. We need to specify the partition and offsets collections.
  • Dynamic Discovery of Topics and Partitions
  • Commit Offsets
  • Timestamp Extraction/Watermark Generation

2 Flink Kafka Producer

Producer Partitioners

  • When we use FlinkKafkaProducer to write data to Kafka, FlinkFixedPartitioner will be used by default, if we do not set a separate partitioner. This partitioner manages partitions by determining the remainder of the total number of parallel task IDs divided by the total partition length: parallelInstanceId % partitions.length.
  • If we have four sinks and one partition, all four tasks write data to the same partition. However, if the number of sink tasks is less than that of partitions, some partitions will end up with no data. For example, if we have two sink tasks and four partitions, the tasks only write data to the first two partitions.
  • If we set the partitioner to null when we build the FlinkKafkaProducer, the default round-robin Kafka partitioner will be used. In this case, each sink task writes data to all downstream partitions in a round-robin manner. The advantage is that data is evenly written to all downstream partitions. It has a drawback, too. When there are a lot of partitions, many network connections must be maintained because each task must connect to the broker of each partition.
  • Fault Tolerance

Section 3: Frequently Asked Questions

Q: Should the number of parallel tasks of the Flink consumer be equal to the total number of partitions? Should parallelism be set to the total number of partitions when there are multiple topics and data sources?

Original Source:



Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud


Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website: