Apache Flink Fundamentals: Using Table API for Programming

What Is Table API?

Overview of Flink APIs

Table API Features

Table API Programming

WordCount Example

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
public class JavaBatchWordCount { // line:10 public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
String path = JavaBatchWordCount.class.getClassLoader().getResource("words.txt").getPath();
tEnv.connect(new FileSystem().path(path))
.withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n"))
.withSchema(new Schema().field("word", Types.STRING))
.registerTableSource("fileSource"); // line:20
Table result = tEnv.scan("fileSource")
.groupBy("word")
.select("word, count(1) as count");
tEnv.toDataSet(result, Row.class).print();
}
}
tEnv.connect(new FileSystem().path(path))
.withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n"))
.withSchema(new Schema().field("word", Types.STRING))
.registerTableSource("fileSource");
Table result = tEnv.scan("fileSource")
.groupBy("word")
.select("word, count(1) as count");
tEnv.toDataSet(result, Row.class).print();

Obtain a Table

How to Output a Table

How to Operate on a Table

Table Operation Overview

Operations to Improve the Usability

Operations to Enhance Functions

Table API Developments

Original Source:

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store