PostgreSQL Asynchronous Message practice — real-time feed system monitoring and response like in proactive e-commerce services — minute level to milli


To locate problems and meet operations requirements, analysis requirements, or other requirements, event tracking is configured in many business systems to record logs of user behaviors in business systems, which are also called Feed logs.

For example, an order system is associated with many aspects of business systems, such as shopping cart, order placing, payment, receipt of goods, disputes, and refunds. An order often has many associated records.

Each aspect may have different properties: new properties may be generated, or existing property values may be changed.

To make it convenient to perform an analysis, it is usually necessary to combine all records (properties) of an order generated throughout the transaction process into one record (a large wide order table).

Solution 1: Use Rds for Postgresql + Oss + Hybriddb for Postgresql to Implement Data Cleaning and Proactive Detection in Minutes

Once consumed by using message queues, data is written into RDS for PostgreSQL in real time. Order feeds are merged in RDS for PostgreSQL and written into external tables in OSS. (Compressed data is supported. Around 100 MB raw data can be written into OSS per second in each session)

HybridDB for PostgreSQL reads data from external tables in OSS (compressed data is supported and around 100 MB raw data can be read from OSS per second in each data node), and merges order feed data into a full order table.

After data goes to HybridDB for PostgreSQL, mine (or analyze) exception data from the full order table using SQL rules.

This solution implements the real-time analysis of massive amounts of order feed data in just minutes. With its very high throughput and low latency, this solution has provided considerable support to transactions during Double 11 events.

Solution 2: Millisecond-Level Feed Monitoring and Real-Time Feedback

Technologies should always serve business needs. Minutes of latency is relatively low, but in some extreme scenarios, much lower latency is required.

In fact, RDS for PostgreSQL supports an additional feature for implementing discovery of and feedback on exception feed data in only milliseconds.

That is to combine stream processing and asynchronous messages. The specific implementation methods are as follows:

1. Combine asynchronous message channels by using the trigger mechanism.

2. Combine asynchronous message channels by using pipelines and stream SQL.

The application listens to message channels, and the database writes exception data into message channels (notify channel, message), implementing active and asynchronous pushing of exception data.

Design of Millisecond-Level Feed Monitoring and Feedback Architecture

Another reason why millisecond-level feed monitoring was previously not on the agenda is that merging HBase databases has relatively high latency, which may cause exceptions when StreamCompute tries to complete fields. Using RDS for PostgreSQL to implement exception monitoring avoids the field completion problem perfectly, because RDS for PostgreSQL itself includes full fields and therefore does not require completion.

Rds for Postgresql Design

1. Use multiple instances to improve system throughput. (For example, if 150,000 rows can be processed per second on a single instance, then 100 instances can process 15 million rows per second).


Mapping relationship:

2. Split a table in an instance into multiple tables to improve concurrent processing throughput on a single instance. In the event of too many rules, splitting a table into many tables can improve the throughput for processing rules on a single instance.


Mapping relationship:

HybridDB for PostgreSQL design

HybridDB for PostgreSQL is still retained for analyzing massive amounts of data (PB-scale data) in real time.

Data channels still use OSS to batch import data.


1. Create a full wide table for order feed data. (We can also use the jsonb field to store all properties, because PostgreSQL supports JSONB. PostgreSQL also supports other multiple value types such as hstore and xml.)

2. Write order feed data, for example, business system A writes order fields c1 and c2. business system B writes order fields c3 and c4…
Use the syntax “on conflict do something” to merge order properties.

3. Create real-time monitoring rules for order feed data to send messages to asynchronous messages in PostgreSQL when these conditions are met. The app that listens to this channel loops to obtain data from asynchronous messages to consume messages in real time.
Rules can be retained in tables or written in trigger code or UDF code.

3.1. If data is written in bulk, statement-level triggers can be used to reduce the number of the trigger function invocations and improve the write throughput.

3.2. If a single piece of data is written each time, row-level triggers can be used. (The stress testing in the following section uses a row-level trigger.)

3.3. As described in the preceding code, rules can be defined in many places.

