Real-Time Multi-Stream Aggregation with JSON

Alibaba Cloud
7 min readJun 5, 2019

--

By Digoal

For writing data, some scenarios require the current record and the last record to be merged into a new record and this new record to be written. Therefore, the latest record holds the content of all previous records. Of course, what we are talking about here is snapshots of individual dimensions instead of all records in a table.

For example, an e-commerce order may involve several systems and have several records. The properties generated in each system may vary, and properties of all systems can form a large wide table. To simplify application design, JSON storage is usually selected instead of large wide tables. Every time data is written, it is expected that all the previous relevant records will be merged into a new record and this new record will be written.

However, note that data related to the same order may be written concurrently (this is not the case unless a thread processes order numbers by hash in a business system and only on one machine). If data related to the same order is written concurrently, merging when writing does not meet the actual requirements.

Example:

tbl has already included records (0, 1, 'test0', now())  

session A:

insert into tbl (pk, caseid, info, crt_time) values (1, 1, 'test1', now());

session B:

insert into tbl (pk, caseid, info, crt_time) values (2, 1, 'test2', now());

If session A and session B are launched simultaneously, the records written may become the following:

(1, 1, 'test0_test1', now());

(2, 1, 'test0_test2', now());
However, the intended results are the two following records

(1, 1, 'test0_test1', now());

(2, 1, 'test0_test1_test2', now());

This is similar to blockchains.

Therefore, we use another method to obtain snapshots. When writing data, we do not change the original writing method. That is, to write order records in individual lines of business into a single table and use JSON to represent the description of the order by individual business units.

JSON Write Performance

create table tbl_ord (  
ordid int8, -- Order no.
appid int, -- Application ID
info jsonb, -- Content
crt_time timestamp -- Write time
);

create index idx_tbl_ord on tbl_ord(ordid, crt_time),

Stress testing for writing a single record each time

vi test.sql  

\set ordid random(1,10000000)
\set appid random(1,10)
insert into tbl_ord (ordid,appid,info,crt_time) values (:ordid,:appid,jsonb '{"a" : 1, "b" : 2}',now());

pgbench -M prepared -n -r -P 1 -f ./test.sql -c 40 -j 40 -t 2500000

The stress testing result shows that the throughput for writing a single record is 234,000 rows/s.

transaction type: ./test.sql  
scaling factor: 1
query mode: prepared
number of clients: 40
number of threads: 40
number of transactions per client: 2500000
number of transactions actually processed: 100000000/100000000
latency average = 0.170 ms
latency stddev = 0.498 ms
tps = 234047.009786 (including connections establishing)
tps = 234060.902533 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.001 \set ordid random(1,10000000)
0.001 \set appid random(1,10)
0.168 insert into tbl_ord (ordid,appid,info,crt_time) values (:ordid,:appid,jsonb '{"a" : 1, "b" : 2}',now());

The bulk write throughput is up to over 1 million rows/s.

JSON Full Field Index

PostgreSQL supports full-field indexing for the JSON type and two operator classes. The following shows the supported data retrieval.

GIN indexes can be used to efficiently search for keys or key/value pairs occurring within   a large number of jsonb documents (datums).   

Two GIN "operator classes" are provided, offering different performance and flexibility trade-offs.

The default GIN operator class for jsonb supports queries with top-level key-exists operators
?, ?& and ?| operators and path/value-exists operator @>.

(For details of the semantics that these operators implement,

see Table 9.44.) An example of creating an index with this operator class is:

CREATE INDEX idxgin ON api USING GIN (jdoc);

The non-default GIN operator class jsonb_path_ops supports indexing the @> operator only.

An example of creating an index with this operator class is:

CREATE INDEX idxginp ON api USING GIN (jdoc jsonb_path_ops);
create index idx_tbl_ord_2 on tbl_ord using gin (info);

Example

-- Find documents in which the key "company" has value "Magnafone"  
SELECT jdoc->'guid', jdoc->'name' FROM api WHERE jdoc @> '{"company": "Magnafone"}';

