Streaming Statistics in PostgreSQL with INSERT ON CONFLICT

Collecting Statistics with PostgreSQL

create table tbl (    
sid int primary key,
v1 int,
crt_time timestamp,
cnt int8 default 1, -- Statistical value. The default value is 1, indicating the first record.
sum_v float8 default 0, -- Statistical value. The default value is 0.
min_v float8 default float8 'Infinity', -- Statistical value. It is set to the maximum value of this type by default.
max_v float8 default float8 '-Infinity' -- Statistical value. It is set to the minimum value of this type by default.
);
create table tbl_log (    
sid int,
v1 int,
crt_time timestamp
);
insert into tbl (sid, v1, crt_time) values (:sid, :v1, now())     
on conflict (sid) do update set
v1=excluded.v1,
crt_time=excluded.crt_time,
cnt=tbl.cnt+1,
sum_v=case tbl.cnt when 1 then tbl.v1+excluded.v1 else tbl.sum_v+excluded.v1 end,
min_v=least(tbl.min_v, excluded.v1),
max_v=greatest(tbl.max_v, excluded.v1)
;
vi test.sql    

\set sid random(1,1000000)
\set v1 random(1,100000000)
insert into tbl (sid, v1, crt_time) values (:sid, :v1, now()) on conflict (sid) do update set v1=excluded.v1, crt_time=excluded.crt_time, cnt=tbl.cnt+1, sum_v=case tbl.cnt when 1 then tbl.v1+excluded.v1 else tbl.sum_v+excluded.v1 end, min_v=least(tbl.min_v, excluded.v1), max_v=greatest(tbl.max_v, excluded.v1);
insert into tbl_log values (:sid, :v1, now());
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120
postgres=# \timing    
Timing is on.
postgres=# select sid, count(*), sum(v1), min(v1), max(v1) from tbl_log group by sid order by sid limit 10;
sid | count | sum | min | max
-----+-------+-----------+----------+----------
1 | 14 | 740544728 | 11165285 | 90619042
2 | 10 | 414224202 | 2813223 | 83077953
3 | 11 | 501992396 | 13861878 | 79000001
4 | 17 | 902219309 | 23429 | 99312338
5 | 6 | 374351692 | 25582424 | 96340616
6 | 15 | 649447876 | 12987896 | 80478126
7 | 8 | 386687697 | 19697861 | 95097076
8 | 12 | 657650588 | 11339236 | 97211546
9 | 10 | 594843053 | 9192864 | 97362345
10 | 9 | 383123573 | 3877866 | 76604940
(10 rows)

Time: 1817.395 ms (00:01.817)


postgres=# select * from tbl order by sid limit 10;
sid | v1 | crt_time | cnt | sum_v | min_v | max_v
-----+----------+----------------------------+-----+-----------+----------+----------
1 | 26479786 | 2017-11-23 20:27:43.134594 | 14 | 740544728 | 11165285 | 90619042
2 | 25755108 | 2017-11-23 20:27:43.442651 | 10 | 414224202 | 2813223 | 83077953
3 | 51068648 | 2017-11-23 20:27:48.118906 | 11 | 501992396 | 13861878 | 79000001
4 | 81160224 | 2017-11-23 20:27:37.183186 | 17 | 902219309 | 23429 | 99312338
5 | 70208701 | 2017-11-23 20:27:35.399063 | 6 | 374351692 | 40289886 | 96340616
6 | 77536576 | 2017-11-23 20:27:46.04372 | 15 | 649447876 | 12987896 | 80478126
7 | 31153753 | 2017-11-23 20:27:46.54858 | 8 | 386687697 | 19697861 | 95097076
8 | 11339236 | 2017-11-23 20:27:40.947561 | 12 | 657650588 | 11339236 | 97211546
9 | 46103803 | 2017-11-23 20:27:38.450889 | 10 | 594843053 | 9192864 | 92049544
10 | 55630877 | 2017-11-23 20:27:28.944168 | 9 | 383123573 | 3877866 | 76604940
(10 rows)

Time: 0.512 ms

Single-Table Stress Testing

vi test.sql    