4. Select triggers.

4.1. Statement-level trigger (recommended for bulk write)

4.2. Row-level triggers (recommended for writing a single data entry each time) (The stress testing in the following section uses a row-level trigger.)

5. Negotiate a channel name.

6. The application listens to the message channel.

7. During the process of writing order data, each row of data will pass through the trigger in real time, where a logic operation will be performed. When specified rules are met, messages will be sent to the negotiated message channel.

8. The following is a received sample message:

9. Batch insert data.

The following are the samples received at a time:

10. Update data.

The following are the asynchronous message samples received:

Stress Testing 1: Write One Record Each Time in Real Time

1. Consider a practical scenario where one exception record in every 10,000 records needs to be pushed.

2. The stress testing result shows that the processing throughput is 167,000 rows/s.

3. The following are some samples of the asynchronous messages that are listened to.

Schemaless Design of Table Sharding on a Single Instance

For information about how to create and split tables into shards automatically, refer to the following articles.

PostgreSQL schemaless design and stress testing in China Railway Corporation’s ordering system

Implementation of on-demand slicing in PostgreSQL — plpgsql schemaless implementation of the automatic slicing feature of the TimescaleDB plug-in

PostgreSQL schemaless implementation

PostgreSQL time-series best practices — design a stock exchange system database — Alibaba Cloud RDS for PostgreSQL best practices

Stress Testing 2: Write to Table Shards on a Single Instance

1. Create a full wide table template for order feed data.

2. Define rules.

3. Define table sharding

4. Define UDFs for writing data into table shards dynamically. (This logic can be implemented in the application layer. This example only shows how much throughput can be reached after table sharding on a single instance.)

Submit one record each time:

Batch submit records:

5. Consider a practical scenario where one exception record in every 10,000 records needs to be pushed.

Submit one record each time:

Batch submit records:

6. Stress testing results

If a single record is submitted each time, the processing throughput is 150,000 rows/s.

The throughput is a little lower than that of writing a single record into a single table, because dynamical SQL queries are used (table shards are joined in UDFs) and this logic is put on the app side, increasing performance by 20%.

For batch commits (1,000 rows/batch), the processing throughput is 1,170,000 rows/s.

Batch submitting can significantly increase performance.

Example of Using Jdbc Asynchronous Messages

Method of Using Libpq Asynchronous Messages

Trigger Usage

How to use PostgreSQL triggers -1

How to use PostgreSQL triggers — 2


1. Asynchronous messages should be received quickly, otherwise they may occupy the $PGDATA/pg_notifydirectory space.

2. There is no upper limit on asynchronous messages, but storage has an upper limit.

Buffer size:

3. Reliability of asynchronous messages: PostgreSQL will track and listen to the position offset of received messages in each asynchronous message channel session.

For a new listener, only messages after the last offset of a channel will be sent. Messages before the last offset will not be sent.

Received messages will be cleared if they do not meet listener requirements.

A session listening to the message channel needs to be persisted. That is, if the session is disconnected, unreceived messages and the messages generated between the disconnection of the session and re-listening cannot be received.

4. The following method can be used to strengthen the reliability (replace asynchronous messages and use the persistence model)

change pg_notify in a trigger to insert into feedback_table .... ;

Persist the message consumption method and change to the following (async batch consume with atomicity):

Persisting messages can also meet the consumption requirement of over 100,000 rows. Generally there are not very many asynchronous messages, so consider using one exception table and multiple order tables.

However, this will consume more RDS for PostgreSQL IOPS (generating WAL and VACUUM WAL).


Pushed exceptions may be triggered again after data is updated. You can use logic to compare OLD values and NEW values to avoid this issue. This article does not deal with this issue. You can rewrite trigger code in actual use cases.

Compute Processing Throughput in Real Time

  1. A single instance of RDS for PostgreSQL has processing throughput up to 1,170,000 rows/s. This is highly cost-effective.
  2. One hundred instances of RDS for PostgreSQL can easily achieve processing throughput up to 100 million rows/s (6 billion rows/minute). This is absolutely exceptional performance.


Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website: