Tagging Trillions of Users in Milliseconds with varbitx on PostgreSQL

Alibaba Cloud
9 min readJan 21, 2019

By Digoal

varbitx is an extension provided by Alibaba Cloud ApsaraDB for RDS PostgreSQL for BIT operations. This extension enables users to tag people with trillions of user tags in milliseconds, providing real-time solutions.

Async batch consume with atomicity stream/batch processing and schemaless UDF allow users to efficiently add/delete tags, and to tag people based on their tags in milliseconds.

The target application scenario has billions of user data records and millions of tags. In this article, we will explore Alibaba Cloud RDS PostgreSQL varbitx stream tags (async batch consume with atomicity stream/batch computing) for tagging trillions of users with any tags in milliseconds.

Architecture Diagram

Functional Interfaces

Add Tags with Alibaba Cloud varbitx

Using set_bit_array for one bit.

set_bit_array (  
varbit,
int, -- Target BIT (0|1)
int, -- Filled BIT (0|1)
int[] -- Target position
) returns varbit

Sets bits at the specified position to 0|1 (start bit=0), and fills digits exceeding the original length with 0|1
For example, set_bit_array('111100001111', 0, 1, array[1,15]) returns 1011000011111110

This function handles the addition and deletion of bits simultaneously

Using set_bit_array for multiple bits.

set_bit_array (  
varbit,
int, -- Target BIT 1 (0|1)
int[] -- Target position 1
int, -- Target BIT 2 (0|1)
int[], -- Target position 2
int -- Filled BIT (0|1)
) returns varbit

Sets bits at the specified position to 0|1 (start bit=0), and fills digits exceeding the original length with 0|1
For example, set_bit_array('111100001111', 0, array[1,15], 1, array[0,4], 0) returns 1011100011111110

varbitx Obtaining Dictionary ID from Bitmap

bit_posite

bit_posite (    
varbit,
int, -- (0|1)
boolean
) returns int[]

Returns positions of 0|1 (start bit=0) in a positive sequence if true, and in a negative sequence if false
For example, bit_posite ('11110010011', 1, true) returns [0,1,2,3,6,9,10]
bit_posite ('11110010011', 1, false) returns [10,9,6,3,2,1,0]

Obtain USERID from Dictionary ID

select uid from dict where id = any ( bit_posite(x,x,x) );

Demo Application

Dictionary Table

Converts USERIDs to array subscripts, namely a dictionary table

First, we need to use seamlessly auto-incrementing IDs. Create a dictionary table as follows:

create table dict(id int8 primary key, uid int8 not null);    

create unique index idx_dict_uid on dict (uid);

Create Existing Users

create table t_uid (  
uid int8 primary key -- USER ID of the existing user
);

Insert a set number of USERIDs (100 million)

insert into t_uid select id from generate_series(1,100000000) t(id) order by random() ;

Create Mapping Subscripts for Existing Users All at Once

create sequence seq minvalue 0 start 0;    


insert into dict select nextval('seq'), uid from t_uid;


select min(id),max(id),count(*) from dict;
min | max | count
-----+----------+-----------
0 | 99999999 | 100000000
(1 row)

Seamlessly Auto-Incrementing Subscripts Function

For new users, use this function to ensure auto-incrementing subscripts:

create or replace function f_uniq(i_uid int8) returns int as 
$$

declare
newid int;
i int := 0;
res int;
begin
loop
if i>0 then
perform pg_sleep(0.2*random());
else
i := i+1;
end if;

-- Obtain the existing maximum ID+1 (that is the ID to be inserted)
select max(id)+1 into newid from dict;
if newid is not null then
-- Obtains AD LOCK
if pg_try_advisory_xact_lock(newid) then
-- Insert
insert into dict (id,uid) values (newid,i_uid) ;
-- Returns the UID
return newid;
else
-- Continues if no AD LOCK is obtained
continue;
end if;
else
-- Indicates that this is the first record, and obtains the LOCK with AD=0
if pg_try_advisory_xact_lock(0) then
insert into dict (id, uid) values (0, i_uid) ;
return 0;
else
continue;
end if;
end if;
end loop;

