Displaying Massive Amounts of Data in Real Time

Background

  1. Present the latest sensor top value data.
  2. Select a timeline and display people flow by region

Examples

1. Find the Latest Values Reported by All Sensors

Design the table structure — GID is a sensor ID, val is an uploaded value, and crt_time is time.

create unlogged table sort_test(  
id serial8 primary key, -- Auto-increment primary key
s_id int, -- Sensor ID
val numeric(10,2), -- Sensor value
crt_time timestamp default clock_timestamp() -- Report time
);
postgres=# insert into sort_test (s_id,val) select random()*10000, random()*100 from generate_series(1,10000000);  
INSERT 0 10000000
postgres=# create index idx_test on sort_test using btree(s_id,id desc);
create type r as (s_id int, val numeric(10,2)); -- Composite type  

with recursive skip as (
(
select (s_id,val)::r as r from sort_test where id in (select id from sort_test where s_id is not null order by s_id,id desc limit 1)
)
union all
(
select (
select (s_id,val)::r as r from sort_test where id in (select id from sort_test t where t.s_id>(s.r).s_id and t.s_id is not null order by s_id,id desc limit 1)
) from skip s where (s.r).s_id is not null
-- "where (s.r).s_id is not null" must be included. Otherwise it will create an endless loop.
)
select (t.r).s_id, (t.r).val from skip t where t.* is not null;
s_id  |  val    
-------+-------
0 | 83.55
1 | 91.62
2 | 72.70
3 | 45.46
4 | 99.97
5 | 17.04
6 | 8.96
7 | 25.83
8 | 28.10
9 | 26.19
10 | 83.03
11 | 1.30
......
Time: 128.779 ms
postgres=# begin;  
BEGIN
Time: 0.095 ms
postgres=# declare cur cursor for with recursive skip as (
(
select (s_id,val)::r as r from sort_test where id in (select id from sort_test where s_id is not null order by s_id,id desc limit 1)
)
union all
(
select (
select (s_id,val)::r as r from sort_test where id in (select id from sort_test t where t.s_id>(s.r).s_id and t.s_id is not null order by s_id,id desc limit 1)
) from skip s where (s.r).s_id is not null
-- "where (s.r).s_id is not null" must be included. Otherwise it will create an endless loop.
)
select (t.r).s_id, (t.r).val from skip t where t.* is not null;
DECLARE CURSOR
Time: 0.841 ms
postgres=# fetch 10 from cur;
s_id | val
------+-------
0 | 83.55
1 | 91.62
2 | 72.70
3 | 45.46
4 | 99.97
5 | 17.04
6 | 8.96
7 | 25.83
8 | 28.10
9 | 26.19
(10 rows)

Time: 0.364 ms

2. Find the 10 Busiest Intersections by Vehicle in a City

postgres=# with recursive skip as (    
(
select (s_id,val)::r as r from sort_test where id in (select id from sort_test where s_id is not null order by s_id,id desc limit 1)
)
union all
(
select (
select (s_id,val)::r as r from sort_test where id in (select id from sort_test t where t.s_id>(s.r).s_id and t.s_id is not null order by s_id,id desc limit 1)
) from skip s where (s.r).s_id is not null
-- "where (s.r).s_id is not null" must be included. Otherwise it will create an endless loop.
)
select (t.r).s_id, (t.r).val from skip t where t.* is not null order by 2 desc limit 10;


s_id | val
------+-------
997 | 99.99
2233 | 99.97
610 | 99.97
4 | 99.97
6735 | 99.96
545 | 99.93
2992 | 99.91
4747 | 99.90
543 | 99.89
7229 | 99.88
(10 rows)

Time: 126.052 ms

3. Find the Busiest Shop by People Flow in a Specific Period in an Area

create table test (  
id serial8 primary key, -- Auto-increment series
gid int, -- Shop ID
val int, -- People flow at a shop
pos point, -- Shop location, indicated by points in place of longitudes and altitudes for test convenience
crt_time timestamp -- Time of uploading
);
postgres=# insert into test (gid,val,pos,crt_time) select random()*10000, random()*100000, point(random()*10000, random()*10000), clock_timestamp() from generate_series(1,10000000);  

postgres=# select min(crt_time),max(crt_time) from test;
min | max
----------------------------+----------------------------
2017-04-13 20:04:18.969268 | 2017-04-13 20:04:54.578339
(1 row)
create index idx_test_1 on test (gid, val desc) where crt_time between '2017-04-13 20:04:18.969268' and '2017-04-13 20:04:30.969268';
postgres=# select count(*) from test where crt_time between '2017-04-13 20:04:18.969268' and '2017-04-13 20:04:30.969268';  
count
---------
3461005
(1 row)
with recursive skip as (    
(
select t0 from test t0 where id in
(select id from test where gid is not null and crt_time between '2017-04-13 20:04:18.969268' and '2017-04-13 20:04:30.969268' order by gid,val desc limit 1) -- The time parameter. Obtain the largest val of the smallest GID. Use it as a startup record.
)
union all
(
select (
select t1 from test t1 where id in (select id from test t where t.gid > (s.t0).gid and t.gid is not null
and crt_time between '2017-04-13 20:04:18.969268' and '2017-04-13 20:04:30.969268' -- The time parameter
order by gid,val desc limit 1)
) from skip s where (s.t0).gid is not null
) -- "where (s.t0).gid is not null" must be included. Otherwise, it will create an endless loop.
)
select (t.t0).* from skip t where t.* is not null
and circle '((5000,5000), 1000)' @> (t.t0).pos -- The region parameter
order by (t.t0).val desc limit 10; -- Find the top 10 shops
id    | gid  |  val  |                 pos                 |          crt_time            
---------+------+-------+-------------------------------------+----------------------------
1754353 | 4001 | 99997 | (4755.64117543399,5253.53815406561) | 2017-04-13 20:04:24.563999
600729 | 5874 | 99996 | (5507.96090625226,4394.04523000121) | 2017-04-13 20:04:20.851141
1137330 | 4248 | 99995 | (4332.14340358973,4383.84034205228) | 2017-04-13 20:04:22.575639
2609044 | 7209 | 99995 | (5809.22217573971,4967.18854177743) | 2017-04-13 20:04:27.328745
1330926 | 2834 | 99994 | (4153.9505450055,4986.64934188128) | 2017-04-13 20:04:23.197925
208578 | 3439 | 99994 | (4186.14753056318,5103.39797474444) | 2017-04-13 20:04:19.598547
703010 | 5736 | 99993 | (4913.89285307378,4628.21466382593) | 2017-04-13 20:04:21.178653
298380 | 7680 | 99992 | (4539.91844784468,4454.29485291243) | 2017-04-13 20:04:19.884725
996318 | 7658 | 99992 | (4462.14715018868,5504.16304729879) | 2017-04-13 20:04:22.122626
3120169 | 3261 | 99991 | (4814.33014851063,4505.81138487905) | 2017-04-13 20:04:28.98197
(10 rows)

Time: 135.480 ms
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
Limit (cost=937.82..937.83 rows=1 width=40) (actual time=147.241..147.243 rows=10 loops=1)
Output: ((t.t0).id), ((t.t0).gid), ((t.t0).val), ((t.t0).pos), ((t.t0).crt_time)
Buffers: shared hit=80066
CTE skip
-> Recursive Union (cost=1.00..935.54 rows=101 width=64) (actual time=0.037..141.284 rows=10002 loops=1)
Buffers: shared hit=80066
-> Nested Loop (cost=1.00..9.03 rows=1 width=64) (actual time=0.036..0.036 rows=1 loops=1)
Output: t0.*
Inner Unique: true
Buffers: shared hit=8
-> HashAggregate (cost=0.57..0.58 rows=1 width=8) (actual time=0.022..0.023 rows=1 loops=1)
Output: test.id
Group Key: test.id
Buffers: shared hit=4
-> Limit (cost=0.43..0.55 rows=1 width=16) (actual time=0.017..0.018 rows=1 loops=1)
Output: test.id, test.gid, test.val
Buffers: shared hit=4
-> Index Scan using idx_test_1 on public.test (cost=0.43..431864.13 rows=3461209 width=16) (actual time=0.017..0.017 rows=1 loops=1)
Output: test.id, test.gid, test.val
Index Cond: (test.gid IS NOT NULL)
Buffers: shared hit=4
-> Index Scan using test_pkey on public.test t0 (cost=0.43..8.45 rows=1 width=72) (actual time=0.012..0.012 rows=1 loops=1)
Output: t0.*, t0.id
Index Cond: (t0.id = test.id)
Buffers: shared hit=4
-> WorkTable Scan on skip s (cost=0.00..92.45 rows=10 width=32) (actual time=0.014..0.014 rows=1 loops=10002)
Output: (SubPlan 1)
Filter: ((s.t0).gid IS NOT NULL)
Rows Removed by Filter: 0
Buffers: shared hit=80058
SubPlan 1
-> Nested Loop (cost=1.20..9.22 rows=1 width=64) (actual time=0.013..0.013 rows=1 loops=10001)
Output: t1.*
Inner Unique: true
Buffers: shared hit=80058
-> HashAggregate (cost=0.76..0.77 rows=1 width=8) (actual time=0.009..0.009 rows=1 loops=10001)
Output: t_1.id
Group Key: t_1.id
Buffers: shared hit=40033
-> Limit (cost=0.43..0.75 rows=1 width=16) (actual time=0.008..0.008 rows=1 loops=10001)
Output: t_1.id, t_1.gid, t_1.val
Buffers: shared hit=40033
-> Index Scan using idx_test_1 on public.test t_1 (cost=0.43..369056.35 rows=1153736 width=16) (actual time=0.008..0.008 rows=1 loops=10001)
Output: t_1.id, t_1.gid, t_1.val
Index Cond: ((t_1.gid > (s.t0).gid) AND (t_1.gid IS NOT NULL))
Buffers: shared hit=40033
-> Index Scan using test_pkey on public.test t1 (cost=0.43..8.45 rows=1 width=72) (actual time=0.003..0.003 rows=1 loops=10000)
Output: t1.*, t1.id
Index Cond: (t1.id = t_1.id)
Buffers: shared hit=40025
-> Sort (cost=2.28..2.29 rows=1 width=40) (actual time=147.240..147.241 rows=10 loops=1)
Output: ((t.t0).id), ((t.t0).gid), ((t.t0).val), ((t.t0).pos), ((t.t0).crt_time)
Sort Key: ((t.t0).val) DESC
Sort Method: top-N heapsort Memory: 26kB
Buffers: shared hit=80066
-> CTE Scan on skip t (cost=0.00..2.27 rows=1 width=40) (actual time=0.252..147.138 rows=317 loops=1)
Output: (t.t0).id, (t.t0).gid, (t.t0).val, (t.t0).pos, (t.t0).crt_time
Filter: ((t.* IS NOT NULL) AND ('<(5000,5000),1000>'::circle @> (t.t0).pos))
Rows Removed by Filter: 9685
Buffers: shared hit=80066
Planning time: 0.508 ms
Execution time: 147.505 ms
(62 rows)
Assume that around 10 million pieces of data are generated every two hours.
postgres=# create index idx_test_gist on test using gist(pos);  
CREATE INDEX
select * from  
(
select row_number() over(partition by gid order by val desc) as rn, * from test
where
circle '((5000,5000), 1000)' @> pos -- The region parameter
) t
where rn = 1 -- Find the largest value of each shop in this interval
order by val desc limit 10; -- Find the 10 largest shop values
rn |   id    | gid  |  val  |                 pos                 |          crt_time            
----+---------+------+-------+-------------------------------------+----------------------------
1 | 7859807 | 2311 | 99999 | (4900.04640072584,4950.79724118114) | 2017-04-13 20:04:46.013424
1 | 4658616 | 3699 | 99999 | (5625.03716442734,5338.90711143613) | 2017-04-13 20:04:35.467025
1 | 1754353 | 4001 | 99997 | (4755.64117543399,5253.53815406561) | 2017-04-13 20:04:24.563999
1 | 6076598 | 4610 | 99997 | (5679.03681658208,4793.08029171079) | 2017-04-13 20:04:40.09587
1 | 6139261 | 4069 | 99997 | (5225.87833926082,4101.83480009437) | 2017-04-13 20:04:40.301817
1 | 600729 | 5874 | 99996 | (5507.96090625226,4394.04523000121) | 2017-04-13 20:04:20.851141
1 | 4281282 | 9720 | 99996 | (5036.95292398334,4731.64941649884) | 2017-04-13 20:04:34.237957
1 | 5579952 | 1503 | 99996 | (4271.09604235739,5250.28191972524) | 2017-04-13 20:04:38.469311
1 | 5310205 | 1317 | 99995 | (4439.0160869807,4796.70224711299) | 2017-04-13 20:04:37.590451
1 | 1137330 | 4248 | 99995 | (4332.14340358973,4383.84034205228) | 2017-04-13 20:04:22.575639
(10 rows)

Time: 633.342 ms
Limit  (cost=39265.88..39265.91 rows=10 width=48) (actual time=730.704..730.706 rows=10 loops=1)  
Output: t.rn, t.id, t.gid, t.val, t.pos, t.crt_time
Buffers: shared hit=317037, temp read=1921 written=1928
-> Sort (cost=39265.88..39266.01 rows=50 width=48) (actual time=730.702..730.703 rows=10 loops=1)
Output: t.rn, t.id, t.gid, t.val, t.pos, t.crt_time
Sort Key: t.val DESC
Sort Method: top-N heapsort Memory: 26kB
Buffers: shared hit=317037, temp read=1921 written=1928
-> Subquery Scan on t (cost=38939.80..39264.80 rows=50 width=48) (actual time=520.846..728.927 rows=10001 loops=1)
Output: t.rn, t.id, t.gid, t.val, t.pos, t.crt_time
Filter: (t.rn = 1)
Rows Removed by Filter: 303477
Buffers: shared hit=317037, temp read=1921 written=1928
-> WindowAgg (cost=38939.80..39139.80 rows=10000 width=48) (actual time=520.844..703.933 rows=313478 loops=1)
Output: row_number() OVER (?), test.id, test.gid, test.val, test.pos, test.crt_time
Buffers: shared hit=317037, temp read=1921 written=1928
-> Sort (cost=38939.80..38964.80 rows=10000 width=40) (actual time=520.837..594.505 rows=313478 loops=1)
Output: test.gid, test.val, test.id, test.pos, test.crt_time
Sort Key: test.gid, test.val DESC
Sort Method: external merge Disk: 15368kB
Buffers: shared hit=317037, temp read=1921 written=1928
-> Index Scan using idx_test_gist on public.test (cost=0.42..38275.42 rows=10000 width=40) (actual time=0.240..336.235 rows=313478 loops=1)
Output: test.gid, test.val, test.id, test.pos, test.crt_time
Index Cond: ('<(5000,5000),1000>'::circle @> test.pos)
Buffers: shared hit=317037
Planning time: 0.140 ms
Execution time: 734.226 ms
(27 rows)

Kernel-Level Optimization (Support for Spatial Grid Table Partitions)

Allow PostgreSQL to support spatial GRID partitions (In fact, you can now use inheritance to implement this, and use grid+mod to determine which partition data should be inserted.)

Business Optimization Methods

1. In example 1 and example 2, since the application layer uses recent data, historical data is not involved. In addition to recursive optimization, another two optimization methods are available.

create unlogged table sort_test(  
s_id int primary key, -- Sensor ID
val numeric(10,2), -- Sensor value
crt_time timestamp default clock_timestamp() -- Report time
);

insert into sort_test(s_id,val,crt_time) values (?,?,?) on conflict (s_id) do update set val=excluded.val,crt_time=excluded.crt_time;
create unlogged table hist(  
id serial8 primary key, -- Auto-increment primary key
s_id int, -- Sensor ID
val numeric(10,2), -- Sensor value
crt_time timestamp default clock_timestamp() -- Report time
);

create unlogged table hist_stat(
s_id int primary key, -- Sensor ID
val numeric(10,2), -- Sensor value
crt_time timestamp default clock_timestamp() -- Report time
);



create or replace function tg() returns trigger as
$$

declare
begin
insert into hist_stat (s_id,val,crt_time) values (NEW.s_id,NEW.val,NEW.crt_time) on conflict (s_id) do update set val=excluded.val,crt_time=excluded.crt_time;
return null;
end;
$$
language plpgsql strict;

create trigger tg after insert on hist for each row execute procedure tg();
postgres=# insert into hist(s_id,val) values(1,1);  
INSERT 0 1
postgres=# insert into hist(s_id,val) values(1,1);
INSERT 0 1
postgres=# insert into hist(s_id,val) values(1,1);
INSERT 0 1
postgres=# insert into hist(s_id,val) values(1,1);
INSERT 0 1
postgres=# insert into hist(s_id,val) values(1,1);
INSERT 0 1
postgres=# select * from hist;
id | s_id | val | crt_time
----+------+------+----------------------------
3 | 1 | 1.00 | 2017-04-13 22:23:25.165286
4 | 1 | 1.00 | 2017-04-13 22:23:26.23929
5 | 1 | 1.00 | 2017-04-13 22:23:26.646152
6 | 1 | 1.00 | 2017-04-13 22:23:26.991189
7 | 1 | 1.00 | 2017-04-13 22:23:27.376265
(5 rows)

postgres=# select * from hist_stat ;
s_id | val | crt_time
------+------+----------------------------
1 | 1.00 | 2017-04-13 22:23:27.376265
(1 row)
postgres=# select * from hist_stat ;  
s_id | val | crt_time
------+------+----------------------------
1 | 1.00 | 2017-04-13 22:23:27.376265
(1 row)

Original Source

https://www.alibabacloud.com/blog/displaying-massive-amounts-of-data-in-real-time_594988?spm=a2c41.13103891.0.0

--

--

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com