Learning Kafka from Scratch: A Guide to Kafka (Part 1)

By Lv Renqi.

After some intense years working on the remote procedure call (RPC) framework and having recently switched to Message Queue (MQ) in the middleware field in the process of bringing the Kafka product solution of Alibaba Cloud to the market, I’m more than ready to take a break from the coding back end. Well, I’d like to take advantage of this short hiatus from coding to discuss Kafka with all of you. I’d like to share everything I know — most of which comes from reading Kafka: The Definitive Guide written by Neha Narkhede, one of the co-developers of Kafka, and other references.

According to Jay Kreps, another co-developer of Kafka, the founders and developers behind Kafka intended it to solve their all of data writing requirements. Kafka was named after the well-known author Franz Kafka. Jay Kreps took many literature courses in his college years and was very fond of Franz Kafka, and he also thought that Kafka is a cool name for an open-source project.

So, with all that said, let’s get right into it, and discuss some of the major things to know with Kafka. First of all, let’s cover the major concepts you’ll find in Kafka.

Major Concepts in Kafka

  • Message and batch: A message is equivalent to a data record. A trade-off exists between latency and throughput: The larger a batch, the more messages are processed per unit time and the longer it takes to transmit a single message. Batch data is compressed to improve data transmission and storage capabilities, but more computing is required.

The Technical Detail Record of Kafka

Now, let’s go over all the technical stuff you’ll need to know about Kafka.

The Deployment and O&M

How to Determine the Number of Partitions?

  • What is the maximum write throughput of a topic? For example, do you want to write 100 KB or 1 GB data per second?

Therefore, if you estimate the producer throughput and the single-consumer throughput of a topic, you can calculate the number of partitions by dividing the topic throughput by the single-consumer throughput. That is to say, if 1 GB of data is written to and read from the topic every second and each consumer can process 50 MB of data every second, then at least 20 partitions are required. In this way, 20 consumers can simultaneously read these partitions to achieve 1 Gbit/s throughput. If you lack the necessary information for calculating the number of partitions, you can limit the maximum partition size to 25 GB for a relatively ideal effect.

Message File Configuration

  • log.retention.ms: The data retention period is determined based on time and is relative to the last modification time, which is indicated by the log segment closing time, or the timestamp of the last message in the file. Moving or modifying log files by using a Linux command may cause the deletion of rotated log files to fail.

Hardware Selection with Kafka

If you focus on the overall performance, you need to select the optimal hardware configuration within the budget range: disk throughput and capacity, memory, network, and CPU.

  • Disk: You can select HDD or SSD, depending on your weighing between costs and storage capacity, which may affect the producer response time.

You can select an appropriate instance based on the Kafka performance priority. First, when you do this, you’ll want to consider the size of the data to be retained, and then consider the producer performance. If you require low latency, you can select I/O-optimized instances that use SSDs. Otherwise, you can select instances with temporary storage. After selecting a storage type, you will find it much easier to select CPU and memory. M4 or R3 instances are typically selected if AWS is used. M4 instances retain data for a longer time than R3 instances but have smaller disk throughput due to use of elastic block storage. R3 instances have relatively high throughput due to use of SSDs but have a limited data retention volume. To strike a balance between throughput and data retention, you need to upgrade to an I2 or D2 instance, but at much higher costs. Alibaba Cloud should have a recommended configuration for Kafka users. Kafka-based Confluent Cloud is integrated with deployment ingress for AWS, Azure, and Google Cloud, but not for Alibaba Cloud.

When planning the Kafka cluster capacity, you can estimate the broker cluster size based on the total message volume, including backups and replicas, and the network traffic.

OS+JVM Configuration

  • Virtual memory: You’ll want to avoid memory swapping whenever possible. Swapping between memory pages and disks has significant impact on the performance of Kafka. I recommend that you set vm.swappiness to a smaller value, for example, 1. This parameter specifies how the virtual machine (VM) subsystems use swap partitions, rather than simply remove memory pages from the page cache. Reduce the page cache rather than swap memory. Set vm.dirty_ratio to increase the number of dirty pages before they are refreshed to disks by the kernel process. You can set it to a value greater than 20. The parameter value also indicates the percentage of system memory. The value range is wide, and 60–80 is a reasonable range. Changing the parameter value may bring risks that affect the number of dirty pages that have not been refreshed to disks and result in long I/O wait time due to synchronous refreshing. If you set this parameter to a greater value, I recommend that you enable Kafka replication to avoid data loss due to system crash.