-- Continues calling if a PK conflict occurs at a transient state
exception when others then
select f_uniq(i_uid) into res;
return res;
end;
$$
language plpgsql strict;

For example, a USERID is added.

select f_uniq(?);

Obtain USERID from ID or Obtain ID from USERID

create or replace function get_id_from_uid (int8) returns int8 as 
$$

select id from dict where uid=$1;
$$
language sql strict;

create or replace function get_uid_from_id (int8) returns int8 as
$$

select uid from dict where id=$1;
$$
language sql strict;

Tag Description Table

create table t_tags(tagid int primary key, desc text);

Stream Tag Table

In a UID mapping table, IDs within different ranges are mapped to table names with different suffixes.

We recommend that you automatically write logs to different t_tag_log tables. This avoids filtering upon consumption.

create table t_mapping (  
dict_id int8range, -- Dictionary ID range
suffix text unique -- Table name suffix
);

alter table t_mapping add constraint ck_exclude_dict_id exclude using gist(dict_id with &&); -- Avoids intersection between different ranges
insert into t_mapping values (int8range(0,100000000), '0');
insert into t_mapping values (int8range(100000000,200000000), '1');
insert into t_mapping values (int8range(200000000,300000000), '2');
insert into t_mapping values (int8range(300000000,400000000), '3');

Stream tag master table

create table t_tag_log (  
dict_id int8, -- User subscript ID
action int2, -- Deletes 0, adds 1
tagid int, -- Tag ID
crt_time timestamp -- Time
);

create index idx_t_tag_log on t_tag_log (crt_time);

Create a stream tag subtable. Partitions by tagid, namely by the mapping relations between IDs and t_mapping suffixes.

do language plpgsql 
$$

declare
x text;
begin
for i in 0..63 loop
for x in select suffix from t_mapping loop
execute format('create table t_tag_log_%s_%s (like t_tag_log including all) inherits (t_tag_log)', i, x);
end loop;
end loop;
end;
$$
;

postgres=# \dt t_tag_log_*
List of relations
Schema | Name | Type | Owner
--------+----------------+-------+----------
public | t_tag_log_0_0 | table | postgres
public | t_tag_log_0_1 | table | postgres
public | t_tag_log_0_2 | table | postgres
public | t_tag_log_0_3 | table | postgres
public | t_tag_log_10_0 | table | postgres
public | t_tag_log_10_1 | table | postgres
public | t_tag_log_10_2 | table | postgres
public | t_tag_log_10_3 | table | postgres
public | t_tag_log_11_0 | table | postgres
public | t_tag_log_11_1 | table | postgres
.....................

Use schemaless UDF to write the tag information

create or replace function ins_tag(v_uid int8, v_action int2, v_tagid int, v_crt_time timestamp) returns void as 
$$

declare
i int := mod(v_tagid, 64);
x text;
begin
select suffix into x from t_mapping where dict_id @> $1;
execute format ('insert into t_tag_log_%s_%s (dict_id, action, tagid, crt_time) values (%s, %s, %s, %L)', i, x, get_id_from_uid(v_uid), v_action, v_tagid, v_crt_time);
end;
$$
language plpgsql strict;

High-Speed Writing/Deleting Tags

vi test.sql  

\set uid random(1,100000000)
\set tagid random(1,100000)
\set action random(0,1)
select ins_tag (:uid::int8, :action::int2, :tagid, now()::timestamp);

pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120

Entry-by-entry write speed for a single table

Batch write speed for a single table or multiple tables can exceed 1 million rows/s.

transaction type: ./test.sql  
scaling factor: 1
query mode: prepared
number of clients: 32
number of threads: 32
duration: 120 s
number of transactions actually processed: 19013298
latency average = 0.202 ms
latency stddev = 0.267 ms
tps = 158442.952245 (including connections establishing)
tps = 158449.386772 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.001 \set uid random(1,100000000)
0.000 \set tagid random(1,100000)
0.000 \set action random(0,1)
0.200 insert into t_tag_log (dict_id, action, tagid, crt_time) values (get_id_from_uid(:uid), :action, :tagid, now());

