Merging Time Series Data in Different Scenarios with PostgreSQL

Sensor Case Example

Take the previously mentioned sensor case as an example. Let’s assume that these sensors are constantly uploading data and users need to query the latest value that each sensor is uploading.

create unlogged table sort_test(
id serial8 primary key, -- Primary key
c2 int, -- Sensor ID
c3 int -- Sensor value
);

Write 10 million pieces of sensor test data
postgres=# insert into sort_test (c2,c3) select random()*100000, random()*100 from generate_series(1,10000000);
INSERT 0 10000000
postgres=# explain (analyze,verbose,timing,costs,buffers) select id,c2,c3 from (select id,c2,c3,row_number() over(partition by c2 order by id desc) rn from sort_test) t where rn=1;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------
Subquery Scan on t (cost=10001512045.83..10001837045.83 rows=50000 width=16) (actual time=23865.363..44033.984 rows=100001 loops=1)
Output: t.id, t.c2, t.c3
Filter: (t.rn = 1)
Rows Removed by Filter: 9899999
Buffers: shared hit=54055, temp read=93801 written=93801
-> WindowAgg (cost=10001512045.83..10001712045.83 rows=10000000 width=24) (actual time=23865.351..41708.460 rows=10000000 loops=1)
Output: sort_test.id, sort_test.c2, sort_test.c3, row_number() OVER (?)
Buffers: shared hit=54055, temp read=93801 written=93801
-> Sort (cost=10001512045.83..10001537045.83 rows=10000000 width=16) (actual time=23865.335..31540.089 rows=10000000 loops=1)
Output: sort_test.id, sort_test.c2, sort_test.c3
Sort Key: sort_test.c2, sort_test.id DESC
Sort Method: external merge Disk: 254208kB
Buffers: shared hit=54055, temp read=93801 written=93801
-> Seq Scan on public.sort_test (cost=10000000000.00..10000154055.00 rows=10000000 width=16) (actual time=0.021..1829.135 rows=10000000 loops=1)
Output: sort_test.id, sort_test.c2, sort_test.c3
Buffers: shared hit=54055
Planning time: 0.194 ms
Execution time: 44110.560 ms
(18 rows)
postgres=# create index sort_test_1 on sort_test(c2,id desc); 
CREATE INDEX
postgres=# explain (analyze,verbose,timing,costs,buffers) select id,c2,c3 from (select id,c2,c3,row_number() over(partition by c2 order by id desc) rn from sort_test) t where rn=1;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------
Subquery Scan on t (cost=0.43..542565.80 rows=50000 width=16) (actual time=0.048..33844.843 rows=100001 loops=1)
Output: t.id, t.c2, t.c3
Filter: (t.rn = 1)
Rows Removed by Filter: 9899999
Buffers: shared hit=10029020 read=1
-> WindowAgg (cost=0.43..417564.59 rows=10000097 width=24) (actual time=0.042..30490.662 rows=10000000 loops=1)
Output: sort_test.id, sort_test.c2, sort_test.c3, row_number() OVER (?)
Buffers: shared hit=10029020 read=1
-> Index Scan using sort_test_1 on public.sort_test (cost=0.43..242562.89 rows=10000097 width=16) (actual time=0.030..18347.482 rows=10000000 loops=1)
Output: sort_test.id, sort_test.c2, sort_test.c3
Buffers: shared hit=10029020 read=1
Planning time: 0.216 ms
Execution time: 33865.321 ms
(13 rows)
\timing
begin;
declare c1 cursor for select id,c2,c3 from (select id,c2,c3,row_number() over(partition by c2 order by id desc) rn from sort_test) t where rn=1;
postgres=# fetch 100 from c1;
id | c2 | c3
---------+----+-----
9962439 | 0 | 93
9711199 | 1 | 52
9987709 | 2 | 65
9995611 | 3 | 34
9998766 | 4 | 12
9926693 | 5 | 81
....
9905064 | 98 | 44
9991592 | 99 | 99
(100 rows)
Time: 31.408 ms -- results are returned very quickly
drop index sort_test_1;begin;
declare c1 cursor for select id,c2,c3 from (select id,c2,c3,row_number() over(partition by c2 order by id desc) rn from sort_test) t where rn=1;
postgres=# fetch 100 from c1;
....
Time: 22524.783 ms -- results are returned after SORT is completed. This is very slow.

Example of Incremental Synchronization of Merged Data

When a materialized view is applied in Oracle, updating the same record only requires the last update instead of all the intermediate processes of each update.

create extension hstore;create unlogged table sort_test1(
id serial8 primary key, -- Primary key
c2 int, -- Target table PK
c3 text, -- insert or update or delete
c4 hstore -- row
);
create index idx_sort_test1_1 on sort_test1(c2,id desc);select c2,c3,c4 from (select c2,c3,c4,row_number() over(partition by c2 order by id desc) rn from sort_test1) t where rn=1;postgres=# explain select c2,c3,c4 from (select c2,c3,c4,row_number() over(partition by c2 order by id desc) rn from sort_test1) t where rn=1;
QUERY PLAN
---------------------------------------------------------------------------------------------------
Subquery Scan on t (cost=0.15..46.25 rows=4 width=68)
Filter: (t.rn = 1)
-> WindowAgg (cost=0.15..36.50 rows=780 width=84)
-> Index Scan using idx_sort_test1_1 on sort_test1 (cost=0.15..22.85 rows=780 width=76)
(4 rows)

