Deep Insights into Flink SQL: Flink Advanced Tutorials

The New TableEnvironment

Overall Design of the New TableEnvironment

Figure 1 Overall design of the new TableEnvironment
  • flink-table-common: This package contains the code shared by the Flink planner and Blink planner.
  • flink-table-api-java: This package contains most of the programming APIs.
  • flink-table-api-scala: This is only related to the expression of the Table API and the domain-specific language (DSL).
  • Two planners: flink-table-planner and flink-table-planner-blink.
  • Two bridges: flink-table-api-scala-bridge and flink-table-api-java-bridge. As shown in the figure, the Flink planner and Blink planner depend on specific Java APIs and bridges. Bridges are used to convert API operations to DataStream and DataSet of Scala or Java.

Comparison between the Old and New TableEnvironment

Application of the New TableEnvironment

Figure 2 Scenarios of the new TableEnvironment
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.execute("job name");
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment execEnv = ...
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, settings);
ExecutionEnvironment execEnv = ...
BatchTableEnvironment tEnv = BatchTableEnvironment.create(execEnv);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.execute("job name");

A New Catalog Type and DDL

New Catalog Design

Figure 3 New catalog design
  • Register catalogs (registerCatalog)
  • Get all catalogs (getCatalogs)
  • Get a specific catalog (getCatalog)
  • Get the current catalog (getCurrentCatalog)
  • Set the current catalog (setCurrentCatalog)
  • Get the current database (getCurrentDatabase)
  • Set the current database (setCurrentDatabase)
  • Register catalogs (registerCatalog)
  • List all catalogs (listCatalogs)
  • Get a specific catalog (getCatalog)
  • Use a specific catalog (useCatalog)

Design and Usage of DDL

  • Create Table: Explicitly specify a catalog name or database name. If you keep the default value, these two parameters are filled in based on the specified current catalog. Set parameters, add descriptions, and use the Partition By syntax. Use the ‘With’ parameter to specify the connector to use, such as Kafka, CSV, or HBase. The ‘With’ parameter requires several attribute values, which can be found in the Factory definition of each connector. The Factory definition specifies which attribute values are required and which ones are optional.
Create Table [[catalog_name.]db_name.]table_name(
a int comment 'column comment',
b bigint,
c varchar
)comment 'table comment'
[partitioned by(b)]
  • Create View: You must specify a view name and then enter an SQL statement. The view is stored in the catalog.
  • Drop Table and Drop View: These two commands are similar to the standard SQL syntax and support the IF EXISTS statement. If you drop a non-existent table without adding IF EXISTS, an exception is thrown.
DROP TABLE [IF EXISTS] [[catalog_name.]db_name.]table_name
  • DDL statement execution in the SQL client: Most DDL statements in Flink 1.9 only support the view operation. Only some DDL statements support the Create View and Drop View commands. Catalogs, databases, tables, and functions can only be viewed. Through the SQL client, you may use an existing catalog, modify some of its attributes, and perform operations such as Describe and Explain.
SET xxx=yyy
DESCRIBE table_name

Blink Planner

Figure 4 Main parsing process
  • Parsing and validation through the Table API and SQL: In Flink 1.9, the Table API is extensively refactored, and a new set of operations is introduced to describe the task-related logic tree. Once an SQL statement is executed, it is parsed to obtain an SQLNode tree, namely, an abstract syntax tree. Then, the SQL statement is validated, during which process the validator accesses the FunctionManager and CatalogManager. The FunctionManager queries user-defined functions (UDFs) and checks whether the UDFs are valid. The CatalogManager checks whether the specified table or database exists. If validation is passed, an operation directed acyclic graph (DAG) is generated.
  • RelNode generation: The operation DAG is transformed into a RelNode (relational expression) DAG.
  • Optimization: The optimizer implements optimizations to the RelNode. The optimizer’s input includes various optimization rules and statistics. Currently, most of the optimization rules of the Blink planner are shared by batch and stream processing. Batch does not have the state concept, and streams do not support sorting. Therefore, the Blink planner runs two separate rule sets and defines two separate Physical Rels: BatchPhysical Rel and StreamPhysical Rel. As a result, the output of the optimizer is a Physical Rel DAG.
  • Transformation: The Physical Rel DAG is transformed into an ExecNode at the Blink execution layer. The ExecNode performs many CodeGen operations and executes non-code operators. Finally, the ExecNode is transformed into a Transformation DAG.
  • Generation of an executable JobGraph: The Transformation DAG is transformed into a JobGraph. This completes parsing through the Table API or SQL.

