Learning Kafka from Scratch: A Guide to Kafka (Part 1)

Major Concepts in Kafka

  • Message and batch: A message is equivalent to a data record. A trade-off exists between latency and throughput: The larger a batch, the more messages are processed per unit time and the longer it takes to transmit a single message. Batch data is compressed to improve data transmission and storage capabilities, but more computing is required.
  • Schema: Simple systems such as JSON and XML are easy to use and readable. However, they lack strong typing capabilities and have poor compatibility between different versions. Kafka recommends Avro, which requires no recoding when the Avro schema is changed and supports strong typing and schema evolution. Versions of Avro are forward- and backward-compatible. Data format consistency is important for Kafka to remove the operation coupling between message producers and consumers. We can define proper schemas and store them in a public warehouse to better understand the message structure of Kafka.
  • Topic and partition: Messages in Kafka are classified by topic. A topic is like the data source of a database or a folder in a file system. A topic can be divided into several partitions. A partition is a commitlog. In the case of database and table sharding, assume that a data source is referenced by 10 databases, each of which has 10 tables. Similarly, a topic can be distributed to 10 brokers, each of which has 10 partitions. A producer keeps putting and inserting data by using a message key and a partitioner. The partitioner generates a hash value for the message key and maps it to the specified partition. A consumer keeps getting data and uses a load balancing policy to select a partition for consumption. By analogy with the familiar distributed table sharding component, you can better understand message design in the modular and geo-redundant logical data center (LDC) architecture and accept the “turning the database inside out” concept promoted by Confluent (a company with a high valuation for Kafka commercialization). OceanBase shields the component logic of database and table sharding, similar to the Taobao Distributed Data Layer (TDDL), from users, allowing them to access OceanBase from MySQL clients. This is also the case for Kafka and OSS. All distributed systems have similarities, with the only difference being scenarios. Kafka focuses on queue data pipelines, OceanBase focuses on the k/v of atomicity, consistency, isolation, and durability (ACID), and OSS focuses on the persistence of massive files, or large objects.
  • Broker and cluster: Based on specific hardware and its performance characteristics, a single broker can easily process thousands of partitions and millions of messages per second. In Kafka, messages can be replicated only within a single cluster but not across clusters. Clusters should be converged and independently deployed within data centers. Kafka provides MirrorMaker to replicate messages across clusters.
  • Controller: Each Kafka cluster has a controller that manages state changes of topic partitions and replicas and executes management tasks, such as partition re-allocation. The controller is a thread that runs on a broker in a cluster. A controller monitors cluster operations and may be migrated from one broker to another in some cases. The current controller registers itself to a ZooKeeper node, which is at the top of the cluster path and named /controller. Manually deleting this node will release the current controller, and the cluster will elect a new controller. A controller provides services based on the change notifications sent by ZooKeeper, such as for the creation or deletion of topics.
  • Coordinator: It coordinates inter-consumer partition routing, offsets, and Kafka-defined transaction semantics for production across partitions, topics, and consumers. Coordinator may be the most complex concept in Kafka.

The Technical Detail Record of Kafka

The Deployment and O&M

How to Determine the Number of Partitions?

  • What is the maximum write throughput of a topic? For example, do you want to write 100 KB or 1 GB data per second?
  • What is the maximum throughput for reading data from a single partition? Each partition has a consumer. If the consumer writes data to the database at a rate of no more than 50 Mbit/s, then data is read from the partition at a throughput of no more than 50 Mbit/s.
  • You can estimate the throughput at which a producer writes data to a single partition in a similar way. We recommend that a higher throughput be estimated for the producer because the rate the producer has is much faster than that of the consumer.
  • The number of partitions, available disk space, and network bandwidth in each broker.
  • If messages are written to partitions by using different keys, it is complex to add partitions to the existing topic.
  • A single broker has a limit on the number of partitions because the more partitions there are, the more memory is occupied and the longer it takes to elect a leader.

