Accelerating PostgreSQL Ad Hoc Query and Dictionary with RUM Index

By Digoal

This article discusses how you can accelerate PostgreSQL ad hoc query and dictionary (random field combination) through RUM index acceleration.

Background

Data Size of System:

About 2 billion rows, 64 fields. The original data is mostly strings. (Unique values of most fields are limited)

Requirements:

  1. Query. Query of random field combinations for the aggregated value.
  2. Query concurrency. About 1000 concurrent queries. The response time of each query must be less than 100 ms.
  3. Latency for write and update must be less than 1 second. Peak time write and update speed must be up to 200,000 rows/s. Batch write supported.
  4. Convenient to add fields.
  5. Real-time computing (without modeling) required. In other words, the addition of statistical dimensions must be convenient, and there must be no need to wait for modeling to complete.

PostgreSQL Features for This Scenario

PostgreSQL can be used to meet all these requirements. PostgreSQL has the following features for ad hoc non-modeling queries:

1. Index Interface:

Bloom interface. Supports multi-field combination indexes, query of any field combinations, implements lossy filtering, and aggregates target data to certain BLOCKs.

GIN interface. Inverted index, widely used in multi-value types (such as full-text search, arrays, JSON, and K-V), and multi-field combination indexes. Supports multi-value type or random field combination searches and bitmap index scans, aggregates target data to certain BLOCKs, and speeds up queries.

RUM interface. The new version of RUM not only supports the tsvector type, but also supports the array type. The advantage of RUM is that it does not need bitmap scans, so there is no recheck process, and there is less CPU consumption during queries than with the GIN index interface.

2. Index Scanning Methods

Index scans can directly find required data.

Bitmap index scan, returns the block that contains the target data, and then the database performs CPU RECHECK. This method supports multiple-field combined scans.

3. Other Features:

  • Parallel computing (supports parallel scanning, filtering, sorting, JOIN, aggregation, index creation, and so on), (for example, parallel sorting only takes 40 seconds to return the top k entries of 10 billion data entries). More indicators for reference:
  • Asynchronous calls and aggregations, also supports DBLINK asynchronous calls for parallel computing.
  • Horizontal sharding.
  • Sequence that can be used in dictionaries. Example:
  • UDF. Can support very complex database function programming, and implement complex logic.
  • RULE. Data is automatically converted into data dictionaries when a data write or update occurs.

PostgreSQL Scenario Optimization Measures

  1. Dictionary (the number of unique values of most fields is limited, which is about 1–50 million). Data with about 30 fields requires a data dictionary (the ETL process can be used to make a real-time dictionary). The purpose of converting data into dictionaries is to save storage space and improve processing efficiency. If the performance is OK, data dictionary is not required.
  2. Write to an automatic dictionary (can be implemented using RULE)
  3. Automatic translation upon query
  4. Bloom, RUM, GIN, array, tsvector, multi-field BITMAP SCAN
  5. Sharding. Dblink asynchronous parallel calls.

Introduction to the Horizontal Sharding Method

  1. Use plproxy for horizontal sharding
  2. Use postgres_fdw + pg_pathman horizontal sharding
  3. Other PostgreSQL-based NewSQL or MPP open source products

pg-xl
https://www.postgres-xl.org/

citusdb
https://www.citusdata.com/

greenplum
http://www.greenplum.org/

pg_shardman
https://github.com/postgrespro/pg_shardman

Solution 1 — Global Dictionary + Array Type + RUM Index

Global Dictionary

Global dictionary means that the value ranges of all fields constitutes a large value range, and the “field name + field value” is unique within the value range.

After making a dictionary, you can choose either INT4 or INT8 as the element type of the dictionary.

Array

Since a global dictionary is used, we can use one array field to replace all fields.

create table tbl(    
id int8 primary key,
c1 int,
c2 int,
...
c50 int
);

Replaced with

create table tbl(    
id int8 primary key,
dict int[]
);

There are many advantages to using arrays. For example, adding fields will be a breeze, because you don’t need to change the results. Instead, you only need to fill the contents of the newly added fields into the array.

The original AND queries are replaced with Array CONTAINS queries, and the original OR queries are replaced with ARRAY INTERSECT queries.

RUM indexes

RUM indexes already support arrays. Support CONTAINS and INTERSECT queries.

Solution Demo

The demo will not include converting text into a dictionary. The hardware used is an Alibaba Cloud ECS, 56-core, 224 GB memory, local SSD cloud disk.

1. Create an extension

create extension rum;

2. Create a function that generates a random value

Create a function that generates a random value (namely the dictionary value). When you enter a range, the function returns a random value in this range.

