How to Optimize Duplicate Data Cleansing in PostgreSQL

Alibaba Cloud
12 min readJun 9, 2020

Background

Duplicate data cleansing is common business demand. For example, some databases do not support unique constraints, or unique constraints on certain columns are not taken into consideration during the program design, which leads to duplicate data after the application runs for a time. This sparks the demand for duplicate data cleansing.

It’s critical to ascertain which cleansing approaches are efficient. A small application scenario involves 10 technical database issues. Let’s consider each issue in detail in the following sections.

Duplicate Data Cleansing Techniques

For example, assume there is a table with several unique fields that have duplicate values. Now the objective is to keep one of the duplicate values and remove the others.

Example

postgres=# create table tbl_dup(   
id serial8,
sid int,
crt_time timestamp,
mdf_time timestamp,
c1 text default md5(random()::text),
c2 text default md5(random()::text),
c3 text default md5(random()::text),
c4 text default md5(random()::text),
c5 text default md5(random()::text),
c6 text default md5(random()::text),
c7 text default md5(random()::text),
c8 text default md5(random()::text)
);

Remove duplicate (sid + crt_time) combinations and keep the value with the largest mdf_time among the duplicate combinations. One million records of testing data are generated with a 1/10 repeatability rate.

A duplicate value is generated after every 500 records of data to avoid duplicate data in the same data block. The following SQL statement helps to accomplish this.

insert into tbl_dup (sid, crt_time, mdf_time)   
select
case when mod(id,11)=0 then id+500 else id end,
case when mod(id,11)=0 then now()+(''||id+500||' s')::interval else now()+(''||id||' s')::interval end,
clock_timestamp()
from generate_series(1,1000000) t(id);

Verify the ctid to make sure duplicate data is not in the same data block.

Use a window query as the verification method.

postgres=# select * from (select ctid,sid,crt_time,mdf_time, count(*) over(partition by sid,crt_time) as cnt from tbl_dup) t where t.cnt>=2;  
ctid | sid | crt_time | mdf_time | cnt
------------+--------+----------------------------+----------------------------+-----
(0,11) | 511 | 2016-12-29 17:42:13.935348 | 2016-12-29 17:33:43.092625 | 2
(20,11) | 511 | 2016-12-29 17:42:13.935348 | 2016-12-29 17:33:43.102726 | 2
(20,22) | 522 | 2016-12-29 17:42:24.935348 | 2016-12-29 17:33:43.102927 | 2
(0,22) | 522 | 2016-12-29 17:42:24.935348 | 2016-12-29 17:33:43.09283 | 2
(21,8) | 533 | 2016-12-29 17:42:35.935348 | 2016-12-29 17:33:43.103155 | 2
(1,8) | 533 | 2016-12-29 17:42:35.935348 | 2016-12-29 17:33:43.093191 | 2
(21,19) | 544 | 2016-12-29 17:42:46.935348 | 2016-12-29 17:33:43.103375 | 2
(1,19) | 544 | 2016-12-29 17:42:46.935348 | 2016-12-29 17:33:43.093413 | 2
....

The above snippet shows that there are many duplicate values.

postgres=# select count(*) from (select * from (select ctid,sid,crt_time,mdf_time, count(*) over(partition by sid,crt_time) as cnt from tbl_dup) t where t.cnt=2) t;  
count
--------
181726
(1 row)
Time: 1690.709 ms

Next, let’s remove the duplicates.

Method 1) Insert

Insert the deduplication results in a new table. This takes 5.8 seconds.

create table tbl_uniq(like tbl_dup including all);  

insert into tbl_uniq (id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8)
select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from
(select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from tbl_dup) t
where t.rn=1;

INSERT 0 909137
Time: 5854.349 ms

Check what optimization is possible. According to the analysis results, sorting should be optimized.

