Millisecond Marketing with Trillions of User Tags Using PostgreSQL

Image for post
Image for post

Background

By taking advantage of just two small PostgreSQL functions, we are able to solve one of the biggest long-term problems plaguing the industry.

Requirements of E-commerce Recommendation Systems

For example, how does a store discovers target customers?

Reach and Magnitude

The number of e-shoppers may reach several billion worldwide, and stores could reach the order of tens of millions.

Efficient Database Design

First, we need to organize key data.

Designing table structure

1.Generate IDs for stores and products.

class int,    -- Dimension, corresponding to s1, s2, s3, s4 in a user tag table
id int, -- Offset amount (or enumeration value)
description -- Description (such as 1-10,000, 10,001-100,000, etc.)
uid int primary key,  -- User ID  
s1 int[], -- Browsed stores and time ranges (store ID hash + range table ID)
s2 int[], -- Browsed products and time ranges (product ID hash + range table ID)
s3 int[], -- Purchased products and quantity ranges (product ID hash + range table ID)
s4 int[], -- ... Other dimensions and the like
Time interval 1, ... -- e.g. 1 day: count each day and write the data into the table
Step corresponding to track s1
1 -> 0
2 -> 1-10
3 -> 11-100
4 -> 101-500
5 -> 501-
...
9 -> 10000+
userid|s1|s2|s3 1|{'1:2', '109:9'}|{'2:2', '88:19'}|{'2:1', '88:2'}
Meaning:
• User ID: 1,
• Browsed store 1 (step=2) and store 109 (step 9),
• Browsed product 2 (step=2) and product 88 (step 19),
• Purchased product 2 (step=1) and product 88 (step 2).
Starting ID of new value = new_start_val = 1                  -- This constant can be freely defined but cannot be changed once it has been set.
Corresponding step (such as the number of steps for the traffic to the store) = step = 9 -- This constant can be freely defined (the number of steps of each dimension can be different), and cannot be changed once it is set.
Store ID = dp_id -- The original store ID
int/int8 new value = new_val -- The new generated value with two meanings (product ID and number of steps)
If the store ID is known, calculate new_val (a write and query process):$new_val = new_start_val + (step+1)*(dp_id-1)If new_val is known, calculate the store ID (a translation process):$dp_id = 1 + (new_val-new_start_val)/(step+1)
Browsed store ID=1, indicating 1 step
Browsed store ID=192, indicating 15 steps
Use the information above, the constant, and the formula to generate the new_val for the array: {1, 3821}Translate the store ID based on the array above, constant, and formula: {1, 192}

Performance indicators

Browsed stores and products, purchased products, and the billions of users tracked over the specified time interval are all aggregated into a tag array to generate over a trillion user tags.

Real-time Design

We’ve already explained how to efficiently obtain user data, now let’s look at how we can update user tags in real time.

Stream processing

The goal of stream processing is to update user tags in real time. For example if a user generates hundreds of bytes of browsing records every second, then these records need to be merged into the appropriate tags.

Why does stream processing need to be completed in the database?

We know that target customer discovery can only be completed when the tag data has been submitted to the database, so if we don’t use stream calculation, rather use frames like JSTROM, then a layer of updates will be ignored. For example, 100 billion updates could become only 100 million.

Stress Testing

For the stress testing phase I chose a machine with 32 cores, 2 SSD cards, and 512GB memory.

Example 1

There are 10 tables, each with 10 million users, and 4 tag fields stored with tsvector.

postgres=# create tablespace tbs1 location '/u01/digoal/tbs1';  
CREATE TABLESPACE

postgres=# create tablespace tbs2 location '/u02/digoal/tbs2';
CREATE TABLESPACE

do language plpgsql $$
declare
i int;
suffix text;
tbs text;
begin
for i in 0..10 loop
if i=0 then
suffix := '';
tbs := 'tbs1';
elsif i >=1 and i<=5 then
suffix := i::text;
tbs := 'tbs1';
else
suffix := i::text;
tbs := 'tbs2';
end if;
if i=0 then
execute 'create unlogged table test'||suffix||'(uid int primary key USING INDEX TABLESPACE '||tbs||', s1 tsvector, s2 tsvector, s3 tsvector, s4 tsvector) with (autovacuum_enabled=off, toast.autovacuum_enabled=off) tablespace '||tbs;
else
execute 'create unlogged table test'||suffix||'(uid int primary key USING INDEX TABLESPACE '||tbs||', s1 tsvector, s2 tsvector, s3 tsvector, s4 tsvector) inherits(test) with (autovacuum_enabled=off, toast.autovacuum_enabled=off) tablespace '||tbs;
end if;
execute 'create index idx_test'||suffix||'_s1 on test'||suffix||' using rum(s1 rum_tsvector_ops) tablespace '||tbs;
execute 'create index idx_test'||suffix||'_s2 on test'||suffix||' using rum(s2 rum_tsvector_ops) tablespace '||tbs;
execute 'create index idx_test'||suffix||'_s3 on test'||suffix||' using rum(s3 rum_tsvector_ops) tablespace '||tbs;
execute 'create index idx_test'||suffix||'_s4 on test'||suffix||' using rum(s4 rum_tsvector_ops) tablespace '||tbs;
end loop;
end;
$$;