Tag Reversing Table

The above splitting modes correspond to two storage methods: multi-table storage and single-table storage.

Multi-table storage

Tag reversing master table:

create table tag_userbitmap (  
tagid int primary key, -- Tag ID
userbitmap varbit -- User bitmap
);

Tag reversing subtable.

do language plpgsql 
$$

declare
x text;
begin
for x in select suffix from t_mapping loop
execute format('create table tag_userbitmap_%s (like tag_userbitmap including all) inherits (tag_userbitmap)', x);
end loop;
end;
$$
;

Tag partitioning is not required, because there are no row lock conflicts during incremental updates, and the table size is sufficient.

postgres=# \dt tag_userbitmap*  
List of relations
Schema | Name | Type | Owner
--------+------------------+-------+----------
public | tag_userbitmap | table | postgres
public | tag_userbitmap_0 | table | postgres
public | tag_userbitmap_1 | table | postgres
public | tag_userbitmap_2 | table | postgres
public | tag_userbitmap_3 | table | postgres
(5 rows)

Single-table storage

Add a column to differentiate OFFSET for single-table storage.

create table tag_userbitmap_single (
tagid int , -- Tag ID
bitmap_offset int , -- bit segment offset value
userbitmap varbit, -- bitmap for the current segment
primary key (tagid, bitmap_offset)
);

Async Batch Consume with Atomicity — Stream Consumption Tags Are Combined to the Tag Reversing Table

Consume without considering partition tables.

with tmp as (delete from t_tag_log     -- Query for table  
where ctid = any ( array (
select ctid from t_tag_log order by crt_time limit 100000 -- Batch processes 100,000 entries
)) returning *
)
select tagid, action, array_agg(dict_id) as dict_id from
(
select row_number() over w1 as rn, *
from tmp
window w1 as (partition by dict_id, tagid order by crt_time desc) -- Retrieves the last entry for each ID and each tag
) t where rn=1
group by tagid, action -- Combined to a subscript array
;

Schemaless UDF, Async Batch Consume with Atomicity — Stream Consumption Tags Are Combined to the Tag Reversing Table

There are different schemaless UDFs for different tag_userbimap designs (single-table or multi-table).

Multi-table: automatically joins table names in the UDF, and combines BITMAP to the corresponding sub-table.

create or replace function merge_tags(v_limit int, v_suffix1 int, v_suffix2 int) returns void as 
$$

declare
v_tagid int;
v_action int2;
v_dict_id int[];
v_offset int;
v_fixed_dict_id int[];
begin
select substring(dict_id::text,'(\d+),') into v_offset from t_mapping where suffix=v_suffix2::text;

for v_tagid, v_action, v_dict_id in
execute format
('
with tmp as (delete from t_tag_log_%s_%s -- Query for table
where ctid = any ( array (
select ctid from t_tag_log_%s_%s order by crt_time limit %s -- Batch processes N entries
)) returning *
)
select tagid, action, array_agg(dict_id) as dict_id from
(
select row_number() over w1 as rn, *
from tmp
window w1 as (partition by dict_id, tagid order by crt_time desc) -- Retrieves the last entry for each ID and each tag
) t where rn=1
group by tagid, action
',
v_suffix1, v_suffix2, v_suffix1, v_suffix2, v_limit)

loop
select array(select unnest(v_dict_id)-v_offset) into v_fixed_dict_id;
-- raise notice '% ,% ,% ,%', v_tagid, v_action, v_dict_id, v_fixed_dict_id;
execute format('insert into tag_userbitmap_%s (tagid, usrebitmap) values (%s, set_bit_array(''0'', %s, %s, %L))
on conflict (tagid)
do update set usrebitmap=set_bit_array(tag_userbitmap_%s.usrebitmap, %s, %s, %L)',
v_suffix2, v_tagid, v_action, 0, v_fixed_dict_id, v_suffix2, v_action, 0, v_fixed_dict_id);
end loop;
end;
$$
language plpgsql strict;

