PostgreSQL Asynchronous Message practice — real-time feed system monitoring and response like in proactive e-commerce services — minute level to milli

Background

To locate problems and meet operations requirements, analysis requirements, or other requirements, event tracking is configured in many business systems to record logs of user behaviors in business systems, which are also called Feed logs.

Solution 1: Use Rds for Postgresql + Oss + Hybriddb for Postgresql to Implement Data Cleaning and Proactive Detection in Minutes

Once consumed by using message queues, data is written into RDS for PostgreSQL in real time. Order feeds are merged in RDS for PostgreSQL and written into external tables in OSS. (Compressed data is supported. Around 100 MB raw data can be written into OSS per second in each session)

Solution 2: Millisecond-Level Feed Monitoring and Real-Time Feedback

Technologies should always serve business needs. Minutes of latency is relatively low, but in some extreme scenarios, much lower latency is required.

Design of Millisecond-Level Feed Monitoring and Feedback Architecture

Another reason why millisecond-level feed monitoring was previously not on the agenda is that merging HBase databases has relatively high latency, which may cause exceptions when StreamCompute tries to complete fields. Using RDS for PostgreSQL to implement exception monitoring avoids the field completion problem perfectly, because RDS for PostgreSQL itself includes full fields and therefore does not require completion.

Rds for Postgresql Design

1. Use multiple instances to improve system throughput. (For example, if 150,000 rows can be processed per second on a single instance, then 100 instances can process 15 million rows per second).

DB0, DB1, DB2, DB3, ..., DB255
db0, host?, port?    

db1, host?, port?

...
tbl0, tbl1, tbl2, ..., tbl127    

tbl128, tbl129, tbl130, ..., tbl255
tbl0, db?    

tbl1, db?

...

HybridDB for PostgreSQL design

HybridDB for PostgreSQL is still retained for analyzing massive amounts of data (PB-scale data) in real time.

Demo

1. Create a full wide table for order feed data. (We can also use the jsonb field to store all properties, because PostgreSQL supports JSONB. PostgreSQL also supports other multiple value types such as hstore and xml.)

create table feed(id int8 primary key, c1 int, c2 int, c3 int, c4 int, c5 int, c6 int, c7 int, c8 int, c9 int, c10 int, c11 int, c12 int);
insert into feed (id, c1, c2) values (2,2,30001) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2 ;    

insert into feed (id, c3, c4) values (2,99,290001) on conflict (id) do update set c3=excluded.c3, c4=excluded.c4 ;
create or replace function tg1() returns trigger as 
$$

declare
begin
-- 规则定义,实际使用时,可以联合规则定义表
-- c2大于1000时,发送异步消息
perform pg_notify('channel_1', 'Resone:c2 overflow::'||row_to_json(inserted)) from inserted where c2>1000;

-- 多个规则,写单个notify的方法。
-- perform pg_notify(
-- 'channel_1',
-- case
-- when c2>1000 then 'Resone:c2 overflow::'||row_to_json(inserted)
-- when c1>200 then 'Resone:c1 overflow::'||row_to_json(inserted)
-- end
-- )
-- from inserted
-- where
-- c2 > 1000
-- or c1 > 200;

-- 多个规则,可以写多个notify,或者合并成一个NOTIFY。

return null;
end;
$$
language plpgsql strict;
create or replace function tg2() returns trigger as 
$$

declare
begin
-- 规则定义,实际使用时,可以联合规则定义表

-- c2大于9999时,发送异步消息
perform pg_notify('channel_1', 'Resone:c2 overflow::'||row_to_json(NEW)) where NEW.c2>9999;

-- 多个规则,调用单个notify,写一个CHANNEL的方法。
-- perform pg_notify(
-- 'channel_1',
-- case
-- when c2>1000 then 'Resone:c2 overflow::'||row_to_json(NEW)
-- when c1>200 then 'Resone:c1 overflow::'||row_to_json(NEW)
-- end
-- )
-- where
-- NEW.c2 > 10000
-- or NEW.c1 > 200;

-- 多个规则,调用单个notify,写多个CHANNEL的方法。
-- perform pg_notify(
-- case
-- when c2>1000 then 'channel_1'
-- when c1>200 then 'channel_2'
-- end,
-- case
-- when c2>1000 then 'Resone:c2 overflow::'||row_to_json(NEW)
-- when c1>200 then 'Resone:c1 overflow::'||row_to_json(NEW)
-- end
-- )
-- where
-- NEW.c2 > 1000
-- or NEW.c1 > 200;

-- 多个规则,可以写多个notify,或者合并成一个NOTIFY。
-- 例如
-- perform pg_notify('channel_1', 'Resone:c2 overflow::'||row_to_json(NEW)) where NEW.c2 > 1000;
-- perform pg_notify('channel_2', 'Resone:c1 overflow::'||row_to_json(NEW)) where NEW.c1 > 200;

