Real-Time Data Synchronization Based on Flink SQL CDC
By Wu Chong (yunxie), Apache Flink PMC and Technical Expert
Edited by Chen Zhengyu, a Volunteer from the Flink Community
Flink 1.11 introduces Flink SQL CDC. What changes can CDC bring to data and business? It will focus on traditional data synchronization solutions, synchronization solutions based on Flink CDC, more application scenarios, and CDC future development plans.
Traditional Data Synchronization Solution and Flink SQL CDC Solution
In business systems, data usually needs to be updated in multiple storage components. For example, an order system can complete business by writing data to a database. One day, the BI Team expects to perform full-text indexing on the database. Therefore, one more copy of the data should be written to Elasticsearch. After the transformation, the data needs to be written to the Redis cache.
This mode is not sustainable. This data writing mode may lead to maintenance, scalability, and data consistency problems. Distributed transactions are required, and the cost and complexity will increase. You can use the Change Data Capture (CDC) tool to decouple data and synchronize the data to the downstream storage systems. By doing so, systems are more robust, and the subsequent maintenance will be convenient.
Analysis on Flink SQL CDC Data Synchronization
CDC is short for Change Data Capture. It is such a broad concept that as long as changed data can be captured, it can be called CDC. There are query-based CDC and log-based CDC in the industry. The differences in their features are listed below:
Based on the comparison above, the log-based CDC has the following advantages:
- All data changes and complete change records can be captured. It is widely used in remote disaster recovery and data backup scenarios. For query-based CDC, the data between two queries may be lost.
- Each DML operation has a record with no need to initiate a full table scan for filtering, compared with the query-based CDC. It brings higher efficiency and performance, low latency, and no increase in database loads.
- No intrusion into the business, business decoupling, and no need to change the business model
- Deletion events and the status of old records are captured. For query-based CDC, periodic queries cannot perceive whether the intermediate data has been deleted.
Introduction to Log-Based CDC
From the perspective of ETL, the business database data is collected generally. Let’s take MySQL as an example. MySQL Binlog is collected through Debezium and then sent to Kafka message queue. Then, it is connected to real-time computing engines or applications for consumption. After that, the data is transmitted to an OLAP system or other storage media.
Flink hopes to connect more data sources and give full play to its computing capability. The operational logs and database logs are the main sources in production. Business logs have been well supported by Flink, while the database logs haven’t been supported before Flink 1.11. This is one of the reasons why CDC is integrated.
Flink SQL supports the complete changelog mechanism. Flink only needs to convert the CDC data to the data that Flink recognizes to connect CDC data. Therefore, the TableSource API is restructured in Flink 1.11 to better support and integrate the CDC.
The restructured TableSource outputs RowData structure, representing a row of data. There is metadata information above RowData, which is called RowKind. The RowKind contains commands, including
DELETE, which are similar to the binlog in a database. Through the JSON format collected by Debezium, old and new data rows and original data information are included. The "u" of the "op" indicates the update identifier, and "ts_ms" indicates the synchronization timestamp. Therefore, connecting Debezium JSON data converts the original JSON data into the RowData recognizable to Flink.
Use Flink as the ETL Tool
When Flink is selected as the ETL tool, the data synchronization process is shown in the following figure:
Debezium subscribes MySQL Binlog and transmits Binlog to Kafka. Flink designates the format as debezium-json by creating a Kafka table. Then, after computing by Flink, the table is inserted directly into other external data storage systems, such as Elasticsearch and PostgreSQL, in the following figure:
However, there is a disadvantage in this architecture. Too many collection components lead to complicated maintenance. So, we wonder if Flink SQL can be used to connect directly with MySQL Binlog data, and is there any alternative? The answer is yes. The improved structure is shown in the following figure:
The community has developed the
flink-cdc-connectors component. It is a source component that can read full data and incremental change data directly from MySQL, PostgreSQL, and other databases. It has been open-sourced. Please see this link for more information.
flink-cdc-connectors can replace the data collection module of Debezium + Kafka. Thus, Flink SQL collection, computing, and ETL can be unified, which has the following advantages:
- Out-of-the-box, simple, and easy-to-use
- Fewer maintenance components, simplified real-time procedures, and lower deployment costs
- Small end-to-end latency
- Exactly-once guarantee for read and compute
- No data storage and lower storage costs
- Stream reading of full and incremental data supported
- Backtracking of Binlog collection site
Practices of Data Synchronization Based on Flink SQL CDC
Here are three examples of Flink SQL + CDC that are often used in real-world scenarios. If you want to practice, you will need components, such as Docker, MySQL, and Elasticsearch. For more information about the three examples, please see the documents of each example.
Example 1: Flink SQL CDC + JDBC Connector
In this example, the order table (fact table) data is subscribed, and MySQL Binlog is sent to Kafka through Debezium. Then, the results are output to the downstream PostgreSQL database through Join and ETL operations on the dimension table.
Example 2: CDC Streaming ETL
This example simulates the order table and logistics table of the e-commerce company. The order data needs to be statistically analyzed. Different information needs to be associated to form a wide order table. Then, the wide table is delivered to the downstream business team for data analysis through Elasticsearch. This example demonstrates how Binlog data streams are associated once and synchronized to Elasticsearch in real-time through Flink’s great computing capability without relying on other components.
For example, the following Flink SQL code can complete the real-time synchronization of full and incremental data in the orders table in MySQL:
CREATE TABLE orders (
price DECIMAL(10, 5),
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders'
);SELECT * FROM orders
The test environment of docker-compose is also provided to help readers understand this example. For more detailed tutorials, please refer to the following document: Reference Document
Example 3: Streaming Changes to Kafka
The following example shows how to count the GMV of the whole platform daily. The statements include
DELETE. Only the paid order can be calculated into the GMV to observe the changes of GMV value.
More Application Scenarios of Flink SQL CDC
Flink SQL CDC can be applied flexibly in real-time data synchronization scenarios. It can also be provided to users in more scenarios.
Flexible Positioning of Flink in Data Synchronization Scenarios
- If you already have the collection layer (E) of Debezium/Canal + Kafka, you can use Flink as the computing layer (T) and transport layer (L).
- Flink can also replace Debezium/Canal. Change data can be synchronized directly to Kafka by Flink. The ETL process is unified by Flink.
- If Kafka data caching is not required, change data can be synchronized directly by Flink to the destination. The ETL process is unified by Flink.
Flink SQL CDC: Connecting to More Scenarios
- Real-time data synchronization, data backup, data migration, and data warehouse construction
- Benefits: Various upstream and downstream components (E&L), great computing capabilities (T), easy-to-use API (SQL), and low-latency streaming computing
- Real-time materialized view and streaming data analysis on databases
- Indexing and real-time maintenance
- Business cache refreshing
- Audit trail
- Microservice decoupling and read-write separation
- CDC-based dimension table join
The following section explains why the CDC-based dimension table join is faster than the query-based dimension table join.
Query-Based Dimension Table Join
Currently, the query of the dimension table is conducted through table join. After data enters the dimension table from the message queue, an I/O request is initiated to the database. Then, the database returns and merges the result and then outputs the result to the downstream. However, this process inevitably consumes I/O and network communication, making it impossible to improve the throughput. Even with some cache mechanisms, the results may not be accurate since the cache is not updated in time.
CDC-Based Dimension Table Join
The data of the dimension table can be imported into the State of the dimension table Join through CDC. Since the State is distributed, it stores the real-time image of the database dimension table in the Database. When the message queue data comes, there is no need to query the remote database again. Direct query to the State in the local disk is enough. This saves I/O operations and realizes low latency, high throughput, and higher accuracy.
Tips: This feature is currently under development for version 1.12. Please pay attention to FLIP-132 for the specific progress.
- FLIP-132: Temporal Table DDL (CDC-based dimension table join)
- Upsert data output to Kafka
- More CDC formats supported (
debezium-avro, OGG, and Maxwell)
- CDC data processing supported in batch mode
- More databases supported by
This article shares the advantages of Flink CDC by comparing the Flink SQL CDC and traditional data synchronization solutions. Meanwhile, it introduces the implementation principles of log-based and query-based CDC. There are also examples of MySQL Binlog subscription using Debezium and technology integration for replacing subscription components through
flink-cdc-connectors. Additionally, it describes the application scenarios of Flink CDC, such as data synchronization, materialized view, and multi-data center backup, in detail. It also introduces the advantages of CDC-based dimension table join and the CDC component work.
After reading this article, we hope you will have a new understanding of Flink SQL CDC. We hope Flink SQL CDC will bring more convenience for development and be applied in more scenarios.
1. How do I write
GROUP BY results to Kafka?
GROUP BY results are updated results, currently, they cannot be written into the "append only" message queue. The updated results writing will be supported natively in Kafka 1.12. In version 1.11, the changelog-json format provided by the
flink-cdc-connectors can do this. For more information, please see this document.
2. Does CDC need to ensure sequential consumption?
Yes, it does. The Kafka partitions must be sequential first to synchronize data to Kafka. Data changes with the same key must be synchronized to the same Kafka partition. This way, the sequence can be guaranteed when Flink reads data.