Deep Insights into Flink SQL: Flink Advanced Tutorials

Image for post
Image for post

By He Xiaoling (Xiaoling) and compiled by Zheng Zhongni

This article is based on the advanced live courses on Apache Flink given by He Xiaoling, who is a technical expert at Alibaba. It explains the SQL principles and feature changes in Flink 1.9 from the users’ perspective to help you understand and use the new features of Flink 1.9 effortlessly. This article consists of the following parts:

1) Design and scenarios of the new TableEnvironment
2) Design of the new catalog type and related data definition language (DDL) practices
3) Key improvements and optimizations to the Blink planner

The New TableEnvironment

In FLIP-32, Blink is completely open-source and has been merged with the Flink master branch. After the merger, Flink 1.9 provides two planners: Flink planner and Blink planner.

Flink Table only played a minor role in Flink versions earlier than v1.9. However, Flink Table has become increasingly important as the user-friendly Flink SQL has been increasingly recognized and used. The design of Blink considers a unified approach to batch and stream processing. Batch processing is a special form of stream processing so the community considered the special design of Blink while merging Blink with the Flink master branch.

Image for post
Image for post
Figure 1 Overall design of the new TableEnvironment

As shown in Figure 1, TableEnvironment consists of the following parts:

  • flink-table-common: This package contains the code shared by the Flink planner and Blink planner.
  • flink-table-api-java: This package contains most of the programming APIs.
  • flink-table-api-scala: This is only related to the expression of the Table API and the domain-specific language (DSL).
  • Two planners: flink-table-planner and flink-table-planner-blink.
  • Two bridges: flink-table-api-scala-bridge and flink-table-api-java-bridge. As shown in the figure, the Flink planner and Blink planner depend on specific Java APIs and bridges. Bridges are used to convert API operations to DataStream and DataSet of Scala or Java.

In Flink versions earlier than v1.9, the Flink Table module has seven environments, making it complex to use and maintain. The seven environments are divided into StreamTableEnvironment and BatchTableEnvironment, with one of each for Java and for Scala, and three superclass environments.

Under the new framework, the community simplified the original design to unify batch and stream processing. Therefore, a unified TableEnvironment is provided. It is stored in the flink-table-api-java package. The bridges provide two StreamTableEnvironments used to connect Scala DataStream and Java DataStream. The BatchTableEnvironment is retained because the Flink planner still supports operations similar to toDataSet(). Currently, a total of five TableEnvironments are used.

The BatchTableEnvironment will be deprecated after the future removal of the Flink planner, leaving only three environments. This will make the TableEnvironment design more concise.

This section describes the scenarios and limitations of the new TableEnvironment.

The scenarios of the new TableEnvironment are shown in the following figure.

Image for post
Image for post
Figure 2 Scenarios of the new TableEnvironment

In the first line and afterward, the new TableEnvironment is referred to as UnifyTableEnvironment. In Blink, batch processing is a special case of stream processing, and hence UnifyTableEnvironment is applicable to Blink batch processing.

Flink 1.9 places some limitations on UnifyTableEnvironment. For example, UnifyTableEnvironment does not support the registration of user-defined aggregate functions (UDAFs) and user-defined table functions (UDTFs). In addition, UnifyTableEnvironment does not support the type inference function (which is not completed) of the new Type System and the type inference functions (which are not unified) of Java and Scala. These functions will be implemented in Flink 1.10. UnifyTableEnvironment cannot be converted to DataStream and DataSet.

In the second line, StreamTableEnvironment can be converted to DataStream and supports UDAF and UDTF registration. Java-compiled functions are registered to Java’s TableEnvironment, and Scala-compiled functions are registered to Scala’s TableEnvironment.

Blink batch jobs do not support StreamTableEnvironment because toAppendStream() is currently unavailable. Therefore, toDataStream() is not supported for the moment. As shown in the figure, only TableEnvironment is usable at present.

In the last line, BatchTableEvironment can be converted to DataSet by using toDataSet().

Figure 2 shows the scenarios and limitations of each TableEnvironment.

Let’s take a look at each scenario using an example.

Example 1: Blink Batch

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.execute("job name");

As shown in Figure 2, Blink batch processing only uses TableEnvironment, namely, UnifyTableEnvironment. In the code, you must first create an EnvironmentSetting and specify the use of the Blink planner and the Batch mode. Specify the Blink planner for Flink 1.9 because the JAR packages of the Flink planner and Blink planner are placed in Flink’s lib directory. If you do not specify any planner, the framework does not know which planner to use. If the lib directory contains the JAR package of only one planner, you do not need to specify which planner to use.