create or replace function gen_rand(    
Int, -- Minimum value (inclusive)
Int -- Maximum value (inclusive)
) returns int as
$$

select $1+(random()*($2-$1))::int;
$$
language sql strict;

3. Create a function that generates a random array with a length of 50

The rule is, 16 fields having 1 million elements in the dictionary’s value range, 16 fields having 10 million elements in the dictionary’s value range, and 18 fields having 50 million elements in the dictionary’s value range.

A total of 50 fields consume 1.076 billion dictionary values. Therefore, we can use INT4 as the dictionary element type.

create or replace function gen_ran_array() returns int[] as 
$$

declare
res int[] := '{}'; -- Results
x int; -- Group range
offset1 int; -- Offset
begin
-- The first segment consumes 16 million values
offset1 := (-2147483648); -- Offset of the first segment is the minimum value of int4
x := 1000000; -- Value range for each segment is 1 million
for i in 1..16
loop
res := res||gen_rand(offset1+(i-1)*x, offset1+i*x-1);
end loop;

-- The second segment consumes 160 million values
offset1 := (-2147483648)+16*1000000; -- Offset of the second segment
x := 10000000; -- Value range for each segment is 10 million
for i in 1..16
loop
res := res||gen_rand(offset1+(i-1)*x, offset1+i*x-1);
end loop;

-- The third segment consumes 900 million values
offset1 := (-2147483648)+16*1000000+16*10000000; -- Offset of the third segment is
x := 50000000; -- Value range for each segment is 50 million
for i in 1..18
loop
res := res||gen_rand(offset1+(i-1)*x, offset1+i*x-1);
end loop;

-- A total of 1.076 billion values are consumed in the value range of INT4
return res;
end;
$$
language plpgsql strict;

4. Data example

postgres=# select gen_ran_array();    

gen_ran_array
--------------------------------------------------------------------------------------------------------
{-2146646308,-2145683415,-2145349222,-2143926381,-2143348415,-2141933614,-2141364249,-2140223009,-2138645116,-2138311094,-2137328519,-2136424380,-2134763612,-2134461767,-2132675440,-2131727900,-2125512613,-2117580976,-2108206637,-2093806503,-2084537076,-2072042857,-2071092129,-2060488058,-2043914532,-2039914771,-2025797284,-2021177739,-2004046058,-1997857659,-1988910392,-1975672648,-1963342019,-1901896072,-1864565293,-1806580356,-1724394364,-1708595351,-1643548404,-1582467707,-1549967665,-1485791936,-1429504322,-1413965811,-1334697903,-1289093865,-1226178368,-1204842726,-1169580505,-1109793310}
(1 row)

5. Create a table

create table tbl_test(    
id serial primary key,
dict int[] -- Use an array to represent 50 fields.
);

6. Create array RUM indexes

create index idx_tbl_test on tbl_test using rum (dict rum_anyarray_ops);

7. Write 200 million pieces of test data into a single table of a single instance

vi test2.sql    
insert into tbl_test (dict) select gen_ran_array() from generate_series(1,10);

pgbench -M prepared -n -r -P 1 -f ./test2.sql -c 56 -j 56 -t 357143

8. The write speed of a single instance is 33,000 rows/s.

Write speed is about 33,000 rows/s, and 10 nodes will make 330,000 rows/s.
About 80% CPU usage.

progress: 2.0 s, 3363.5 tps, lat 16.716 ms stddev 4.362    
progress: 3.0 s, 3568.0 tps, lat 15.707 ms stddev 3.707
progress: 4.0 s, 3243.0 tps, lat 17.239 ms stddev 4.529

9. Space proportion for 200 million pieces of data

Table: 49 GB
Indexes: 184 GB

10. Create a function that returns random values within N valid ranges for query testing

create or replace function gen_test_arr(int) returns int[] as 
$$

select array(select * from unnest(gen_ran_array()) order by random() limit $1);
$$
language sql strict immutable;

Result example

postgres=# select gen_test_arr(4);    
gen_test_arr
---------------------------------------------------
{-2012641247,-2133910693,-1626085823,-2136987009}
(1 row)

postgres=# select gen_test_arr(4);
gen_test_arr
---------------------------------------------------
{-1664820600,-1321104348,-1410506219,-2116164275}
(1 row)

11. Ad hoc query stress test

Disable bitmap scan

set enable_bitmapscan=off;

1 field query

select * from tbl_test where dict @> gen_test_arr(1);    