select relname,reltablespace from pg_class where relname ~ 'test' order by 2,1;
vi test.sql  

\set uid1 random(1,10000000)
\set uid2 random(10000001,20000000)
\set uid3 random(20000001,30000000)
\set uid4 random(30000001,40000000)
\set uid5 random(40000001,50000000)
\set uid6 random(50000001,60000000)
\set uid7 random(60000001,70000000)
\set uid8 random(70000001,80000000)
\set uid9 random(80000001,90000000)
\set uid10 random(90000001,100000000)
insert into test1 (uid,s1,s2,s3,s4) select :uid1+id, (select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)) from generate_series(1,1000) t(id) on conflict do nothing;
insert into test2 (uid,s1,s2,s3,s4) select :uid2+id, (select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)) from generate_series(1,1000) t(id) on conflict do nothing;
insert into test3 (uid,s1,s2,s3,s4) select :uid3+id, (select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)) from generate_series(1,1000) t(id) on conflict do nothing;
insert into test4 (uid,s1,s2,s3,s4) select :uid4+id, (select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)) from generate_series(1,1000) t(id) on conflict do nothing;
insert into test5 (uid,s1,s2,s3,s4) select :uid5+id, (select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)) from generate_series(1,1000) t(id) on conflict do nothing;
insert into test6 (uid,s1,s2,s3,s4) select :uid6+id, (select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)) from generate_series(1,1000) t(id) on conflict do nothing;
insert into test7 (uid,s1,s2,s3,s4) select :uid7+id, (select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)) from generate_series(1,1000) t(id) on conflict do nothing;
insert into test8 (uid,s1,s2,s3,s4) select :uid8+id, (select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)) from generate_series(1,1000) t(id) on conflict do nothing;
insert into test9 (uid,s1,s2,s3,s4) select :uid9+id, (select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)) from generate_series(1,1000) t(id) on conflict do nothing;
insert into test10 (uid,s1,s2,s3,s4) select :uid10+id, (select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)),(select array_to_tsvector(array_agg(trunc(5000000*random())||'_'||trunc(20*random()))) from generate_series(1,1000)) from generate_series(1,1000) t(id) on conflict do nothing;


nohup pgbench -M prepared -n -r -P 1 -f ./test.sql -c 64 -j 64 -T 1000000 >/dev/null 2>&1 &

Example 2

There are 10 tables, each with 10 million users, and 4 tag fields stored in a text[].

do language plpgsql $$  
declare
i int;
suffix text;
tbs text;
begin
for i in 0..10 loop
if i=0 then
suffix := '';
tbs := 'tbs1';
elsif i >=1 and i<=5 then
suffix := i::text;
tbs := 'tbs1';
else
suffix := i::text;
tbs := 'tbs2';
end if;
if i=0 then
execute 'create unlogged table test'||suffix||'(uid int primary key USING INDEX TABLESPACE '||tbs||', s1 text[], s2 text[], s3 text[], s4 text[]) with (autovacuum_enabled=off, toast.autovacuum_enabled=off) tablespace '||tbs;
else
execute 'create unlogged table test'||suffix||'(uid int primary key USING INDEX TABLESPACE '||tbs||', s1 text[], s2 text[], s3 text[], s4 text[]) inherits(test) with (autovacuum_enabled=off, toast.autovacuum_enabled=off) tablespace '||tbs;
end if;
execute 'create index idx_test'||suffix||'_s1 on test'||suffix||' using gin(s1 ) tablespace '||tbs;
execute 'create index idx_test'||suffix||'_s2 on test'||suffix||' using gin(s2 ) tablespace '||tbs;
execute 'create index idx_test'||suffix||'_s3 on test'||suffix||' using gin(s3 ) tablespace '||tbs;
execute 'create index idx_test'||suffix||'_s4 on test'||suffix||' using gin(s4 ) tablespace '||tbs;
end loop;
end;
$$;

select relname,reltablespace from pg_class where relname ~ 'test' order by 2,1;