postgres=# explain (analyze,verbose,timing,costs,buffers)  insert into tbl_uniq (id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8)  
select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from
(select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from tbl_dup) t
where t.rn=1;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Insert on public.tbl_uniq (cost=423098.84..458098.84 rows=5000 width=292) (actual time=5994.723..5994.723 rows=0 loops=1)
Buffers: shared hit=1021856 read=36376 dirtied=36375, temp read=37391 written=37391
-> Subquery Scan on t (cost=423098.84..458098.84 rows=5000 width=292) (actual time=1715.278..3620.269 rows=909137 loops=1)
Output: t.id, t.sid, t.crt_time, t.mdf_time, t.c1, t.c2, t.c3, t.c4, t.c5, t.c6, t.c7, t.c8
Filter: (t.rn = 1)
Rows Removed by Filter: 90863
Buffers: shared hit=40000, temp read=37391 written=37391
-> WindowAgg (cost=423098.84..445598.84 rows=1000000 width=300) (actual time=1715.276..3345.392 rows=1000000 loops=1)
Output: row_number() OVER (?), tbl_dup.id, tbl_dup.sid, tbl_dup.crt_time, tbl_dup.mdf_time, tbl_dup.c1, tbl_dup.c2, tbl_dup.c3, tbl_dup.c4, tbl_dup.c5, tbl_dup.c6, tbl_dup.c7, tbl_dup.c8
Buffers: shared hit=40000, temp read=37391 written=37391
-> Sort (cost=423098.84..425598.84 rows=1000000 width=292) (actual time=1715.263..2174.426 rows=1000000 loops=1)
Output: tbl_dup.sid, tbl_dup.crt_time, tbl_dup.mdf_time, tbl_dup.id, tbl_dup.c1, tbl_dup.c2, tbl_dup.c3, tbl_dup.c4, tbl_dup.c5, tbl_dup.c6, tbl_dup.c7, tbl_dup.c8
Sort Key: tbl_dup.sid, tbl_dup.crt_time, tbl_dup.mdf_time DESC
Sort Method: external sort Disk: 299128kB
Buffers: shared hit=40000, temp read=37391 written=37391
-> Seq Scan on public.tbl_dup (cost=0.00..50000.00 rows=1000000 width=292) (actual time=0.012..398.007 rows=1000000 loops=1)
Output: tbl_dup.sid, tbl_dup.crt_time, tbl_dup.mdf_time, tbl_dup.id, tbl_dup.c1, tbl_dup.c2, tbl_dup.c3, tbl_dup.c4, tbl_dup.c5, tbl_dup.c6, tbl_dup.c7, tbl_dup.c8
Buffers: shared hit=40000
Planning time: 0.174 ms
Execution time: 6120.921 ms
(20 rows)

Optimization 1

Cancel sorting of indexes.

After optimization, the process only takes 3.9 seconds.

For online businesses, PostgreSQL allows using CONCURRENTLY to create indexes concurrently without congesting the DML.

postgres=# create index CONCURRENTLY idx_tbl_dup on tbl_dup(sid,crt_time,mdf_time desc);  
CREATE INDEX
Time: 765.426 ms

postgres=# truncate tbl_uniq;
TRUNCATE TABLE
Time: 208.808 ms
postgres=# insert into tbl_uniq (id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8)
select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from
(select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from tbl_dup) t
where t.rn=1;
INSERT 0 909137
Time: 3978.425 ms

postgres=# explain (analyze,verbose,timing,costs,buffers) insert into tbl_uniq (id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8)
select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from
(select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from tbl_dup) t
where t.rn=1;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Insert on public.tbl_uniq (cost=0.42..159846.13 rows=5000 width=292) (actual time=4791.360..4791.360 rows=0 loops=1)
Buffers: shared hit=1199971 read=41303 dirtied=36374
-> Subquery Scan on t (cost=0.42..159846.13 rows=5000 width=292) (actual time=0.061..2177.768 rows=909137 loops=1)
Output: t.id, t.sid, t.crt_time, t.mdf_time, t.c1, t.c2, t.c3, t.c4, t.c5, t.c6, t.c7, t.c8
Filter: (t.rn = 1)
Rows Removed by Filter: 90863
Buffers: shared hit=218112 read=4929
-> WindowAgg (cost=0.42..147346.13 rows=1000000 width=300) (actual time=0.060..1901.174 rows=1000000 loops=1)
Output: row_number() OVER (?), tbl_dup.id, tbl_dup.sid, tbl_dup.crt_time, tbl_dup.mdf_time, tbl_dup.c1, tbl_dup.c2, tbl_dup.c3, tbl_dup.c4, tbl_dup.c5, tbl_dup.c6, tbl_dup.c7, tbl_dup.c8
Buffers: shared hit=218112 read=4929
-> Index Scan using idx_tbl_dup on public.tbl_dup (cost=0.42..127346.13 rows=1000000 width=292) (actual time=0.051..601.249 rows=1000000 loops=1)
Output: tbl_dup.id, tbl_dup.sid, tbl_dup.crt_time, tbl_dup.mdf_time, tbl_dup.c1, tbl_dup.c2, tbl_dup.c3, tbl_dup.c4, tbl_dup.c5, tbl_dup.c6, tbl_dup.c7, tbl_dup.c8
Buffers: shared hit=218112 read=4929
Planning time: 0.304 ms
Execution time: 4834.392 ms
(15 rows)
Time: 4835.484 ms

