Merging Time Series Data in Different Scenarios with PostgreSQL

Sensor Case Example

create unlogged table sort_test(
id serial8 primary key, -- Primary key
c2 int, -- Sensor ID
c3 int -- Sensor value
);

Write 10 million pieces of sensor test data
postgres=# insert into sort_test (c2,c3) select random()*100000, random()*100 from generate_series(1,10000000);
INSERT 0 10000000
postgres=# explain (analyze,verbose,timing,costs,buffers) select id,c2,c3 from (select id,c2,c3,row_number() over(partition by c2 order by id desc) rn from sort_test) t where rn=1;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------
Subquery Scan on t (cost=10001512045.83..10001837045.83 rows=50000 width=16) (actual time=23865.363..44033.984 rows=100001 loops=1)
Output: t.id, t.c2, t.c3
Filter: (t.rn = 1)
Rows Removed by Filter: 9899999
Buffers: shared hit=54055, temp read=93801 written=93801
-> WindowAgg (cost=10001512045.83..10001712045.83 rows=10000000 width=24) (actual time=23865.351..41708.460 rows=10000000 loops=1)
Output: sort_test.id, sort_test.c2, sort_test.c3, row_number() OVER (?)
Buffers: shared hit=54055, temp read=93801 written=93801
-> Sort (cost=10001512045.83..10001537045.83 rows=10000000 width=16) (actual time=23865.335..31540.089 rows=10000000 loops=1)
Output: sort_test.id, sort_test.c2, sort_test.c3
Sort Key: sort_test.c2, sort_test.id DESC
Sort Method: external merge Disk: 254208kB
Buffers: shared hit=54055, temp read=93801 written=93801
-> Seq Scan on public.sort_test (cost=10000000000.00..10000154055.00 rows=10000000 width=16) (actual time=0.021..1829.135 rows=10000000 loops=1)
Output: sort_test.id, sort_test.c2, sort_test.c3
Buffers: shared hit=54055
Planning time: 0.194 ms
Execution time: 44110.560 ms
(18 rows)
postgres=# create index sort_test_1 on sort_test(c2,id desc); 
CREATE INDEX
postgres=# explain (analyze,verbose,timing,costs,buffers) select id,c2,c3 from (select id,c2,c3,row_number() over(partition by c2 order by id desc) rn from sort_test) t where rn=1;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------
Subquery Scan on t (cost=0.43..542565.80 rows=50000 width=16) (actual time=0.048..33844.843 rows=100001 loops=1)
Output: t.id, t.c2, t.c3
Filter: (t.rn = 1)
Rows Removed by Filter: 9899999
Buffers: shared hit=10029020 read=1
-> WindowAgg (cost=0.43..417564.59 rows=10000097 width=24) (actual time=0.042..30490.662 rows=10000000 loops=1)
Output: sort_test.id, sort_test.c2, sort_test.c3, row_number() OVER (?)
Buffers: shared hit=10029020 read=1
-> Index Scan using sort_test_1 on public.sort_test (cost=0.43..242562.89 rows=10000097 width=16) (actual time=0.030..18347.482 rows=10000000 loops=1)
Output: sort_test.id, sort_test.c2, sort_test.c3
Buffers: shared hit=10029020 read=1
Planning time: 0.216 ms
Execution time: 33865.321 ms
(13 rows)
\timing
begin;
declare c1 cursor for select id,c2,c3 from (select id,c2,c3,row_number() over(partition by c2 order by id desc) rn from sort_test) t where rn=1;
postgres=# fetch 100 from c1;
id | c2 | c3
---------+----+-----
9962439 | 0 | 93
9711199 | 1 | 52
9987709 | 2 | 65
9995611 | 3 | 34
9998766 | 4 | 12
9926693 | 5 | 81
....
9905064 | 98 | 44
9991592 | 99 | 99
(100 rows)
Time: 31.408 ms -- results are returned very quickly
drop index sort_test_1;begin;
declare c1 cursor for select id,c2,c3 from (select id,c2,c3,row_number() over(partition by c2 order by id desc) rn from sort_test) t where rn=1;
postgres=# fetch 100 from c1;
....
Time: 22524.783 ms -- results are returned after SORT is completed. This is very slow.

Example of Incremental Synchronization of Merged Data

create extension hstore;create unlogged table sort_test1(
id serial8 primary key, -- Primary key
c2 int, -- Target table PK
c3 text, -- insert or update or delete
c4 hstore -- row
);
create index idx_sort_test1_1 on sort_test1(c2,id desc);select c2,c3,c4 from (select c2,c3,c4,row_number() over(partition by c2 order by id desc) rn from sort_test1) t where rn=1;postgres=# explain select c2,c3,c4 from (select c2,c3,c4,row_number() over(partition by c2 order by id desc) rn from sort_test1) t where rn=1;
QUERY PLAN
---------------------------------------------------------------------------------------------------
Subquery Scan on t (cost=0.15..46.25 rows=4 width=68)
Filter: (t.rn = 1)
-> WindowAgg (cost=0.15..36.50 rows=780 width=84)
-> Index Scan using idx_sort_test1_1 on sort_test1 (cost=0.15..22.85 rows=780 width=76)
(4 rows)

Excellent Optimization Method for Sparse Columns