-- Find documents in which the key "tags" contains key or array element "qui"
SELECT jdoc->'guid', jdoc->'name' FROM api WHERE jdoc -> 'tags' ? 'qui';

-- Find documents in which the key "tags" contains array element "qui"
SELECT jdoc->'guid', jdoc->'name' FROM api WHERE jdoc @> '{"tags": ["qui"]}';

Check a Point and Use JSON Aggregation to Obtain Snapshots at Any Time Point

Get snapshots of a caseid at a specific point in time and use JSONB aggregations. The performance is excellent.

Aggregate all records into one record

select caseid, jsonb_agg((pk,info,crt_time) order by crt_time) from tbl where caseid=? and crt_time<=? group by caseid;

Example of using jsonb_agg

postgres=# create type typ1 as (c1 int, c2 int);  
CREATE TYPE

postgres=# select jsonb_agg((c1,c2)::typ1 order by c1 desc) from (values (1,2),(2,3)) t(c1,c2);
jsonb_agg
------------------------------------------
[{"c1": 2, "c2": 3}, {"c1": 1, "c2": 2}]
(1 row)

Performance of using aggregate queries by order:

0.7 milliseconds

create type typ2 as (appid int, info jsonb, crt_time timestamp);  

postgres=# select ordid, jsonb_agg((appid,info,crt_time)::typ2 order by crt_time) from tbl_ord where ordid=1 and crt_time<=now() group by ordid;

-[ RECORD 1 ]----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
ordid | 1
jsonb_agg | [{"info": {"a": 1, "b": 2}, "appid": 6, "crt_time": "2017-12-09T23:24:56.659672"}, {"info": {"a": 1, "b": 2}, "appid": 5, "crt_time": "2017-12-09T23:25:13.073163"}, {"info": {"a": 1, "b": 2}, "appid": 6, "crt_time": "2017-12-09T23:25:49.94649"}, {"info": {"a": 1, "b": 2}, "appid": 10, "crt_time": "2017-12-09T23:26:23.523946"}, {"info": {"a": 1, "b": 2}, "appid": 2, "crt_time": "2017-12-09T23:26:49.900199"}, {"info": {"a": 1, "b": 2}, "appid": 7, "crt_time": "2017-12-09T23:27:10.643058"}, {"info": {"a": 1, "b": 2}, "appid": 8, "crt_time": "2017-12-09T23:27:20.937021"}, {"info": {"a": 1, "b": 2}, "appid": 8, "crt_time": "2017-12-09T23:27:21.446752"}, {"info": {"a": 1, "b": 2}, "appid": 6, "crt_time": "2017-12-09T23:29:19.10536"}, {"info": {"a": 1, "b": 2}, "appid": 7, "crt_time": "2017-12-09T23:29:56.192353"}, {"info": {"a": 1, "b": 2}, "appid": 1, "crt_time": "2017-12-09T23:30:07.879201"}, {"info": {"a": 1, "b": 2}, "appid": 6, "crt_time": "2017-12-09T23:30:31.487457"}]

Time: 0.696 ms

Stress testing

vi test.sql  

\set ordid random(1,10000000)
select ordid, jsonb_agg((appid,info,crt_time)::typ2 order by crt_time) from tbl_ord where ordid=:ordid and crt_time<=now() group by ordid;

Results

pgbench -M prepared -n -r -P 1 -f ./test.sql -c 28 -j 28 -T 120  

transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 28
number of threads: 28
duration: 120 s
number of transactions actually processed: 4677282
latency average = 0.718 ms
latency stddev = 0.463 ms
tps = 38977.016281 (including connections establishing)
tps = 38982.209839 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.001 \set ordid random(1,10000000)
0.717 select ordid, jsonb_agg((appid,info,crt_time)::typ2 order by crt_time) from tbl_ord where ordid=:ordid and crt_time<=now() group by ordid;

For 100 million records, the performance of aggregating snapshot of any order at any point in time:

Testing per second: 38,982

Average response time: 0.718 ms