postgres=# explain (analyze,verbose,timing,costs,buffers) select * from tbl_test where dict @> gen_test_arr(1);
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------
Index Scan using idx_tbl_test on public.tbl_test (cost=14.40..852142.09 rows=753011 width=228) (actual time=0.410..4.444 rows=132 loops=1)
Output: id, dict
Index Cond: (tbl_test.dict @> '{-2139078302}'::integer[])
Buffers: shared hit=28 read=126 dirtied=10
Planning time: 0.616 ms
Execution time: 4.492 ms
(6 rows)

2-field AND query

select * from tbl_test where dict @> gen_test_arr(2);    

postgres=# explain (analyze,verbose,timing,costs,buffers) select * from tbl_test where dict @> gen_test_arr(2);
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------
Index Scan using idx_tbl_test on public.tbl_test (cost=28.80..4627.28 rows=3776 width=228) (actual time=0.084..0.084 rows=0 loops=1)
Output: id, dict
Index Cond: (tbl_test.dict @> '{-1229103789,-2117549196}'::integer[])
Buffers: shared hit=27
Planning time: 0.428 ms
Execution time: 0.098 ms
(6 rows)

3-field AND query

select * from tbl_test where dict @> gen_test_arr(3);    

postgres=# explain (analyze,verbose,timing,costs,buffers) select * from tbl_test where dict @> gen_test_arr(3);
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------
Index Scan using idx_tbl_test on public.tbl_test (cost=43.20..67.53 rows=19 width=228) (actual time=0.145..0.145 rows=0 loops=1)
Output: id, dict
Index Cond: (tbl_test.dict @> '{-1297850230,-1598505025,-1409870549}'::integer[])
Buffers: shared hit=32
Planning time: 0.621 ms
Execution time: 0.165 ms
(6 rows)

4-field AND query

select * from tbl_test where dict @> gen_test_arr(4);    

postgres=# explain (analyze,verbose,timing,costs,buffers) select * from tbl_test where dict @> gen_test_arr(4);
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------
Index Scan using idx_tbl_test on public.tbl_test (cost=57.60..60.01 rows=1 width=228) (actual time=0.301..0.301 rows=0 loops=1)
Output: id, dict
Index Cond: (tbl_test.dict @> '{-2143045247,-1543382864,-2132603589,-2146917034}'::integer[])
Buffers: shared hit=37
Planning time: 0.651 ms
Execution time: 0.321 ms
(6 rows)

2-field OR query

select * from tbl_test where dict && gen_test_arr(2);    

postgres=# explain (analyze,verbose,timing,costs,buffers) select * from tbl_test where dict && gen_test_arr(2);
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------
Index Scan using idx_tbl_test on public.tbl_test (cost=28.80..1626373.60 rows=1538286 width=228) (actual time=0.222..12.367 rows=308 loops=1)
Output: id, dict
Index Cond: (tbl_test.dict && '{-2141077184,-2146768682}'::integer[])
Buffers: shared hit=40 read=295 dirtied=44
Planning time: 0.590 ms
Execution time: 12.439 ms
(6 rows)

3-field OR query

select * from tbl_test where dict && gen_test_arr(3);    

postgres=# explain (analyze,verbose,timing,costs,buffers) select * from tbl_test where dict && gen_test_arr(3);
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------
Index Scan using idx_tbl_test on public.tbl_test (cost=43.20..2265424.89 rows=2282542 width=228) (actual time=0.254..19.038 rows=174 loops=1)
Output: id, dict
Index Cond: (tbl_test.dict && '{-1620795514,-1639870542,-2139239663}'::integer[])
Buffers: shared hit=40 read=166 dirtied=31
Planning time: 0.612 ms
Execution time: 19.093 ms
(6 rows)

4-field OR query

select * from tbl_test where dict && gen_test_arr(4);    

postgres=# explain (analyze,verbose,timing,costs,buffers) select * from tbl_test where dict && gen_test_arr(4);
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------
Index Scan using idx_tbl_test on public.tbl_test (cost=57.60..2847470.08 rows=3043456 width=228) (actual time=0.598..17.606 rows=328 loops=1)
Output: id, dict
Index Cond: (tbl_test.dict && '{-1705307460,-2136144007,-2132774019,-1953195893}'::integer[])
Buffers: shared hit=46 read=319 dirtied=54
Planning time: 0.652 ms
Execution time: 17.690 ms
(6 rows)

More-field AND query

postgres=# explain (analyze,verbose,timing,costs,buffers) select * from tbl_test where dict @> gen_test_arr(50);    

QUERY PLAN