-- 也可以把规则定义在TABLE里面,实现动态的规则
-- 规则不要过于冗长,否则会降低写入的吞吐,因为是串行处理规则。
-- udf的输入为feed类型以及rule_table类型,输出为boolean。判断逻辑定义在UDF中。
-- perfrom pg_notify(channel_column, resone_column||'::'||row_to_json(NEW)) from rule_table where udf(NEW::feed, rule_table);

return null;
end;
$$
language plpgsql strict;
create trigger tg1 after insert on feed REFERENCING NEW TABLE AS inserted for each statement execute procedure tg1();    
create trigger tg2 after update on feed REFERENCING NEW TABLE AS inserted for each statement execute procedure tg1();
create trigger tg1 after insert on feed for each row execute procedure tg2();    
create trigger tg2 after update on feed for each row execute procedure tg2();
listen channel_1;    

接收消息:

loop
sleep ?;
get 消息;
end loop
postgres=# insert into feed (id, c1, c2) values (2,2,30001) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2 ;    
INSERT 0 1
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":2,"c1":2,"c2":30001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
postgres=# insert into feed (id, c1, c2)  select id,random()*100, random()*1001 from generate_series(1,10000) t(id) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2 ;    
INSERT 0 10000
Time: 59.528 ms
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":362,"c1":92,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.    
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":4061,"c1":90,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":4396,"c1":89,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":5485,"c1":72,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":6027,"c1":56,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":6052,"c1":91,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":7893,"c1":84,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":8158,"c1":73,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
postgres=# update feed set c1=1;    
UPDATE 10000
Time: 33.444 ms
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":1928,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.    
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":2492,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":2940,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":2981,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":4271,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":4539,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":7089,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":7619,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":8001,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":8511,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":8774,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":9394,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.

Stress Testing 1: Write One Record Each Time in Real Time

1. Consider a practical scenario where one exception record in every 10,000 records needs to be pushed.

vi test.sql    

\set id random(1,10000000)
\set c1 random(1,1001)
\set c2 random(1,10000)
insert into feed (id, c1, c2) values (:id, :c1, :c2) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2 ;
transaction type: ./test.sql    
scaling factor: 1
query mode: prepared
number of clients: 56
number of threads: 56
duration: 120 s
number of transactions actually processed: 20060111
latency average = 0.335 ms
latency stddev = 0.173 ms
tps = 167148.009836 (including connections establishing)
tps = 167190.475312 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.002 \set id random(1,10000000)
0.001 \set c1 random(1,1001)
0.000 \set c2 random(1,10000)
0.332 insert into feed (id, c1, c2) values (:id, :c1, :c2) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2 ;
postgres=# listen channel_1;    
LISTEN
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":3027121,"c1":393,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 738.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":5623104,"c1":177,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 758.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":3850742,"c1":365,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 695.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":5244809,"c1":55,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 716.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":4062585,"c1":380,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 722.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":8536437,"c1":560,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 695.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":7327211,"c1":365,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 728.
Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":431739,"c1":824,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 731.

Schemaless Design of Table Sharding on a Single Instance

For information about how to create and split tables into shards automatically, refer to the following articles.

Stress Testing 2: Write to Table Shards on a Single Instance

1. Create a full wide table template for order feed data.

create table feed(id int8 primary key, c1 int, c2 int, c3 int, c4 int, c5 int, c6 int, c7 int, c8 int, c9 int, c10 int, c11 int, c12 int);
create or replace function tg() returns trigger as 
$$

declare
begin
-- c2大于9999时,发送异步消息,
perform pg_notify('channel_1', 'Resone:c2 overflow::'||row_to_json(NEW)) where NEW.c2>9999;

-- 写入各个通道的例子,通过trigger parameter传入通道后缀(也可以写入单一通道,具体看设计需求)
-- perform pg_notify('channel_'||TG_ARGV[0], 'Resone:c2 overflow::'||row_to_json(NEW)) where NEW.c2>9999;

return null;
end;
$$
language plpgsql strict;
do language plpgsql 
$$

declare
begin
for i in 1..512 loop
execute 'create table feed'||i||'(like feed including all) inherits (feed)';
-- 创建触发器(采用行级触发) , 本例采用静态规则(when (...)),实际使用请使用动态规则,处理所有行
execute 'create trigger tg1 after insert on feed'||i||' for each row WHEN (NEW.c2>9999) execute procedure tg()';
execute 'create trigger tg2 after update on feed'||i||' for each row WHEN (NEW.c2>9999) execute procedure tg()';
end loop;
end;
$$
;
create or replace function ins(int,int8,int,int) returns void as 
$$

