Integrating Apache Hudi and Apache Flink for New Data Lake Solutions

1. Reasons for Decoupling

2. Difficulties in Decoupling

3. Decoupling Ideas

4.Flink Integration Designs

  • Source receives Kafka data and converts them to List data.
  • InstantGeneratorOperator generates a globally unique instant. When the last instant is not completed, or there is no data in the current batch, it does not create any new instant.
  • KeyBy partitionPath divides partitions based on partitionPath to avoid writing to the same partition by multiple subtasks.
  • WriteProcessOperator performs write operations. If the current partition has no data, it sends empty result data to the downstream.
  • CommitSink receives the computing results from upstream tasks. When the parallel computing results are received, it considers that all the upstream subtasks have been completed and executes the commit operation.

5. Examples

/**
* Abstract implementation of a HoodieTable.
*
* @param <T> Sub type of HoodieRecordPayload
* @param <I> Type of inputs
* @param <K> Type of keys
* @param <O> Type of outputs
*/
public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implements Serializable {
protected final HoodieWriteConfig config;
protected final HoodieTableMetaClient metaClient;
protected final HoodieIndex<T, I, K, O> index;
public abstract HoodieWriteMetadata<O> upsert(HoodieEngineContext context, String instantTime,
I records);
public abstract HoodieWriteMetadata<O> insert(HoodieEngineContext context, String instantTime,
I records);
public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, String instantTime,
I records, Option<BulkInsertPartitioner<I>> bulkInsertPartitioner);
......
}
/**
* Base class contains the context information needed by the engine at runtime. It will be extended by different
* engine implementation if needed.
*/
public abstract class HoodieEngineContext {
public abstract <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism); public abstract <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism); public abstract <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int parallelism); ......
}
@Override
public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {
return javaSparkContext.parallelize(data, parallelism).map(func::apply).collect();
}
@Override
public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {
return data.stream().parallel().map(func::apply).collect(Collectors.toList());
}
@FunctionalInterface
public interface SerializableFunction<I, O> extends Serializable {
O apply(I v1) throws Exception;
}

6. Status Quo and Follow-Up Plans

6.1 Work Timelines

6.2 Follow-Up Plans

Original Source:

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com