---------------------------------------------------------------------------------------------------------------------------------------
Index Scan using idx_tbl_test on public.tbl_test (cost=600.00..602.41 rows=1 width=228) (actual time=2.203..2.203 rows=0 loops=1)
Output: id, dict
Index Cond: (tbl_test.dict @> '{-2132669865,-2137249848,-2042878341,-2088316247,-2143000973,-2143620433,-2133871891,-1209554329,-1528596632,-2134772182,-1897199994,-1104232704,-1704082437,-2141239524,-1968035285,-2131776457,-139302331
4,-1622173835,-2021025608,-1143009897,-1793901515,-1510483843,-2142162388,-2000639730,-2139063117,-2079775594,-1329895944,-1447777707,-2145106996,-2059425427,-1307088506,-2136236994,-1731136990,-1257663719,-2110797445,-2094280348,-212741
5326,-1990393443,-2040274978,-2022798000,-2118667926,-2070083767,-2145499074,-1979076804,-2137973932,-2004407692,-2146950560,-2140049095,-1610110401,-1866288627}'::integer[])
Buffers: shared hit=217
Planning time: 1.124 ms
Execution time: 2.230 ms
(6 rows)

More-field OR query

postgres=# explain (analyze,verbose,timing,costs,buffers) select * from tbl_test where dict && gen_test_arr(50);    

QUERY PLAN

------------------------------------------------------------------------
Index Scan using idx_tbl_test on public.tbl_test (cost=600.00..1271996.70 rows=6602760 width=228) (actual time=2.338..6.521 rows=547 loops=1)
Output: id, dict
Index Cond: (tbl_test.dict && '{-1610700436,-1085141127,-2014816431,-1549709010,-2137440391,-1263750440,-1973015812,-1129115246,-2007733110,-2081342072,-1654458135,-2062905475,-1702363876,-2141009261,-1948730625,-2035766373,-214289408
0,-1502295300,-1732512476,-2131960156,-2053099607,-2140187767,-2117547749,-2133816635,-1875496311,-2139047408,-2145616325,-1177249426,-2135287970,-2123144611,-1298794740,-1389925076,-2138430551,-2144850436,-2084170210,-2132759222,-214442
2424,-1819252191,-1995606281,-1988618306,-2135969961,-2105761786,-1435016071,-2141623972,-2147011919,-2049887148,-2100968914,-2030470574,-1368944612,-1826083272}'::integer[])
Buffers: shared hit=764 dirtied=1
Planning time: 0.627 ms
Execution time: 6.619 ms
(6 rows)

Stress Test Results

4-dimension AND query, input random conditions, stress test result: average RT 1.3 milliseconds, TPS 43,000+

vi test.sql  
select count(*) from tbl_test where dict @> gen_test_arr(4);

Since the IMMUTABLE function is used to run the index, the prepare statement cannot be used for measurement, otherwise the variable is fixed. As a result, the extended protocol is used here.
pgbench -M extended -n -r -P 1 -f ./test.sql -c 56 -j 56 -T 120The main bottleneck is at I/O. Either more memory or higher I/O capability can contribute to better performance.----total-cpu-usage---- -dsk/total- -net/total- ---paging-- ---system--
usr sys idl wai hiq siq| read writ| recv send| in out | int csw
34 5 15 45 0 0| 937M 0 |5540B 5804B| 0 0 | 116k 132k
33 5 15 46 0 0| 937M 0 |4616B 4976B| 0 0 | 115k 129k
transaction type: ./test.sql
scaling factor: 1
query mode: extended
number of clients: 56
number of threads: 56
duration: 120 s
number of transactions actually processed: 5190552
latency average = 1.295 ms
latency stddev = 0.791 ms
tps = 43242.325550 (including connections establishing)
tps = 43247.431982 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
1.296 select count(*) from tbl_test where dict @> gen_test_arr(4);

4-dimension OR query, input random conditions, stress test result: average RT 2.9 milliseconds, TPS 18,000+

vi test.sql  
select count(*) from tbl_test where dict && gen_test_arr(4);

Since the IMMUTABLE function is used to run the index, the prepare statement cannot be used for measurement, otherwise the variable is fixed. As a result, the extended protocol is used here.
pgbench -M extended -n -r -P 1 -f ./test.sql -c 56 -j 56 -T 120The main bottleneck is at I/O. Either more memory or higher I/O capability can contribute to better performance.transaction type: ./test.sql
scaling factor: 1
query mode: extended
number of clients: 56
number of threads: 56
duration: 120 s
number of transactions actually processed: 2260125
latency average = 2.973 ms
latency stddev = 2.724 ms
tps = 18828.318071 (including connections establishing)
tps = 18830.742359 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
2.974 select count(*) from tbl_test where dict && gen_test_arr(4);