Excellent Performance of Batch-Obtaining Records by Using JSON Indexes and Parallel Computing in PostgreSQL 10

A full table scan of 100 million records only needs 1 second.

postgres=# select count(*) from tbl_ord;  
count
-----------
100000000
(1 row)

Time: 1014.201 ms (00:01.014)

Searching by index is extremely fast (in milliseconds). Parallel scans and single-process scans are supported:

postgres=# explain (analyze,verbose,timing,costs,buffers) SELECT * from tbl_ord WHERE info @> '{"a": 5}';  
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------
Gather (cost=32241.40..142872.70 rows=100000 width=61) (actual time=3.878..3.878 rows=0 loops=1)
Output: ordid, appid, info, crt_time
Workers Planned: 1
Workers Launched: 1
Single Copy: true
-> Bitmap Heap Scan on public.tbl_ord (cost=32241.40..142872.70 rows=100000 width=61) (actual time=0.158..0.158 rows=0 loops=1)
Output: ordid, appid, info, crt_time
Recheck Cond: (tbl_ord.info @> '{"a": 5}'::jsonb)
Buffers: shared hit=6
Worker 0: actual time=0.158..0.158 rows=0 loops=1
Buffers: shared hit=6
-> Bitmap Index Scan on idx_tbl_ord_2 (cost=0.00..32216.40 rows=100000 width=0) (actual time=0.153..0.153 rows=0 loops=1)
Index Cond: (tbl_ord.info @> '{"a": 5}'::jsonb)
Buffers: shared hit=6
Worker 0: actual time=0.153..0.153 rows=0 loops=1
Buffers: shared hit=6
Planning time: 0.092 ms
Execution time: 4.836 ms
(18 rows)

Time: 5.416 ms
postgres=# set max_parallel_workers_per_gather =0;
SET
Time: 0.202 ms
postgres=# explain (analyze,verbose,timing,costs,buffers) SELECT * from tbl_ord WHERE info @> '{"a": 5}';
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------
Bitmap Heap Scan on public.tbl_ord (cost=32241.40..142872.70 rows=100000 width=61) (actual time=0.062..0.062 rows=0 loops=1)
Output: ordid, appid, info, crt_time
Recheck Cond: (tbl_ord.info @> '{"a": 5}'::jsonb)
Buffers: shared hit=6
-> Bitmap Index Scan on idx_tbl_ord_2 (cost=0.00..32216.40 rows=100000 width=0) (actual time=0.060..0.060 rows=0 loops=1)
Index Cond: (tbl_ord.info @> '{"a": 5}'::jsonb)
Buffers: shared hit=6
Planning time: 0.091 ms
Execution time: 0.098 ms
(9 rows)

Time: 0.539 ms

Connect Batch Data Results to Other Business Platform by Writing to OSS

OSS external table document (RDS for PostgreSQL): https://www.alibabacloud.com/help/doc-detail/44461.htm

OSS external table document (AnalyticDB for PostgreSQL): https://www.alibabacloud.com/help/doc-detail/35457.htm

Summary

1. An e-commerce order may involve several systems and have several records. The properties generated in each system may vary, and properties of all systems can form a large wide table. To simplify application design, JSON storage is usually selected instead of large wide tables. All business data of an order at any point in time may need to be merged in the business system.

2. JSON aggregation can perfectly meet the requirements of merging all business unit data for an order in chronological order.

3. The PostgreSQL JSON type supports GIN indexes and can implement efficient JSON data retrieval.

4. By combining RDS for PostgreSQL and OSS, if users need to send the filtered order data to other platforms, they can use external tables in OSS to easily connect data to other business.

5. PostgreSQL supports parallel computing for operations such as full table scans, indexing, ordering, and aggregation, providing extremely fast querying in a table containing hundreds of millions of records.

References

https://www.postgresql.org/docs/10/static/functions-json.html
https://www.postgresql.org/docs/10/static/datatype-json.html

Reference:https://www.alibabacloud.com/blog/real-time-multi-stream-aggregation-with-json_594864?spm=a2c41.12952312.0.0

--

--

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com