Introduction to Dynamic Tables

Image for post
Image for post

Image 1.

Concept of Dynamic Tables

Conversion from Stream to Dynamic Table

Append Mode

Image for post
Image for post

Image 2.

Replace Mode

Image for post
Image for post

Image 3.

Conversion from Dynamic Table to Stream

Retraction Mode

Image for post
Image for post

Image 4.

Update Mode

Image for post
Image for post

Image 5.

Querying Dynamic Tables

Table A is a dynamic table, and its snapshot at the time of t is denoted as A[t]. Function q queries the snapshot at time t and is denoted as q(A[t]).

Image for post
Image for post

Image 6.

Query Limitations

1.In practice, Flink converts the query into a continuous streaming application, and the executed query only applies to the current logical time. Therefore, the query for any point in time (A [t]) is unsupported.

2.The most intuitive principle is that the possible state of the query and the computing must be bounded for Flink to support queries for incremental computing. To ensure this, follow these steps:

  • Constantly update the queries of the current results: The query can generate insert, update, and delete changes.
    The query is expressed as Q(t+1) = q’(Q(t), c(T, t, t+1)), where Q(t) stands for the previous query result of the query q, c(T, t, t+1) represents the change in Table T from t+1 to t, and q’ is the incremental version of q.
  • Generate an append-only table and calculate the new data directly from the end of the input table.
    The query is expressed as Q(t+1) = q’’(c(T, t-x, t+1)) ∪ Q(t), where q’’ is the incremental version query q of q result. When time t is not required, c(T, t-x, t+1) represents the last x+1 data records of Table T, where x is dependent on the syntax. For example, window aggregation in the past hour requires at least all the data from the past hour as its state.

Other supported query types include:

  • SELECT WHERE that is executed separately on each row
  • GROUP BY clauses on the rowtime (such as the time-based window aggregate)
  • OVER windows (row-windows) of ORDER BY rowtime
  • ORDER BY rowtime.

3.When the input table is sufficiently small, every data record in the table can be accessed. For example, you can join two stream tables of fixed sizes (i.e., a fixed number of keys).

Bounded Intermediate State

  • The progress of the intermediate computational state beyond the constraint of the time predicate (such as the expansion of the aggregate key)
  • Time is bounded, but historical data is needed (such as window aggregation)

Although the “Last Result Offset” parameter, mentioned below, can resolve the second case, the first instance requires optimizer detection. You should reject intermediate state growth queries that are not subject to time constraints. The optimizer should provide information on how to fix the query and require the appropriate time predicate. Take the following query for example:

SELECT user, page, COUNT(page) AS pCnt
FROM pageviews
GROUP BY user, page

With the increase in the number of users and the number of pages, the intermediate state data increases over time. Adding a time predicate limits the requirements for the storage space:

SELECT user, page, COUNT(page) AS pCnt
FROM pageviews
WHERE rowtime BETWEEN now() - INTERVAL '1' HOUR AND now() // only last hour
GROUP BY user, page

Because not all attributes keep growing, you can tell the optimizer the domain size to infer that the intermediate state will not grow over time, and then queries without time predicates can be accepted.

val sensorT: Table = sensors
.toTable('id, 'loc, 'stime, 'temp)
.attributeDomain('loc, Domain.constant) // domain of 'loc is not growing
env.registerTable("sensors", sensorT)
SELECT loc, AVG(temp) AS avgTemp
FROM sensors

Calculation of Results and Refinement of Time Sequence

The following figure depicts how different configurations of parameters are used to control early results and refine the results.

Image for post
Image for post

Image 7

  • “First Result Offset” refers to the first early result calculation time. The time is relative to the time for complete result calculation (e.g., relative to the 10:30 end time of the window). If the setting is -10 minutes, for the window with the end time at 10:30, the first result sent is calculated at the logical time of 10:20. The default value for this parameter is 0, which means the result calculates at the end of the window.
  • “Complete Result Offset” indicates the time at which the calculation of the complete result finalizes. The time is corresponding to the time taken for making the first full calculation. If the setting is +5 minutes, for the window with the end time at 10:30, the time for generating the complete result is 10:35. This parameter can mitigate the impact of delayed data. The default value is 0, that is, the result calculated at the end of the window is the complete result.
  • “Update Rate” signifies the time interval (which can be a time value or the number of occasions) for updating the result before the complete result calculation. Consider a case where the setting is 5 minutes. For the 30-minute tumbling window with start time 10:30, the “First Result Offset” -15 minutes, and the “Complete Result Offset” 2 minutes, the result updates at 10:20, 10: 25, and 10:30. A result generates at 10:15 and the complete result yields at 10:32.
  • “Last Updates Switch” means whether the delayed update is calculated for the delayed data, after sending the complete result, until the calculation state clears.
  • “Last Result Offset” indicates the time of the last result calculation. This is the time when the internal state clears, and removes the data that arrives after this time. Last Result Offset means that the calculation result is an approximate value and its accuracy is not guaranteed.


Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store