I originally intended to name this article “Setting up a Kafka Message Queue Cluster.” However, unlike RabbitMQ, Kafka does not implement message queue protocols (for example, Advanced Message Queuing Protocol (AMQP). AMQP provides advanced queuing protocols for unified message services. It is an open standard for application layer protocols and designed for message-oriented middleware. Therefore, although Kafka’s usage mode is more like a queue, it is still not strictly a message queue. So I decided to give this article a more generic name: “An Overview of Kafka Distributed Message System.”
Introduction to Kafka
LinkedIn was the first company to develop Kafka using Java and Scala languages. Its source code was opened up in 2011, and it became a top project of the Apache Software Foundation in 2012. In 2014, several founders of Kafka set up a new company named Confluent, which specialized in Kafka.
The purpose of the Kafka project is to provide a unified, high-throughput, and low-delay system platform for real-time data processing. Kafka delivers the following three functions:
- Publishing and Subscription: Kafka publishes subscription streaming data similar to other message systems.
- Processing: Kafka compiles a stream processing application and responds to real-time events.
- Storage: Kafka securely stores streaming data in a distributed and fault-tolerant cluster.
Kafka is a message system. Let us understand more about the message system and the problems it solves. Take the currently popular micro-service as an example. Let’s assume that there are three terminal-oriented (WeChat official account, mobile app, and browser) web services (HTTP protocols) at the web end, namely Web1, Web2, and Web3, and three internal application services App1, App2, and App3 (Remote Procedure Call, for example, WCF and gRPC). If there is no message system and the direct connected mode is adopted, the communication mode between them may be as follows:
Figure 1: Structure of the System that Communicates in the Direct Connected Mode
The following issues exist while adopting this mode:
- Tightly coupled services increase the challenges. If an external interface of application service two is modified, all components invoking the interface would require modification. In the extreme circumstance shown above, (where all components invoke the interface, which is rare in practice) all other web and application services would require modification.
- Modifying an interface with tightly coupled services is difficult. If an uncontrolled third party application invokes the interface, the interface would require a modification. This would make third-party applications unavailable. Modifying WeChat’s official interface will lead to a failure of thousands of applications.
- Launching different versions of an interface is the resolution to this problem. There are various access modes provided such as web/v1/interface, web/v1.1/interface, and app/v2.0/interface Interfaces are compatible among minor versions; however, they aren’t compatible among versions that are significantly different and completely tweaked.
- Although the above interface planning method solves the issue of tight coupling to some extent, this method isn’t entirely free of challenges. Firstly, multiple versions need to be modified should there be an update and secondly, a large number of versions need maintenance.
- The method also creates difficulties in operation. It becomes complex to increase or decrease clients. If application service 4 is now added to provide a function necessary for a web service, web service 1, web service 2, and web service 3 must be modified. Similarly, if application service 2 is no longer required, the code that is used to invoke this service must be removed at the web service’s end.
- Performance is limited, and expansion becomes difficult. For example, it is imperative to use a third-party tool such as ZooKeeper or Consul to achieve load balancing. An alternative approach is to either rewrite the code or adding specific configurations.
However, after the introduction of the message system, the structure changes as below:
Figure 2: Structure of the System After the Introduction of the Message System
After the introduction of the message system, all the issues mentioned previously get resolved.
- Components, web services, and application services no longer need to be concerned about each other’s interface definitions. Instead, they only need to be concerned about the data structure (JSON structure).
- There isn’t a need to worry about the structure of Kafka. It is mature, highly standard and relatively stable. However, the protocol used to communicate with Kafka needs attention.
- Kafka improves performance. It is not only designed to transmit big data but also meet the requirements of most enterprises with its throughput.
- Kafka makes expansion easy through clustering. Moreover, it has a unique model that provisions common needs such as load balancing.
Two Message System Models
Producer is an application that produces messages at one end of a data pipeline. Consumer is an application that consumes messages at one end of a data pipeline.
Outlined below are the two scenarios when the producer sends messages to a queue:
- If no consumer connects to the queue or consumes messages at this time, messages are saved in the queue until it is full or a consumer is online.
- If multiple consumers connect to the queue at this time, one consumer receives only one message. Therefore, load balancing is naturally achieved in case when there are multiple consumers in practice.
Publisher: an application that generates events at one end of a data pipeline.
Subscriber: an application that responds to events at one end of a data pipeline.
In Publisher/Subscriber model, the data sent to a queue is in the form of events instead of messages. In this case, data processing is the subscription of an event, and not message consumption.
If no subscriber connects to the queue after the publisher publishes an event, the event gets lost, i.e., no application responds to it. If a subscriber is online later, he will not receive the event.
In case if multiple subscribers connect to the queue after the publisher publishes an event at the same time, the event gets broadcasted to all the subscribers, and each subscriber receives the same event. Therefore, load balancing does not exist.
Stream Processing Application
There is a difference between batch processing application and stream processing application. A visible boundary determines the most significant difference between batch processing and stream processing. If it exists, it is called batch processing. For example, a client collects the data once every hour, sends this data to the server for statistics, and then saves the statistical results in the statistical database.
If the boundary doesn’t exist, the processing is called streaming data (stream processing). Here is an example of stream processing: Logs and orders are generated continuously on a large website just like a data flow. If the processing of each log and order takes less than several hundred milliseconds or several seconds post its generation, the application is called a stream application. If the collection of logs and orders happens once every hour followed by a unified transmission, the original stream data converts into batch data.
Occasionally, stream processing becomes imperative. For example, Jack Ma wanted to display the orders and sales on Tmall for November 11 on a large screen. If the data center works in T+1 mode and can obtain data for November 11 on November 12, Jack Ma would not agree.
The method for processing stream data is different from the method for processing batch data. Kafka provides a unique component, Kafka Streaming, to process stream data. Kafka offers different elements for other projects in the Hadoop ecosystem. For example, Spark also uses Spark Streaming to process stream data. Storm was the first system that was built to process stream data exclusively.
Apart from data boundaries, processing times can be used to differentiate stream processing and batch processing. The processing cycle for batch processing is generally hours or days, while the processing cycle for stream processing is usually seconds. Correspondingly, batch processing is referred to as offline data processing, whereas, stream processing is referred to as real-time data processing. In the unit of minutes, data processing is referred as near-line data processing. However, data processing is seldom discussed and generally processed offline unless the processing cycle reduces.
Kafka securely stores data in a distributed and fault-tolerant cluster. The default storage period is of one week. Additionally, Kafka naturally supports clusters. Kafka allows to conveniently add or reduce machines and specify the number of copies for data. This ensures that the cluster provides break free services even when individual servers in the cluster break down.
Kafka is primarily used to transmit data in our data center project. Let me first introduce the background of this project and then provide an understanding of issues that Kafka solves:
At present, 10 applications are running in the front-end which might increase with time. The front-end applications send data to the back-end data center (an application called data collector or collector for short). The collector corresponds to multiple applications. While it is idle most of the time, when numerous applications send data at the same time, the collector isn’t able to process the data. In this case, there is a requirement for buffer mechanism so that the collector is not too idle or busy. Kafka is useful as a buffer pool for the data in such situations.
In this example, instead of selecting a traditional message queue component such as RabbitMQ, I have selected Kafka. This is because Kafka is inherently developed to cope up with a large batch of data and provide better performance.
Kafka serves the function of “smooth upgrading” in a data center in addition to data buffering. Outlined below is a quick diagrammatic representation:
Figure 3: Smooth Upgrading
In the previous use case, we used .Net for developing frontend, data collection, and data cleaning applications. MS SQL stores the same. Big data technology helped us to store large amounts of data on the HDFS and Sparks helped us to collect statistics.
There was no need to change the previous versions of frontend, data collection or data cleaning applications after introducing Kafka. The new versions of collection or cleaning applications can be accessed because Kafka allows to extract data at any point of time.
It is easy to switch to the new system by simply stopping previous versions of the applications once the new versions pass the test.
Challenges After a Message Queue Is Introduced
Every coin has two sides. After introducing Kafka, the following changes take place:
Although the applications in the system are not mutually dependent, they depend heavily on Kafka. Stability of Kafka, therefore, becomes very important (similar to infrastructure such as Microsoft SQL Server).
- In practice, implement micro-services. Micro-services offer decentralization as each service can independently work without depending on the other. It is critical to determine when to use these two modes.
- A message queue is naturally asynchronous. Although a message queue improves performance, it increases code complexity. Initially, it was simple to invoke returned results by using RPC synchronously. However, code compilation and debugging become more complicated after adopting an asynchronous message queue.
Broker, Topic, and Partition
In a simplified sense, a broker is a Kafka server. It is a service process that runs Kafka.
- IWhen a broker connects within a cluster, it allows to access the whole cluster.
- Brokers in a cluster are differentiated based on the ID which is typically a unique number.
Topic, Partition, and Offset
A topic is a data pipeline. A data pipeline produces messages and publishes events at the one end and consumes messages and responds to events at the other end of the pipeline. The data pipeline stores, routes, and sends messages or events.
The topic stores the data for a default period of one week.
A topic can be divided into multiple partitions.
- As a first step, order data in the partitions. Secondly, if a topic has only one partition, order the messages for the topic as well. Messages dont order if one topic has multiple partitions.
- Parallel processing increases with more partitions. Generally, the recommended number of hosts is x2. For example, if there are three servers in a cluster, six partitions can be created for each topic.
- It is not possible to modify messages written to partitions. Modified data for the topic would need retransmission unless a topic repeats again.
- In case if the key doesn’t exist, the data will then be sent to any one of the partitions. If the key exists, the data for the same key transmits to their respective topic.
- Offset is an incremental ID. Each message sent to a partition has a unique incremental ID. The following figure shows the overall structure:
Figure 4: Kafka Topic, Partition, Offset
Distribution of Brokers, Topics, and Partitions
It is possible to set a different number of partitions for different topics Partitions are randomly distributed on different nodes in case if there are multiple nodes in a cluster. As shown in the following figure, topic 1 has three partitions, and topic 2 has two partitions:
Figure 5: Distribution of Brokers, Topics, and Partitions
Number of Copies for a Topic（Replication Factor）
Generally, a topic has 2 or 3 number of copies. This is mainly because in case if a node fails and goes offline, the topic will remain available, and other nodes in the cluster will continue to provide the services.
Figure 6: Copies of Topic 2
It is advisable to restrict to a fewer number of copies. With an increase in a number of copies, the time taken to synchronize the data increases and at the same time, it lowers disk utilization.
Note: For both Kafka and Hadoop clusters, more nodes do not mean higher fault tolerance. There is a reduction in fault probability on relevant nodes however the fault tolerance is still the same. Let’s say, there are 100 nodes present in a Hadoop cluster and we have set the number of copies to 2. It is not possible to access the data if the nodes that store these copies failed. The overall failure probability of nodes, where we have saved 2 copies, reduces compared to three or five nodes, among 100 nodes.
Partition Leader and ISR
There are only one leader partition and one or more in-sync replicas (ISRs) for a topic with multiple partitions. While the leader partition performs read and write, the ISR ensures backup.
Figure 7: Copies of Topic 2 (Actual Diagram)
Producer Writes Data to a Topic
After the producer specifies the partition name and connects to any node in the cluster, Kafka automatically performs load balancing and routes the write operation to write data to the correct partition (multiple partitions are on different nodes in the cluster).
Figure 8: Producer is Used to Write Data
In the above figure, I haven’t added any ISR partition to simplify the drawing.
The producer will obtain the data write notification by selecting the following modes.
- Set ACKs to 0. At this point, the producer does not wait for write notification resulting into data loss. There is maximum speed here.
- Set ACKs to 1. The speed is limited and not as high as the previous mode. The producer waits for a notification from the leader instead of ISR. It may result in a loss of ISR data.
Set ACKs to all. At this point, the speed is extremely slow. The producer waits for a notification from the leader, and there is no loss of ISR.
A key is specified when the producer transmits data. For example, an e-commerce order would transmit data such as Order Number, Retailer, and Customer) How would you set the key in this case?
Set the key as:
- An order number or empty if the receiver (consumer) of the data is not concerned about the transmission sequence of orders
- A retailer, if the data receiver would need the retailer to transmit the order in a sequence
- As a customer, if the data receiver would need the retailer to send the order in sequence. If we set the key to Retailer, it does not mean that the string “Retailer” is specified for each key in transmission. Instead, a specific value for Retailer is mentioned. Let’s take the following table for example:
For orders 001 to 007, when we set the key to retailer, the key values are Apple(001)、Apple(002)、XiaoMi(003)、XiaoMi(004)、XiaoMi(005)、XiaoMi(006)、Apple(007)
In this use case, all the Apple orders go to one partition in sequence, and all the Xiaomi orders go to the other partition in sequence. The two partitions may be the same or different. See the figure below:
Figure 9: Producer Key is Used to Route Data
Basic Concept of Consumer
Consumers read data from topics. Similar to the producer, as long as any node in the cluster is connected, and the topic name is specified, Kafka automatically extracts data from the correct broker and partition and sends the data to consumers.
Data is ordered for each partition, as shown in the figure below:
Figure 10: Consumer is Used to Read Data
Kafka uses the group concept to integrate the producer/consumer and publisher/subscriber models.
One topic may have multiple groups, and one group may include multiple consumers. Only one consumer in the group can consume one message. For different groups, consumers are in the publisher/subscriber model. All groups receive one message. The following figure describes the relationship:
Figure 11: Consumer Groups
Note: Allocate one partition to only one consumer in the same group. If there are three partitions and four consumers in one of the groups, one consumer is redundant and cannot receive any data.
Please note — Consumer offsets mentioned here are different as compared to offsets mentioned in the previous topic. In the previous topic, offsets are related to topic (especially partitions) whereas here I am referring to consumers. Here are some more points of consideration
- Offset records the read location of each consumer in each group.
- Kafka uses a particular topic, consumer_offsets, to save consumer offsets.
- When a consumer comes online again after going offline, fetch the data from the location previously recorded by offsets.
Submission times for offsets:
- At Most Once: As long as a consumer receives a message, the consumer submits offsets. This ensures maximum efficiency. However, when the processing of a message fails, for example, the application becomes abnormal, and the message is no longer obtainable.
- At least Once: The offsets are submitted after a consumer processes a message.. This may cause repeated reads. When the processing of a message becomes abnormal, the message reads again. This is the default value.
- Exactly Once: Still under trial.
Generally, “At least once” is selected and operation is performed on the application to ensure repeated operation. However, the result is not affected.
Note: CAP theory: A distributed system can meet at the most any two aspects along with Consistency, Availability, and Partition Tolerance at once.
ZooKeeper is a distributed service registration, discovery, and governance component. Many components in the big data ecosystem, HDFS for example, use ZooKeeper. Kafka depends on ZooKeeper. The Kafka installation package directly includes its compatible ZooKeeper version.
Kafka uses ZooKeeper for the following:
- Manage nodes in a cluster and maintain the list of nodes.
- Manage all topics and keep the list of topics.
- Elect the leader for partitions.
- Notify Kafka when there is any change detected in a cluster. The changes include topic creation, broker online/offline, and topic deletion.
This document discusses Kafka’s main concepts and mechanisms. I believe that you will have a preliminary understanding of Kafka after reading this article. In the following chapters, we will perform actual operations to see how Kafka works. If you feel that Kafka is stable and robust during use, I hope you will like it.
To learn more about message queueing on Alibaba Cloud, visit www.alibabacloud.com/product/mq.