create type r as (c2 int, c3 int);postgres=# explain (analyze,verbose,timing,costs,buffers) with recursive skip as (  
(
select (c2,c3)::r as r from sort_test where id in (select id from sort_test where c2 is not null order by c2,id desc limit 1)
)
union all
(
select (
select (c2,c3)::r as r from sort_test where id in (select id from sort_test t where t.c2>(s.r).c2 and t.c2 is not null order by c2,id desc limit 1)
) from skip s where (s.r).c2 is not null
) -- "where (s.r).c2 is not null" must be added, otherwise it will end up with an endless loop.
)
select (t.r).c2, (t.r).c3 from skip t where t.* is not null;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
CTE Scan on skip t (cost=302.97..304.99 rows=100 width=8) (actual time=0.077..4184.770 rows=100001 loops=1)
Output: (t.r).c2, (t.r).c3
Filter: (t.* IS NOT NULL)
Rows Removed by Filter: 1
Buffers: shared hit=800947, temp written=476
CTE skip
-> Recursive Union (cost=0.91..302.97 rows=101 width=32) (actual time=0.066..3970.580 rows=100002 loops=1)
Buffers: shared hit=800947
-> Nested Loop (cost=0.91..2.95 rows=1 width=32) (actual time=0.064..0.066 rows=1 loops=1)
Output: ROW(sort_test_1.c2, sort_test_1.c3)::r
Buffers: shared hit=8
-> HashAggregate (cost=0.47..0.48 rows=1 width=8) (actual time=0.044..0.044 rows=1 loops=1)
Output: sort_test_2.id
Group Key: sort_test_2.id
Buffers: shared hit=4
-> Limit (cost=0.43..0.46 rows=1 width=12) (actual time=0.036..0.036 rows=1 loops=1)
Output: sort_test_2.id, sort_test_2.c2
Buffers: shared hit=4
-> Index Only Scan using sort_test_1 on public.sort_test sort_test_2 (cost=0.43..267561.43 rows=10000000 width=12) (actual time=0.034..0.034 rows=1 loops=1)
Output: sort_test_2.id, sort_test_2.c2
Index Cond: (sort_test_2.c2 IS NOT NULL)
Heap Fetches: 1
Buffers: shared hit=4
-> Index Scan using sort_test_pkey on public.sort_test sort_test_1 (cost=0.43..2.45 rows=1 width=16) (actual time=0.011..0.012 rows=1 loops=1)
Output: sort_test_1.id, sort_test_1.c2, sort_test_1.c3
Index Cond: (sort_test_1.id = sort_test_2.id)
Buffers: shared hit=4
-> WorkTable Scan on skip s (cost=0.00..29.80 rows=10 width=32) (actual time=0.037..0.038 rows=1 loops=100002)
Output: (SubPlan 1)
Filter: ((s.r).c2 IS NOT NULL)
Rows Removed by Filter: 0
Buffers: shared hit=800939
SubPlan 1
-> Nested Loop (cost=0.92..2.96 rows=1 width=32) (actual time=0.034..0.035 rows=1 loops=100001)
Output: ROW(sort_test.c2, sort_test.c3)::r
Buffers: shared hit=800939
-> HashAggregate (cost=0.49..0.50 rows=1 width=8) (actual time=0.023..0.023 rows=1 loops=100001)
Output: t_1.id
Group Key: t_1.id
Buffers: shared hit=400401
-> Limit (cost=0.43..0.48 rows=1 width=12) (actual time=0.021..0.021 rows=1 loops=100001)
Output: t_1.id, t_1.c2
Buffers: shared hit=400401
-> Index Only Scan using sort_test_1 on public.sort_test t_1 (cost=0.43..133557.76 rows=3333333 width=12) (actual time=0.019..0.019 rows=1 loops=100001)
Output: t_1.id, t_1.c2
Index Cond: ((t_1.c2 > (s.r).c2) AND (t_1.c2 IS NOT NULL))
Heap Fetches: 100000
Buffers: shared hit=400401
-> Index Scan using sort_test_pkey on public.sort_test (cost=0.43..2.45 rows=1 width=16) (actual time=0.006..0.007 rows=1 loops=100000)
Output: sort_test.id, sort_test.c2, sort_test.c3
Index Cond: (sort_test.id = t_1.id)
Buffers: shared hit=400538
Planning time: 0.970 ms
Execution time: 4209.026 ms
(54 rows)
postgres=# begin;
BEGIN
Time: 0.079 ms
postgres=# declare cur cursor for with recursive skip as (
(
select (c2,c3)::r as r from sort_test where id in (select id from sort_test where c2 is not null order by c2,id desc limit 1)
)
union all
(
select (
select (c2,c3)::r as r from sort_test where id in (select id from sort_test t where t.c2>(s.r).c2 and t.c2 is not null order by c2,id desc limit 1)
) from skip s where (s.r).c2 is not null
) -- "where (s.r).c2 is not null" must be added, otherwise it will end up with an endless loop.
)
select (t.r).c2, (t.r).c3 from skip t where t.* is not null;
DECLARE CURSOR
Time: 1.240 ms
postgres=# fetch 100 from cur;
r
----------
(0,93)
(1,52)
(2,65)
.....
(97,78)
(98,44)
(99,99)
(100 rows)
Time: 4.314 ms

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store