Optimization 2

Recursive query and convergence.

In certain cases, this method improves performance hundreds of times over. Use the following method to achieve better results when there are many duplicates.

with recursive skip as (    
(
select tbl_dup as tbl_dup from tbl_dup where (sid,crt_time,mdf_time) in (select sid,crt_time,mdf_time from tbl_dup order by sid,crt_time,mdf_time desc limit 1)
)
union all
(
select (
select tbl_dup from tbl_dup where (sid,crt_time,mdf_time) in (select sid,crt_time,mdf_time from tbl_dup t where t.sid>(s.tbl_dup).sid or (t.sid=(s.tbl_dup).sid and t.crt_time>(s.tbl_dup).crt_time) and t.sid is not null order by t.sid,t.crt_time,t.mdf_time desc limit 1)
) from skip s where (s.tbl_dup).sid is not null
) -- 这里的where (s.tbl_dup).sid is not null 一定要加, 否则就死循环了.
)
select (t.tbl_dup).sid, (t.tbl_dup).crt_time from skip t where t.* is not null;

Use it as shown below when there is UK.

with recursive skip as (    
(
select tbl_dup as tbl_dup from tbl_dup where (id) in (select id from tbl_dup order by sid,crt_time,mdf_time desc limit 1)
)
union all
(
select (
select tbl_dup from tbl_dup where id in (select id from tbl_dup t where t.sid>(s.tbl_dup).sid or (t.sid=(s.tbl_dup).sid and t.crt_time>(s.tbl_dup).crt_time) and t.id is not null order by t.sid,t.crt_time,t.mdf_time desc limit 1)
) from skip s where (s.tbl_dup).id is not null
) -- 这里的where (s.tbl_dup).id is not null 一定要加, 否则就死循环了.
)
select (t.tbl_dup).sid, (t.tbl_dup).crt_time from skip t where t.* is not null;

Method 2) Delete

Add a row_number field while importing the table to be processed and create a partial index with where row_number<>1.

Delete this part of the record. The requirement is fulfilled within two seconds.

postgres=# delete from tbl_dup where (sid,crt_time,mdf_time) in (select sid,crt_time,mdf_time from (select sid,crt_time,mdf_time,row_number() over(partition by sid,crt_time order by mdf_time desc) as rn from tbl_dup) t where t.rn<>1);  

DELETE 90863
Time: 2079.588 ms


postgres=# explain delete from tbl_dup where (sid,crt_time,mdf_time) in (select sid,crt_time,mdf_time from (select sid,crt_time,mdf_time,row_number() over(partition by sid,crt_time order by mdf_time desc) as rn from tbl_dup) t where t.rn<>1);
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------
Delete on tbl_dup (cost=187947.63..283491.75 rows=995000 width=50)
-> Hash Semi Join (cost=187947.63..283491.75 rows=995000 width=50)
Hash Cond: ((tbl_dup.sid = t.sid) AND (tbl_dup.crt_time = t.crt_time) AND (tbl_dup.mdf_time = t.mdf_time))
-> Seq Scan on tbl_dup (cost=0.00..50000.00 rows=1000000 width=26)
-> Hash (cost=159846.13..159846.13 rows=995000 width=64)
-> Subquery Scan on t (cost=0.42..159846.13 rows=995000 width=64)
Filter: (t.rn <> 1)
-> WindowAgg (cost=0.42..147346.13 rows=1000000 width=28)
-> Index Only Scan using idx_tbl_dup on tbl_dup tbl_dup_1 (cost=0.42..127346.13 rows=1000000 width=20)
(9 rows)

Verification

postgres=# select count(*) , count(distinct (sid,crt_time)) from tbl_dup;  
count | count
--------+--------
909137 | 909137
(1 row)

