Real-time Statistical Analysis on Sliding Windows with PostgreSQL

Background

In real life, there is often a need for aggregation analysis. For example, the number of people stopping at each store in a mall at each time point. Some techniques can perceive the location of people. When you enter a certain area, a record is written, indicating that you have entered the area. When you leave, a record of you leaving is recorded. If you do not move for a long time, a heartbeat record is written regularly.

Database Design

An IoT enterprise uses sensors to analyze its system status. When a sensor goes online, an online record is written, and when it goes offline, an offline record is written. At the same time, the state of the sensor is queried at regular intervals. This means that sensors that do not have a record within an hour are considered offline.

1. Table Structure

create table sensor_stat(  
sid int, -- 传感器ID
state boolean, -- 传感器状态,true在线,false离线
crt_time timestamp -- 状态上传时间
);

2. Index

create index idx_sensor_stat_1 on sensor_stat(sid, crt_time desc);
insert into sensor_stat select random()*1000, (random()*1)::int::boolean, clock_timestamp() from generate_series(1,110100000);

3. TTL Data

The TTL for data, which ensures that the table is small and contains only data within the heartbeat time range.

create table sensor_stat1 (  
sid int, -- 传感器ID
state boolean, -- 传感器状态,true在线,false离线
crt_time timestamp -- 状态上传时间
);

create table sensor_stat2 (
sid int, -- 传感器ID
state boolean, -- 传感器状态,true在线,false离线
crt_time timestamp -- 状态上传时间
);

4. Recursive Query

Use a recursive query to query the final state of the sensor efficiently.

with recursive t as   
(
(
select sensor_stat as sensor_stat from sensor_stat order by sid, crt_time desc limit 1
)
union all
(
select (select t1 from sensor_stat AS t1 where t1.sid>(t.sensor_stat).sid order by sid, crt_time desc limit 1) from t where (t.sensor_stat).sid is not null
)
)
select (t.sensor_stat).* from t where t.* is not null;
explain (analyze,verbose,timing,costs,buffers) with recursive t as   
(
(
select sensor_stat as sensor_stat from sensor_stat where state is true order by sid, crt_time desc limit 1
)
union all
(
select (select t1 from sensor_stat AS t1 where t1.sid>(t.sensor_stat).sid and t1.state is true order by sid, crt_time desc limit 1) from t where (t.sensor_stat).sid is not null
)
)
select (t.sensor_stat).* from t where t.* is not null;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
CTE Scan on t (cost=70.86..72.88 rows=100 width=13) (actual time=0.037..10.975 rows=1001 loops=1)
Output: (t.sensor_stat).sid, (t.sensor_stat).state, (t.sensor_stat).crt_time
Filter: (t.* IS NOT NULL)
Rows Removed by Filter: 1
Buffers: shared hit=5926
CTE t
-> Recursive Union (cost=0.57..70.86 rows=101 width=37) (actual time=0.030..10.293 rows=1002 loops=1)
Buffers: shared hit=5926
-> Subquery Scan on "*SELECT* 1" (cost=0.57..0.63 rows=1 width=37) (actual time=0.029..0.029 rows=1 loops=1)
Output: "*SELECT* 1".sensor_stat
Buffers: shared hit=5
-> Limit (cost=0.57..0.62 rows=1 width=49) (actual time=0.028..0.028 rows=1 loops=1)
Output: sensor_stat.*, sensor_stat.sid, sensor_stat.crt_time
Buffers: shared hit=5
-> Index Scan using idx_sensor_stat_1 on public.sensor_stat (cost=0.57..3180100.70 rows=55369290 width=49) (actual time=0.027..0.027 rows=1 loops=1)
Output: sensor_stat.*, sensor_stat.sid, sensor_stat.crt_time
Filter: (sensor_stat.state IS TRUE)
Buffers: shared hit=5
-> WorkTable Scan on t t_1 (cost=0.00..6.82 rows=10 width=32) (actual time=0.010..0.010 rows=1 loops=1002)
Output: (SubPlan 1)
Filter: ((t_1.sensor_stat).sid IS NOT NULL)
Rows Removed by Filter: 0
Buffers: shared hit=5921
SubPlan 1
-> Limit (cost=0.57..0.66 rows=1 width=49) (actual time=0.009..0.009 rows=1 loops=1001)
Output: t1.*, t1.sid, t1.crt_time
Buffers: shared hit=5921
-> Index Scan using idx_sensor_stat_1 on public.sensor_stat t1 (cost=0.57..1746916.71 rows=18456430 width=49) (actual time=0.009..0.009 rows=1 loops=1001)
Output: t1.*, t1.sid, t1.crt_time
Index Cond: (t1.sid > (t_1.sensor_stat).sid)
Filter: (t1.state IS TRUE)
Rows Removed by Filter: 1
Buffers: shared hit=5921
Planning time: 0.180 ms
Execution time: 11.083 ms
(35 rows)
sid  | state |          crt_time            
------+-------+----------------------------
0 | t | 2017-07-05 10:29:09.470687
1 | f | 2017-07-05 10:29:09.465721
2 | t | 2017-07-05 10:29:09.474216
3 | f | 2017-07-05 10:29:09.473176
4 | t | 2017-07-05 10:29:09.473179
5 | t | 2017-07-05 10:29:09.473842
......
996 | t | 2017-07-05 10:29:09.469787
997 | f | 2017-07-05 10:29:09.470983
998 | t | 2017-07-05 10:29:09.47268
999 | t | 2017-07-05 10:29:09.469192
1000 | t | 2017-07-05 10:29:09.472195
(1001 rows)