Fundamental Design

Consumer Rebalance

The client APIs of versions earlier than Kafka 0.9 require consumers to depend on ZooKeeper to listen to and fetch data from clusters and perform rebalance operation separately. This results in the problems of herd effect and brain split and increases the load of ZooKeeper, affecting performance and stability.

Herd effect: When any broker or consumer is added or removed, a rebalance is triggered among all consumers.

Split brain: Each consumer determines which broker or consumer is down through ZooKeeper. The view of ZooKeeper may vary depending on different consumers due to the characteristics of ZooKeeper. This may cause incorrect rebalance attempts. In version 0.9, ZooKeeper is no longer used and Kafka clusters are used for inter-consumer synchronization. This mechanism is described as follows.

Consumers need to select a coordinator among brokers to allocate partitions, similar to the case that a controller is selected among brokers.

  1. The selection result depends on what partition stores offsets.

Rebalance process

  1. A consumer sends a JoinGroupRequest to the coordinator.

Partitions are allocated at the consumer end to meet the flexibility requirements of services. From the logical perspective, such flexible allocation is required by consumers rather than brokers. It is strange and inflexible to meet the new requirements of consumers by adjusting brokers. Such a design takes into consideration a clear division of principles.

A rebalance is triggered when the number of partitions or consumers changes. Specifically, a rebalance is triggered in the following situations:

  1. A partition is added.

Consumer Offset Maintenance

When a consumer group consumes partitions, an offset is stored to record the consumption end point. Offsets used to be stored in ZooKeeper, which requires consumers to report offsets at a 1-minute interval due to the poor write performance of ZooKeeper. The performance of ZooKeeper seriously affects the consumption speed and easily causes repeated consumption.

In version 0.10 and later, Kafka transfers offset storage from ZooKeeper to a topic named _consumer_offsets topic. The key written to a message is composed of groupid, topic, and partition, and its value is the offset. The cleanup policy “compact” is configured for the topic. The latest key is retained, while other keys are deleted. The offset of each key is cached in the memory. Partition traversal is not performed during a query. If no cache is available, partitions are traversed during the first query and then results are returned. The partition of _consumer_offsets to which the consumer group displacement information is written is determined by the hash value of Consumer_Group and partition quantities.

Producer

A producer implements partitioners and manages schemas.

Broker File Organization

The data of Kafka is stored as files in a file system. Topics include partitions, which include segments in the form of individual files. Topic and partition are abstract concepts. Log files (that is, segments) and corresponding index files exist in the /${topicName}-{$partitionid}/ directory.

Each segment file is of the same size and named after the smallest offset in the segment, with the file name extension .log. The file name of the index that corresponds to each segment is the same, with the file name extension .index. Two index files exist. The offset index file is used to query messages by offset, and the time index file is used to query messages by time. The two index files can be combined for optimization. The following only describes the offset index file. The overall organization of the offset index file is as follows:

Sparse matrices are used for indexing to reduce the index file size, reduce space usage, and enable direct index file loading to the memory. An index is created for every several bytes rather than creating the specific location of every message. An index contains two parts: baseOffset and position.

  • baseOffset indicates the specific message in the segment file that corresponds to the index. baseOffset helps save space by using a numeric compression algorithm. For example, Kafka uses varint.

Reliability Assurance

The definition of reliability varies from person to person. For example, RocketMQ reduces and stabilizes RT for producers through secondary asynchronous commit, which may increase the risk of data loss, or compromise availability as defined in the CAP Theorem: Consistency, Availability, and Partition Tolerance. Kafka ensures data consistency (consistency as defined in the CAP Theorem) through multi-copy replication and ensures reliability (durability as defined in ACID) through replication and multi-copy partitioning. Reliability is a kind of system behavior that is determined based on specific service scenarios and is also a trade-off between the importance of reliability and consistency of message storage and the importance of availability, high throughput, low latency, and hardware costs.

  • Broker reliability: Each partition can have multiple replicas, one of which is the leader. All events are directly sent to the leader or read from the leader. Other replicas only need to synchronize with the leader and promptly replicate the latest events. When the leader is unavailable, one of the synchronous replicates becomes the new leader. A replica is no longer synchronized if it fails to exchange heartbeats with ZooKeeper within 6s, stops fetching new messages, or encounters a delay of more than 10s when fetching messages.

Original Source

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.