Fast Method 1

If the duplicate data comes from text, import the text into the database after deduplication and then export the text.

First, consider the file external tables and then the copy tunnel. Refer to the documentation for more details.

postgres=# create extension file_fdw;  
CREATE EXTENSION


postgres=# copy tbl_dup to '/home/digoal/tbl_dup.csv' ;
COPY 1000000

postgres=# create server file foreign data wrapper file_fdw;
CREATE SERVER

CREATE FOREIGN TABLE ft_tbl_dup (
id serial8,
sid int,
crt_time timestamp,
mdf_time timestamp,
c1 text default md5(random()::text),
c2 text default md5(random()::text),
c3 text default md5(random()::text),
c4 text default md5(random()::text),
c5 text default md5(random()::text),
c6 text default md5(random()::text),
c7 text default md5(random()::text),
c8 text default md5(random()::text)
) server file options (filename '/home/digoal/tbl_dup.csv' );

postgres=# copy (select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from
(select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from ft_tbl_dup) t
where t.rn=1) to '/home/digoal/tbl_uniq.csv';

COPY 909137
Time: 10973.289 ms

Since the speed is not fast enough. Therefore, let’s try an optimization method.

Optimization of Concurrent Processing

If the file is split into multiple files for concurrent processing, the required time reduces to around 800 milliseconds. But this is not the end. Perform merge sort for global deduplication.

split -l 50000 tbl_dup.csv load_test_  

for i in `ls load_test_??`
do
psql <<EOF &
drop foreign table "ft_$i";
CREATE FOREIGN TABLE "ft_$i" (
id serial8,
sid int,
crt_time timestamp,
mdf_time timestamp,
c1 text default md5(random()::text),
c2 text default md5(random()::text),
c3 text default md5(random()::text),
c4 text default md5(random()::text),
c5 text default md5(random()::text),
c6 text default md5(random()::text),
c7 text default md5(random()::text),
c8 text default md5(random()::text)
) server file options (filename '/home/digoal/$i' );

\timing

copy (select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from
(select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from "ft_$i") t
where t.rn=1) to '/home/digoal/uniq_csv.$i';

EOF
done

The accelerated speed helps to complete the process in 1 second. Further, increase the concurrency to reduce the total time to around 200 milliseconds.

COPY 45500  
Time: 764.978 ms
COPY 45500
Time: 683.255 ms
COPY 45500
Time: 775.625 ms
COPY 45500
Time: 733.227 ms
COPY 45500
Time: 750.978 ms
COPY 45500
Time: 766.984 ms
COPY 45500
Time: 796.796 ms
COPY 45500
Time: 797.016 ms
COPY 45500
Time: 881.682 ms
COPY 45500
Time: 794.691 ms
COPY 45500
Time: 812.932 ms
COPY 45500
Time: 921.792 ms
COPY 45500
Time: 890.095 ms
COPY 45500
Time: 845.815 ms
COPY 45500
Time: 867.456 ms
COPY 45500
Time: 874.979 ms
COPY 45500
Time: 882.578 ms
COPY 45500
Time: 880.131 ms
COPY 45500
Time: 901.515 ms
COPY 45500
Time: 904.857 ms

Therefore, concurrency is not enough.

Let’s look at another method.

Fast Method 2

Import single tables concurrently for processing and then export them without saving the intermediate results. This implies using UNLOGGED TABLE.

CREATE unlogged TABLE tmp (   
id serial8,
sid int,
crt_time timestamp,
mdf_time timestamp,
c1 text default md5(random()::text),
c2 text default md5(random()::text),
c3 text default md5(random()::text),
c4 text default md5(random()::text),
c5 text default md5(random()::text),
c6 text default md5(random()::text),
c7 text default md5(random()::text),
c8 text default md5(random()::text)
) with (autovacuum_enabled=off, toast.autovacuum_enabled=off);

create index idx_tmp_1 on tmp (sid,crt_time,mdf_time desc);
split -l 20000 tbl_dup.csv load_test_
date +%F%T.%N

for i in `ls load_test_??`
do
psql <<EOF &
truncate tmp;
copy tmp from '/home/digoal/$i';

EOF
done

for ((i=1;i>0;i=1))
do
sleep 0.0001
cnt=`ps -ewf|grep -v grep|grep -c psql`
if [ $cnt -eq 0 ]; then
break
fi
done