Note, it’s not possible to retrieve ExecutionEnvironment from UnifyEnvironment. This implies that you cannot use the executionEnvironment.execute() method to start tasks after writing a job flow. You must explicitly use the tableEnvironment.execute() method to start tasks, which is different from how jobs are started.

Example 2: Blink Stream

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment execEnv = ...
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, settings);

Blink streams use UnifyTableEnvironment and StreamTableEnvironment in a way similar to the Batch mode. You only need to replace inBatchMode with inStreamingMode.

Example 3: Flink Batch

ExecutionEnvironment execEnv = ...
BatchTableEnvironment tEnv = BatchTableEnvironment.create(execEnv);

Flink batch processing is used in the same way as in Flink versions earlier than v1.9.

Example 4: Flink Stream

EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.execute("job name");

Flink streams support UnifyEnvironment and StreamTableEnvironment. While specifying a planner, you must specify useOldPlanner, namely, the Flink planner. OlderPlanner is so named because the Flink planner will be removed in the future. OlderPlanner only supports inStreamingMode, not inBatchMode.

A New Catalog Type and DDL

FLIP-30 proposes the idea of creating a new catalog API. Moving ahead, ExternalCatalog will be deprecated. Blink planner no longer supports it, however, it is still supported by Flink planner

Figure 3 shows the overall design of the new catalog type.

Image for post
Image for post
Figure 3 New catalog design

The new type of catalog consists of three layers (catalog_name.database_name.object_name). The top layer is the catalog name, the middle layer is a database, and the bottom layer includes all types of meta-objects, such as tables, partitions, and functions. It provides two built-in catalogs: MemoryCatalog and HiveCatalog. You may implement custom catalogs as needed.

How is the catalog used? A catalog supports statements, such as Create, Drop, List, Alter, and Exists, as well as operations on databases, tables, partitions, functions, and statistics. It also supports common SQL syntax.

CatalogManager is used to manage multiple catalogs simultaneously. It allows performing queries and associating operations across catalogs using the same SQL statement. For example, join A Hive Catalog and B Hive Catalog, making the Flink query process more flexible.

CatalogManager supports the following operations:

  • Register catalogs (registerCatalog)
  • Get all catalogs (getCatalogs)
  • Get a specific catalog (getCatalog)
  • Get the current catalog (getCurrentCatalog)
  • Set the current catalog (setCurrentCatalog)
  • Get the current database (getCurrentDatabase)
  • Set the current database (setCurrentDatabase)

Though a catalog has three layers, you do not have to specify a value for each layer. Just write a table name, and the system uses getCurrentCatalog and getCurrentDatabase to obtain the default values to fill in the three layers. This simplifies the use of catalogs. To use a catalog other than the default one, call setCurrentCatalog.

The TableEnvironment layer supports the following catalog operations:

  • Register catalogs (registerCatalog)
  • List all catalogs (listCatalogs)
  • Get a specific catalog (getCatalog)
  • Use a specific catalog (useCatalog)

The SQL client layer also supports catalog operations, but with some limitations. You cannot use the Create statement to create a catalog directly. You must define a description in the YARN file to describe a catalog and then pass in -e +file_path when starting the SQL client to define the catalog. Currently, the SQL client supports some catalog operations, such as listing defined catalogs and using an existing catalog.

Operate the catalog content using DDL and execute DDL statements using the SQL client or the sqlUpdate() method of TableEnvironment.

The sqlUpdate() method supports the Create Table, Create View, Drop Table, and Drop View commands. It also supports the INSERT INTO statement.

The preceding four commands are explained as follows:

  • Create Table: Explicitly specify a catalog name or database name. If you keep the default value, these two parameters are filled in based on the specified current catalog. Set parameters, add descriptions, and use the Partition By syntax. Use the ‘With’ parameter to specify the connector to use, such as Kafka, CSV, or HBase. The ‘With’ parameter requires several attribute values, which can be found in the Factory definition of each connector. The Factory definition specifies which attribute values are required and which ones are optional.

Currently, DDL does not support the definition of computed columns and watermarks, but this will be improved in later versions of Flink.