\set sid random(1,1000000)
\set v1 random(1,100000000)
insert into tbl (sid, v1, crt_time) values (:sid, :v1, now()) on conflict (sid) do update set v1=excluded.v1, crt_time=excluded.crt_time, cnt=tbl.cnt+1, sum_v=case tbl.cnt when 1 then tbl.v1+excluded.v1 else tbl.sum_v+excluded.v1 end, min_v=least(tbl.min_v, excluded.v1), max_v=greatest(tbl.max_v, excluded.v1);
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 300
transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 32
number of threads: 32
duration: 300 s
number of transactions actually processed: 57838943
latency average = 0.166 ms
latency stddev = 0.057 ms
tps = 192791.786864 (including connections establishing)
tps = 192805.650917 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.001 \set sid random(1,1000000)
0.000 \set v1 random(1,100000000)
0.164 insert into tbl (sid, v1, crt_time) values (:sid, :v1, now()) on conflict (sid) do update set v1=excluded.v1, crt_time=excluded.crt_time, cnt=tbl.cnt+1, sum_v=case tbl.cnt when 1 then tbl.v1+excluded.v1 else tbl.sum_v+excluded.v1 end, min_v=least(tbl.min_v, excluded.v1), max_v=greatest(tbl.max_v, excluded.v1);
top - 20:57:35 up 16 days, 3:44, 2 users, load average: 8.67, 2.08, 1.68
Tasks: 497 total, 28 running, 469 sleeping, 0 stopped, 0 zombie
%Cpu(s): 34.8 us, 13.7 sy, 0.0 ni, 51.3 id, 0.1 wa, 0.0 hi, 0.0 si, 0.0 st
KiB Mem : 23094336+total, 79333744 free, 1588292 used, 15002134+buff/cache
KiB Swap: 0 total, 0 free, 0 used. 22219502+avail Mem

Single-Instance Streaming Statistics Performance

Single-Table and Batch Writing Performance Stress Testing

vi test.sql    

\set sid random(1,1000000)
\set v1 random(1,100000000)
insert into tbl (sid, v1, crt_time) select :sid+id, :v1+id, now() from generate_series(1,100) t(id) on conflict (sid) do update set v1=excluded.v1, crt_time=excluded.crt_time, cnt=tbl.cnt+1, sum_v=case tbl.cnt when 1 then tbl.v1+excluded.v1 else tbl.sum_v+excluded.v1 end, min_v=least(tbl.min_v, excluded.v1), max_v=greatest(tbl.max_v, excluded.v1);
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 28 -j 28 -T 300
scaling factor: 1
query mode: prepared
number of clients: 28
number of threads: 28
duration: 120 s
number of transactions actually processed: 1411743
latency average = 2.380 ms
latency stddev = 0.815 ms
tps = 11764.276597 (including connections establishing)
tps = 11765.715797 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.001 \set sid random(1,1000000)
0.000 \set v1 random(1,100000000)
2.378 insert into tbl (sid, v1, crt_time) select :sid+id, :v1+id, now() from generate_series(1,100) t(id) on conflict (sid) do update set v1=excluded.v1, crt_time=excluded.crt_time, cnt=tbl.cnt+1, sum_v=case tbl.cnt when 1 then tbl.v1+excluded.v1 else tbl.sum_v+excluded.v1 end, min_v=least(tbl.min_v, excluded.v1), max_v=greatest(tbl.max_v, excluded.v1);
34 processes: 22 running, 11 sleeping, 1 uninterruptable
CPU states: 28.5% user, 0.0% nice, 6.3% system, 65.0% idle, 0.1% iowait
Memory: 173G used, 47G free, 247M buffers, 168G cached
DB activity: 9613 tps, 0 rollbs/s, 0 buffer r/s, 100 hit%, 961050 row r/s, 960824 row w/s
DB I/O: 0 reads/s, 0 KB/s, 0 writes/s, 0 KB/s
DB disk: 1455.4 GB total, 441.9 GB free (69% used)
Swap:

Streaming Statistics Solution

create table tbl(c1 int, c2 int, c3 int, c4 int, c5 int);    

select c1, count(*) from tbl group by c1;

select c2,c3, sum(c5) , count(*) from tbl group by c2,c3;

... More dimensions

Process Design

Example

create table tbl(c1 int not null, c2 int not null, c3 int not null, c4 int not null, c5 int not null);
create table cv1_tbl (c1 int primary key, cnt int8 default 1);    

create table cv2_tbl (c2 int, c3 int, c5 int, sum_v float8 default 0, cnt int8 default 1, primary key (c2,c3)) ;

... Other dimensions
insert into cv1_tbl (c1) values (NEW.c1) on conflict (c1) do update set cnt=cv1_tbl.cnt+1;    