Message File Configuration

  • log.retention.ms: The data retention period is determined based on time and is relative to the last modification time, which is indicated by the log segment closing time, or the timestamp of the last message in the file. Moving or modifying log files by using a Linux command may cause the deletion of rotated log files to fail.
  • log.retention.bytes: The expiration of messages is determined based on the number of retained message bytes. This method applies to every partition. If log.retention.bytes and log.retention.ms (or another time unit) are both specified, messages are deleted as long as either of the two parameters is met. That is, log.retention.bytes and log.retention.ms are of the OR relationship.
  • log.segment.bytes: When the log segment size reaches the maximum value (the default is 1 GB) specified by log.segment.bytes, the current log segment is closed and a new log segment is opened. The closed log segment waits to expire. The smaller the value of this parameter, the more frequently new files are closed and allocated, reducing the overall write efficiency of the disk. When an offset is fetched by using timestamps, the smaller the log segment, the more accurate the result.
  • log.segment.ms: specifies how long log segments are closed. log.segment.bytes and log.retention.ms are not mutually exclusive. Log segments are closed when the maximum size or the specified time is reached, whichever is satisfied first. By default, log.segment.ms is not set. Log segments are closed when the maximum size is reached. Consider the issue of closing multiple log segments in parallel.
  • message.max.bytes: specifies the maximum size of a single message. The default value is 1,000,000, or 1 MB. If a producer tries to send a message that is larger than the maximum size, the message cannot be received and an error message is returned by the broker. This value has significant impact on performance. The greater the value, the more time it takes for the thread to process network connections and requests. It also increases the disk write block size, affecting I/O throughput. The values of message.max.bytes and consumer-specified fetch.message.max.bytes must be coordinated with the broker-specified message size. Consumption may be blocked if the former two values are greater than the latter one. I am not quite clear about this point because the consumer should be able to pull the next message.

Hardware Selection with Kafka

  • Disk: You can select HDD or SSD, depending on your weighing between costs and storage capacity, which may affect the producer response time.
  • Memory: You can select the memory capacity based on the page cache size, which may affect the consumer response time.
  • Network: The imbalance of network traffic distribution results from the consumption-production proportion (which is typically greater than 1), imbalance between outbound and inbound network traffic (the former is greater than the latter), replication, and use of MirrorMaker.
  • CPU: The demand on CPU is low. CPU resources are mainly consumed by security and decompression.

OS+JVM Configuration

  • Virtual memory: You’ll want to avoid memory swapping whenever possible. Swapping between memory pages and disks has significant impact on the performance of Kafka. I recommend that you set vm.swappiness to a smaller value, for example, 1. This parameter specifies how the virtual machine (VM) subsystems use swap partitions, rather than simply remove memory pages from the page cache. Reduce the page cache rather than swap memory. Set vm.dirty_ratio to increase the number of dirty pages before they are refreshed to disks by the kernel process. You can set it to a value greater than 20. The parameter value also indicates the percentage of system memory. The value range is wide, and 60–80 is a reasonable range. Changing the parameter value may bring risks that affect the number of dirty pages that have not been refreshed to disks and result in long I/O wait time due to synchronous refreshing. If you set this parameter to a greater value, I recommend that you enable Kafka replication to avoid data loss due to system crash.
  • File System: XFS provides better performance for Kafka than Ext4. Apart from automatic optimization provided by the file system, no additional optimization is required. Batch disk writing is more efficient and can improve the overall I/O throughput. I recommend that you set the noatime parameter properly for mount points regardless of what file system you use to store log segments. File metadata contains three timestamps: creation time (represented by the ctime parameter), last modification time (mtime), and last access time (atime). By default, atime is updated after each file is read, which leads to a large number of disk write operations, and the atime property is not very useful for Kafka, so it can be disabled.
  • Network: You can set the read-write buffer of TCP socket by setting net.ipv4.tcp_wmem and net.ipv4.tcp_rmem. Set net.ipv4.tcp_window_scaling to 1 and enable TCP time window extension to improve the efficiency of data transmission by the consumer. The transmitted data can be buffered on the broker. Set net.ipv4.tcp_max_syn_backlog to a value greater than the default value 1024 to accept more concurrent connections. Set net.core.netdev_max_backlog to a value greater than the default value 1000 to help cope with network traffic bursts, and queue more data packets for kernel processing when gigabit networks are used. You can perform the preceding configuration in Java applications if you are familiar with Java network programming.
  • GC: G1 is preferred. If a broker has 64 GB memory and uses 5 GB heap memory to run Kafka, you can apply the following configuration to start garbage collection earlier than the default time: Set MaxGCPauseMillis to 20 ms and InitiatingHeapOccupancyPercent to 35.
  • Other production suggestions: Deploy Kafka across racks, which facilitates service scheduling. Store offsets on brokers in versions later than 0.9 to reduce the burden and dependence on ZooKeeper. Multi-cluster reuse of ZooKeeper is not recommended.

Fundamental Design

Consumer Rebalance

  1. The selection result depends on what partition stores offsets.
  2. The broker where the partition leader is located is selected as a coordinator.

Rebalance process

  1. A consumer sends a JoinGroupRequest to the coordinator.
  2. Upon receiving heartbeat requests from other consumers, the coordinator notifies the consumers that a rebalance is required.
  3. Other consumers send JoinGroupRequests.
  4. After receiving JoinGroupRequests from all recorded consumers, the coordinator randomly selects a leader among these consumers. The coordinator returns a JoinGroupResponse to each consumer to indicate whether the consumer is a follower or a leader. The coordinator also returns follower information to the leader consumer to allow it to allocate partitions.
  5. Each consumer sends a SyncGroupRequest to the coordinator. The SyncGroupRequest sent by the leader includes allocation details.
  6. The coordinator returns a response to notify the consumers, including the leader, of the allocation details.
  1. A partition is added.
  2. A consumer is added, automatically shuts down, or is down.
  3. A coordinator is down.

