How to Synchronize Data from Message Queue for Apache Kafka to MaxCompute?
By Geng Jiangtao, an Engineer in the Intelligent Customer Services Team at Alibaba Cloud
The following content is based on the video by Geng Jiangtao and the accompanying PowerPoint slides.
For daily operations, many enterprises use Message Queue for Apache Kafka to collect the behavior logs and business data generated by apps or websites and then process them offline or in real-time. Generally, the logs and data are delivered to MaxCompute for modeling and business processing to obtain user features, sales rankings, and regional order distributions, and the data is displayed in data reports.
There are two ways to synchronize data from Message Queue for Apache Kafka to DataWorks. In one process, business data and behavior logs are uploaded to Datahub through Message Queue for Apache Kafka and Flume, then transferred to MaxCompute, and finally displayed in Quick BI. In the second process, business data and action logs are transferred through Message Queue for Apache Kafka, DataWorks, and MaxCompute, and finally displayed in Quick BI.
In the following description, I will use the second process. Synchronize data from DataWorks to MaxCompute using one of two solutions: custom resource groups or exclusive resource groups. Custom resource groups are used to migrate data to the cloud on complex networks. Exclusive resource groups are used when integrated resources are insufficient.
2.1 How Message Queue for Apache Kafka Works
- Overview: Message Queue for Apache Kafka is a distributed, high-throughput, and scalable message queue service provided by Alibaba Cloud. It is generally used in big data fields, such as log collection, monitoring data aggregation, streaming data processing, and online and offline analysis. Message Queue for Apache Kafka provides a fully managed service for open-source Apache Kafka, solving the long-standing pain points of the open-source product. Message Queue for Apache Kafka is cost-effective, elastic, and reliable. When using it, you only need to focus on business development, without worrying about deployment or O&M.
- Architecture: As shown in the following figure, a typical Message Queue for Apache Kafka cluster is divided into four parts. A producer generates data and sends messages to the Message Queue for Apache Kafka broker in push mode. The messages sent can be page visits (PVs), server logs, and information related to system resources such as CPU utilization and memory usage. A Message Queue for Apache Kafka broker is a server that stores messages. You can add Message Queue for Apache Kafka brokers as needed. The more Message Queue for Apache Kafka brokers, the higher the throughput of the Message Queue for Apache Kafka cluster. The Message Queue for Apache Kafka broker uses partitions for topics, and partitions are assigned the leader and follower roles. A consumer group subscribes to and consumes the information about a leader from the Message Queue for Apache Kafka broker in pull mode. Each partition contains consumer offsets. ZooKeeper manages cluster configurations, elects the leader partition, and manages the load balancing of
partition_leaderwhen the consumer group changes.
- Purchase and deploy Message Queue for Apache Kafka: Log on to the Message Queue for Apache Kafka console, set Consumption Mode, Region, Instance Type, Disk, Traffic, and Message Storage Period, and purchase Message Queue for Apache Kafka, as shown in the following figure. It is important to select the correct region. If the region of your MaxCompute is China (Qingdao), China (Beijing), China (Zhangjiakou-Beijing Winter Olympics), or China (Hohhot), select one of these regions for Message Queue for Apache Kafka if possible. You need to deploy the product after activating it. Select a proper Virtual Private Cloud (VPC) and a VSwitch and click Deploy.
Click Topics in the left-side navigation pane. On the page that appears, click Create Topic. On the Create Topic page, enter your topic information and click OK. There are three notes below the topic name. Set a topic name in line with your business. For example, you should try to separate financial businesses and commercial businesses. Click Consumer Groups in the left-side navigation pane. On the page that appears, click Create Consumer Group to create a required consumer group. Set the consumer group name as appropriate for your topic. For example, financial businesses and commercial businesses should have their own topics.
- Configure the whitelist: After installing and deploying Message Queue for Apache Kafka, determine a whitelist of IP addresses able to access the Message Queue for Apache Kafka brokers or products. The default endpoint in the following figure is the access interface.
2.2 Introduction and Configuration of Resource Groups
- Background of custom resource groups: Custom resource groups are used to address network issues between IDCs. Local networks are different from cloud networks. For example, DataWorks can migrate massive data to the cloud, free of charge (with the default job resource group). However, the default resource group does not support high transmission speeds or synchronous cloud migration of data sources in complex environments. In this case, use custom resource groups to enable the connection between DataWorks default resource groups and your data sources or to achieve a higher transmission speed. However, custom resource groups are mainly used to implement cloud migration in complex network environments and enable data transmission and synchronization between any network environments.
- Configure a custom resource group: To configure a custom resource group, perform the following steps:
1) Log on to the DataWorks console, click Workspace List, select the required project, and click Data Integration to determine the project where data is to be integrated.
2) Go to the data source page and click Create Custom Resource Group to create a custom resource group. Only the project administrator can add a custom resource group in the upper-right corner of the page.
3) Check whether Message Queue for Apache Kafka and the custom resource group to be added are in the same VPC. In this experiment, an Elastic Compute Service (ECS) instance sends messages to Message Queue for Apache Kafka, and they are in the same VPC.
4) Log on to the ECS console to configure the custom resource group. Run the
dmidecode|grep UUID command to obtain the UUID of the ECS instance.
5) Enter the UUID of the instance, the IP address of the custom resource group, and the CPU and memory of the machine.
6) Run the relevant commands on the ECS instance. The agent is installed in five steps. After the fourth step, click Refresh and check whether the service is available. Run a connectivity test to check whether the resource group has been added.
- Background of exclusive resource groups: Some customers reported a problem with insufficient resources when synchronizing data from Message Queue for Apache Kafka to MaxCompute. In this case, add an exclusive resource group for data synchronization. Physical resources, such as the network, disk, CPU, and memory of the ECS instances of an exclusive resource group are completely exclusive to users. This helps to isolate the resources of different users or different workspaces. In addition, exclusive resources can be flexibly scaled in or out, which meets the requirements of resource exclusiveness and flexible configuration.
An exclusive resource group accesses data sources in VPCs in the same region and public endpoints of Relational Database Service (RDS) instances in other regions.
- Configure an exclusive resource group: To configure an exclusive resource group, you need to perform the following steps:
1) Log on to the DataWorks console, click Resource List, and click Create Exclusive Resource Groups to create an exclusive resource group for integration or scheduling. Here, we will add an exclusive resource group for integration. Before clicking Purchase, set the Purchase Mode, Region, Resources, Memory, Validity Period, and Quantity.
2) Bind the purchased exclusive resource group to the VPC corresponding to Message Queue for Apache Kafka. Click Bind VPC, and select the VSwitch (note the zone) and security group corresponding to Message Queue for Apache Kafka.
2.3 Synchronization Process and Precautions
Note the following when configuring parameters for synchronizing data from Message Queue for Apache Kafka to MaxCompute:
- Integrate data in DataWorks: Log on to the DataWorks console, click Create Workflow to create a workflow, add a data synchronization node to the created workflow, and name the node.
Access the data synchronization node on the Reader and Writer and choose Message Queue for Apache Kafka Reader and MaxCompute Writer as the data sources, as shown in the following figure. Switch to script mode. The help document is displayed in the upper-right corner of the following figure. Click some synchronization parameters in the Reader and Writer dialog boxes to facilitate reading, operation, and understanding.
- Main parameters of Message Queue for Apache Kafka Reader: The main parameter of Message Queue for Apache Kafka Reader is a server, which is in the format
ip:port. The default endpoint of Message Queue for Apache Kafka is a server. The server parameter is required. The topic parameter indicates the topic of the Message Queue for Apache Kafka data source after the deployment of Message Queue for Apache Kafka. This parameter is also required. The column parameter can be a constant column, a data column, or an attribute column. Constant columns and data columns are less important. Completely synchronized messages are stored in value in the attribute column. If you need other information, such as the partition, offset, or timestamp, filter it in the attribute column. The column parameter is required.
There are six values for keyType and valueType. Select the appropriate value based on the synchronized data to synchronize data of the type. Note whether the data is synchronized by message time or consumer offset. Data synchronization by consumer offset involves beginDateTime, endDateTime, beginOffset, and endOffset. Select either beginDateTime or beginOffset as the start point for data consumption. Select either endDateTime or endOffset. When using beginDateTime and endDateTime, note that only Message Queue for Apache Kafka V0.10.2 or later supports data synchronization by consumer offset. In addition, beginOffset has three special forms: seekToBeginning indicates that data is consumed from the start point. seekToLast indicates that data is consumed from the last consumer offset. Based on beginOffset, data can be consumed only once from the last consumer offset. If beginDateTime is used, data can be consumed multiple times, depending on the message storage time. seekToEnd indicates that data is consumed from the last consumer offset and empty data is read.
The skipExceeedRecord parameter is optional. The partition parameter is optional as the data in multiple partitions of a topic is read and consumed. The kafkaConfig parameter is optional. Other relevant parameters can be extended from this parameter.
- Main parameters of the MaxCompute writer: The dataSource parameter specifies the name of a MaxCompute data source. The tables parameter specifies the name of the table to be created, or the table to which data will be synchronized from Message Queue for Apache Kafka.
If the table is a partition table, the partition parameter must be set to the last-level partition to determine the synchronization point. If the table is a non-partition table, the partition parameter is optional. The column parameter needs to be consistent with the relevant fields in Message Queue for Apache Kafka columns. Information is synchronized only when the relevant fields are consistent. The truncate parameter specifies whether to write data in append mode or overwrite mode. If possible, prevent multiple Data Definition Language (DDL) operations on one partition at the same time, or create partitions before starting multiple concurrent jobs.
- Synchronize data from Message Queue for Apache Kafka to MaxCompute: The following figure is divided into three parts: Message Queue for Apache Kafka Reader, MaxCompute Writer, and restriction parameters. The Reader includes the fields, such as server, endOffset, kafkaConfig,
group.id, valueType, ByteArray, column, topic, beginOffset, and seekToLast. MaxCompute Writer includes the truncate, compress, and datasource fields, which must be consistent with those of Message Queue for Apache Kafka Reader. The value data must be synchronized. Restriction parameters are as follows: errorlimit specifies the number of data errors before an error is reported, and speed restricts the traffic rate and concurrency.
- Write data according to the instructions provided in the Message Queue for Apache Kafka producer SDK: The produced data is sent to Message Queue for Apache Kafka and can be viewed by using appropriate code. The following code provides an example of the configured reading, protocol, and serialization modes, the request wait time, topic to be sent, and the type of messages to be sent. After the data is sent, a message is returned. For more information about the code, see the configuration file, message source, and producer and consumer code template.
- Package and run code on the ECS instance (in the same zone as Message Queue for Apache Kafka): As shown in the following figure, the crontab-e command is run at 17:00 every day. The following figure shows the message record after the log is sent.
- Create a table in MaxCompute: Log on to the DataWorks console, access the workflow page, create a target table, and run a DDL statement to create a synchronization table, or create different table fields based on individual businesses.
2.4 Development Testing and Production Deployment
- Select a custom resource group (or an exclusive resource group for integration) for synchronization: As shown in the following figure, click Configure Task Resource Group in the upper-right corner, select the needed resource group, and click Run. A success message is displayed, including the data synchronization record and result. With this, the synchronization process is complete.
- Query the synchronization result: View the synchronization result on a temporary DataWorks page. Run the
select * from testkafka3command on a temporary node to view the data synchronization result. If the data is synchronized, the test is successful.
- Set scheduling parameters: After the data synchronization function is developed for the workflow, relevant models are processed, and some SQL nodes and synchronization nodes are designed and deployed. As shown in the following figure, click Scheduling Configuration on the right and enter the scheduling time. For more information, see the DataWorks documentation.
- Submit, package, and release a workflow node: Click Workflow, select a node to submit, and click Submit. Some submitted workflow nodes do not need to be placed in the production environment. Then, go to the job release page and add the nodes to the list of nodes to be released to perform job deployment.
- Verify whether the workflow was released: On the O&M center page, verify that the released nodes are in the production environment. This completes the process of synchronizing data from Message Queue for Apache Kafka to MaxCompute. At the scheduling time, a node log is displayed on each node or in the upper-right corner. Check the log to see if the node is running properly and whether subsequent operations, deployment data or relevant commands are required.
In this article, you gained an overall understanding of Message Queue for Apache Kafka and learned how to synchronize Message Queue for Apache Kafka to MaxCompute on Alibaba Cloud. Additionally, we covered various configuration methods and deployment operations involved right from development to production.