insert into cv2_tbl (c2,c3,c5) values (NEW.c2, NEW.c3, NEW.c5) on conflict (c2,c3) do update set cnt=cv2_tbl.cnt+1, sum_v=case cv2_tbl.cnt when 1 then cv2_tbl.c5+excluded.c5 else cv2_tbl.sum_v+excluded.c5 end;
create rule r1 as on insert to tbl do instead insert into cv1_tbl (c1) values (NEW.c1) on conflict (c1) do update set cnt=cv1_tbl.cnt+1;    

create rule r2 as on insert to tbl do instead insert into cv2_tbl (c2,c3,c5) values (NEW.c2, NEW.c3, NEW.c5) on conflict (c2,c3) do update set cnt=cv2_tbl.cnt+1, sum_v=case cv2_tbl.cnt when 1 then cv2_tbl.c5+excluded.c5 else cv2_tbl.sum_v+excluded.c5 end;
vi test.sql    
\set c1 random(1,1000000)
\set c2 random(1,1000000)
\set c3 random(1,1000000)
\set c4 random(1,1000000)
\set c5 random(1,1000000)
insert into tbl values (:c1, :c2, :c3, :c4, :c5);


pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120
transaction type: ./test.sql    
scaling factor: 1
query mode: prepared
number of clients: 32
number of threads: 32
duration: 120 s
number of transactions actually processed: 18618957
latency average = 0.206 ms
latency stddev = 0.212 ms
tps = 155154.880841 (including connections establishing)
tps = 155174.283641 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.001 \set c1 random(1,1000000)
0.000 \set c2 random(1,1000000)
0.000 \set c3 random(1,1000000)
0.000 \set c4 random(1,1000000)
0.001 \set c5 random(1,1000000)
0.203 insert into tbl values (:c1, :c2, :c3, :c4, :c5);
postgres=# select * from cv2_tbl order by cnt desc limit 10;    
c2 | c3 | c5 | sum_v | cnt
--------+--------+--------+---------+-----
500568 | 119352 | 173877 | 436710 | 2
873168 | 20848 | 730385 | 1688835 | 2
90752 | 526912 | 622354 | 734505 | 2
273533 | 886999 | 766661 | 1085038 | 2
895573 | 466493 | 648095 | 1191965 | 2
338402 | 436092 | 940920 | 1372244 | 2
915723 | 866856 | 255638 | 947606 | 2
586692 | 543596 | 32905 | 996466 | 2
839232 | 928197 | 402745 | 1249665 | 2
401808 | 997216 | 493644 | 1423618 | 2
(10 rows)

postgres=# select * from cv1_tbl order by cnt desc limit 10;
c1 | cnt
--------+-----
952009 | 44
373778 | 43
483788 | 42
25749 | 42
93605 | 41
386201 | 41
596955 | 40
526220 | 40
91289 | 40
429061 | 40
(10 rows)
postgres=# select * from tbl;    
c1 | c2 | c3 | c4 | c5
----+----+----+----+----
(0 rows)

Parallel Design within an Instance

Example

Parallel Design Outside an Instance

Example

Use INSERT ON CONFLICT Together with Hyperloglog to Implement Real-Time UV Estimation

create extension hll;  

create table tbl (grpid int, userid int, dt date, cnt int8 default 1, hll_userid hll default hll_empty(), primary key (grpid, dt));

insert into tbl (grpid, userid, dt) values () on conflict (grpid, dt) do update set
cnt=tbl.cnt+1,
hll_userid=
case tbl.cnt
when 1 then hll_add(hll_add(tbl.hll_userid, hll_hash_integer(tbl.userid)), hll_hash_integer(excluded.userid))
else hll_add(tbl.hll_userid, hll_hash_integer(excluded.userid))
end ;
vi test.sql  

