Integrating Apache Hudi and Apache Flink for New Data Lake Solutions

1. Reasons for Decoupling

Since its inception, Hudi has been using Spark as its data processing engine. If users want to use Hudi as their data lake framework, they must introduce Spark into the technology stacks of their platforms. A few years ago, it was common to use Spark as a processing engine for big data it can perform batch processing, micro-batch streaming stimulation, and unification of stream and batch processing to solve stream and batch problems. However, with the development of big data technology in recent years, Flink, which is also a big data processing engine, has entered people’s views gradually. It occupied a certain market in the field of computing engines, so the dominance in big data processing engines no longer existed. In communities, forums, and other platforms of big data technologies, demands of the support for the Flink in Hudi have emerged and spread. Therefore, it is valuable to enable Hudi to support Flink, while decoupling Hudi from Spark is the prerequisite for integrating Flink.

2. Difficulties in Decoupling

The use of Spark API in Hudi is as common as the use of List in our daily development. Spark RDD is used everywhere as the main data structure, whether reading data from the data source or writing data to the table. Even common tool classes can be implemented through Spark API. Hudi is a general-purpose data lake framework implemented by Spark with deep-rooted binding to Spark.

3. Decoupling Ideas

Theoretically, Hudi uses Spark as its computing engine for the distributed computing capability of Spark and the rich operator capabilities of RDD. Aside from the distributed computing capability, RDD is more of a data structure abstraction in Hudi and is essentially a bounded data set. Therefore, it is feasible to replace RDD with List, but performance may be reduced in this case. We can retain the setting of the bounded dataset as a basic operating unit to ensure the performance and stability of the Hudi Spark. The API for the main operation is unchanged in Hudi. RDD is extracted as a generic, and the Spark engine implementation still uses RDD. Other engines use List or other bounded datasets based on actual conditions.

4.Flink Integration Designs

In essence, write operations of Hudi are batch processing. The continuous mode of DeltaStreamer is implemented by circular batch processing. To use unified APIs, Hudi accumulates a batch of data during the integration with Flink, processes them, and submits them. Here, we use List to accumulate data for Flink.

  • InstantGeneratorOperator generates a globally unique instant. When the last instant is not completed, or there is no data in the current batch, it does not create any new instant.
  • KeyBy partitionPath divides partitions based on partitionPath to avoid writing to the same partition by multiple subtasks.
  • WriteProcessOperator performs write operations. If the current partition has no data, it sends empty result data to the downstream.
  • CommitSink receives the computing results from upstream tasks. When the parallel computing results are received, it considers that all the upstream subtasks have been completed and executes the commit operation.

5. Examples

1) HoodieTable

/**
* Abstract implementation of a HoodieTable.
*
* @param <T> Sub type of HoodieRecordPayload
* @param <I> Type of inputs
* @param <K> Type of keys
* @param <O> Type of outputs
*/
public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implements Serializable {
protected final HoodieWriteConfig config;
protected final HoodieTableMetaClient metaClient;
protected final HoodieIndex<T, I, K, O> index;
public abstract HoodieWriteMetadata<O> upsert(HoodieEngineContext context, String instantTime,
I records);
public abstract HoodieWriteMetadata<O> insert(HoodieEngineContext context, String instantTime,
I records);
public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, String instantTime,
I records, Option<BulkInsertPartitioner<I>> bulkInsertPartitioner);
......
}
/**
* Base class contains the context information needed by the engine at runtime. It will be extended by different
* engine implementation if needed.
*/
public abstract class HoodieEngineContext {
public abstract <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism); public abstract <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism); public abstract <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int parallelism); ......
}
@Override
public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {
return javaSparkContext.parallelize(data, parallelism).map(func::apply).collect();
}
@Override
public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {
return data.stream().parallel().map(func::apply).collect(Collectors.toList());
}
@FunctionalInterface
public interface SerializableFunction<I, O> extends Serializable {
O apply(I v1) throws Exception;
}

6. Status Quo and Follow-Up Plans

6.1 Work Timelines

In April 2020, Yang Hua (@vinoyang) and Wang Xianghu (@wangxianghu) from T3 Travel designed and finalized the decoupling scheme together with Li Shaofeng (@leesf) from Alibaba and other partners.

6.2 Follow-Up Plans

1) Promotion on the Integration of Hudi and Flink

Original Source:

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store