Metrics Principles and Practices: Flink Advanced Tutorials

By Liu Biao and compiled by Maohe

This article is based on a presentation given by Liu Biao, an Apache Flink contributor. It consists of three parts: (1) definition of metrics; (2) how to use metrics; and (3) metrics monitoring practices.

What Are Metrics?

Metric Types

1) Counter: If you have ever written MapReduce jobs, you should be familiar with counters. A counter counts the number of data records or megabytes of data, and its value continually increases over time.
2) Gauge: This is the simplest metric, which reflects a value. For example, if you want to monitor the Java heap memory usage, expose one gauge at a time. The current value of the gauge indicates the heap memory usage.
3) Meter: This metric measures throughput and counts the number of events in a unit of time. It is equivalent to calculating the ratio of the number of events to the elapsed time.
4) Histogram: This metric is complex and not commonly used. It is used to calculate the distribution of certain data, such as Quantile, Mean, StdDev, Max, and Min.

Metric Group

A metric group is divided into TaskManagerMetricGroup and TaskManagerJobMetricGroup. Each job belongs to a group under a task. Tasks are divided into TaskIOMetricGroup and OperatorMetricGroup. I/O statistics and metrics also exist under OperatorMetricGroup. The following figure shows the organizational structure of metrics. Metrics do not affect the system and belong to different groups. Flink allows adding groups to create a layered structure.

•TaskManagerMetricGroup
•TaskManagerJobMetricGroup
•TaskMetricGroup
•TaskIOMetricGroup
•OperatorMetricGroup
•${User-defined Group} / ${User-defined Metrics}
•OperatorIOMetricGroup
•JobManagerMetricGroup
•JobManagerJobMetricGroup

The JobManagerMetricGroup is simple with fewer layers and assumes the role of master.

Metrics are automatically collected and used to compile statistics. View metrics from an external system and aggregate these metrics.

How to Use Metrics?

System Metrics

  • Java virtual machine (JVM) parameters at the master level and worker level, such as load and time. Memory metrics reflect the heap memory usage, non-heap memory usage, direct buffer pool utilization, and mapped buffer pool utilization. The thread-related metric counts the number of live threads. Garbage collection metrics provide practical information.
  • Network metrics are widely used and practical for solving some performance issues. Flink is more than a network transmission framework. It also represents a directed acyclic graph (DAG). A simple producer-consumer model is formed between the upstream and downstream nodes in Flink. The network transmission model in Flink is equivalent to the standard fixed-length queue model between the producer and consumer. You can use intermediate queues to quickly narrow the scope of performance evaluation and locate bottlenecks.
•CPU
•Memory
•Threads
•Garbage Collection
•Network
•Classloader
•Cluster
•Availability
•Checkpointing
•StateBackend
•IO
For more information, see: https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#system-metrics
  • Cluster monitoring information helps facilitate effective cluster O&M. If jobs are large, pay special attention to checkpointing metrics to identify potential problems that may not be reflected by common metrics. For example, jobs may appear to be normal and data streams show no delays even when checkpointing has been inactive for a long time. After failover and restart are complete, jobs may be rolled back to their statuses from long ago due to the long-term inactivity of checkpointing. This makes the jobs useless.
  • RocksDB is a common state backend implementation in production environments. While dealing with large data volumes, pay special attention to RocksDB metrics because its performance may decrease with the increase of the data volume.

User-defined Metrics

User-defined metrics are related to the DataStream API. The context may be required to write user-defined metrics for tables and SQL statements. However, user-defined metrics are written in a way similar to user-defined functions (UDFs).

The DataStream API is inherited from the RichFunction, providing a metrics interface. The getRuntimeContext().getMetricGroup().addGroup(…) method is derived from the RichFunction and provides the entry to user-defined metrics. This method allows you to create user-defined metric groups. To define specific metrics, you must likewise use the getRuntimeContext().getMetricGroup().counter/gauge/meter/histogram(…) method, which provides a constructor to define custom metric types.

Inherited from RichFunction
•Register user-defined Metric Group: getRuntimeContext().getMetricGroup().addGroup(…)
•Register user-defined Metric: getRuntimeContext().getMetricGroup().counter/gauge/meter/histogram(…)

User-defined Metrics Example

For the meter metrics, Flink provides a built-in implementation called Meterview. A meter requires a time window to measure the interval at which events occur. The occurrence of an event can be registered with the markEvent() method. Then, the getrate() method is used to divide the number of events by the elapsed time.

The gauge metrics are simple, with the current time as the output and System::currentTimeMillis (passed in by using the Lambda expression) as the input. Upon each call, the gauge calls the current system time to perform a calculation.

The histogram metrics are relatively complex and have two code implementations in Flink, one of which is given below. A histogram needs a window size, which is assigned a value upon update.

These metrics are not thread-safe. If you want to use multithreading, add synchronization. For more information, visit the following link.

•Counter processedCount = getRuntimeContext().getMetricGroup().counter("processed_count"); processedCount.inc();
•Meter processRate = getRuntimeContext().getMetricGroup().meter("rate", new MeterView(60));
processRate.markEvent();
•getRuntimeContext().getMetricGroup().gauge("current_timestamp", System::currentTimeMillis);
•Histogram histogram = getRuntimeContext().getMetricGroup().histogram("histogram", new DescriptiveStatisticsHistogram(1000)); histogram.update(1024);
•https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#metric-types

Ways to Obtain Metrics

How are metrics obtained in physical architecture?