Time: 11.067 ms
with recursive t as   
(
(
select sensor_stat as sensor_stat from sensor_stat order by sid, crt_time desc limit 1
)
union all
(
select (select t1 from sensor_stat AS t1 where t1.sid>(t.sensor_stat).sid order by sid, crt_time desc limit 1) from t where (t.sensor_stat).sid is not null
)
)
select count(*) from t where t.* is not null and (t.sensor_stat).state is true;
count
-------
491
(1 row)

Time: 10.182 ms

5. Number of Sensors

Count the number of sensors online at any point in time. If the time for each device to go online is exact to seconds (crt_time is exact to seconds), then the number of sensors online at a maximum of 86,400 points in time per day is required regardless of the number of records.

with recursive t as   
(
(
select sensor_stat as sensor_stat from sensor_stat where crt_time <= '2017-07-05 10:29:09' order by sid, crt_time desc limit 1
)
union all
(
select (select t1 from sensor_stat AS t1 where t1.crt_time <= '2017-07-05 10:29:09' and t1.sid>(t.sensor_stat).sid order by sid, crt_time desc limit 1) from t where (t.sensor_stat).sid is not null
)
)
select count(*) from t where t.* is not null and (t.sensor_stat).state is true;

count
-------
501
(1 row)

Time: 20.743 ms

6. Generate the Number of Online Objects of Each Second in the past at One Time

The frame query technique using window query. (A frame indicates the interval to the current record after records are ordered by time.)

7. Count the Maximum and Minimum Number of Online Objects per Minute

Query once per second and write data to the results table.

create table result (crt_time timestamp(0) default now(), state boolean, cnt int);  
create index idx_result_1 on result using brin (crt_time);

insert into result (state,cnt)
with recursive t as
(
(
select sensor_stat as sensor_stat from sensor_stat order by sid, crt_time desc limit 1
)
union all
(
select (select t1 from sensor_stat AS t1 where t1.sid>(t.sensor_stat).sid order by sid, crt_time desc limit 1) from t where (t.sensor_stat).sid is not null
)
)
select (t.sensor_stat).state, count(*) from t where t.* is not null group by 1;

INSERT 0 2
Time: 12.061 ms

postgres=# select * from result ;
crt_time | state | cnt
---------------------+-------+-----
2017-07-05 11:11:03 | f | 510
2017-07-05 11:11:03 | t | 491
(2 rows)

Time: 0.274 ms
select '2017-07-05 11:11:00', min(cnt), max(cnt) from result where crt_time between '2017-07-05 11:11:00' and '2017-07-05 11:12:00';  

or
select to_char(crt_time, 'yyyy-mm-dd hh24:mi:00'), min(cnt), max(cnt) from result where crt_time between ? and ? group by 1;

Optimizing with Large Number of Sensor IDs

When the number of sensor IDs reaches 100,000, the query performance drops to 250 milliseconds.

Optimization Method 2: Subquery

If the sensor ID is maintained in another table, a subquery can be used to optimize for this example.