psql <<EOF
copy (select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from
(select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from tmp) t
where t.rn=1) to '/dev/shm/tbl_uniq.csv';
EOF

date +%F%T.%N
2016-12-3000:59:42.309126109
2016-12-3000:59:47.589134168

It takes 5.28 seconds.

Concurrent Method, Stream Processing — Event Processing

CREATE stream stream_dup (   
id int8,
sid int,
crt_time timestamp,
mdf_time timestamp,
c1 text default md5(random()::text),
c2 text default md5(random()::text),
c3 text default md5(random()::text),
c4 text default md5(random()::text),
c5 text default md5(random()::text),
c6 text default md5(random()::text),
c7 text default md5(random()::text),
c8 text default md5(random()::text)
) ;

CREATE unlogged table tbl_uniq (
id serial8,
sid int,
crt_time timestamp,
mdf_time timestamp,
c1 text default md5(random()::text),
c2 text default md5(random()::text),
c3 text default md5(random()::text),
c4 text default md5(random()::text),
c5 text default md5(random()::text),
c6 text default md5(random()::text),
c7 text default md5(random()::text),
c8 text default md5(random()::text) ,
unique (sid,crt_time)
) with (autovacuum_enabled=off, toast.autovacuum_enabled=off);
create or replace function filter() returns trigger as $$
declare
begin
insert into tbl_uniq values (NEW.id,NEW.sid, NEW.crt_time,NEW.mdf_time,NEW.c1,NEW.c2,NEW.c3,NEW.c4,NEW.c5,NEW.c6,NEW.c7,NEW.c8) on conflict (sid,crt_time) do update set
id=excluded.id, mdf_time=excluded.mdf_time, c1=excluded.c1,c2=excluded.c2,c3=excluded.c3,c4=excluded.c4,c5=excluded.c5,c6=excluded.c6,c7=excluded.c7,c8=excluded.c8
where tbl_uniq.mdf_time<excluded.mdf_time;
return new;
end;
\$\$ language plpgsql strict;
CREATE CONTINUOUS TRANSFORM ct AS
SELECT id::int8,sid::int,crt_time::timestamp,mdf_time::timestamp,c1::text,c2::text,c3::text,c4::text,c5::text,c6::text,c7::text,c8::text FROM stream_dup
THEN EXECUTE PROCEDURE filter();

activate;

Well, this allows writing data into streams concurrently.

How to Clear Duplicate Rows without a Unique ID?

Delete rows by physical row numbers as shown below.

create index idx1 on tbl_dup(ctid);pipeline=# explain delete from tbl_dup where (ctid) in (select ctid from (select ctid,row_number() over(partition by sid,crt_time order by ctid desc) as rn from tbl_dup) t where t.rn<>1); 
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------
Delete on tbl_dup (cost=673139.27..683574.38 rows=1000000 width=36)
-> Nested Loop (cost=673139.27..683574.38 rows=1000000 width=36)
-> Unique (cost=673138.84..683088.84 rows=199 width=36)
-> Sort (cost=673138.84..678113.84 rows=1990000 width=36)
Sort Key: t.ctid
-> Subquery Scan on t (cost=332753.69..402753.69 rows=1990000 width=36)
Filter: (t.rn <> 1)
-> WindowAgg (cost=332753.69..377753.69 rows=2000000 width=18)
-> Sort (cost=332753.69..337753.69 rows=2000000 width=18)
Sort Key: tbl_dup_1.sid, tbl_dup_1.crt_time, tbl_dup_1.ctid DESC
-> Seq Scan on tbl_dup tbl_dup_1 (cost=0.00..100000.00 rows=2000000 width=18)
-> Index Only Scan using idx1 on tbl_dup (cost=0.43..2.43 rows=1 width=6)
Index Cond: (ctid = t.ctid)
(13 rows)
Time: 1.402 ms
pipeline=# delete from tbl_dup where (ctid) in (select ctid from (select ctid,row_number() over(partition by sid,crt_time order by ctid desc) as rn from tbl_dup) t where t.rn<>1);
DELETE 181726
Time: 3316.990 ms

Optimization Methods of Duplicate Data Cleansing — Technical Issues

The preceding sections have already introduced different optimization methods. Now let’s review each technique.

1) Window Query