Example 3 (pressure testing)

There are 64 partition tables, each with 5 million records. An int array is used to store a total of 4 million tags, and each user has 4,000 random tags to ensure that there are enough target customers.

alter role postgres set gin_pending_list_limit='128MB';  

do language plpgsql $$
declare
i int;
suffix text;
tbs text;
begin
for i in 0..64 loop
if i=0 then
suffix := '';
tbs := 'tbs1';
elsif i >=1 and i<=32 then
suffix := i::text;
tbs := 'tbs1';
else
suffix := i::text;
tbs := 'tbs2';
end if;
if i=0 then
execute 'create unlogged table test'||suffix||'(uid int primary key USING INDEX TABLESPACE '||tbs||', s1 int[], s2 int[], s3 int[], s4 int[]) with (autovacuum_enabled=off, toast.autovacuum_enabled=off) tablespace '||tbs;
else
execute 'create unlogged table test'||suffix||'(uid int primary key USING INDEX TABLESPACE '||tbs||', s1 int[], s2 int[], s3 int[], s4 int[]) inherits(test) with (autovacuum_enabled=off, toast.autovacuum_enabled=off) tablespace '||tbs;
end if;
execute 'create index idx_test'||suffix||'_s1 on test'||suffix||' using gin(s1 ) tablespace '||tbs;
execute 'create index idx_test'||suffix||'_s2 on test'||suffix||' using gin(s2 ) tablespace '||tbs;
execute 'create index idx_test'||suffix||'_s3 on test'||suffix||' using gin(s3 ) tablespace '||tbs;
execute 'create index idx_test'||suffix||'_s4 on test'||suffix||' using gin(s4 ) tablespace '||tbs;
end loop;
end;
$$;

select relname,reltablespace from pg_class where relname ~ 'test' order by 2,1;
vi test1.sh  

for ((i=1;i<=64;i++))
do
echo "\set uid random($((($i-1)*5000000+1)),$(($i*5000000)))" > test$i.sql

echo "insert into test$i (uid,s1,s2,s3,s4) select :uid, (select array_agg(trunc(random()*4000000)) from generate_series(1,1000)) s1,(select array_agg(trunc(random()*4000000)) from generate_series(1,1000)) s2,(select array_agg(trunc(random()*4000000)) from generate_series(1,1000)) s3, (select array_agg(trunc(random()*4000000)) from generate_series(1,1000)) s4 on conflict do nothing;" >> test$i.sql

done

. ./test1.sh
vi test2.sh  

for ((i=1;i<=64;i++))
do
nohup pgbench -M prepared -n -r -P 1 -f ./test$i.sql -c 1 -j 1 -T 1000000 >/dev/null 2>&1 &
done

. ./test2.sh

Target customer discovery requirements — performance testing

Carry out pressure testing on example 3

postgres=# begin;
BEGIN
Time: 0.030 ms
postgres=# declare a cursor for select uid from test where s1 @> array[1] and s2 @> array[4];
DECLARE CURSOR
Time: 6.679 ms
postgres=# fetch 100 in a;
uid
-----------
19246842
118611240
148504032
185844649
(4 rows)
Time: 101.041 ms
postgres=# begin;
BEGIN
postgres=# declare a cursor for select uid from test where s1 @> array[1] or s2 @> array[4];
DECLARE CURSOR
Time: 3.484 ms
postgres=# fetch 100 in a;
uid
---------
2911941
2373506
.....
29713
3353782
2836804
1602067
(100 rows)
Time: 3.892 ms
postgres=# fetch 100 in a;
uid
---------
384170
1332271
4282941
......
1190946
4524861
1110635
(100 rows)
Time: 4.005 ms
postgres=# explain (analyze,verbose,timing,costs,buffers) select uid from test where s1 @> array[1];
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------
Append (cost=0.00..233541.24 rows=206876 width=4) (actual time=0.081..108.037 rows=15221 loops=1)
Buffers: shared hit=60641
-> Seq Scan on public.test (cost=0.00..0.00 rows=1 width=4) (actual time=0.001..0.001 rows=0 loops=1)
Output: test.uid
Filter: (test.s1 @> '{1}'::integer[])
-> Bitmap Heap Scan on public.test1 (cost=33.71..2901.56 rows=3188 width=4) (actual time=0.078..0.381 rows=242 loops=1)
Output: test1.uid
Recheck Cond: (test1.s1 @> '{1}'::integer[])
Heap Blocks: exact=238
Buffers: shared hit=243
-> Bitmap Index Scan on idx_test1_s1 (cost=0.00..32.91 rows=3188 width=0) (actual time=0.049..0.049 rows=242 loops=1)
Index Cond: (test1.s1 @> '{1}'::integer[])
Buffers: shared hit=5
... 62 tables in the middle are ignored. -> Bitmap Heap Scan on public.test64 (cost=34.00..2935.31 rows=3225 width=4) (actual time=0.068..0.327 rows=214 loops=1)
Output: test64.uid
Recheck Cond: (test64.s1 @> '{1}'::integer[])
Heap Blocks: exact=211
Buffers: shared hit=216
-> Bitmap Index Scan on idx_test64_s1 (cost=0.00..33.19 rows=3225 width=0) (actual time=0.041..0.041 rows=214 loops=1)
Index Cond: (test64.s1 @> '{1}'::integer[])
Buffers: shared hit=5
Planning time: 2.016 ms
Execution time: 109.400 ms
(519 rows)
Time: 113.216 ms
postgres=# explain (analyze,verbose,timing,costs,buffers) select uid from test1 where s1 @> array[1];
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------
Bitmap Heap Scan on public.test1 (cost=33.71..2901.56 rows=3188 width=4) (actual time=0.085..0.383 rows=242 loops=1)
Output: uid
Recheck Cond: (test1.s1 @> '{1}'::integer[])
Heap Blocks: exact=238
Buffers: shared hit=243
-> Bitmap Index Scan on idx_test1_s1 (cost=0.00..32.91 rows=3188 width=0) (actual time=0.051..0.051 rows=242 loops=1)
Index Cond: (test1.s1 @> '{1}'::integer[])
Buffers: shared hit=5
Planning time: 0.097 ms
Execution time: 0.423 ms
(10 rows)
Time: 1.011 ms