Create Table [[catalog_name.]db_name.]table_name(
a int comment 'column comment',
b bigint,
c varchar
)comment 'table comment'
[partitioned by(b)]
  • Create View: You must specify a view name and then enter an SQL statement. The view is stored in the catalog.
  • Drop Table and Drop View: These two commands are similar to the standard SQL syntax and support the IF EXISTS statement. If you drop a non-existent table without adding IF EXISTS, an exception is thrown.
DROP TABLE [IF EXISTS] [[catalog_name.]db_name.]table_name
  • DDL statement execution in the SQL client: Most DDL statements in Flink 1.9 only support the view operation. Only some DDL statements support the Create View and Drop View commands. Catalogs, databases, tables, and functions can only be viewed. Through the SQL client, you may use an existing catalog, modify some of its attributes, and perform operations such as Describe and Explain.
SET xxx=yyy
DESCRIBE table_name

Flink 1.9 provides mature DDL features, but some features will be improved in the future.

Blink Planner

This section explains how to implement transformation to a JobGraph through the Table API and SQL to help you better understand the Blink planner, read Blink code, and use Blink. It also describes the improvements and optimizations made to the Blink planner.

Image for post
Image for post
Figure 4 Main parsing process

Figure 4 shows the main parsing process, which consists of three layers: Table API and SQL, Blink planner, and runtime. The important phases of this process are explained as follows.

  • Parsing and validation through the Table API and SQL: In Flink 1.9, the Table API is extensively refactored, and a new set of operations is introduced to describe the task-related logic tree. Once an SQL statement is executed, it is parsed to obtain an SQLNode tree, namely, an abstract syntax tree. Then, the SQL statement is validated, during which process the validator accesses the FunctionManager and CatalogManager. The FunctionManager queries user-defined functions (UDFs) and checks whether the UDFs are valid. The CatalogManager checks whether the specified table or database exists. If validation is passed, an operation directed acyclic graph (DAG) is generated.

The Table API and SQL eventually transform the SQL statement into a unified structure, namely, the operation DAG.

  • RelNode generation: The operation DAG is transformed into a RelNode (relational expression) DAG.
  • Optimization: The optimizer implements optimizations to the RelNode. The optimizer’s input includes various optimization rules and statistics. Currently, most of the optimization rules of the Blink planner are shared by batch and stream processing. Batch does not have the state concept, and streams do not support sorting. Therefore, the Blink planner runs two separate rule sets and defines two separate Physical Rels: BatchPhysical Rel and StreamPhysical Rel. As a result, the output of the optimizer is a Physical Rel DAG.
  • Transformation: The Physical Rel DAG is transformed into an ExecNode at the Blink execution layer. The ExecNode performs many CodeGen operations and executes non-code operators. Finally, the ExecNode is transformed into a Transformation DAG.
  • Generation of an executable JobGraph: The Transformation DAG is transformed into a JobGraph. This completes parsing through the Table API or SQL.

The following improvements are made to the Blink planner:

  • Increased support for SQL syntax: Supported SQL statements include IN, EXISTS, NOT EXISTS, subquery statements, complete Over statements, and Group Sets. The Blink planner has passed the TPC Benchmark H (TPC-H) and TPC Benchmark DS (TPC-DS) tests, with excellent performance.
  • A wide range of efficient operators.
  • A complete cost model: This model accesses catalog statistics to develop an optimal execution plan.
  • Support for join reorder.
  • Shuffle service: The Blink planner provides the batch-specific shuffle service, which significantly improves the stability of batch jobs. Quickly resume a failed batch job through the shuffle service.

The performance optimization of the Blink planner includes the following:

  • Segmental optimization.
  • Sub-plan reuse.
  • A variety of optimized rules: More than 100 rules are provided, most of which are shared by batch and stream processing.
  • A more efficient BinaryRow data structure: reduces the need for serialization and deserialization.
  • Support for mini-batch (only applicable to streams): reduces the operations required by state access.
  • Removal of unnecessary shuffle and sort operations (applicable to the Batch mode): Thee operators, Operator A, Operator B, and Operator C are added. If the shuffle operation must be performed based on the f1 field between Operator A and Operator B and between Operator B and Operator C, then the shuffle operation between Operator B and Operator C can be removed. A similar approach is used in the case of the sort operations. This optimization significantly improves performance, especially when many sort and shuffle operations are performed during computing.