create table a(id int primary key);   – Id is the sensor IDcreate table b(
aid int, – the sensor ID
crt_time timestamp, – time of reporting
val numeric – reported value
);
create index idx_b_1 on b(aid, crt_time desc); – indexing
insert into a select generate_series(0,100000);insert into b select random()*100000, clock_timestamp(), random() from generate_series(1,100000000);
select (t.b).aid,(t.b).val,(t.b).crt_time 
from
(
select (select b from b where b.aid=a.id order by crt_time desc limit 1) -- sub query, 循环若干次,若干=a的记录数。取出最后一个VALUE。
from a limit 1000000000 -- 这个不加的话有点问题,可能是个BUG,已反馈给社区。
) t
where (t.b).aid is not null; -- 取出b表中已上报的记录.
select (t.b).aid,(t.b).val,(t.b).crt_time 
from
(
select
(
select b from b
where b.aid=a.id
and b.crt_time between ? and ? -- 限定时间区间
order by crt_time desc limit 1
) -- sub query, 循环若干次,若干=a的记录数。取出最后一个VALUE。
from a limit 1000000000 -- 这个不加的话有点问题,可能是个BUG,已反馈给社区。
) t
where (t.b).aid is not null; -- 取出b表中已上报的记录.
explain (analyze,verbose,timing,costs,buffers) select (t.b).aid,(t.b).val,(t.b).crt_time 
from
(
select (select b from b where b.aid=a.id order by crt_time desc limit 1) -- sub query, 循环若干次,若干=a的记录数。取出最后一个VALUE。
from a limit 1000000000 -- 这个不加的话有点问题,可能是个BUG,已反馈给社区。
) t
where (t.b).aid is not null; -- 取出b表中已上报的记录.
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------
Subquery Scan on t (cost=0.00..191854.32 rows=99500 width=44) (actual time=0.033..827.591 rows=100000 loops=1)
Output: (t.b).aid, (t.b).val, (t.b).crt_time
Filter: ((t.b).aid IS NOT NULL)
Buffers: shared hit=500443
-> Limit (cost=0.00..190854.32 rows=100000 width=32) (actual time=0.032..796.185 rows=100000 loops=1)
Output: ((SubPlan 1))
Buffers: shared hit=500443
-> Seq Scan on postgres.a (cost=0.00..190854.32 rows=100000 width=32) (actual time=0.031..787.322 rows=100000 loops=1)
Output: (SubPlan 1)
Buffers: shared hit=500443
SubPlan 1
-> Limit (cost=0.57..1.89 rows=1 width=55) (actual time=0.007..0.007 rows=1 loops=100000)
Output: b.*, b.crt_time
Buffers: shared hit=500000
-> Index Scan using idx_b_1 on postgres.b (cost=0.57..946.44 rows=713 width=55) (actual time=0.007..0.007 rows=1 loops=100000)
Output: b.*, b.crt_time
Index Cond: (b.aid = a.id)
Buffers: shared hit=500000
Planning time: 0.144 ms
Execution time: 832.539 ms
(20 rows)
explain (analyze,verbose,timing,costs,buffers) select (t.b).aid,(t.b).val,(t.b).crt_time 
from
(
select
(
select b from b
where b.aid=a.id
and b.crt_time between '2017-07-17 09:53:00.480416' and '2017-07-17 09:54:00.480416' -- 限定时间区间
order by crt_time desc limit 1
) -- sub query, 循环若干次,若干=a的记录数。取出最后一个VALUE。
from a limit 1000000000 -- 这个不加的话有点问题,可能是个BUG,已反馈给社区。
) t
where (t.b).aid is not null; -- 取出b表中已上报的记录.
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Subquery Scan on t (cost=0.00..192742.68 rows=99500 width=44) (actual time=0.039..671.069 rows=100000 loops=1)
Output: (t.b).aid, (t.b).val, (t.b).crt_time
Filter: ((t.b).aid IS NOT NULL)
Buffers: shared hit=501263
-> Limit (cost=0.00..191742.68 rows=100000 width=32) (actual time=0.036..643.263 rows=100000 loops=1)
Output: ((SubPlan 1))
Buffers: shared hit=501263
-> Seq Scan on postgres.a (cost=0.00..191742.68 rows=100000 width=32) (actual time=0.035..634.038 rows=100000 loops=1)
Output: (SubPlan 1)
Buffers: shared hit=501263
SubPlan 1
-> Limit (cost=0.57..1.90 rows=1 width=55) (actual time=0.006..0.006 rows=1 loops=100000)
Output: b.*, b.crt_time
Buffers: shared hit=500820
-> Index Scan using idx_b_1 on postgres.b (cost=0.57..134.12 rows=100 width=55) (actual time=0.006..0.006 rows=1 loops=100000)
Output: b.*, b.crt_time
Index Cond: ((b.aid = a.id) AND (b.crt_time >= '2017-07-17 09:53:00.480416'::timestamp without time zone) AND (b.crt_time <= '2017-07-17 09:54:00.480416'::timestamp without time zone))
Buffers: shared hit=500820
Planning time: 0.183 ms
Execution time: 676.006 ms
(20 rows)

Summary

Through the method (recursive query) mentioned in this article, we can achieve very fine-grained, real-time statistics of the state of a large number of tracked objects (a single machine supports real-time pivot of any sliding window with daily increment of about 10 billion).

--

--

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com