Sharding

When we are dealing with several billions of users, we can shard them by user ID and then use multiple hosts.

Requirements and Performance for Target Customer Discovery

This requirement falls into the realm of PostgreSQL. Actually, it is simply a location-based KNN query. PostgreSQL can easily meet this requirement via GiST index.

postgres=# explain (analyze,verbose,timing,costs,buffers) select * from t order by id limit 10;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------
Limit (cost=0.72..1.13 rows=10 width=4) (actual time=0.158..0.165 rows=10 loops=1)
Output: t.id
Buffers: shared hit=3 read=4
-> Merge Append (cost=0.72..819.74 rows=20001 width=4) (actual time=0.157..0.162 rows=10 loops=1)
Sort Key: t.id
Buffers: shared hit=3 read=4
-> Index Only Scan using idx on public.t (cost=0.12..2.14 rows=1 width=4) (actual time=0.003..0.003 rows=0 loops=1)
Output: t.id
Heap Fetches: 0
Buffers: shared hit=1
-> Index Only Scan using idx1 on public.t1 (cost=0.29..225.28 rows=10000 width=4) (actual time=0.107..0.107 rows=6 loops=1)
Output: t1.id
Heap Fetches: 6
Buffers: shared hit=1 read=2
-> Index Only Scan using idx2 on public.t2 (cost=0.29..225.28 rows=10000 width=4) (actual time=0.043..0.044 rows=5 loops=1)
Output: t2.id
Heap Fetches: 5
Buffers: shared hit=1 read=2
Planning time: 0.181 ms
Execution time: 0.219 ms
(20 rows)
postgres=# explain (analyze,verbose,timing,costs,buffers) select * from t order by id ;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------
Merge Append (cost=0.72..819.74 rows=20001 width=4) (actual time=0.043..10.324 rows=20000 loops=1)
Sort Key: t.id
Buffers: shared hit=149
-> Index Only Scan using idx on public.t (cost=0.12..2.14 rows=1 width=4) (actual time=0.004..0.004 rows=0 loops=1)
Output: t.id
Heap Fetches: 0
Buffers: shared hit=1
-> Index Only Scan using idx1 on public.t1 (cost=0.29..225.28 rows=10000 width=4) (actual time=0.021..3.266 rows=10000 loops=1)
Output: t1.id
Heap Fetches: 10000
Buffers: shared hit=74
-> Index Only Scan using idx2 on public.t2 (cost=0.29..225.28 rows=10000 width=4) (actual time=0.017..3.309 rows=10000 loops=1)
Output: t2.id
Heap Fetches: 10000
Buffers: shared hit=74
Planning time: 0.175 ms
Execution time: 11.791 ms
(17 rows)

Summary

Let’s get back to three core questions of the target customer system.

Related Articles

Application of Stream Processing for the “Internet of Things (IoT)” — Use PostgreSQL for Real-time Processing (Trillions Each Day)

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store