Single table:

create or replace function merge_tags(v_limit int, v_suffix1 int, v_suffix2 int) returns void as 
$$

declare
v_tagid int;
v_action int2;
v_dict_id int[];
v_offset int;
v_fixed_dict_id int[];
begin
select substring(dict_id::text,'(\d+),') into v_offset from t_mapping where suffix=v_suffix2::text;

for v_tagid, v_action, v_dict_id in
execute format
('
with tmp as (delete from t_tag_log_%s_%s -- Query for table
where ctid = any ( array (
select ctid from t_tag_log_%s_%s order by crt_time limit %s -- Batch processes N entries
)) returning *
)
select tagid, action, array_agg(dict_id) as dict_id from
(
select row_number() over w1 as rn, *
from tmp
window w1 as (partition by dict_id, tagid order by crt_time desc) -- Retrieves the last entry for each ID and each tag
) t where rn=1
group by tagid, action
',
v_suffix1, v_suffix2, v_suffix1, v_suffix2, v_limit)

loop
select array(select unnest(v_dict_id)-v_offset) into v_fixed_dict_id;
-- raise notice '% ,% ,% ,%', v_tagid, v_action, v_dict_id, v_fixed_dict_id;
execute format('insert into tag_userbitmap (tagid, bitmap_offset, usrebitmap) values (%s, %s, set_bit_array(''0'', %s, %s, %L))
on conflict (tagid, bitmap_offset)
do update set usrebitmap=set_bit_array(tag_userbitmap.usrebitmap, %s, %s, %L)',
v_suffix2, v_suffix1, v_action, 0, v_fixed_dict_id, v_action, 0, v_fixed_dict_id);
end loop;
end;
$$
language plpgsql strict;

Merge tags:

Functional interfaces for single-table and multi-table storage modes are the same. They all call the merge_tags function and update tags with incremental data. Parallel calling is supported for different combinations.

The following queries can be called in parallel:

select merge_tags(100000, 0, 0);  
select merge_tags(100000, 0, 1);
select merge_tags(100000, 0, 2);
select merge_tags(100000, 0, 3);
.....
select merge_tags(100000, 63, 0);
select merge_tags(100000, 63, 1);
select merge_tags(100000, 63, 2);
select merge_tags(100000, 63, 3);

Query for User IDs of Tags

This SQL statement can be modified slightly by including coalesce, and padding the varbit length to the fixed length.

full outer JOIN gets the final varbit.

Determine the dict_id based on the bitmap, and then the user_id based on the dict_id.

For multiple-table storage:select bit_posite(t1.usrebitmap||t2.usrebitmap||t3.usrebitmap||t4.usrebitmap, '1', true) 
from tag_userbitmap_0 t1 , ....
where tx.tagid in (....);
For single-table storage:create aggregate bit_agg (varbit order by offset) (sfunc = bitcat, stype=varbit) ;
select bit_posite(bit_and(tag), '1', true) from (
select bit_agg(usrebitmap) from tag_userbitmap where tagid in (...) group by tagid
) t

Other Procedures

You can optimize your database tagging further with the following procedures:

  1. Maintain the dictionary table (delete inactive users)
  2. Shrink VARBITX at the same time

Query for Tags of a User

We recommend that you use the following structure, which uses arrays as tags. This structure contrasts with this article.

create table t_user_tags(   
uid int8 primary key, -- User ID
tagid int[] -- Tag ID
);

To learn more about PostgreSQL on Alibaba Cloud, visit https://www.alibabacloud.com/product/apsaradb-for-rds-postgresql

To read more blogs from Digoal or to find related source codes, visit https://github.com/digoal/blog/blob/master/README.md

Reference:https://www.alibabacloud.com/blog/tagging-trillions-of-users-in-milliseconds-with-varbitx-on-postgresql_594370?spm=a2c41.12516335.0.0

--

--

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com