With a better understanding of metrics and how they work, we can use them more effectively. The WebUI and RESTful APIs pull metrics from each component by periodically querying a central node. The fetched data is not necessarily updated in real-time. The default data update interval is 10s. Therefore, the data updated by the WebUI and RESTful APIs may not be real-time data. In addition, data may be fetched asynchronously. For example, if there are two components, data may be fetched from one component, whereas no data is fetched from the other component due to a timeout error. Fetching is a try-best operation, so the values of metrics may lag, but will be updated eventually.

The red paths in the following figure pass through MetricFetcher and are aggregated by a central node. The metric reporter collects metrics from individual nodes, and these metrics are not aggregated by a central node. These metrics are aggregated by a third-party system, such as the Time Series Database (TSDB). A non-centralized architecture has its own merits and is free of the problems produced by the central node, such as insufficient memory. The metric reporter directly reports raw data, which can be processed with better performance.

Metric Reporter

•Flink offers many built-in reporters, which can serve as references for the selection of an external system model. For more information, see- https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#reporter. 
•Metric Reporter Configuration Example
metrics.reporters: your_monitor,jmx
metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 1025-10000
metrics.reporter.your_monitor.class: com.your_company.YourMonitorClass
metrics.reporter.your_monitor.interval: 10 SECONDS
metrics.reporter.your_monitor.config.a: your_a_value
metrics.reporter.your_monitor.config.b: your_b_value

The preceding code shows how to configure metric reporters. Separate the name of the metric reporter with a comma (,). Find the reporter through classname reflection of metrics.reporter.jmx.class. Obtain the configuration of metrics.reporter.jmx.port, which is typically sent through a network by a third-party system. Determine the destination based on the IP address and port number. metrics.reporter.your_monitor.class is required. Define an interval, which is parsed by Flink and define config. as well.

Practice: Monitoring with Metrics

Automated O&M

The automated O&M process is as follows:

  • Use a metric reporter or RESTful API to collect key metrics for decision making. Store the collected metrics in a storage or analysis system, such as TSDB.
  • Define custom monitoring rules and pay attention to key metrics to obtain information about failover, checkpoints, and processing delays. Custom monitoring rules are often used to trigger alerts, which helps to eliminate a large amount of manual work. For example, define the maximum number of failovers that might occur before manual intervention is required.
  • Send exception notifications to the specified contacts through DingTalk, email, SMS, or phone calls.
  • Automated O&M allows viewing data clearly through dashboards and reports. See the overall job status on a dashboard and perform analysis and optimization based on report data.

Performance Analysis

Promptly locate performance issues through a metrics system with monitoring and alert functions and conveniently analyze performance issues on a dashboard. For example, analyze system metrics to narrow down the scope of the issue, verify assumptions, and identify bottlenecks. Then, analyze possible causes from multiple perspectives, such as business logic, JVM, operating system, states, and data distribution. If the root cause still cannot be identified, resort to the profiling tool.

Practice: How to Solve a Slow Task Problem

If you are unfamiliar with how Flink works and do not understand the system runtime and status, analyze metrics to look into the system’s internal workings. Some examples are provided below.

Locate the Problem

The following figure shows the metric that measures the task failover rate. Only one value is not 0, which indicates a problem.

The following figure shows the metric that measures the task input TPS. The metric value suddenly drops from 4 or 5 million to 0, which indicates a problem.

The following figure shows the metric that measures processing delay. Compare the generation time of the processed data with the current time to find that the processed data was actually generated 1 hour ago. This indicates a problem because the processed data should usually have been generated about 1s ago.

Narrow Down the Scope and Identify the Bottleneck

As shown by the red box in the following figure, the task processing speed slows down, but we do not know which task has caused the problem. The OUT_Q value rises to 100%, but other metrics are within their normal ranges or even indicate good performance. We can attribute the slow processing problem to the producer-consumer model. The producer’s input queue (IN_Q) is full, and so is the consumer’s output queue (OUT_Q). The processing speed of Node 4 is very slow and this node cannot promptly process the data generated by Node 1. However, Node 5 is running properly. This indicates the queue between Node 1 and Node 4 is blocked. Now, we can troubleshoot the problem by checking Node 1 and Node 4.

Each of the 500 InBps items has 256 PARALLEL operations, so it is impossible to view all the operations one by one. Therefore, during the aggregation, it is necessary to label the number of concurrent operations to which each index belongs. The aggregate operation divides the concurrent operations by label, allowing us to determine which concurrent operation has an OUT_Q value of 100%. As shown in the following figure, Line 324 and Line 115 show the maximum value. This further narrows down the scope of the problem.

The following figure shows how to narrow down the scope of the problem by using metrics combined with checkpoint alignment. However, this method is rarely used.

Analyze the Problem from Multiple Perspectives

This section explains how to analyze the problem of slow task processing.

In the case of slow task processing, we need to analyze the problem in a top-down manner, that is, from the business layer to the underlying system. Many problems can be analyzed from a business perspective. For example, analyzing whether concurrency is within the normal range, the data peaks and valleys, and the data skew. In terms of performance, it’s possible to analyze garbage collection, checkpoint alignment, state backend, and system performance, which is measured by CPU utilization, memory usage, swap, disk I/O, throughput, capacity, network I/O, and bandwidth utilization.

Q&A

Yes, but this is unnecessary since Flink processes the logs of other systems. Sink nodes directly export log analysis results or trigger alerts. Use metrics to collect statistics on the internal system statuses. Thus, the processing results of input data can be a direct output.

Q) Does each reporter have a dedicated thread?

Yes. Flink provides many threads. If a job is directly executed on the TaskManager, you can view the thread details by using the jstack command.

Original Source:

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com