Best Practices for Migrating Data from Kafka to MaxCompute

Prerequisites

Build a Kafka Cluster

Create a MaxCompute Project

Background

Procedure

[root@emr-header-1 ~]# kafka-topics.sh --zookeeper emr-header-1:2181/kafka-1.0.1 --partitions 10 --replication-factor 3 --topic testkafka --create
Created topic "testkafka".
[root@emr-header-1 ~]# kafka-topics.sh --list --zookeeper emr-header-1:2181/kafka-1.0.1
__consumer_offsets
_emr-client-metrics
_schemas
connect-configs
connect-offsets
connect-status
testkafka
[root@emr-header-1 ~]# kafka-console-producer.sh --broker-list emr-header-1:9092 --topic testkafka
123
abc
[root@emr-header-1 ~]# kafka-console-consumer.sh --bootstrap-server emr-header-1:9092 --topic testkafka --from-beginning
123
abc

Create a MaxCompute Table

CREATE TABLE testkafka (
`key` string,
`value` string,
`partition1` string,
`timestamp1` string,
`offset` string,
`t123` string,
`event_id` string,
`tag` string
);

Data Sync

Create a Custom Resource Group

Create and Run a Synchronization Task

{
"type": "job",
"steps": [
{
"stepType": "kafka",
"parameter": {
"server": "47.xxx.xxx.xxx:9092",
"kafkaConfig": {
"group.id": "console-consumer-83505"
},
"valueType": "ByteArray",
"column": [
"__key__",
"__value__",
"__partition__",
"__timestamp__",
"__offset__",
"'123'",
"event_id",
"tag.desc"
],
"topic": "testkafka",
"keyType": "ByteArray",
"waitTime": "10",
"beginOffset": "0",
"endOffset": "3"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "odps",
"parameter": {
"partition": "",
"truncate": true,
"compress": false,
"datasource": "odps_first",
"column": [
"key",
"value",
"partition1",
"timestamp1",
"offset",
"t123",
"event_id",
"tag"
],
"emptyAsNull": false,
"table": "testkafka"
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": ""
},
"speed": {
"throttle": false,
"concurrent": 1,
"dmu": 1
}
}
}
[root@emr-header-1 ~]# kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --list

_emr-client-metrics-handler-group

[root@emr-header-1 ~]# kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testkafka 6 0 0 0 - - -
test 6 3 3 0 - - -
testkafka 0 0 0 0 - - -
testkafka 1 1 1 0 - - -
testkafka 5 0 0 0 - - -

Result Verification

Original Source

--

--

--

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

How to install Home Assistant on Portainer

Content Delivery Acceleration and Cost Reduction with P2P CDN (PCDN)

Why You Should Use Python 🐍 to Script.

Day 20 Boss time!

What is API?

A little more advanced git features — part 2

Alibaba Cloud Linux 2 LTS: Higher Performance with Greater Protection

Using Hive in Apache Flink 1.9

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

More from Medium

Query prioritization in Apache Druid

FluentD webhook with TLS Mutual Authentication

Airflow with no downtime: An in-depth guide

Up and Running with Kafka (installation) in Simplest way