declare
begin
execute format('insert into feed%s (id,c1,c2) values (%s,%s,%s) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2', $1, $2, $3, $4) ;
end;
$$
language plpgsql strict;
create or replace function ins(int, int8) returns void as 
$$

declare
begin
execute format('insert into feed%s (id,c1,c2) %s on conflict (id) do update set c1=excluded.c1, c2=excluded.c2', $1, 'select id, random()*100, random()*10000 from generate_series('||$1||','||$1+1000||') t (id)') ;
end;
$$
language plpgsql strict;
vi test.sql    

\set suffix random(1,512)
\set id random(1,10000000)
\set c1 random(1,1001)
\set c2 random(1,10000)
select ins(:suffix, :id, :c1, :c2);
vi test.sql    

\set suffix random(1,512)
\set id random(1,10000000)
select ins(:suffix, :id);
transaction type: ./test.sql  
scaling factor: 1
query mode: prepared
number of clients: 112
number of threads: 112
duration: 120 s
number of transactions actually processed: 18047334
latency average = 0.744 ms
latency stddev = 0.450 ms
tps = 150264.463046 (including connections establishing)
tps = 150347.026261 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.002 \set suffix random(1,512)
0.001 \set id random(1,10000000)
0.001 \set c1 random(1,1001)
0.000 \set c2 random(1,10000)
0.742 select ins(:suffix, :id, :c1, :c2);
transaction type: ./test.sql  
scaling factor: 1
query mode: prepared
number of clients: 56
number of threads: 56
duration: 120 s
number of transactions actually processed: 140508
latency average = 47.820 ms
latency stddev = 17.175 ms
tps = 1169.851558 (including connections establishing)
tps = 1170.150203 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.002 \set suffix random(1,512)
0.000 \set id random(1,10000000)
47.821 select ins(:suffix, :id);

Example of Using Jdbc Asynchronous Messages

https://jdbc.postgresql.org/documentation/81/listennotify.html

import java.sql.*;    

public class NotificationTest {

public static void main(String args[]) throws Exception {
Class.forName("org.postgresql.Driver");
String url = "jdbc:postgresql://localhost:5432/test";

// Create two distinct connections, one for the notifier
// and another for the listener to show the communication
// works across connections although this example would
// work fine with just one connection.
Connection lConn = DriverManager.getConnection(url,"test","");
Connection nConn = DriverManager.getConnection(url,"test","");

// Create two threads, one to issue notifications and
// the other to receive them.
Listener listener = new Listener(lConn);
Notifier notifier = new Notifier(nConn);
listener.start();
notifier.start();
}

}

class Listener extends Thread {

private Connection conn;
private org.postgresql.PGConnection pgconn;

Listener(Connection conn) throws SQLException {
this.conn = conn;
this.pgconn = (org.postgresql.PGConnection)conn;
Statement stmt = conn.createStatement();
stmt.execute("LISTEN mymessage");
stmt.close();
}

public void run() {
while (true) {
try {
// issue a dummy query to contact the backend
// and receive any pending notifications.
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT 1");
rs.close();
stmt.close();

org.postgresql.PGNotification notifications[] = pgconn.getNotifications();
if (notifications != null) {
for (int i=0; i<notifications.length; i++) {
System.out.println("Got notification: " + notifications[i].getName());
}
}

// wait a while before checking again for new
// notifications
Thread.sleep(500);
} catch (SQLException sqle) {
sqle.printStackTrace();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}

}

class Notifier extends Thread {

private Connection conn;

public Notifier(Connection conn) {
this.conn = conn;
}

public void run() {
while (true) {
try {
Statement stmt = conn.createStatement();
stmt.execute("NOTIFY mymessage");
stmt.close();
Thread.sleep(2000);
} catch (SQLException sqle) {
sqle.printStackTrace();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}

}

Method of Using Libpq Asynchronous Messages

https://www.postgresql.org/docs/10/static/libpq-notify.html

Trigger Usage

https://www.postgresql.org/docs/10/static/sql-createtrigger.html

/*    
* The number of SLRU page buffers we use for the notification queue.
*/
#define NUM_ASYNC_BUFFERS 8
with t1 as (select ctid from feedback_table order by crt_time limit 100)     
delete from feedback_table where
ctid = any (array(select ctid from t1))
returning *;

Other

Pushed exceptions may be triggered again after data is updated. You can use logic to compare OLD values and NEW values to avoid this issue. This article does not deal with this issue. You can rewrite trigger code in actual use cases.

Compute Processing Throughput in Real Time

  1. A single instance of RDS for PostgreSQL has processing throughput up to 1,170,000 rows/s. This is highly cost-effective.
  2. One hundred instances of RDS for PostgreSQL can easily achieve processing throughput up to 100 million rows/s (6 billion rows/minute). This is absolutely exceptional performance.

--

--

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com