Use Cases for Millisecond-Level Massive Multi-Dimensional Data Pivoting on PostgreSQL/GPDB
Background
For a typical e-commerce data pivoting service, the corpus may contain tag data of some users. For example: the ID of the brand, the ID of the sales area, the ID of the user corresponding to the brand, as well as a number of user tag fields and time fields.
Tags may be classified according to different dimensions, such as tag1 for gender, tag2 for age, and tag3 for interests….
The requirements of the business side are more likely to pivot the users of their own brands, and count the number of users in different sales areas (channels), time periods and tag dimensions, which is a very typical data pivoting requirement.
Example
Examples of Data Structures
Active user ID of the area and sales channel per day
t1 (
UID, -- the user ID
groupid, -- ID of the sales channel and area
day -- the date
)
Self-owned users of each brand, and increment maintenance
t2 (
UID, -- the user ID
pinpai -- the brand
)
User tags and increment maintenance
t3 (
UID, -- the user ID
tag1, -- tag 1, such as interest
tag2, -- tag 2, such as gender
tag3, -- tag 3, such as age
... ,
)
Examples of Data Pivoting
Pivot the data of a brand, a sales area, a tag and a day.
For example:
select
'interest' as tag,
t3.tag1 as tag_value,
count(1) as cnt
from
t1,
t2,
t3
where
t1.uid = t3.uid
and t1.uid = t2.uid
and t2.pinpai = ?
and t1.groupid = ?
AND t1.day = '2017-06-25'
group by t3.tag1
Such queries are computationally intensive and analysts may need to perform comparative analysis on different dimensions, for this reason stream computing for optimization is recommended.
Stream Computing for Optimization
The results required for stream computing are as follows:
t_result (
day, -- the data
pinpai, -- the brand ID
groupid, -- the channel, area, and store ID
tag1, -- tag type 1
tag2, -- tag type 2
tag3, -- tag type 3
... -- tag type N
cnt, -- number of users
UIDs, -- user ID array. This is an optional field, and if you do not need to know the ID details, there is no need to save it
hll_uids -- user HLL valuation
)
For GPDB, column storage can be used. For table partition, the first-level partition is based on the day range, and the second-level partition is based on pinpai and groupid hash. The random distribution is selected for the data distribution strategy. Finally, a separate index is created for each tag? field. This enables fast retrieval (the speed of a single pivoting request should be controlled within 100 milliseconds regardless of the data size).
After obtaining this result, the analyst’s query is simplified as follows (the first three conditions filter the data through partitions, and finally the result is quickly obtained according to tag? index):
select
day, pinpai, groupid, 'tag?' as tag, cnt, uids, hll_uids
from t_result
where
day =
and pinpai =
and groupid =
and tag? = ?
After stream computing, you can even achieve more complex dimensional analysis with a very small degree of computation, such as analyzing the different users for two specific days, and analyzing users with multiple TAGs.
Stream Computing Method
The SQL that produces the statistical results is as follows:
select
t1.day,
t2.pinpai,
t1.groupid,
t3.tag1,
t3.tag2,
t3.tag3,
...
count(1) as cnt ,
array_agg(uid) as uids,
hll_add_agg(hll_hash_integer(uid)) as hll_uids
from
t1,
t2,
t3
where
t1.uid = t3.uid
and t1.uid = t2.uid
group by
t1.day,
t2.pinpai,
t1.groupid,
grouping sets (
(t3.tag1),
(t3.tag2),
(t3.tag3),
(...),
(t3.tagn)
)
Explanation:
1. Aggregate uid into an array
array_agg(uid)
2. Convert UID to hll hash val and aggregate to HLL type
hll_add_agg(hll_hash_integer(uid))
3. To make statistics by each tag dimension, multi-dimensional analysis syntax “grouping sets” can be used instead of writing multiple SQL statements. This way, the data is only scanned once and is counted by each tag dimension.
grouping sets (
(t3.tag1),
(t3.tag2),
(t3.tag3),
(...),
(t3.tagn)
)
Stream Computing Results Pivoting Query
If you have complex pivoting, you can perform an array logic operation on different records of the analysis result to obtain the final UID set result.
I. Array Logic Operation
1. The value in array 1 but not in array 2
create or replace function arr_miner(anyarray, anyarray) returns anyarray as
$$
select array(select * from (select unnest($1) except select unnest($2)) t group by 1); $$
language sql strict;
2. The intersection of array 1 and array 2
create or replace function arr_overlap(anyarray, anyarray) returns anyarray as
$$
select array(select * from (select unnest($1) intersect select unnest($2)) t group by 1); $$
language sql strict;
3. The union of array 1 and array 2
create or replace function arr_merge(anyarray, anyarray) returns anyarray as
$$
select array(select unnest(array_cat($1,$2)) group by 1); $$
language sql strict;
For example, the collection of users before the promotion (2017–06–24) is UID1[], and the collection of users after the promotion (2017–06–25) is UID2[]. And, you want to know the new users obtained through the promotion.
Use arr_miner(uid2[], uid1[])
to get the result.
II. We use the HLL type, which supports the logical computation of data
1. Compute the number of unique values
hll_cardinality(users)
2. Compute the union of two HLL to obtain an HLL
hll_union()
For example, the user collection HLL before the promotion (2017–06–24) is uid1_hll, and the user collection HLL after the promotion (2017–06–25) is uid2_hll. And, you want to know the number of new users obtained through the promotion.
hll_cardinality(uid2_hll) - hll_cardinality(uid1_hll)
Stream Computing Scheduling
The business used to obtain perspective results through instant JOIN, but now we use the method of pre-statistics to obtain the results, and pre-statistics itself needs to be scheduled.
The scheduling method depends on the source of the data and the method of the data merging (stream increments or batch increments)
I. Data Is Counted on a Daily Basis
Historical statistics are not updated and only data is added.
Write and merge statistical results to the t_result results table at regular intervals.
insert into t_result
select
t1.day,
t2.pinpai,
t1.groupid,
t3.tag1,
t3.tag2,
t3.tag3,
...
count(1) as cnt ,
array_agg(uid) as uids ,
hll_add_agg(hll_hash_integer(uid)) as hll_uids
from
t1,
t2,
t3
where
t1.uid = t3.uid
and t1.uid = t2.uid
group by
t1.day,
t2.pinpai,
t1.groupid,
grouping sets (
(t3.tag1),
(t3.tag2),
(t3.tag3),
(...),
(t3.tagn)
)
II. Merge Statistical Dimension Data
The daily statistical results are only the results counted by day. If you want to query monthly or yearly statistics, you need to query and aggregate the data of the day.
The business can also choose asynchronous aggregation, and the end user queries the aggregated results.
t_result_month (
month, -- yyyy-mm
pinpai, -- the brand ID
groupid, -- the channel, area, and store ID
tag1, -- tag type 1
tag2, -- tag type 2
tag3, -- tag type 3
... -- tag type N
cnt, -- number of users
UIDs, -- user ID array. This is an optional field, and if you do not need to know the ID details, there is no need to save it
hll_uids -- user HLL valuation
)
Array aggregation requires customizing an aggregate function
postgres=# create aggregate arragg (anyarray) ( sfunc=arr_merge, stype=anyarray);
CREATE AGGREGATE
postgres=# select arragg(c1) from (values (array[1,2,3]),(array[2,5,6])) t (c1);
arragg
-------------
{6,3,2,1,5}
(1 row)
Aggregate SQL by month as follows:
select
to_char(day, 'yyyy-mm'),
pinpai,
groupid,
tag1,
tag2,
tag3,
...
array_length(arragg(uid),1) as cnt,
arragg(uid) as uids,
hll_union_agg() as hll_uids
from t_result
group by
to_char(day, 'yyyy-mm'),
pinpai,
groupid,
tag1,
tag2,
tag3,
...
Gather by year and so on.
III. Stream Scheduling
If the business has real-time statistical requirements, you can use the stream computation method to perform the above aggregation statistics in real time.
If the data size is very large, you can divide the data according to the partition key. Different data falls to different StreamCompute nodes. Finally, the result of the StreamCompute is summarized in HybridDB (base on GPDB).
Summary
- For pivoting analysis requirements, use the flip method to stream compute the data according to the query requirements to obtain statistical results, so that you only need to query the calculation results when you pivot them, and the response speed within 100 ms can be achieved for any dimension perspective.
- Using GROUPING SETS, you can use one-time statistics for multiple tag dimensions, thus reducing repeated scanning and computation of data to significantly improve processing efficiency.
- Using arrays, you can record the UID of each pivoting dimension to support not only the pivoting but also the needs of target selection. At the same time, it supports more complicated pivoting requirements in the future.
- The HLL type is used to store the estimated value. For complex pivotings, HLL can be used. For example, the values of multiple HLLs can be united and the number of unique values can be computed. It is usually used to evaluate UV and add UV.
- Use StreamCompute. If the data requires real-time statistics, you can use pipelineDB for streaming analysis to compute the statistical results in real time. (pipelineDB is being converted into a plug-in and will be more convenient to use in the future)
- In combination with cloud components of Alibaba Cloud, the OSS object is used to store transition data (original data) and the OSS_FDW external table is used to dock with the OSS, so transition data can be used for stream computing only without being stored in the database. This significantly reduces the write and space requirements of the database.
- Using the level 1 and level 2 partitions of Greenplum, the access requirements for pivoting data are scattered to smaller units, and then tag indexes are used to reduce the scope of data search again, thus realizing response within 100 ms for any data volume and any dimension pivoting request.
- Use column storage to increase compression ratio and save space for statistical data.