By Cui Xingcan and compiled by Sha Shengyang (Chengyang)
Apache Flink APIs are divided into three layers: ProcessFunction at the bottom layer, DataStream API at the middle layer, and SQL API and Table API at the top layer. All these APIs depend on the time attribute. The time attribute is an essential aspect of stream processing and one of the basic components of a stream processing system. It runs through the three layers of APIs. The time attribute is rarely used by the DataStream API due to encapsulation. This article focuses on the time attribute used by the underlying ProcessFunction and that used by the SQL API and Table API at the top layer.
Time Semantics in Flink
The time semantics vary depending on different scenarios. As an advanced distributed stream processing engine, Flink supports different time semantics. The processing time and event time (row time) are the core elements of the time semantics. These two types of time are different in the following aspects:
The processing time is used to simulate the real-world time. In fact, even the local time of a data processing node does not necessarily coincide with the real-world time. The event time is the time used by data stream processing. The processing time is obtained by directly calling the system time of the local machine. The event time is determined based on the timestamp in a data processing record.
The difficulty of handling these two types of time in Flink and using them in actual scenarios varies. The processing time is much easier to handle than the event time. The processing result of the processing time, or the internal status of the stream processing application, is uncertain. Flink provides many assurances for the event time, so its handling result is certain and reproducible, no matter how many times data is replayed.
Therefore, you may choose to use the processing time or event time based on whether you want the same result from replaying your application from the last checkpoint or savepoint in case of an exception. If you want the same result from the replay, you must use the event time. If you are ready to tolerate different results, use the processing time. The processing time is often used to measure the system throughput based on the actual time. For example, if you want to count the number of data records that are processed within one hour, just use the processing time.
An important characteristic of time is that it only increases progressively. It is impossible to travel backward in the time dimension. We must make full use of this characteristic while using the time attribute. Let’s look at an example of how to process some records using the processing time and event time, respectively.
- Using the Processing Time: Since we use the local node’s system time and assume that this node’s clock is properly synchronized, the processing time that is fetched each time increases progressively. Therefore, the returned data streams are ordered.
- Using the Event Time: As it is bound to each data record, the data time may be disordered due to various causes, such as network latency, internal program logic, and causes related to distributed systems. An example of this is shown in the preceding figure. When the event time is used, the time in each data record is called the record timestamp. If the sequence of record timestamps is disordered, this problem must be solved.
If the timestamps are discontinuous between data records, we must discretize the entire sequence of data records. A simple solution is to group a certain number of data records into small batches according to time. As shown in the following figure, after discretization, the time value in the rightmost box is less than that in the middlebox, and the time value in the middlebox is less than that in the leftmost box.
Insert watermarks, which are a special type of flag bit, into the time sequence. A watermark indicates a timestamp value, which must be less than the timestamp values of any subsequent incoming data records.
Overview of Timestamp and Watermark Behavior
This section introduces the basic information about the record timestamp (timestamp for short) and the watermark of the event time. Most distributed stream computing engines abstract data by using a directed acyclic graph (DAG). A DAG has a data source, some processing operators, and a data sink. Data flows between different logical operators. Watermarks and timestamps have their own lifecycles. The following sections explain how watermarks and timestamps are generated, transmitted between different nodes, and processed on each node.
Timestamp Allocation and Watermark Generation
Flink supports two watermark generation methods. A watermark generated in SourceFunction is equivalent to implementing the logic of timestamp allocation and watermark generation at the source end of a stream processing application. There are two ways to generate a watermark in SourceFunction :
- Use the collectWithTimestamp method to send a data record, wherein the first parameter specifies the data to be sent and the second parameter specifies the data timestamp. You may also call the emitWatermark method to generate a watermark and specify a timestamp value, which must be less than the timestamp values of any subsequent incoming data records.
- If you don’t want to generate timestamps or watermarks in SourceFunction or SourceFunction does not support the generation of timestamps and watermarks, use the DataStream API to call the DataStream.assignTimestampsAndWatermarks method, which receives different timestamp generators and watermark generators.
Generators are divided into two types. One type is a periodic generator and the other type generates watermarks based on special records in the stream processing of data streams.
The periodic generator is driven by the actual time and periodically calls the generation logic to generate a watermark. This generator does not generate timestamps because a timestamp is included in every data record. The other type of generator is driven by data and generates watermarks based on the existence of special records, which indicate no more compliant data will be received. This results in calling the custom watermark generation method each time a timestamp is allocated. You must implement the watermark generation logic in the generation method.
Though the generation method is specified in SourceFunction and DataStream during the processes of timestamp allocation and watermark generation, we recommend generating watermarks in close proximity to the data source. This allows more operators in the program logic to determine whether some data is out of order. Flink provides an effective mechanism to ensure that timestamps and watermarks are correctly transmitted to downstream nodes.
Watermarks transmission is based on the following three rules:
- Watermarks are broadcast between operators. For example, when an upstream operator is connected to three downstream tasks, it broadcasts the received watermark downstream.
- If a program receives a watermark with the value Long.MAX_VALUE, no more data will be sent from a portion of the corresponding data stream. This watermark is viewed as a termination flag.
- Compared to a single data stream, the watermark calculation is more complex in the case of multiple input operators.
For example, assume that the blue block represents a task of an operator, which has three inputs, namely, W1, W2, and W3. These inputs are arbitrary and may belong to the same stream or different streams. Watermark calculation is divided into two scenarios: single input and multiple inputs. In the single-input scenario, the maximum watermark value is used because the watermark value increases monotonically. In the multi-input scenario, the minimum watermark value in the entire operator task is used. This means the watermarks of a multi-input task are limited by the slowest input stream. As a result, the performance of the entire process depends on the weakest link, or the slowest input stream.
Watermark transmission is idempotent. The maximum watermark value is always used in the single-input scenario, whereas the minimum watermark value is always used in the multi-input scenario. Therefore, the final value remains unchanged even when a node receives the same watermark multiple times or receives a watermark which is the same as a previous one.
However, this design does not differentiate between the input of multiple partitions from a single stream and the input of a Join operation on the streams of different logics. A stream is divided into different partitions, which share the same clock. Therefore, clock synchronization can be implemented between the different partitions of the same stream. If an operator task performs an operation similar to Join, it makes no sense to synchronize two input clocks because the operator task may join a data stream that is close to the current time to another data stream that is far from the current time. To wait for the slow data stream, the fast data stream may have to cache a large amount of data in states. This results in significant performance overhead for the entire cluster.
The following section explains how ProcessFunction works. The logic of processing watermarks inside tasks is divided into internal logic and external logic. The external logic is implemented by ProcessFunction. You must write this logic in ProcessFunction while using Flink APIs that involve time.
ProcessFunction provides three time-related functions:
- Obtain the timestamp of the data record that is being processed or the current processing time based on the system’s time semantics.
- Obtain the current operator’s time, which can be viewed as the current watermark.
- Register timers to implement complex functions in ProcessFunction. For example, trigger a timer when the watermark value reaches a certain time value. To do so, you must provide all the required callback logic, which involves three methods: registerEventTimeTimer, registerProcessingTimeTimer, and onTimer. The callback logic must be implemented by the onTimer method and is triggered when the relevant conditions are met.
A simple scenario is limiting the data caching duration for time-related processing. Configure a timer for certain data so that this data will expire at a certain time point and then be deleted from states. All the time-related logic is implemented by Flink’s time service.
When receiving a watermark, the operator instance needs to update its current time. This is done by a method of ProcessFunction that queries the operator time to obtain the latest time. Then, ProcessFunction traverses a prioritized queue of registered timers, which are sorted by trigger time. After obtaining the time, Flink traverses the timer queue and triggers the callback logic one by one. In this way, a Flink task sends the current watermark to other downstream task instances. This completes the closed-loop transmission of watermarks.
Time in the Table API
This section explains the time attribute used by the Table API and SQL API. To apply the time attribute to the computations performed by the Table API and SQL API, we must put the time attribute in the table schema in advance. Then, use the time attribute in SQL statements or Table API-related logical expressions as needed.
How to Specify a Time Column in a Table
The Flink community has discussed how to use the time attribute in the Table API and SQL API. Two solutions were proposed. One solution is to use the method for obtaining the current processing time as a special user-defined function (UDF). The other solution is to materialize the time column into the overall schema. Ultimately, the second solution was adopted. The following section explains how to specify the processing time and event time in a table.
While using the processing time, obtain a table object, or register a table by using the following two methods:
1) Transform a data stream into a table
2) Create a table directly based on the table source
If you use the first method, write a line of code as f1, f2, f3.proctime, in which f1 and f2 indicate the existing columns. The last column, f3, will be registered as the processing time column and is directly used while writing queries. If you use the second method, call the DefinedRowtimeAttributes interface to automatically generate the processing time based on your logic.
Unlike the processing time, the event time cannot be used in an on-demand manner. If you want to transform a data stream into a table, ensure that this data stream contains timestamps and watermarks. If you want to create a table based on the table source, ensure that the target data includes a time field of the long or timestamp type.
If you want to register a table from a data stream, write a line of code as — column_name.rowtime. When using the processing time, ensure that the new field is the last of all fields of the schema. When using the event time, replace an existing column. Then, Flink automatically converts this column into the row time type. If you want to create a table based on the table source, just call the DefinedRowtimeAttributes interface. The DataStream API does not allow the existence of multiple event time (row time) entries, but the Table API does, at least in theory. This is because the DefinedRowtimeAttributes interface returns the row time description in list form, which allows for the existence of multiple row time columns. This feature will be improved and optimized in the future.
Time Columns and Table Operations
After a time column is specified, other operations are required during a query. The operations listed here are all closely related to the time column and must be performed based on this column. For example, the OVER and GROUP BY operations for window aggregation are only performed based on the time column when you set parameters in an SQL statement. Similarly, the time window aggregation operation is only performed based on the time column when you write conditions. The sort operation is also subjected to the time column. It is almost impossible to sort data records in an endless data stream because these data records are already sorted by time. Therefore, if you want to sort the data of a table that is transformed from a data stream, you must perform the sort operation based on the time column. You may also add other columns for sorting, but the time column must take precedence.
Why is it essential to perform these operations based on the time column? One reason is that an incoming data stream is considered as a table in which data records are sorted by time. Table operations are performed only after these data records are scanned in sequence. Data streams are transient, which makes it difficult to access previously processed data records. This problem can be mitigated by using Flink’s internal states, but the restrictions on state size limit the data that can be cached. Another reason why operations are performed based on the time column is that the time column prevents internally generated states from increasing infinitely. This is the primary prerequisite for these operations.