PostgreSQL 11 HASH Partition Table Combined with RUM

PostgreSQL supports HASH partition tables. The performance might be better with smart parallel AGG.

1. Create RUM indexes

create extension rum;

2. Create a partition table

create unlogged table tbl_test(  
id serial primary key,
dict int[]
) PARTITION BY HASH (id);
create index idx_tbl_test on tbl_test using rum (dict rum_anyarray_ops);
do language plpgsql
$$

declare
begin
for i in 0..15 loop
execute format('create unlogged table tbl_test_%s partition of tbl_test for values with (MODULUS 16, REMAINDER %s)', i, i);
end loop;
end;
$$
;

3. Create random functions

create or replace function gen_rand(  
Int, -- Minimum value (inclusive)
Int -- Maximum value (inclusive)
) returns int as
$$

select $1+(random()*($2-$1))::int;
$$
language sql strict;

4. Create a dictionary-generation function

create or replace function gen_ran_array() returns int[] as 
$$

declare
res int[] := '{}'; -- Results
x int; -- Group range
offset1 int; -- Offset
begin
-- The first segment consumes 16 million values
offset1 := (-2147483648); -- Offset of the first segment is the minimum value of int4
x := 1000000; -- Value range for each segment is 1 million
for i in 1..16
loop
res := res||gen_rand(offset1+(i-1)*x, offset1+i*x-1);
end loop;

-- The second segment consumes 160 million values
offset1 := (-2147483648)+16*1000000; -- Offset of the second segment
x := 10000000; -- Value range for each segment is 10 million
for i in 1..16
loop
res := res||gen_rand(offset1+(i-1)*x, offset1+i*x-1);
end loop;

-- The third segment consumes 900 million values
offset1 := (-2147483648)+16*1000000+16*10000000; -- Offset of the third segment is
x := 50000000; -- Value range for each segment is 50 million
for i in 1..18
loop
res := res||gen_rand(offset1+(i-1)*x, offset1+i*x-1);
end loop;

-- A total of 1.076 billion values are consumed in the value range of INT4
return res;
end;
$$
language plpgsql strict;

5. Write test data

vi test2.sql  
insert into tbl_test (dict) select gen_ran_array() from generate_series(1,10);

nohup pgbench -M prepared -n -r -P 10 -f ./test2.sql -c 56 -j 56 -t 3571430 >./ins.log 2>&1 &

Using PostgreSQL 11 HASH partitions, the write speed is about 55,000 rows/s.

6. Ad hoc query performance is consistent with PostgreSQL 10

create or replace function gen_test_arr(int) returns int[] as 
$$

select array(select * from unnest(gen_ran_array()) order by random() limit $1);
$$
language sql strict immutable;
explain (analyze,verbose,timing,costs,buffers) select * from tbl_test where dict @> gen_test_arr(4);

explain (analyze,verbose,timing,costs,buffers) select * from tbl_test where dict && gen_test_arr(4);

Aggregate Computing

At present, aggregation operations on partition tables requires scanning, appending, and then aggregating. There is still room for optimization, and the community is already on it.

  1. Smart parallel aggregation of partitions
    https://commitfest.postgresql.org/17/1250/
  2. Smart parallel JOIN of partitions
    Let workers for each partition work in parallel, which is similar to the processing of the MPP architecture.
  3. DBLINK asynchronous call parallel aggregation

Summary

This article uses “global dictionary + array + RUM index” to achieve efficient write and query performance.

This query is particularly suitable for scenarios where all fields are equivalent query conditions. If there are non-equivalent conditions, we recommend that you cascade them and convert them to equivalent queries. Otherwise, you can extract the fields of non-equivalent query conditions and use the b-tree index, and then use the multi-index bitmap index scan, which also has the acceleration effect. In this case, you would need to do the corresponding recheck.

PG 10 single-instance single-table write speed: about 33,000 rows/s. There is significant room for write performance improvement, and the current bottleneck is mainly at the wal writer.

Single-instance write concomitant query: for any dimension queries, the response time is within 20 milliseconds.

4-dimension AND query, average RT 1.3 milliseconds, TPS 43,000+, far exceeding the 1000-concurrency business requirement.

4-dimension OR query, average RT 2.9 milliseconds, TPS 18,000+, far exceeding the 1000-concurrency business requirement.

With “global dictionary service + sub-database”, you can fulfill the requirements of a greater number of ad hoc real-time queries.

References

Reference:https://www.alibabacloud.com/blog/accelerating-postgresql-ad-hoc-query-and-dictionary-with-rum-index_594599?spm=a2c41.12696450.0.0

Written by

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store