This section lists examples that allow us to analyze the design of the Blink planner after performance optimization.

Segmental Optimization

Example 5

create view MyView as select word, count(1) as freq from SourceTable group by word; insert into SinkTable1 select * from MyView where freq >10;
insert into SinkTable2 select count(word) as freq2, freq from MyView group by freq;

The preceding SQL statements are transformed into the following RelNode DAG:

Image for post
Image for post
Figure 5 RelNode DAG for Example 5

If the Flink planner is used, the RelNode DAG can be optimized into the following execution DAG:

Image for post
Image for post
Figure 6 DAG using the Flink planner (Example 5)

The Flink planner implements reverse traversal that starts from the sink nodes and ends at the source nodes and it forms two separate execution links. As shown in Figure 5, repeated computations are performed on the scan node and the first-layer aggregate node.

If the Blink planner is used, the following execution DAG is generated after optimization:

Image for post
Image for post
Figure 7 DAG using the Blink planner (Example 5)

The Blink planner caches all INSERT INTO operations and optimizes them only before execution. Figure 7 shows the complete execution DAG, in which some computations are repeated. The Blink planner identifies repeated computations by finding the maximum common subgraph that is optimized. After optimization, the Blink planner transforms the maximum common subgraph into a temporary table, which is used by other components.

In this way, the preceding DAG is divided into three parts: (1) maximum common subgraph (temporary table); (2) temporary table and optimization of the filter node and Sink Table 1; and (3) temporary table and optimization of the aggregate node and Sink Table 2.

The Blink planner finds the maximum common subgraph by using the declared view. During the development process, if you want to reuse a certain logic, define it as a view. This allows you to make full use of the Blink planner’s segmental optimization function and reduce repeated computations.

Problems may occur during the optimization process. For example, some optimizations may be lost because the DAG is segmented in advance. Related algorithms will be optimized in the future.

To sum up, the segmental optimization of the Blink planner is essentially a multi-sink optimization or DAG optimization. In the case of single-sink optimization, optimizations can be made on all nodes, without having to segment the DAG.

Sub-Plan Reuse

Example 6

insert into SinkTabl
select freq from (select word, count(1) as freq from SourceTable group by word) t where word like 'T%'
union all
select count(word) as freq2 from (select word, count(1) as freq from SourceTable group by word) t group by freq;

The SQL statements used in this example are similar to the SQL statements used for segmental optimization. In this example, the results are unionized to a single result table through the sink node.

Figure 8 shows the RelNode DAG after transformation.

Image for post
Image for post
Figure 8 RelNode DAG for Example 6

As shown in Figure 8, repeated computations are performed on the scan node and the first-layer aggregate node. The Blink planner identifies these computations. The RelNode DAG is transformed as follows:

Image for post
Image for post
Figure 9 DAG using the Blink planner (Example 6)

The following two parameters are required to enable sub-plan optimization:

  • table.optimizer.reuse-sub-plan-enabled (enabled by default)
  • table.optimizer.reuse-source-enabled (enabled by default)

Disable the two parameters as needed. The table.optimizer.reuse-source-enabled parameter is explained as follows.

In Batch mode, a join operation may cause a deadlock. For example, the hash-join or nested-loop-join operator reads the build input and probe input in sequence. If reuse-source-enabled is enabled and the same data source is used, the data from this source is sent to the build input and probe input. The data of the build input is not consumed, so the join operation fails and the join process does not proceed.

To solve the deadlock problem and ensure normal data reading from the build input, the Blink planner writes the data of the probe input to a disk. Then, the Blink planner pulls data from the probe input only after all data is read from the build input. Writing data to a disk produces extra overhead. Reading data from the source twice may be completed before the write operation is completed. In this case, disable reuse-source for better performance. If the overhead of reading data from the source twice is much greater than that of a disk write operation, keep reuse-source on. The deadlock problem does not occur in Stream mode because the join operation in this mode does not involve edge selection.

The sub-plan reuse feature solves the problem of subgraph reuse after optimization. It is similar to and complements segmental optimization.


Hash Join: If you want to join two tables, namely, t1 and t2, select one of these two tables and create a hash table based on the columns provided by the join clause. Then, scan the other table by row and check whether the hash table contains equivalent rows for the join operation. This process is called a probe. The first table is called the build table, and the second table is called the probe table.

Classification of Aggregate-based Optimizations

