Displaying Massive Amounts of Data in Real Time

Background

Image for post
Image for post

Examples

1. Find the Latest Values Reported by All Sensors

create unlogged table sort_test(  
id serial8 primary key, -- Auto-increment primary key
s_id int, -- Sensor ID
val numeric(10,2), -- Sensor value
crt_time timestamp default clock_timestamp() -- Report time
);
postgres=# insert into sort_test (s_id,val) select random()*10000, random()*100 from generate_series(1,10000000);  
INSERT 0 10000000
postgres=# create index idx_test on sort_test using btree(s_id,id desc);
create type r as (s_id int, val numeric(10,2)); -- Composite type  

with recursive skip as (
(
select (s_id,val)::r as r from sort_test where id in (select id from sort_test where s_id is not null order by s_id,id desc limit 1)
)
union all
(
select (
select (s_id,val)::r as r from sort_test where id in (select id from sort_test t where t.s_id>(s.r).s_id and t.s_id is not null order by s_id,id desc limit 1)
) from skip s where (s.r).s_id is not null
-- "where (s.r).s_id is not null" must be included. Otherwise it will create an endless loop.
)
select (t.r).s_id, (t.r).val from skip t where t.* is not null;
s_id  |  val    
-------+-------
0 | 83.55
1 | 91.62
2 | 72.70
3 | 45.46
4 | 99.97
5 | 17.04
6 | 8.96
7 | 25.83
8 | 28.10
9 | 26.19
10 | 83.03
11 | 1.30
......
Time: 128.779 ms
postgres=# begin;  
BEGIN
Time: 0.095 ms
postgres=# declare cur cursor for with recursive skip as (
(
select (s_id,val)::r as r from sort_test where id in (select id from sort_test where s_id is not null order by s_id,id desc limit 1)
)
union all
(
select (
select (s_id,val)::r as r from sort_test where id in (select id from sort_test t where t.s_id>(s.r).s_id and t.s_id is not null order by s_id,id desc limit 1)
) from skip s where (s.r).s_id is not null
-- "where (s.r).s_id is not null" must be included. Otherwise it will create an endless loop.
)
select (t.r).s_id, (t.r).val from skip t where t.* is not null;
DECLARE CURSOR
Time: 0.841 ms
postgres=# fetch 10 from cur;
s_id | val
------+-------
0 | 83.55
1 | 91.62
2 | 72.70
3 | 45.46
4 | 99.97
5 | 17.04
6 | 8.96
7 | 25.83
8 | 28.10
9 | 26.19
(10 rows)

Time: 0.364 ms

2. Find the 10 Busiest Intersections by Vehicle in a City

Image for post
Image for post
postgres=# with recursive skip as (    
(
select (s_id,val)::r as r from sort_test where id in (select id from sort_test where s_id is not null order by s_id,id desc limit 1)
)
union all
(
select (
select (s_id,val)::r as r from sort_test where id in (select id from sort_test t where t.s_id>(s.r).s_id and t.s_id is not null order by s_id,id desc limit 1)
) from skip s where (s.r).s_id is not null
-- "where (s.r).s_id is not null" must be included. Otherwise it will create an endless loop.
)
select (t.r).s_id, (t.r).val from skip t where t.* is not null order by 2 desc limit 10;


s_id | val
------+-------
997 | 99.99
2233 | 99.97
610 | 99.97
4 | 99.97
6735 | 99.96
545 | 99.93
2992 | 99.91
4747 | 99.90
543 | 99.89
7229 | 99.88
(10 rows)

Time: 126.052 ms

3. Find the Busiest Shop by People Flow in a Specific Period in an Area

Image for post
Image for post
create table test (  
id serial8 primary key, -- Auto-increment series
gid int, -- Shop ID
val int, -- People flow at a shop
pos point, -- Shop location, indicated by points in place of longitudes and altitudes for test convenience
crt_time timestamp -- Time of uploading
);
postgres=# insert into test (gid,val,pos,crt_time) select random()*10000, random()*100000, point(random()*10000, random()*10000), clock_timestamp() from generate_series(1,10000000);  

postgres=# select min(crt_time),max(crt_time) from test;
min | max
----------------------------+----------------------------
2017-04-13 20:04:18.969268 | 2017-04-13 20:04:54.578339
(1 row)
create index idx_test_1 on test (gid, val desc) where crt_time between '2017-04-13 20:04:18.969268' and '2017-04-13 20:04:30.969268';
postgres=# select count(*) from test where crt_time between '2017-04-13 20:04:18.969268' and '2017-04-13 20:04:30.969268';  
count
---------
3461005
(1 row)
with recursive skip as (    
(
select t0 from test t0 where id in
(select id from test where gid is not null and crt_time between '2017-04-13 20:04:18.969268' and '2017-04-13 20:04:30.969268' order by gid,val desc limit 1) -- The time parameter. Obtain the largest val of the smallest GID. Use it as a startup record.
)
union all
(
select (
select t1 from test t1 where id in (select id from test t where t.gid > (s.t0).gid and t.gid is not null
and crt_time between '2017-04-13 20:04:18.969268' and '2017-04-13 20:04:30.969268' -- The time parameter
order by gid,val desc limit 1)
) from skip s where (s.t0).gid is not null
) -- "where (s.t0).gid is not null" must be included. Otherwise, it will create an endless loop.
)
select (t.t0).* from skip t where t.* is not null
and circle '((5000,5000), 1000)' @> (t.t0).pos -- The region parameter
order by (t.t0).val desc limit 10; -- Find the top 10 shops
id    | gid  |  val  |                 pos                 |          crt_time            
---------+------+-------+-------------------------------------+----------------------------
1754353 | 4001 | 99997 | (4755.64117543399,5253.53815406561) | 2017-04-13 20:04:24.563999
600729 | 5874 | 99996 | (5507.96090625226,4394.04523000121) | 2017-04-13 20:04:20.851141
1137330 | 4248 | 99995 | (4332.14340358973,4383.84034205228) | 2017-04-13 20:04:22.575639
2609044 | 7209 | 99995 | (5809.22217573971,4967.18854177743) | 2017-04-13 20:04:27.328745
1330926 | 2834 | 99994 | (4153.9505450055,4986.64934188128) | 2017-04-13 20:04:23.197925
208578 | 3439 | 99994 | (4186.14753056318,5103.39797474444) | 2017-04-13 20:04:19.598547
703010 | 5736 | 99993 | (4913.89285307378,4628.21466382593) | 2017-04-13 20:04:21.178653
298380 | 7680 | 99992 | (4539.91844784468,4454.29485291243) | 2017-04-13 20:04:19.884725
996318 | 7658 | 99992 | (4462.14715018868,5504.16304729879) | 2017-04-13 20:04:22.122626
3120169 | 3261 | 99991 | (4814.33014851063,4505.81138487905) | 2017-04-13 20:04:28.98197
(10 rows)

Time: 135.480 ms
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
Limit (cost=937.82..937.83 rows=1 width=40) (actual time=147.241..147.243 rows=10 loops=1)
Output: ((t.t0).id), ((t.t0).gid), ((t.t0).val), ((t.t0).pos), ((t.t0).crt_time)
Buffers: shared hit=80066
CTE skip
-> Recursive Union (cost=1.00..935.54 rows=101 width=64) (actual time=0.037..141.284 rows=10002 loops=1)
Buffers: shared hit=80066
-> Nested Loop (cost=1.00..9.03 rows=1 width=64) (actual time=0.036..0.036 rows=1 loops=1)
Output: t0.*
Inner Unique: true
Buffers: shared hit=8
-> HashAggregate (cost=0.57..0.58 rows=1 width=8) (actual time=0.022..0.023 rows=1 loops=1)
Output: test.id
Group Key: test.id
Buffers: shared hit=4
-> Limit (cost=0.43..0.55 rows=1 width=16) (actual time=0.017..0.018 rows=1 loops=1)
Output: test.id, test.gid, test.val
Buffers: shared hit=4
-> Index Scan using idx_test_1 on public.test (cost=0.43..431864.13 rows=3461209 width=16) (actual time=0.017..0.017 rows=1 loops=1)
Output: test.id, test.gid, test.val
Index Cond: (test.gid IS NOT NULL)
Buffers: shared hit=4
-> Index Scan using test_pkey on public.test t0 (cost=0.43..8.45 rows=1 width=72) (actual time=0.012..0.012 rows=1 loops=1)
Output: t0.*, t0.id
Index Cond: (t0.id = test.id)
Buffers: shared hit=4
-> WorkTable Scan on skip s (cost=0.00..92.45 rows=10 width=32) (actual time=0.014..0.014 rows=1 loops=10002)
Output: (SubPlan 1)
Filter: ((s.t0).gid IS NOT NULL)
Rows Removed by Filter: 0
Buffers: shared hit=80058
SubPlan 1
-> Nested Loop (cost=1.20..9.22 rows=1 width=64) (actual time=0.013..0.013 rows=1 loops=10001)
Output: t1.*
Inner Unique: true
Buffers: shared hit=80058
-> HashAggregate (cost=0.76..0.77 rows=1 width=8) (actual time=0.009..0.009 rows=1 loops=10001)
Output: t_1.id
Group Key: t_1.id
Buffers: shared hit=40033
-> Limit (cost=0.43..0.75 rows=1 width=16) (actual time=0.008..0.008 rows=1 loops=10001)
Output: t_1.id, t_1.gid, t_1.val
Buffers: shared hit=40033
-> Index Scan using idx_test_1 on public.test t_1 (cost=0.43..369056.35 rows=1153736 width=16) (actual time=0.008..0.008 rows=1 loops=10001)
Output: t_1.id, t_1.gid, t_1.val
Index Cond: ((t_1.gid > (s.t0).gid) AND (t_1.gid IS NOT NULL))
Buffers: shared hit=40033
-> Index Scan using test_pkey on public.test t1 (cost=0.43..8.45 rows=1 width=72) (actual time=0.003..0.003 rows=1 loops=10000)
Output: t1.*, t1.id
Index Cond: (t1.id = t_1.id)
Buffers: shared hit=40025
-> Sort (cost=2.28..2.29 rows=1 width=40) (actual time=147.240..147.241 rows=10 loops=1)
Output: ((t.t0).id), ((t.t0).gid), ((t.t0).val), ((t.t0).pos), ((t.t0).crt_time)
Sort Key: ((t.t0).val) DESC
Sort Method: top-N heapsort Memory: 26kB
Buffers: shared hit=80066
-> CTE Scan on skip t (cost=0.00..2.27 rows=1 width=40) (actual time=0.252..147.138 rows=317 loops=1)
Output: (t.t0).id, (t.t0).gid, (t.t0).val, (t.t0).pos, (t.t0).crt_time
Filter: ((t.* IS NOT NULL) AND ('<(5000,5000),1000>'::circle @> (t.t0).pos))
Rows Removed by Filter: 9685
Buffers: shared hit=80066
Planning time: 0.508 ms
Execution time: 147.505 ms
(62 rows)
Assume that around 10 million pieces of data are generated every two hours.
postgres=# create index idx_test_gist on test using gist(pos);  
CREATE INDEX
select * from  
(
select row_number() over(partition by gid order by val desc) as rn, * from test
where
circle '((5000,5000), 1000)' @> pos -- The region parameter
) t
where rn = 1 -- Find the largest value of each shop in this interval
order by val desc limit 10; -- Find the 10 largest shop values
rn |   id    | gid  |  val  |                 pos                 |          crt_time            
----+---------+------+-------+-------------------------------------+----------------------------
1 | 7859807 | 2311 | 99999 | (4900.04640072584,4950.79724118114) | 2017-04-13 20:04:46.013424
1 | 4658616 | 3699 | 99999 | (5625.03716442734,5338.90711143613) | 2017-04-13 20:04:35.467025
1 | 1754353 | 4001 | 99997 | (4755.64117543399,5253.53815406561) | 2017-04-13 20:04:24.563999
1 | 6076598 | 4610 | 99997 | (5679.03681658208,4793.08029171079) | 2017-04-13 20:04:40.09587
1 | 6139261 | 4069 | 99997 | (5225.87833926082,4101.83480009437) | 2017-04-13 20:04:40.301817
1 | 600729 | 5874 | 99996 | (5507.96090625226,4394.04523000121) | 2017-04-13 20:04:20.851141
1 | 4281282 | 9720 | 99996 | (5036.95292398334,4731.64941649884) | 2017-04-13 20:04:34.237957
1 | 5579952 | 1503 | 99996 | (4271.09604235739,5250.28191972524) | 2017-04-13 20:04:38.469311
1 | 5310205 | 1317 | 99995 | (4439.0160869807,4796.70224711299) | 2017-04-13 20:04:37.590451
1 | 1137330 | 4248 | 99995 | (4332.14340358973,4383.84034205228) | 2017-04-13 20:04:22.575639
(10 rows)

Time: 633.342 ms
Limit  (cost=39265.88..39265.91 rows=10 width=48) (actual time=730.704..730.706 rows=10 loops=1)  
Output: t.rn, t.id, t.gid, t.val, t.pos, t.crt_time
Buffers: shared hit=317037, temp read=1921 written=1928
-> Sort (cost=39265.88..39266.01 rows=50 width=48) (actual time=730.702..730.703 rows=10 loops=1)
Output: t.rn, t.id, t.gid, t.val, t.pos, t.crt_time
Sort Key: t.val DESC
Sort Method: top-N heapsort Memory: 26kB
Buffers: shared hit=317037, temp read=1921 written=1928
-> Subquery Scan on t (cost=38939.80..39264.80 rows=50 width=48) (actual time=520.846..728.927 rows=10001 loops=1)
Output: t.rn, t.id, t.gid, t.val, t.pos, t.crt_time
Filter: (t.rn = 1)
Rows Removed by Filter: 303477
Buffers: shared hit=317037, temp read=1921 written=1928
-> WindowAgg (cost=38939.80..39139.80 rows=10000 width=48) (actual time=520.844..703.933 rows=313478 loops=1)
Output: row_number() OVER (?), test.id, test.gid, test.val, test.pos, test.crt_time
Buffers: shared hit=317037, temp read=1921 written=1928
-> Sort (cost=38939.80..38964.80 rows=10000 width=40) (actual time=520.837..594.505 rows=313478 loops=1)
Output: test.gid, test.val, test.id, test.pos, test.crt_time
Sort Key: test.gid, test.val DESC
Sort Method: external merge Disk: 15368kB
Buffers: shared hit=317037, temp read=1921 written=1928
-> Index Scan using idx_test_gist on public.test (cost=0.42..38275.42 rows=10000 width=40) (actual time=0.240..336.235 rows=313478 loops=1)
Output: test.gid, test.val, test.id, test.pos, test.crt_time
Index Cond: ('<(5000,5000),1000>'::circle @> test.pos)
Buffers: shared hit=317037
Planning time: 0.140 ms
Execution time: 734.226 ms
(27 rows)

Kernel-Level Optimization (Support for Spatial Grid Table Partitions)

Image for post
Image for post

Business Optimization Methods

create unlogged table sort_test(  
s_id int primary key, -- Sensor ID
val numeric(10,2), -- Sensor value
crt_time timestamp default clock_timestamp() -- Report time
);

insert into sort_test(s_id,val,crt_time) values (?,?,?) on conflict (s_id) do update set val=excluded.val,crt_time=excluded.crt_time;
create unlogged table hist(  
id serial8 primary key, -- Auto-increment primary key
s_id int, -- Sensor ID
val numeric(10,2), -- Sensor value
crt_time timestamp default clock_timestamp() -- Report time
);

create unlogged table hist_stat(
s_id int primary key, -- Sensor ID
val numeric(10,2), -- Sensor value
crt_time timestamp default clock_timestamp() -- Report time
);



create or replace function tg() returns trigger as
$$

declare
begin
insert into hist_stat (s_id,val,crt_time) values (NEW.s_id,NEW.val,NEW.crt_time) on conflict (s_id) do update set val=excluded.val,crt_time=excluded.crt_time;
return null;
end;
$$
language plpgsql strict;

create trigger tg after insert on hist for each row execute procedure tg();
postgres=# insert into hist(s_id,val) values(1,1);  
INSERT 0 1
postgres=# insert into hist(s_id,val) values(1,1);
INSERT 0 1
postgres=# insert into hist(s_id,val) values(1,1);
INSERT 0 1
postgres=# insert into hist(s_id,val) values(1,1);
INSERT 0 1
postgres=# insert into hist(s_id,val) values(1,1);
INSERT 0 1
postgres=# select * from hist;
id | s_id | val | crt_time
----+------+------+----------------------------
3 | 1 | 1.00 | 2017-04-13 22:23:25.165286
4 | 1 | 1.00 | 2017-04-13 22:23:26.23929
5 | 1 | 1.00 | 2017-04-13 22:23:26.646152
6 | 1 | 1.00 | 2017-04-13 22:23:26.991189
7 | 1 | 1.00 | 2017-04-13 22:23:27.376265
(5 rows)

postgres=# select * from hist_stat ;
s_id | val | crt_time
------+------+----------------------------
1 | 1.00 | 2017-04-13 22:23:27.376265
(1 row)
postgres=# select * from hist_stat ;  
s_id | val | crt_time
------+------+----------------------------
1 | 1.00 | 2017-04-13 22:23:27.376265
(1 row)

Original Source

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store