Improvements and Optimizations to the Blink Planner

  • Increased support for SQL syntax: Supported SQL statements include IN, EXISTS, NOT EXISTS, subquery statements, complete Over statements, and Group Sets. The Blink planner has passed the TPC Benchmark H (TPC-H) and TPC Benchmark DS (TPC-DS) tests, with excellent performance.
  • A wide range of efficient operators.
  • A complete cost model: This model accesses catalog statistics to develop an optimal execution plan.
  • Support for join reorder.
  • Shuffle service: The Blink planner provides the batch-specific shuffle service, which significantly improves the stability of batch jobs. Quickly resume a failed batch job through the shuffle service.
  • Segmental optimization.
  • Sub-plan reuse.
  • A variety of optimized rules: More than 100 rules are provided, most of which are shared by batch and stream processing.
  • A more efficient BinaryRow data structure: reduces the need for serialization and deserialization.
  • Support for mini-batch (only applicable to streams): reduces the operations required by state access.
  • Removal of unnecessary shuffle and sort operations (applicable to the Batch mode): Thee operators, Operator A, Operator B, and Operator C are added. If the shuffle operation must be performed based on the f1 field between Operator A and Operator B and between Operator B and Operator C, then the shuffle operation between Operator B and Operator C can be removed. A similar approach is used in the case of the sort operations. This optimization significantly improves performance, especially when many sort and shuffle operations are performed during computing.

Performance Optimization and Practices

create view MyView as select word, count(1) as freq from SourceTable group by word; insert into SinkTable1 select * from MyView where freq >10;
insert into SinkTable2 select count(word) as freq2, freq from MyView group by freq;
Figure 5 RelNode DAG for Example 5
Figure 6 DAG using the Flink planner (Example 5)
Figure 7 DAG using the Blink planner (Example 5)
insert into SinkTabl
select freq from (select word, count(1) as freq from SourceTable group by word) t where word like 'T%'
union all
select count(word) as freq2 from (select word, count(1) as freq from SourceTable group by word) t group by freq;
Figure 8 RelNode DAG for Example 6
Figure 9 DAG using the Blink planner (Example 6)
  • table.optimizer.reuse-sub-plan-enabled (enabled by default)
  • table.optimizer.reuse-source-enabled (enabled by default)
  • Group Agg, such as select count(a) from t group by b
  • Over Agg, such as select count(a) over (partition by b order by c) from t
  • Window Agg, such as select count(a) from t group by tumble(ts, interval ‘10’ second), b
  • Table Agg, such as tEnv.scan(‘t’).groupBy(‘a’).flatAggregate(flatAggFunc(‘b’ as (‘c’, ‘d’)))
  • All the Agg functions used by aggregate operations are mergeable. Each aggregate operation needs to implement merge methods, such as SUM, COUNT, and AVG, in different phases, and then merge the results. However, some operations, such as finding the median and calculating the 95% confidence interval, cannot be split into multiple phases. In this case, optimization through Local Agg and Global Agg is not applicable.
  • table.optimizer.agg-phase-strategy is set to AUTO or TWO_PHASE.
  • In Stream mode, mini-batch is enabled. In Batch mode, the AUTO function determines whether to use Local Agg and Global Agg for optimization based on the cost model and statistics.
select count(*) from t group by color
Figure 10 Unoptimized Count operation (Example 7)
Figure 11 Count operation after Local Agg and Global Agg are used (Example 7)
  • In Batch mode, Distinct and Agg are executed in sequence, which results in overhead far less than that of directly executing Distinct Agg.
  • In Stream mode, Distinct Agg is used to solve the hot spot problem due to the fact that all input data is stored in states. If hot spot data exists, many state operations are performed, which affects performance.
select color, count(distinct id), count(*) from t group by color
select color, count(id), min(cnt) from (
select color, id, count(*) filter (where $e=2) as cnt from (
select color, id, 1 as $e from t --for distinct id
union all
select color, null as id, 2 as $e from t -- for count(*)
) group by color, id, $e
) group by color
Figure 12 Distinct-based rewriting logic in Batch mode (Example 8)
  • Supported Agg function: avg/count/min/max/sum/first_value/concat_agg/single_value
  • table.optimizer.distinct-agg.split.enabled (disabled by default)
select color, count(distinct id), count(*) from t group by color
select color, sum(dcnt), sum(cnt) from (
select color, count(distinct id) as dcnt, count(*) as cnt from t
group by color, mod(hash_code(id), 1024)
) group by color
Figure 13 Logical graph in Stream mode before Distinct-based optimization (Example 9)
Figure 14 Logical graph in Stream mode after Distinct-based optimization (Example 9)


Original Source:



Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website: