Applying PostgreSQL Graph Database to Social Scenarios

Alibaba Cloud
31 min readJul 22, 2019

--

By Digoal

Background

Humans are social animals. As the population grows, the methods of communication become more and more borderless. A huge network of relations has been formed between people-people, people-event, and people-time.

Many scenarios are based on this huge network of relations. For example:

1. Headhunting

IT personnel, headhunters and HR are no strangers to LinkedIn. LinkedIn is actually a website that maintains interpersonal relations.

By searching your 1-depth contacts, you can find people who are directly related to you, and by searching your 2-depth contacts, you can find people who are indirectly related to you.

Of course, you can continue to search for N-depth contacts, but those may not be so relevant to you.

If you knew that you are only a few depths away from actor Fan Bingbing, would you get a little a bit excited?

In fact, since ancient times, this kind of Social Relation already existed, as well as this kind of specialized occupation. Buying official positions, selling official positions, and other acts actually all depend on networking contacts. After reading the Dream of Red Mansions, you may be amazed at how many relatives this family has!

2. Criminal investigation

Public Security criminal investigation is also an application related to networking contacts. However, the relations and behaviors are getting more and more complex nowadays, as is the relations between people. In ancient times, the range of people that can be reached basically depends on two legs, plus a horse at most.

Now, mobile phones, computers, ATM machines, supermarkets, cameras, cars, and so on are all connected through the road network and the Internet.

The relations generated by someone’s behavior are more complicated. The difficulty of criminal investigation becomes more and more complicated if relation analysis is only performed manually.

3. Financial risk control

For example, when a bank reviews loan qualifications, it usually needs to check whether the applicant has the ability to repay, whether any false information was provided, as well as the behaviors & habits, assets, circle of friends and so on. It also involves complex analysis of relations between people, relations of human behaviors, and so on.

Pictures are from the Internet

Such human-centered and event-related businesses gave rise to Graph Databases

Currently, graph databases, such as neo4j, are commonly used.

For more information, see

https://en.wikipedia.org/wiki/Graph_database

PostgreSQL is a full-featured database. And, PostgreSQL is used in the background of some graph databases, such as OpenCog, and Cayley.

In addition, PostgreSQL is also very mature in relation query and relation management.

This article reveals how PostgreSQL meets the requirements of financial risk control, criminal investigation, social relations and human relations.

Graph Data Model

In the design of many graph databases, events or people are used as nodes, and if a relation exists between events or people, the relation is established.

In PostgreSQL, we can use two columns to represent this relation. Each column represents an event or a person. If the two have a relation, a record is created.

This representation is adopted to keep consistent with the graph data.

Of course, optimization methods, such as arrays, can also be used. In PostgreSQL, other events or people associated with an event are stored as arrays, which can improve retrieval efficiency.

Can PostgreSQL Recursive Queries Meet the Needs of Relation Derivation?

PostgreSQL recursive query is an excellent feature.

Relations can be circular. Therefore, due to the problem of loops, all relations found before should be excluded each time in derivation, which is currently supported for the PostgreSQL recursion.

For example, I need such a syntax to support the exclusion of loops

postgres=# with recursive s as ( 
select c1,c2 from a where c1=1
union all
select a.c1,a.c2 from a join s on (a.c1=s.c2) where a.c1 not in (with t as (insert into tmp select * from s) select c2 from tmp ) and s.* is not null
)
select * from s;
ERROR: 42P19: recursive reference to query "s" must not appear within a subquery
LINE 4: ... not in (with t as (insert into tmp select * from s) select ...
^
LOCATION: checkWellFormedRecursionWalker, parse_cte.c:773

Currently, this is not supported.

Recursion can only exclude the set of work tables, not the set of the whole recursive process. If a loop exists in the relation, it leads to an infinite loop.

Legend:

As shown in the legend, many points together form a loop. For example, A and I are good friends, B and I are also good friends, and A and B are also good friends.

In the derivation, A and B are derived from me, then B is derived from A, and A is derived from A, and so on… which is an infinite loop.

PostgreSQL UDF to Solve the Issue of Relation Loops

Using UDFs, the problem of infinite recursive loops caused by data looping can be solved very well.

Modeling

Create 100 million entries of relation data, and assume a total of 10 million people. For ease of testing, other attributes of people or events (such as Time, Location, and Intimacy) are omitted from the table here.

postgres=# create table a(c1 int, c2 int, primary key(c1,c2));
CREATE TABLE
postgres=# create index idx_a_1 on a(c1);
CREATE INDEX
postgres=# create index idx_a_2 on a(c2);
CREATE INDEX
postgres=# insert into a select random()*10000000, random()*10000000 from generate_series(1,100000000) ;

Temporary tables used by the functions

create temp table if not exists tmp1(level int, c1 int, c2 int) ON COMMIT delete rows; 
create index if not exists idx_tmp1_1 on tmp1(level);
create index if not exists idx_tmp1_2 on tmp1(c1);
create index if not exists idx_tmp1_3 on tmp1(c2);
create unlogged table u1 (like tmp1);create temp table if not exists tmp2(level int, path int[], c1 int, c2 int) ON COMMIT delete rows;
create index if not exists idx_tmp2_1 on tmp2(level);
create index if not exists idx_tmp2_2 on tmp2(c1);
create index if not exists idx_tmp2_3 on tmp2(c2);
create unlogged table u2 (like tmp2);

Function 1: Output A-Centered and N-Level Relation Data

create or replace function find_rel(v_c1 int, v_level int) returns setof u1 as 
$$
declare
i int := 1;
begin
if v_level <=0 then
raise notice 'level must >=1';
return;
end if;

-- Note: In version 9.6, frequent creation and deletion of inner temp tables may still cause garbage to be generated in catalog.
-- To temporarily store the relations at each level from the start point
create temp table if not exists tmp1(level int, c1 int, c2 int) ON COMMIT delete rows;
create index if not exists idx_tmp1_1 on tmp1(level);
create index if not exists idx_tmp1_2 on tmp1(c1);
create index if not exists idx_tmp1_3 on tmp1(c2);
-- To store the data of the initial level (namely the start point)
return query insert into tmp1 select i, * from a where c1=v_c1 returning *;
loop
i := i+1;
-- All levels of data has been found
if i > v_level then
return;
end if;
-- The next level relation is derived though a join with level=i-1 (Group By excludes duplicate nodes. For example, 3 in 1-2-3-4 and 1-5-3-4 is excluded), and the looping points are excluded by not exists.
return query insert into tmp1 select i, a.c1, a.c2 from a join (select c2 from tmp1 where level=i-1 group by c2) tmp on (a.c1=tmp.c2) where not exists (select 1 from tmp1 where a.c1 = tmp1.c1) returning *;
end loop;
end;
$$
language plpgsql strict;

Function 2: Output A-Centered and N-Level Relation Data

A path is generated at the same time. Note: This path is a complete path, and a node may have multiple paths.

create or replace function find_rel_withpath(v_c1 int, v_level int) returns setof u2 as 
$$
declare
i int := 1;
begin
if v_level <=0 then
raise notice 'level must >=1';
return;
end if;

-- Note: In version 9.6, frequent creation and deletion of inner temp tables may still cause garbage to be generated in catalog.
-- To temporarily store the relations at each level from the start point
create temp table if not exists tmp2(level int, path int[], c1 int, c2 int) ON COMMIT delete rows;
create index if not exists idx_tmp2_1 on tmp2(level);
create index if not exists idx_tmp2_2 on tmp2(c1);
create index if not exists idx_tmp2_3 on tmp2(c2);
-- To store the data of the initial level (namely the start point)
return query insert into tmp2 select i, array[]::int[] || c1 || c2 , * from a where c1=v_c1 returning *;
loop
i := i+1;
-- All levels of data has been found
if i > v_level then
return;
end if;
-- The next level relation is derived though a join with level=i-1 (Group By excludes duplicate nodes. For example, duplicate paths like 1-2-3-4 and 1-2-3-4 should be excluded. In practice, this situation cannot exist after adding the unique constraints of c1 and c2), and the looping points are excluded by not exists.
-- path indicates the current path.
-- If PK constraints for c1 and c2 already exist, you can directly associate them with tmp2 without using Group By.
return query insert into tmp2 select i, tmp.path||a.c2, a.c1, a.c2 from a join (select c2,path from tmp2 where level=i-1 group by c2,path) tmp on (a.c1=tmp.c2) where not exists (select 1 from tmp2 where a.c1 = tmp2.c1) returning *;
end loop;
end;
$$
language plpgsql strict;

Function 3: Find the shortest relation path between two people and two events

Here, the optimization algorithm is to be tested, involving which point to start the search, or both sides simultaneously conducting radial search, and so on.

create or replace function find_rel_path(v_c1 int, v_c2 int) returns setof int[] as 
$$
declare
i int := 1;
begin
-- Note: In version 9.6, frequent creation and deletion of inner temp tables may still cause garbage to be generated in catalog.
-- To temporarily store the relations at each level from the start point
create temp table if not exists tmp2(level int, path int[], c1 int, c2 int) ON COMMIT delete rows;
create index if not exists idx_tmp2_1 on tmp2(level);
create index if not exists idx_tmp2_2 on tmp2(c1);
create index if not exists idx_tmp2_3 on tmp2(c2);
-- To store the data of the initial level (namely the start point)
insert into tmp2 select i, array[]::int[] || c1 || c2 , * from a where c1=v_c1;
loop
i := i+1;

perform 1 from tmp2 where c2=v_c2 limit 1;
if found then
return query select path from tmp2 where c2=v_c2 and level=i-1;
return;
end if;
insert into tmp2 select i, tmp.path||a.c2, a.c1, a.c2 from a join (select c2,path from tmp2 where level=i-1 group by c2,path) tmp on (a.c1=tmp.c2) where not exists (select 1 from tmp2 where a.c1 = tmp2.c1);
end loop;
end;
$$
language plpgsql strict;

Test

1. Find the 3-level relation network of the user with ID 1

select * from find_rel(1,3);select * from find_rel_withpath(1,3);

Result

postgres=# select * from find_rel(1,3);
NOTICE: relation "tmp1" already exists, skipping
NOTICE: relation "idx_tmp1_1" already exists, skipping
NOTICE: relation "idx_tmp1_2" already exists, skipping
NOTICE: relation "idx_tmp1_3" already exists, skipping
level | c1 | c2
------+---------+---------
1 | 1 | 5157873
1 | 1 | 3102468
1 | 1 | 8399312
1 | 1 | 1442141
1 | 1 | 5094441
1 | 1 | 1521897
1 | 1 | 401079
2 | 401079 | 1147078
2 | 401079 | 9740100
......
3 | 1731998 | 6503196
3 | 1731998 | 5112396
3 | 6525458 | 937613
3 | 6525458 | 8459123
3 | 6525458 | 8419231
3 | 6525458 | 4021987
(828 rows)
Time: 15.000 ms
postgres=# select * from find_rel_withpath(1,3);
NOTICE: relation "tmp2" already exists, skipping
NOTICE: relation "idx_tmp2_1" already exists, skipping
NOTICE: relation "idx_tmp2_2" already exists, skipping
NOTICE: relation "idx_tmp2_3" already exists, skipping
level | path | c1 | c2
-------+-----------------------------+---------+---------
1 | {1,5157873} | 1 | 5157873
1 | {1,3102468} | 1 | 3102468
1 | {1,8399312} | 1 | 8399312
1 | {1,1442141} | 1 | 1442141
1 | {1,5094441} | 1 | 5094441
1 | {1,1521897} | 1 | 1521897
1 | {1,401079} | 1 | 401079
2 | {1,401079,1147078} | 401079 | 1147078
2 | {1,401079,9740100} | 401079 | 9740100
......
3 | {1,1442141,9719411,7811631} | 9719411 | 7811631
3 | {1,401079,9740100,5119416} | 9740100 | 5119416
3 | {1,401079,9740100,350046} | 9740100 | 350046
3 | {1,401079,9740100,8223067} | 9740100 | 8223067
3 | {1,401079,9740100,5608312} | 9740100 | 5608312
3 | {1,401079,9740100,7920319} | 9740100 | 7920319
3 | {1,401079,9740100,6416565} | 9740100 | 6416565
(828 rows)
Time: 19.524 ms
postgres=# select * from find_rel_withpath(1,5) order by level desc limit 10;
NOTICE: relation "tmp2" already exists, skipping
NOTICE: relation "idx_tmp2_1" already exists, skipping
NOTICE: relation "idx_tmp2_2" already exists, skipping
NOTICE: relation "idx_tmp2_3" already exists, skipping
level | path | c1 | c2
-------+----------------------------------------+-----+---------
5 | {1,401079,3806993,3879310,165,8074824} | 165 | 8074824
5 | {1,401079,3806993,3879310,165,5983603} | 165 | 5983603
5 | {1,401079,3806993,3879310,165,9804825} | 165 | 9804825
5 | {1,401079,3806993,3879310,165,3848025} | 165 | 3848025
5 | {1,401079,3806993,3879310,165,2045526} | 165 | 2045526
5 | {1,401079,3806993,3879310,165,9783524} | 165 | 9783524
5 | {1,401079,3806993,3879310,165,386886} | 165 | 386886
5 | {1,401079,3806993,3879310,165,6318408} | 165 | 6318408
5 | {1,401079,3806993,3879310,165,1150578} | 165 | 1150578
5 | {1,401079,3806993,3879310,165,6702827} | 165 | 6702827
(10 rows)
Time: 1473.953 ms

2. Find the shortest relation path between ID 1 and ID 386886

select * from find_rel_path(1,386886);

Result

postgres=# select * from find_rel_path(1,386886);
NOTICE: relation "tmp2" already exists, skipping
NOTICE: relation "idx_tmp2_1" already exists, skipping
NOTICE: relation "idx_tmp2_2" already exists, skipping
NOTICE: relation "idx_tmp2_3" already exists, skipping
find_rel_path
---------------------------------------
{1,401079,3806993,3879310,165,386886}
(1 row)
Time: 1069.781 ms
postgres=# select * from find_rel_path(1,3879310);
NOTICE: relation "tmp2" already exists, skipping
NOTICE: relation "idx_tmp2_1" already exists, skipping
NOTICE: relation "idx_tmp2_2" already exists, skipping
NOTICE: relation "idx_tmp2_3" already exists, skipping
find_rel_path
----------------------------
{1,401079,3806993,3879310}
(1 row)
Time: 17.290 ms

Very fast.

UDF Optimization Idea 1 — Using a Cursor to Receive Streams

We can see that the previous UDF starts to output only after all the data is searched. In fact, the output can be started after the first record is found, so that the client can continuously receive data and reduce latency.

This can be achieved by using cursors.

Cursor usage reference

https://www.postgresql.org/docs/9.6/static/plpgsql-cursors.html

Example

create or replace function find_rel_withpath_cur(v_c1 int, v_level int) returns setof refcursor as 
$$
declare
i int := 1;
ref refcursor[];
res refcursor;
begin
if v_level <=0 then
raise notice 'level must >=1';
return;
end if;
for x in 1..v_level loop
ref[x] := 'a'||x;
end loop;
-- Note: In version 9.6, frequent creation and deletion of inner temp tables may still cause garbage to be generated in catalog.
-- To temporarily store the relations at each level from the start point
create temp table if not exists tmp2(level int, path int[], c1 int, c2 int) ON COMMIT delete rows;
create index if not exists idx_tmp2_1 on tmp2(level);
create index if not exists idx_tmp2_2 on tmp2(c1);
create index if not exists idx_tmp2_3 on tmp2(c2);
-- To store the data of the initial level (namely the start point)
res := ref[i];
open res for insert into tmp2 select i, array[]::int[] || c1 || c2 , * from a where c1=v_c1 returning *;
return next res;
loop
i := i+1;
-- All levels of data has been found
if i > v_level then
return;
end if;
-- The next level relation is derived though a join with level=i-1 (Group By excludes duplicate nodes. For example, duplicate paths like 1-2-3-4 and 1-2-3-4 should be excluded. In practice, this situation cannot exist after adding the unique constraints of c1 and c2), and the looping points are excluded by not exists.
-- path indicates the current path.
-- If PK constraints for c1 and c2 already exist, you can directly associate them with tmp2 without using Group By.
res := ref[i];
open res for insert into tmp2 select i, tmp.path||a.c2, a.c1, a.c2 from a join (select c2,path from tmp2 where level=i-1 group by c2,path) tmp on (a.c1=tmp.c2) where not exists (select 1 from tmp2 where a.c1 = tmp2.c1) returning *;
return next res;
end loop;
end;
$$
language plpgsql strict;
postgres=# begin;
BEGIN
Time: 0.390 ms
postgres=# select * from find_rel_withpath_cur(1,5);
NOTICE: relation "tmp2" already exists, skipping
NOTICE: relation "idx_tmp2_1" already exists, skipping
NOTICE: relation "idx_tmp2_2" already exists, skipping
NOTICE: relation "idx_tmp2_3" already exists, skipping
find_rel_withpath_cur
-----------------------
a1
a2
a3
a4
a5
(5 rows)
Time: 4.008 ms

Return level-1 relations

postgres=# fetch all in a1;
level | path | c1 | c2
-------+-------------+----+---------
1 | {1,5157873} | 1 | 5157873
1 | {1,3102468} | 1 | 3102468
1 | {1,8399312} | 1 | 8399312
1 | {1,1442141} | 1 | 1442141
1 | {1,5094441} | 1 | 5094441
1 | {1,1521897} | 1 | 1521897
1 | {1,401079} | 1 | 401079
(7 rows)
Time: 0.958 ms

Return level-2 relations

postgres=# fetch all in a2;
level | path | c1 | c2
-------+---------------------+---------+---------
2 | {1,401079,1147078} | 401079 | 1147078
2 | {1,401079,9740100} | 401079 | 9740100
2 | {1,401079,8171779} | 401079 | 8171779
2 | {1,401079,3806993} | 401079 | 3806993
2 | {1,401079,2491387} | 401079 | 2491387
......
2 | {1,8399312,5886963} | 8399312 | 5886963
2 | {1,8399312,3652462} | 8399312 | 3652462
2 | {1,8399312,8148713} | 8399312 | 8148713
2 | {1,8399312,8282991} | 8399312 | 8282991
(75 rows)
Time: 3.173 ms

And the like

postgres=# fetch all in a3;
level | path | c1 | c2
-------+-----------------------------+---------+---------
3 | {1,1521897,47614,2653114} | 47614 | 2653114
3 | {1,1521897,47614,1354306} | 47614 | 1354306
3 | {1,1521897,47614,7452721} | 47614 | 7452721
...
3 | {1,401079,9740100,7920319} | 9740100 | 7920319
3 | {1,401079,9740100,6416565} | 9740100 | 6416565
(746 rows)
Time: 25.455 ms
postgres=# fetch all in a4;
......
4 | {1,401079,8171779,9968575,6388546} | 9968575 | 6388546
4 | {1,401079,8171779,9968575,8281935} | 9968575 | 8281935
4 | {1,401079,8171779,9968575,6076729} | 9968575 | 6076729
4 | {1,401079,8171779,9968575,7087557} | 9968575 | 7087557
(7383 rows)
Time: 14.482 ms

Note

Because cursors in this example return relations with the context, make sure to receive data from the next cursor after a current cursor is completely received. No data will be received if you start to receive later cursors beforehand.

postgres=# begin;
BEGIN
Time: 0.561 ms
postgres=# select * from find_rel_withpath_cur(1,5);
NOTICE: relation "tmp2" already exists, skipping
NOTICE: relation "idx_tmp2_1" already exists, skipping
NOTICE: relation "idx_tmp2_2" already exists, skipping
NOTICE: relation "idx_tmp2_3" already exists, skipping
find_rel_withpath_cur
-----------------------
a1
a2
a3
a4
a5
(5 rows)
Time: 2.161 msIf a cursor is not received in a sequential manner in the first place, cursors after this cursor will no longer have data. postgres=# fetch all in a4;
level | path | c1 | c2
-------+------+----+----
(0 rows)
Time: 0.738 ms
postgres=# fetch all in a5;
level | path | c1 | c2
-------+------+----+----
(0 rows)
Time: 0.727 ms
postgres=# fetch all in a1;
level | path | c1 | c2
-------+-------------+----+---------
1 | {1,5157873} | 1 | 5157873
1 | {1,3102468} | 1 | 3102468
1 | {1,8399312} | 1 | 8399312
1 | {1,1442141} | 1 | 1442141
1 | {1,5094441} | 1 | 5094441
1 | {1,1521897} | 1 | 1521897
1 | {1,401079} | 1 | 401079
(7 rows)
Time: 0.954 ms

After a cursor is used, it is ready to receive streams by relation level.

When c2 is of type Array, cursors return functions     

create or replace function find_rel_withpath_cur_array(v_c1 int, v_level int) returns setof refcursor as
$$
declare
i int := 1;
ref refcursor[];
res refcursor;
begin
if v_level <=0 then
raise notice 'level must >=1';
return;
end if;
for x in 1..v_level loop
ref[x] := 'a'||x;
end loop;
-- Note: In version 9.6, frequent creation and deletion of inner temp tables may still cause garbage to be generated in catalog.
-- To temporarily store the relations at each level from the start point
create temp table if not exists tmp2(level int, path int[], c1 int, c2 int) ON COMMIT delete rows;
create index if not exists idx_tmp2_1 on tmp2(level);
create index if not exists idx_tmp2_2 on tmp2(c1);
create index if not exists idx_tmp2_3 on tmp2(c2);
-- To store the data of the initial level (namely the start point)
res := ref[i];
open res for insert into tmp2 select i as level, array[]::int[] || c1 || c2 as path , c1, c2 from (select c1,unnest(c2) as c2 from a where c1=v_c1) a returning *;
return next res;
loop
i := i+1;
-- All levels of data has been found
if i > v_level then
return;
end if;
-- The next level relation is derived though a join with level=i-1 (Group By excludes duplicate nodes. For example, duplicate paths like 1-2-3-4 and 1-2-3-4 should be excluded. In practice, this situation cannot exist after adding the unique constraints of c1 and c2), and the looping points are excluded by not exists.
-- path indicates the current path.
-- If PK constraints for c1 and c2 already exist, you can directly associate them with tmp2 without using Group By.
res := ref[i];
open res for insert into tmp2 select i as level, a.path||a.c2 as path, a.c1, a.c2 from (select tmp2.path, a.c1, unnest(a.c2) c2 from a join tmp2 on (a.c1=tmp2.c2 and tmp2.level=i-1 and tmp2.c1<>a.c1)) a returning *;
return next res;
end loop;
end;
$$
language plpgsql strict;

UDF Optimization Idea 2 — Functions and Stream Receiving

Currently insert into … returning* in PostgreSQL will not return the first record until the insert operation is completed. Therefore, the deeper the level is, the more time it takes to insert data and the slower it is to receive data of a deeper level. Can we use a better optimization method?

In addition to altering cores and letting the cursor that supports insert into … returning * support stream returning, another method can be used to stream return data by using functions.

Create 100 Million Users and Consider Every 50 Thousand Users as a Group of Associated Users. Each User Is Associated with 1000 Close Users, Forming a Network of 100 Billion Relations.

postgres=# create table a(c1 int, c2 int[], primary key (c1));
CREATE TABLE
vi test.sql
\set id random(1,100000000)
insert into a select :id, (select array_agg(((width_bucket(:id,1,100000000,2000)-1)*50000 + (random()*50000)::int)) from generate_series(1,1000)) on conflict do nothing;
pgbench -M prepared -n -r -P 5 -f ./test.sql -c 64 -j 64 -T 100000

About 520 GB

Test Scenario

Deduce User A’s N-level relation network.

create or replace function find_rel_withpath_cur(v_c1 int, v_level int) returns setof record as 
$$

declare
i int := 1;
ref1 cursor(var1 int, var2 int) for select var1 as level, array[]::int[] || c1 || c2 as path , c1, c2 from (select c1,unnest(c2) as c2 from a where c1=var2) a ;
ref2 cursor(var1 int) for select var1 as level, a.path||a.c2 as path, a.c1, a.c2 from (select tmp2.path, a.c1, unnest(a.c2) c2 from a join tmp2 on (a.c1=tmp2.c2 and tmp2.level=i-1 and tmp2.c1<>a.c1)) a;
begin
if v_level <=0 then
raise notice 'level must >=1';
return;
end if;
-- Note: In version 9.6, frequent creation and deletion of inner temp tables may still cause garbage to be generated in catalog.
-- To temporarily store the relations at each level from the start point
-- Currently PL/pgSQL doesn't support stream returning, even if return next or return query is used
-- https://www.postgresql.org/docs/9.6/static/plpgsql-control-structures.html
create temp table if not exists tmp2(level int, path int[] unique, c1 int, c2 int) ON COMMIT delete rows;
create index if not exists idx_tmp2_1 on tmp2(level, c2);
-- To store the data of the initial level (namely the start point)
for rec in ref1(i, v_c1) loop
insert into tmp2 values (rec.level, rec.path, rec.c1, rec.c2) on conflict do nothing;
if found then
return next rec;
end if;
end loop;
loop
i := i+1;
-- All levels of data has been found
if i > v_level then
return;
end if;
-- The next level relation is derived though a join with level=i-1 (Group By excludes duplicate nodes. For example, duplicate paths like 1-2-3-4 and 1-2-3-4 should be excluded. In practice, this situation cannot exist after adding the unique constraints of c1 and c2), and the looping points are excluded by not exists.
-- path indicates the current path.
-- If PK constraints for c1 and c2 already exist, you can directly associate them with tmp2 without using group by.
for rec in ref2(i) loop
insert into tmp2 values(rec.level, rec.path, rec.c1, rec.c2) on conflict do nothing;
if found then
return next rec;
end if;
end loop;
end loop;
return;
end;
$$
language plpgsql strict;

Currently PL/pgSQL doesn’t support stream returning, even if return next or return query is used

https://www.postgresql.org/docs/9.6/static/plpgsql-control-structures.html

Note: The current implementation of RETURN NEXT and RETURN QUERY stores the entire result set before returning from the function, as discussed above. That means that if a PL/pgSQL function produces a very large result set, performance might be poor: 
Data will be written to disk to avoid memory exhaustion, but the function itself will not return until the entire result set has been generated.
A future version of PL/pgSQL might allow users to define set-returning functions that do not have this limitation.
Currently, the point at which data begins being written to disk is controlled by the work_mem configuration variable.
Administrators who have sufficient memory to store larger result sets in memory should consider increasing this parameter.

Currently, using the c function to implement the preceding logic can allow stream receiving of functions.

Test Scenario 2

Extract 10 level relations of a user from 100 billion pieces of data

-- Eliminate duplicate nodes. For example, if N paths pass through the same node, record that node once only.create or replace function find_rel(v_c1 int, v_level int) returns setof u1 as 
$$
declare
i int := 1;
begin
if v_level <=0 then
raise notice 'level must >=1';
return;
end if;
-- Note: In version 9.6, frequent creation and deletion of inner temp tables may still cause garbage to be generated in catalog.
-- To temporarily store the relations at each level from the start point
create temp table if not exists tmp1( level int, c1 int primary key) ON COMMIT delete rows;
create index if not exists idx_tmp1_1 on tmp1(level, c1);
-- To store the data of the initial level (namely the start point)
return query insert into tmp1 select i, c1 from a where c1=v_c1 union select i, unnest(c2) from a where c1=v_c1 returning *;
loop
i := i+1;
-- All levels of data has been found
if i > v_level then
return;
end if;
-- The next level relation is derived though a join with level=i-1 (Group By excludes duplicate nodes. For example, 3 in 1-2-3-4 and 1-5-3-4 is excluded), and the looping points are excluded by not exists. return query
insert into tmp1
select t.i, t.c2 from
( select i, unnest(c2) c2 from a join tmp1 tmp on ( a.c1=tmp.c1 and tmp.level=i-1 and a.c1<>v_c1 )
) t
left join tmp1 on (t.c2=tmp1.c1)
where tmp1.* is null group by 1,2
on conflict do nothing
returning *;

end loop;
end;
$$
language plpgsql strict;
postgres=# select count(*) from find_rel(5000,10);
NOTICE: relation "tmp1" already exists, skipping
NOTICE: relation "idx_tmp1_1" already exists, skipping
count
-------
50001
(1 row)
It takes about 15 seconds because 50 million records are searched for the third level due to the data model.
Another reason for such high time consumption is that CPU paralleling is not enabled.
Study shows that each person maintains a network of around 200 close friends.
For normal business models, certain restrictions are added to each level to converge results and prevent unlimited spreading.
For normal business models, a query can be performed in seconds.
Time: 17131.492 ms
create or replace function find_rel_cur(v_c1 int, v_level int) returns setof refcursor as
$$
declare
i int := 1;
ref refcursor[];
res refcursor;
begin
if v_level <=0 then
raise notice 'level must >=1';
return;
end if;
for x in 1..v_level loop
ref[x] := 'a'||x;
end loop;
-- Note: In version 9.6, frequent creation and deletion of inner temp tables may still cause garbage to be generated in catalog.
-- To temporarily store the relations at each level from the start point
create temp table if not exists tmp1( level int, c1 int primary key) ON COMMIT delete rows;
create index if not exists idx_tmp1_1 on tmp1(level, c1);
-- To store the data of the initial level (namely the start point)
res := ref[i];
open res for insert into tmp1 select i, c1 from a where c1=v_c1 union select i, unnest(c2) from a where c1=v_c1 returning *;
return next res;
loop
i := i+1;
-- All levels of data has been found
if i > v_level then
return;
end if;
-- The next level relation is derived though a join with level=i-1 (Group By excludes duplicate nodes. For example, 3 in 1-2-3-4 and 1-5-3-4 is excluded), and the looping points are excluded by not exists. res := ref[i];
open res for
insert into tmp1
select t.i, t.c2 from
( select i, unnest(c2) c2 from a join tmp1 tmp on ( a.c1=tmp.c1 and tmp.level=i-1 and a.c1<>v_c1 )
) t
left join tmp1 on (t.c2=tmp1.c1)
where tmp1.* is null group by 1,2
on conflict do nothing
returning *;
return next res;
end loop;
end;
$$
language plpgsql strict;
postgres=# select * from find_rel_cur(1,10);
NOTICE: relation "tmp1" already exists, skipping
NOTICE: relation "idx_tmp1_1" already exists, skipping
find_rel_cur
--------------
a1
a2
a3
a4
a5
a6
a7
a8
a9
a10
(10 rows)
Time: 2.098 ms
postgres=# fetch 1 in a1;
level | c1
-------+-------
1 | 37394
(1 row)
Time: 3.138 ms
postgres=# fetch 1 in a2;
level | c1
-------+----
2 | 0
(1 row)
Time: 1345.473 msFor the third level, 50 million records are searched due to the test model
Do not worry about this data volume because normal business models generally do not have to process such a large volume of data.
postgres=# fetch 1 in a3;
level | c1
-------+----
(0 rows)
Time: 15587.686 ms
postgres=# fetch 1 in a4;
level | c1
-------+----
(0 rows)
Time: 0.143 ms
postgres=#

UDF Optimization Idea 3 — Asynchronous Messages

Asynchronous messages can be used to implement the same effect.

PostgreSQL is more useful after asynchronous messages are supported.

Example

create extension dblink;
CREATE FOREIGN DATA WRAPPER postgresql VALIDATOR postgresql_fdw_validator;
CREATE SERVER dst FOREIGN DATA WRAPPER postgresql OPTIONS (hostaddr '127.0.0.1', port '1921', dbname 'postgres');
CREATE USER MAPPING FOR postgres SERVER dst OPTIONS (user 'postgres', password 'postgres');
create or replace function find_rel_withpath_notify(v_c1 int, v_level int, v_notify_channel text) returns void as
$$

declare
i int := 1;
query text;
ref1 cursor(var1 int, var2 int) for select var1 as level, array[]::int[] || c1 || c2 as path , c1, c2 from (select c1,unnest(c2) as c2 from a where c1=var2) a ;
ref2 cursor(var1 int) for select var1 as level, a.path||a.c2 as path, a.c1, a.c2 from (select tmp2.path, a.c1, unnest(a.c2) c2 from a join tmp2 on (a.c1=tmp2.c2 and tmp2.level=i-1 and tmp2.c1<>a.c1)) a;
begin
if v_level <=0 then
raise notice 'level must >=1';
return;
end if;
-- Determine whether or not a connection exists. If not, create one.
if array_position(dblink_get_connections(), v_notify_channel) is not null then
else
perform dblink_connect(v_notify_channel, 'dst');
end if;
-- Note: In version 9.6, frequent creation and deletion of inner temp tables may still cause garbage to be generated in catalog.
-- To temporarily store the relations at each level from the start point
-- Currently PL/pgSQL doesn't support stream returning, even if return next or return query is used
-- https://www.postgresql.org/docs/9.6/static/plpgsql-control-structures.html
create temp table if not exists tmp2(level int, path int[] unique, c1 int, c2 int) ON COMMIT delete rows;
create index if not exists idx_tmp2_1 on tmp2(level, c2);
-- To store the data of the initial level (namely the start point)
for rec in ref1(i, v_c1) loop
insert into tmp2 values (rec.level, rec.path, rec.c1, rec.c2) on conflict do nothing;
if found then
query := format($_$select 1 from pg_notify( %L , 'level: %s, path: %s, c1: %s, c2: %s')$_$, v_notify_channel, rec.level, rec.path, rec.c1, rec.c2);
-- Send asynchronous messages
perform * from dblink(v_notify_channel, query, true) as t(id int);
end if;
end loop;
loop
i := i+1;
-- All levels of data has been found
if i > v_level then
return;
end if;
-- The next level relation is derived though a join with level=i-1 (Group By excludes duplicate nodes. For example, duplicate paths like 1-2-3-4 and 1-2-3-4 should be excluded. In practice, this situation cannot exist after adding the unique constraints of c1 and c2), and the looping points are excluded by not exists.
-- path indicates the current path.
-- If PK constraints for c1 and c2 already exist, you can directly associate them with tmp2 without using group by.
for rec in ref2(i) loop
insert into tmp2 values(rec.level, rec.path, rec.c1, rec.c2) on conflict do nothing;
if found then
query := format($_$select 1 from pg_notify( %L , 'level: %s, path: %s, c1: %s, c2: %s')$_$, v_notify_channel, rec.level, rec.path, rec.c1, rec.c2);
-- Send asynchronous messages
perform * from dblink(v_notify_channel, query, true) as t(id int);
end if;
end loop;
end loop;
return;
end;
$$
language plpgsql strict;

Enable a listener in session A

postgres=# listen hello;

Start query in session B

postgres=# select find_rel_withpath_notify(1,5,'hello');

Session A can read asynchronous messages

...
Asynchronous notification "hello" with payload "level: 2, path: {1,32847,15869}, c1: 32847, c2: 15869" received from server process with PID 33062.
Asynchronous notification "hello" with payload "level: 2, path: {1,32847,21312}, c1: 32847, c2: 21312" received from server process with PID 33062.
Asynchronous notification "hello" with payload "level: 2, path: {1,32847,22852}, c1: 32847, c2: 22852" received from server process with PID 33062.
Asynchronous notification "hello" with payload "level: 2, path: {1,32847,8031}, c1: 32847, c2: 8031" received from server process with PID 33062.
Asynchronous notification "hello" with payload "level: 2, path: {1,32847,45248}, c1: 32847, c2: 45248" received from server process with PID 33062.
Asynchronous notification "hello" with payload "level: 2, path: {1,32847,7139}, c1: 32847, c2: 7139" received from server process with PID 33062.
Asynchronous notification "hello" with payload "level: 2, path: {1,32847,28589}, c1: 32847, c2: 28589" received from server process with PID 33062.
Asynchronous notification "hello" with payload "level: 2, path: {1,32847,8615}, c1: 32847, c2: 8615" received from server process with PID 33062.
Asynchronous notification "hello" with payload "level: 2, path: {1,32847,49518}, c1: 32847, c2: 49518" received from server process with PID 33062.
Asynchronous notification "hello" with payload "level: 2, path: {1,32847,35727}, c1: 32847, c2: 35727" received from server process with PID 33062.
Asynchronous notification "hello" with payload "level: 2, path: {1,32847,2679}, c1: 32847, c2: 2679" received from server process with PID 33062.
Asynchronous notification "hello" with payload "level: 2, path: {1,32847,34267}, c1: 32847, c2: 34267" received from server process with PID 33062.
Asynchronous notification "hello" with payload "level: 2, path: {1,32847,13890}, c1: 32847, c2: 13890" received from server process with PID 33062.
Asynchronous notification "hello" with payload "level: 2, path: {1,32847,20092}, c1: 32847, c2: 20092" received from server process with PID 33062.
Asynchronous notification "hello" with payload "level: 2, path: {1,32847,11795}, c1: 32847, c2: 11795" received from server process with PID 33062.
...

Design Optimization Method 1 — Array

When storing relations, we use one-to-one storage. This storage method is simple, but it has some disadvantages. For example, to search for a user’s direct relation network, it is required to search for N records. More search levels will lead to several-fold increases in records.

Assume that each user has 100 direct relations on average. To search for five levels, we have to search and return 100¹+100²+100³+… 100⁵ records.

However, if array storage is used, the number of scanned records can be reduced to 100⁰+100¹+100²+… 100⁴, and the overload for each level is 100 times smaller.

The use of array storage is also very simple and thus not explained in this article. Arrays support features such as = any(array) query and the foreach loop in functions.

Design Optimization Method 2 — Pgrouting

We all know that a relation has different levels of closeness. Similarly, events also have different levels of association. Therefore, simply using association or no association is far from enough to indicate event relationships in real life.

For example, although Ming, Shao Cong, and PostgreSQL are all my close friends, the closeness level may be totally different. This is particularly true in the case of showing or analyzing relationships among people.

Then how can we indicate the different association levels? The answer is very simple: adding a weight field.

a(c1, int, c2 int, weight numeric).

Path calculation is no longer as simple as using the first received path. Instead, path calculation will not stop until the weight of each path is found to be greater than existing paths.

Assume that many paths can lead from A to B. We cannot stop path calculation when the first path is found, because the total weight of other paths may be smaller. Therefore path calculation will not stop until the weight of each path (including paths that have not arrived at B) exceeds the shortest among already calculated paths.

This is what pgRouting is good at. pgRouting can be completely applied in the graph database field.

pgRouting naturally supports path weight (for example, speed limits on roads, upward/downward slopes, number of road curves, number of highway lanes, and road congestion levels can all be dynamic weight values). pgRouting also supports multiple path planning algorithms and custom planning algorithms.

For more information, see

http://pgrouting.org/

Other Graph Databases

Neo4j is also a good graph database used by many people.

However, some research papers compare Neo4j with PostgreSQL and find that Neo4j also has some disadvantages. For example, the memory cannot hold all data and the access speed is unstable.

Neo4j allows creating indexes for groups only by label to perform data search ( similar to partitioned indexes and not global indexes). To perform global search, each user has to contain the same label.

References

https://bib.irb.hr/datoteka/690975.1268-4089-1-PB.pdf

https://numergent.com/2015-05/Tropology-performance-PostgreSQL-vs-Neo4j.html

Summary

With a growing number of communication media available and increasingly lower communication cost, people, things, and events become more correlated and social relations are more and more complex.

In a relation system, a node is used to represent points (people, things, events, and specific time and space properties). If two nodes are related themselves, they will be associated.

Relation data will continue to see explosive growth. Nodes can be in trillions, and each node can have relations with hundreds of thousands of other nodes, forming a network of thousands of trillions of relations.

However, generally systems do not have such a large volume of data. Only very large companies can have a network of trillions of relations at the same time point.

PostgreSQL is such a comprehensive database that can be used almost in any scenarios. This article uses some features of PostgreSQL to design a DEMO in the relation network scenario that can meet common requirements such as relation deduction and path search between two nodes.

Example Features Used in This Article

1. Arrays for storing positive relations

2. PL/pgSQL for writing logic such as deduction logic and path search logic

3. Cursors for stream returning

4. Asynchronous messages for stream data returning

5. Aggregation queries

6. pgRouting for retrieving the shortest path

7. Recursive queries

8. Computing methods similar to the PostgreSQL rum plug-in can be used for closeness analysis, for example, calculating the score of the event overlapping degree. (Currently RUM has supported the output and sorting of approximate values of type tsvector and therefore can be used for calculating closeness.)

https://en.wikipedia.org/wiki/PageRank

RUM reference

[Full-text Search with PostgreSQL Is Way Too Fast — RUM Index Interface (Pandora Box)]()

Closeness relation level deduction example

Create 100 million users again and consider every 50 thousand users as a group of associated users. Each user is associated with 100 close users, forming a network of 10 billion relations.

CREATE EXTENSION rum;
create table rum1(c1 int, c2 tsvector, primary key (c1));
create index idx_rum1_1 on rum1 using rum(c2 rum_tsvector_ops);
vi test.sql
\set id random(1,100000000)
insert into rum1 select :id, (select to_tsvector(string_agg(((width_bucket(:id,1,100000000,2000)-1)*50000 + (random()*50000)::int)::text, ' ')) from generate_series(1,100)) on conflict do nothing;
pgbench -M prepared -n -r -P 5 -f ./test.sql -c 64 -j 64 -T 100000

Use rum_tsvector_ops of RUM to sort results by closeness and obtain closeness values at the same time.

postgres=# select c1, c2 <=> tsq as dis 
from
rum1,
(select to_tsquery(replace(rtrim(ltrim(array_to_tsvector(tsvector_to_array(c2))::text, ''''), ''''),
$$
' '
$$
, ' | ')) tsq from rum1 where c1=1) as tmp
where
c2 @@ tsq
and
c1<>33233490
order by c2 <=> tsq
limit 10;

The smaller the value is, the closer the relation is.

c1   |   dis    
-------+----------
1 | 0.164493
42469 | 4.11234
28939 | 5.48311
45740 | 5.48311
15508 | 5.48311
11589 | 5.48311
12377 | 5.48311
34282 | 5.48311
16731 | 5.48311
6474 | 5.48311
(10 rows)
Time: 259.834 ms

Iterative computation

Perform deduction per level. Of course you can add a real-time closeness field so that you do not have to perform queries by using the RUM index each time.

create or replace function find_rel_pagerank_cur(
v_c1 int, -- the ID that needs to be deduced
v_level int, --levels of relations to be deduced
v_rank numeric, -- closeness threshold. Values greater than this threshold is not displayed (the more the value is, the longer the distance is and the smaller the closeness is)
v_limit_perlevel int, -- the maximum number of recorders per level (for example, number of persons)
)
returns setof record as
$$

declare
i int := 1;
i_c1 int;
ref cursor(
var_level int,
var_c1 int
) for
select level,c1,c2,pagerank::numeric from
(select var_level as level, var_c1 as c1, c1 as c2, c2 <=> tsq as pagerank from
rum1,
(select to_tsquery(replace(rtrim(ltrim(array_to_tsvector(tsvector_to_array(c2))::text, ''''), ''''), $_$' '$_$, ' | ')) tsq from rum1 where c1=var_c1) as tmp
where
c2 @@ tsq
and
c1<>var_c1
order by c2 <=> tsq
limit v_limit_perlevel
) t where t.pagerank < v_rank ;
begin
if v_level <=0 then
raise notice 'level must >=1';
return;
end if;
-- Note: In version 9.6, frequent creation and deletion of inner temp tables may still cause garbage to be generated in catalog.
-- To temporarily store the relations at each level from the start point
-- Currently PL/pgSQL doesn't support stream returning, even if return next or return query is used
-- https://www.postgresql.org/docs/9.6/static/plpgsql-control-structures.html
create temp table if not exists tmp2(level int, c1 int, c2 int, pagerank numeric, primary key(c1,c2)) ON COMMIT delete rows;
create index if not exists idx_tmp2_1 on tmp2(level, c2);
-- To store the data of the initial level (namely the start point)
for rec in ref(i,v_c1) loop
insert into tmp2 values (rec.level, rec.c1, rec.c2, rec.pagerank) on conflict do nothing;
if found then
raise notice 'level: %, c1: %, c2:% ,pagerank: %', rec.level, rec.c1, rec.c2, rec.pagerank;
return next rec;
end if;
end loop;
loop
i := i+1;
-- All levels of data has been found
if i > v_level then
return;
end if;
for i_c1 in select t2.c1 from rum1 t2 JOIN tmp2 t1 on (t1.c2=t2.c1 and t1.level=i-1 and t2.c1<>v_c1) where not exists (select 1 from tmp2 where tmp2.c1=t2.c1) group by 1
loop
for rec in ref(i,i_c1) loop
insert into tmp2 values (rec.level, rec.c1, rec.c2, rec.pagerank) on conflict do nothing;
if found then
raise notice 'level: %, c1: %, c2:% ,pagerank: %', rec.level, rec.c1, rec.c2, rec.pagerank;
return next rec;
end if;
end loop;
end loop; end loop;
return;
end;
$$
language plpgsql strict;
postgres=# select * from find_rel_pagerank_cur(96807211,2,10000,10) as t(level int, c1 int, c2 int, pagerank numeric);
NOTICE: relation "tmp2" already exists, skipping
NOTICE: relation "idx_tmp2_1" already exists, skipping
NOTICE: level: 1, c1: 96807211, c2:96810420 ,pagerank: 5.48311
NOTICE: level: 1, c1: 96807211, c2:96849305 ,pagerank: 5.48311
NOTICE: level: 1, c1: 96807211, c2:96810740 ,pagerank: 5.48311
NOTICE: level: 1, c1: 96807211, c2:96839717 ,pagerank: 5.48311
NOTICE: level: 1, c1: 96807211, c2:96849378 ,pagerank: 5.48311
NOTICE: level: 1, c1: 96807211, c2:96800097 ,pagerank: 5.48311
NOTICE: level: 1, c1: 96807211, c2:96832351 ,pagerank: 5.48311
NOTICE: level: 1, c1: 96807211, c2:96839438 ,pagerank: 5.48311
NOTICE: level: 1, c1: 96807211, c2:96816466 ,pagerank: 5.48311
NOTICE: level: 1, c1: 96807211, c2:96836416 ,pagerank: 5.48311
NOTICE: level: 2, c1: 96800097, c2:96812430 ,pagerank: 4.11234
NOTICE: level: 2, c1: 96800097, c2:96802051 ,pagerank: 5.48311
NOTICE: level: 2, c1: 96800097, c2:96824209 ,pagerank: 5.48311
......

Postgresql Improvements to Be Made

1. Although the insert into…returning… cursor supports stream returning, currently results are not returned until the insert operation is completed. The insert and return operations cannot be performed at the same time.

2. Currently the with recursive query can only support querying the intermediate status of work tables. It is recommended to add support for full work table queries.

3. It would be better if the with recursive query supported LOOPth variables so that users can know which loop they are currently in.

4. return next and return query in PL/pgSQL supports stream returning. (Currently only C interfaces can be written to return stream data in functions. As shown in the manual, PL/pgSQL may add support for stream returning later.)

5. Index interfaces in PostgreSQL are completely open. Users can customize how to organize index storage and retrieve data.

For the application of graph databases, many other factors can be improved in terms of efficiency. For example, currently the closeness refers to the closeness scores of the relations separated by one degree. (This scoring can be quickly implemented by using the similarity sorting of RUM.)

But how can we efficiently score the closeness of relations separated by two degrees or higher degrees?

The method for one degree of relations certainly cannot enable high multi-round retrieval and query efficiency.

In this case, open PostgreSQL interfaces show its advantages. Users can customize more efficient index organizations and structures based on the actual scenarios.

For more information, refer to how the RUM and bloom indexes are written.

https://github.com/postgrespro/rum

https://www.postgresql.org/docs/9.6/static/xindex.html

https://www.postgresql.org/docs/9.6/static/bloom.html

Original Source

--

--

Alibaba Cloud
Alibaba Cloud

Written by Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

No responses yet