The window query function is used to filter and mark duplicate values. Start by creating the composite index with the to-be-deduplicated fields as the window and the rule field as the sorting field.

2) External Table

For the cases where data is from the text, adopt the fast method to deduplicate it. Use the database as a text processing platform, access the file through the file_fdw external table of PostgreSQL, and remove duplicates in SQL.

3) Parallel Computing

Also, when the data is from the text, split the text into several small files and use external tables for concurrent deduplication. However, after deduplication, perform merge sort for global deduplication. PostgreSQL 9.6 supports the use of multiple CPU cores to process a single query, which will linearly improve the performance. (Merge should be considered for deduplication.

4) Recursive Query and Convergence

Recursive query delivers significant improvement in scenarios with high repeatability. It improves performance from dozens of times to hundreds of times.

5) Insert on Conflict

This is a new feature of PostgreSQL 9.5, which completes deduplication upon data import. The deduplication results are exported directly.

CREATE unlogged TABLE tmp_uniq (   
id serial8,
sid int,
crt_time timestamp,
mdf_time timestamp,
c1 text default md5(random()::text),
c2 text default md5(random()::text),
c3 text default md5(random()::text),
c4 text default md5(random()::text),
c5 text default md5(random()::text),
c6 text default md5(random()::text),
c7 text default md5(random()::text),
c8 text default md5(random()::text),
unique (sid,crt_time)
) with (autovacuum_enabled=off, toast.autovacuum_enabled=off);

Concurrent loading (You cannot update one data record repeatedly in one query.)

ERROR:  21000: ON CONFLICT DO UPDATE command cannot affect row a second time  
HINT: Ensure that no rows proposed for insertion within the same command have duplicate constrained values.
LOCATION: ExecOnConflictUpdate, nodeModifyTable.c:1133
split -l 20000 tbl_dup.csv load_test_

for i in `ls load_test_??`
do
psql <<EOF &
drop foreign table "ft_$i";

CREATE FOREIGN TABLE "ft_$i" (
id serial8,
sid int,
crt_time timestamp,
mdf_time timestamp,
c1 text default md5(random()::text),
c2 text default md5(random()::text),
c3 text default md5(random()::text),
c4 text default md5(random()::text),
c5 text default md5(random()::text),
c6 text default md5(random()::text),
c7 text default md5(random()::text),
c8 text default md5(random()::text)
) server file options (filename '/home/digoal/$i' );

\timing

insert into tmp_uniq select * from "ft_$i" on conflict do update set
id=excluded.id, sid=excluded.sid, crt_time=excluded.crt_time, mdf_time=excluded.mdf_time,
c1=excluded.c1,c2=excluded.c2,c3=excluded.c3,c4=excluded.c4,c5=excluded.c5,c6=excluded.c6,c7=excluded.c7,c8=excluded.c8
where mdf_time<excluded.mdf_time
;

EOF
done

6) LLVM

This reduces context switching in multi-row processing, doubling the performance.

7) Stream Computing

It allows streaming deduplication upon data import. Refer to the following article for more details:

create stream ss_uniq (  
id int8,
sid int,
crt_time timestamp,
mdf_time timestamp,
c1 text default md5(random()::text),
c2 text default md5(random()::text),
c3 text default md5(random()::text),
c4 text default md5(random()::text),
c5 text default md5(random()::text),
c6 text default md5(random()::text),
c7 text default md5(random()::text),
c8 text default md5(random()::text)
);
CREATE CONTINUOUS VIEW cv_uniq as
select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from ss_uniq;

8) Concurrent Index Creation

Concurrently create indexes to avoid DML operation congestion. Increasing the maintenance_work_mem during index creation accelerates creation.

9) Concurrent Reading of File Fragments for Import

Split the file for concurrent import to speed up the import. In the future, achieve concurrent file fragment importation by using external access interfaces such as file_fdw.

10) Bulk Load Without Logon

Bulk imports or unlogged tables are applied if the database is only used for calculation, that is when the intermediate processing results of the database do not need to be retained. This improves the import speed, and the auto vacuum is disabled during the import.

Summary

The following two points summarize the article effectively:

  1. If data already exists in the database, deleting the duplicate data in the original table takes around 2 seconds.
  2. If data is imported from text and then exported after deduplication, the process takes around 5.28 seconds.

Original Source:

--

--

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com