\set grpid random(1,1000000)
\set userid random(1,1000000000)
insert into tbl (grpid, userid, dt) values (:grpid,:userid,'2017-11-24') on conflict (grpid, dt) do update set cnt=tbl.cnt+1, hll_userid=case tbl.cnt when 1 then hll_add(hll_add(tbl.hll_userid, hll_hash_integer(tbl.userid)), hll_hash_integer(excluded.userid)) else hll_add(tbl.hll_userid, hll_hash_integer(excluded.userid)) end ;
transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 28
number of threads: 28
duration: 120 s
number of transactions actually processed: 21713334
latency average = 0.155 ms
latency stddev = 0.071 ms
tps = 180938.313421 (including connections establishing)
tps = 180959.906404 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.001 \set grpid random(1,1000000)
0.000 \set userid random(1,1000000000)
0.153 insert into tbl (grpid, userid, dt) values (:grpid,:userid,'2017-11-24') on conflict (grpid, dt) do update set cnt=tbl.cnt+1, hll_userid= case tbl.cnt when 1 then hll_add(hll_add(tbl.hll_userid, hll_hash_integer(tbl.userid)), hll_hash_integer(excluded.userid)) else hll_add(tbl.hll_userid, hll_hash_integer(excluded.userid)) end ;
postgres=# select * from tbl limit 10;  
grpid | userid | dt | cnt | hll_userid
--------+-----------+------------+-----+-------------------------------------------------------------------------------------------------------------------
71741 | 197976232 | 2017-11-24 | 5 | \x128b7fd534b8dfe5a72bbedd5b6c577ce9fb9fef7835561513628850f173084507f0bd7ed996166036a970
801374 | 373207765 | 2017-11-24 | 3 | \x128b7f1dd66eba7e70d9c550284e6d9870994f5f5b52f71f224d6e
565216 | 502576520 | 2017-11-24 | 7 | \x128b7f9c4eb2a37de228d8b959a3eb6875033eb9e5dae4c7a7a873037cc095c3f7b01506556992f5aeee9c2a29d4eeb4db71f92ce501619432a864
35036 | 868953081 | 2017-11-24 | 10 | \x128b7fa2249c2c7ca51016c477335c6c4e539dd369dd2ea9ab587ce6e3c3c88019dfc33361f5e97ab2db9e3475e0afefc5dc84547c9cc650d2c3ae61b7772ff8a3b36b63bfef7de0eff9f779d598d341edae11
950403 | 122708335 | 2017-11-24 | 9 | \x128b7fbb52bc26a18960fec0e5ef0b5d38015dc59f0bad2126d34ce0f19952682a1359257a39cb05a02cf0437f98ce664da1094e8173f33cc1df79547c86939e25bc096179d0a0cfe98b5c
173872 | 321068334 | 2017-11-24 | 7 | \x128b7fab5e34d66f513600c19356d876f80d37f13d28f4efc2d6ae0974487c0aa3f5e509affd49827908d35b7c4b009f57ff6376be2b1ea27b1204
786334 | 501502479 | 2017-11-24 | 5 | \x128b7f8b5e2d419433c147df779ac0ab34b25a060ecbdd5a896ee229a5ad32a00a060d516c141199609d3f
960665 | 855235921 | 2017-11-24 | 7 | \x128b7f95b32567416b5750ecb0c44a76480566f1d98aa6632a3ceeffe5dd8b8de96ffc2447dd5d74e20e993b38a6b242f2c78c678b60d542d68949
61741 | 945239318 | 2017-11-24 | 6 | \x128b7f885766f21f40b6b5b3783e764d90fd28c10af4a996cb5dcec8ea749905d0c5cb1de8b191b4f9e6775d597c247710ab71


postgres=# select grpid,userid,cnt,hll_cardinality(hll_userid) from tbl limit 10;
grpid | userid | cnt | hll_cardinality
--------+-----------+-----+-----------------
775333 | 642518584 | 13 | 13
17670 | 542792727 | 11 | 11
30079 | 311255630 | 14 | 14
61741 | 945239318 | 10 | 10
808051 | 422418318 | 14 | 14
620850 | 461130760 | 12 | 12
256591 | 415325936 | 15 | 15
801374 | 373207765 | 9 | 9
314023 | 553568037 | 12 | 12

Use INSERT ON CONFLICT Together with UDFs to Simplify the SQL Complexity of Stream Computing

create or replace function func1(int, int, date) returns void as 
$$

declare
begin
insert into tbl (grpid, userid, dt) values ($1,$2,$3) on conflict (grpid, dt)
do update set
cnt=tbl.cnt+1,
hll_userid=
case tbl.cnt
when 1
then hll_add(hll_add(tbl.hll_userid, hll_hash_integer(tbl.userid)), hll_hash_integer(excluded.userid))
else
hll_add(tbl.hll_userid, hll_hash_integer(excluded.userid))
end ;
end;
$$
language plpgsql strict;

Design of Logs Plus Real-Time Computing

create or replace rule R1 AS on INSERT TO log_table do also XXXXXXXXXXXXXXX;
create or replace rule R1 AS on INSERT TO log_table WHERE (位点条件,如 id>10000000) do also XXXXXXXXXXXXXXX;

Original Source

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store