Blink provides a variety of aggregate operations:

  • Group Agg, such as select count(a) from t group by b
  • Over Agg, such as select count(a) over (partition by b order by c) from t
  • Window Agg, such as select count(a) from t group by tumble(ts, interval ‘10’ second), b
  • Table Agg, such as tEnv.scan(‘t’).groupBy(‘a’).flatAggregate(flatAggFunc(‘b’ as (‘c’, ‘d’)))

Group Agg helps in implementing the following two optimizations:

1) Optimization Through Local Agg and Global Agg

Local Agg and Global Agg are used to reduce network shuffles. The following conditions must be met for optimization through Local Agg and Global Agg:

  • All the Agg functions used by aggregate operations are mergeable. Each aggregate operation needs to implement merge methods, such as SUM, COUNT, and AVG, in different phases, and then merge the results. However, some operations, such as finding the median and calculating the 95% confidence interval, cannot be split into multiple phases. In this case, optimization through Local Agg and Global Agg is not applicable.
  • table.optimizer.agg-phase-strategy is set to AUTO or TWO_PHASE.
  • In Stream mode, mini-batch is enabled. In Batch mode, the AUTO function determines whether to use Local Agg and Global Agg for optimization based on the cost model and statistics.

Example 7

select count(*) from t group by color

Without optimization, the following aggregate operation results in 10 shuffle operations:

Image for post
Image for post
Figure 10 Unoptimized Count operation (Example 7)

Figure 11 shows the operations after Local Agg and Global Agg are used. The local aggregate operation and the shuffle operation are performed in sequence. Only six data records are shuffled. In Stream mode, Blink pre-aggregates the results through mini-batch and sends the results to Global Agg for a summary.

Image for post
Image for post
Figure 11 Count operation after Local Agg and Global Agg are used (Example 7)

2) Optimization Through Distinct Agg

Distinct Agg is used to perform optimizations by rewriting SQL statements. However, the optimization method varies between the Batch mode and Stream mode.

  • In Batch mode, Distinct and Agg are executed in sequence, which results in overhead far less than that of directly executing Distinct Agg.
  • In Stream mode, Distinct Agg is used to solve the hot spot problem due to the fact that all input data is stored in states. If hot spot data exists, many state operations are performed, which affects performance.

Batch Mode

The value of Distinct and the value of the non-Distinct Agg function are evaluated. Then, the value of the Distinct Agg function is evaluated.

Example 8

select color, count(distinct id), count(*) from t group by color

Manually rewrite the preceding code as follows:

select color, count(id), min(cnt) from (
select color, id, count(*) filter (where $e=2) as cnt from (
select color, id, 1 as $e from t --for distinct id
union all
select color, null as id, 2 as $e from t -- for count(*)
) group by color, id, $e
) group by color

Figure 12 shows the logical transformation process.

Image for post
Image for post
Figure 12 Distinct-based rewriting logic in Batch mode (Example 8)

Stream Mode

The following conditions must be met to enable the Stream mode:

  • Supported Agg function: avg/count/min/max/sum/first_value/concat_agg/single_value
  • table.optimizer.distinct-agg.split.enabled (disabled by default)

Example 9

select color, count(distinct id), count(*) from t group by color

Manually rewrite the preceding code as follows:

select color, sum(dcnt), sum(cnt) from (
select color, count(distinct id) as dcnt, count(*) as cnt from t
group by color, mod(hash_code(id), 1024)
) group by color

Figure 13 shows the logical graph before the code is rewritten.

Image for post
Image for post
Figure 13 Logical graph in Stream mode before Distinct-based optimization (Example 9)

Figure 14 shows the logical graph after the code is rewritten. Hot spot data is distributed to multiple intermediate nodes.

Image for post
Image for post
Figure 14 Logical graph in Stream mode after Distinct-based optimization (Example 9)

In Example 5, the value 1024 in mod(hash_code(id), 1024) of the SQL statement indicates the distribution dimension. We recommend setting it to greater value for better optimization performance.


This article introduces the overall design of the new TableEnvironment, lists the TableEnvironment choices in various modes, provides examples to explain how to write code in these modes, and points out some coding precautions.

It also provides examples to explain the overall design of the new catalog type and DDL usage. Finally, it describes the process of parsing through the Table API and SQL on the Blink planner as well as the improvements and optimizations made to the Blink planner. I hope that this article helps you understand and use Flink SQL.

Original Source:

Written by

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store