Consumer Offset Maintenance


Broker File Organization

  • baseOffset indicates the specific message in the segment file that corresponds to the index. baseOffset helps save space by using a numeric compression algorithm. For example, Kafka uses varint.
  • position indicates the absolute position of the message in the segment file.
  • Dichotomy is used to search for the record that corresponds to an offset. This method first locates the segment where the offset is located, determines the approximate location of the offset in the segment by means of indexing, and traverses all messages.

Reliability Assurance

  • Broker reliability: Each partition can have multiple replicas, one of which is the leader. All events are directly sent to the leader or read from the leader. Other replicas only need to synchronize with the leader and promptly replicate the latest events. When the leader is unavailable, one of the synchronous replicates becomes the new leader. A replica is no longer synchronized if it fails to exchange heartbeats with ZooKeeper within 6s, stops fetching new messages, or encounters a delay of more than 10s when fetching messages.
  • replication.factor: specifies the number of replicas at the topic level. If the replication factor is N, data can still be written to or read from topics when N-1 brokers fail. Therefore, the greater the replication factor, the higher the availability and reliability and the fewer the failures. A replication factor of N requires at least N brokers and N data replicas, which occupy N times the disk space. For financial data, the recommended replication factor is 5. For log monitoring data, the recommended replication factor is 1. You can specify the name of the rack where each broker is located by setting broker.rack to ensure high availability.
  • unclean.leader.election: At the broker level or within a cluster, this parameter is set to true by default to allow an asynchronous replicate to become the leader. This is called incomplete election and may result in message loss. If this parameter is set to false, the system has to wait until the original leader goes online. This reduces availability. Set this parameter to false in financial scenarios.
  • min.insync.replicas: specifies the minimum number of synchronous replicas. It is a subject-level parameter.
  • Producer reliability: Producers without reliability settings may cause sudden systemwide data loss, despite reliable broker settings.
  • acks=n. When this parameter is set to 0, the one-way communication mode is adopted, which is the fastest communication mode but provides no assurance. When this parameter is set to 1, the leader returns an acknowledgement or error response after receiving a message and writing it to a partition data file, which is not necessarily synchronized to the disk. When this parameter is set to all, the leader waits until all synchronous replicas receive the message before returning an acknowledgement or error response.
  • Exceptions handling: is divided into retries (applicable to network exceptions due to jitter) and non-retries (applicable to non-serializable and non-compliant messages). If a producer receives the LeaderNotAvailableException error message during normal leader election and can properly process the error, it can successfully resend the message through retry. However, this may result in repeated messages and makes it difficult to control the idempotency for message appending.
  • Consumer reliability: Consumers only need to track which messages are read and which are not. This is the key to preventing message loss when messages are read. A committed message is a message that is written to all synchronous replicas and visible to the consumer. Think about how to synchronize the committed progress among replicas.
  • The consumption progress can be committed automatically or explicitly. Automatic commit is simple but may cause an offset to be committed even when messages are not successfully consumed, or result in repeated consumption. Explicit commit needs to consider frequency: whether the commit operation is performed each time after consumption or with an asynchronous delay. It is a trade-off between performance and repeated message quantities.
  • Monitoring reliability:
  • For producers, two major reliability metrics are the error rate and retry rate (merged) of messages. A system fault may occur if the values of the two metrics increase.
  • For consumers, the most important indicator is consumer lag, which indicates the gap between the processing speed of consumers and the offsets that are recently committed to partitions. In the ideal conditions, consumer lag is always 0, indicating that consumers always read latest messages. In Kafka 2.0, consumer lag cannot be used to alert users about potential risks when consumers fall out of the consumable range (out-of-range error) due to an excessively short retention period of partitions. To solve this problem, Kafka adds the lead metric, which measures the distance between the log start offset of a partition and the location of a consumer in the partition. When the metric approaches 0, the consumer falls out of the consumable range and therefore may lose data.
  • To ensure timely reading of monitoring data streams, you need to know when data is generated. The Notify middleware is defined by RocketMQ based on borntime (the time when a message is generated by the consumer) and gmtCreate (the time when the message arrives at the broker), combined with EagleEye tracing to make trace analysis more refined. Kafka supports the internal generation and consumption of messages and adds a monitoring consumer that subscribes to a special topic to only count messages and compare the count result with the number of generated messages. This enables the accurate monitoring of producers even when no consumers exist.

Original Source



Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com