Excellent Optimization Method for Sparse Columns

As we can see, the preceding optimization method only eliminates SORT and does not remove the number of blocks to be scanned.

create type r as (c2 int, c3 int);postgres=# explain (analyze,verbose,timing,costs,buffers) with recursive skip as (  
(
select (c2,c3)::r as r from sort_test where id in (select id from sort_test where c2 is not null order by c2,id desc limit 1)
)
union all
(
select (
select (c2,c3)::r as r from sort_test where id in (select id from sort_test t where t.c2>(s.r).c2 and t.c2 is not null order by c2,id desc limit 1)
) from skip s where (s.r).c2 is not null
) -- "where (s.r).c2 is not null" must be added, otherwise it will end up with an endless loop.
)
select (t.r).c2, (t.r).c3 from skip t where t.* is not null;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
CTE Scan on skip t (cost=302.97..304.99 rows=100 width=8) (actual time=0.077..4184.770 rows=100001 loops=1)
Output: (t.r).c2, (t.r).c3
Filter: (t.* IS NOT NULL)
Rows Removed by Filter: 1
Buffers: shared hit=800947, temp written=476
CTE skip
-> Recursive Union (cost=0.91..302.97 rows=101 width=32) (actual time=0.066..3970.580 rows=100002 loops=1)
Buffers: shared hit=800947
-> Nested Loop (cost=0.91..2.95 rows=1 width=32) (actual time=0.064..0.066 rows=1 loops=1)
Output: ROW(sort_test_1.c2, sort_test_1.c3)::r
Buffers: shared hit=8
-> HashAggregate (cost=0.47..0.48 rows=1 width=8) (actual time=0.044..0.044 rows=1 loops=1)
Output: sort_test_2.id
Group Key: sort_test_2.id
Buffers: shared hit=4
-> Limit (cost=0.43..0.46 rows=1 width=12) (actual time=0.036..0.036 rows=1 loops=1)
Output: sort_test_2.id, sort_test_2.c2
Buffers: shared hit=4
-> Index Only Scan using sort_test_1 on public.sort_test sort_test_2 (cost=0.43..267561.43 rows=10000000 width=12) (actual time=0.034..0.034 rows=1 loops=1)
Output: sort_test_2.id, sort_test_2.c2
Index Cond: (sort_test_2.c2 IS NOT NULL)
Heap Fetches: 1
Buffers: shared hit=4
-> Index Scan using sort_test_pkey on public.sort_test sort_test_1 (cost=0.43..2.45 rows=1 width=16) (actual time=0.011..0.012 rows=1 loops=1)
Output: sort_test_1.id, sort_test_1.c2, sort_test_1.c3
Index Cond: (sort_test_1.id = sort_test_2.id)
Buffers: shared hit=4
-> WorkTable Scan on skip s (cost=0.00..29.80 rows=10 width=32) (actual time=0.037..0.038 rows=1 loops=100002)
Output: (SubPlan 1)
Filter: ((s.r).c2 IS NOT NULL)
Rows Removed by Filter: 0
Buffers: shared hit=800939
SubPlan 1
-> Nested Loop (cost=0.92..2.96 rows=1 width=32) (actual time=0.034..0.035 rows=1 loops=100001)
Output: ROW(sort_test.c2, sort_test.c3)::r
Buffers: shared hit=800939
-> HashAggregate (cost=0.49..0.50 rows=1 width=8) (actual time=0.023..0.023 rows=1 loops=100001)
Output: t_1.id
Group Key: t_1.id
Buffers: shared hit=400401
-> Limit (cost=0.43..0.48 rows=1 width=12) (actual time=0.021..0.021 rows=1 loops=100001)
Output: t_1.id, t_1.c2
Buffers: shared hit=400401
-> Index Only Scan using sort_test_1 on public.sort_test t_1 (cost=0.43..133557.76 rows=3333333 width=12) (actual time=0.019..0.019 rows=1 loops=100001)
Output: t_1.id, t_1.c2
Index Cond: ((t_1.c2 > (s.r).c2) AND (t_1.c2 IS NOT NULL))
Heap Fetches: 100000
Buffers: shared hit=400401
-> Index Scan using sort_test_pkey on public.sort_test (cost=0.43..2.45 rows=1 width=16) (actual time=0.006..0.007 rows=1 loops=100000)
Output: sort_test.id, sort_test.c2, sort_test.c3
Index Cond: (sort_test.id = t_1.id)
Buffers: shared hit=400538
Planning time: 0.970 ms
Execution time: 4209.026 ms
(54 rows)
postgres=# begin;
BEGIN
Time: 0.079 ms
postgres=# declare cur cursor for with recursive skip as (
(
select (c2,c3)::r as r from sort_test where id in (select id from sort_test where c2 is not null order by c2,id desc limit 1)
)
union all
(
select (
select (c2,c3)::r as r from sort_test where id in (select id from sort_test t where t.c2>(s.r).c2 and t.c2 is not null order by c2,id desc limit 1)
) from skip s where (s.r).c2 is not null
) -- "where (s.r).c2 is not null" must be added, otherwise it will end up with an endless loop.
)
select (t.r).c2, (t.r).c3 from skip t where t.* is not null;
DECLARE CURSOR
Time: 1.240 ms
postgres=# fetch 100 from cur;
r
----------
(0,93)
(1,52)
(2,65)
.....
(97,78)
(98,44)
(99,99)
(100 rows)
Time: 4.314 ms

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com