How to Optimize Duplicate Data Cleansing in PostgreSQL

Image for post
Image for post

Background

Duplicate Data Cleansing Techniques

Example

postgres=# create table tbl_dup(   
id serial8,
sid int,
crt_time timestamp,
mdf_time timestamp,
c1 text default md5(random()::text),
c2 text default md5(random()::text),
c3 text default md5(random()::text),
c4 text default md5(random()::text),
c5 text default md5(random()::text),
c6 text default md5(random()::text),
c7 text default md5(random()::text),
c8 text default md5(random()::text)
);
insert into tbl_dup (sid, crt_time, mdf_time)   
select
case when mod(id,11)=0 then id+500 else id end,
case when mod(id,11)=0 then now()+(''||id+500||' s')::interval else now()+(''||id||' s')::interval end,
clock_timestamp()
from generate_series(1,1000000) t(id);
postgres=# select * from (select ctid,sid,crt_time,mdf_time, count(*) over(partition by sid,crt_time) as cnt from tbl_dup) t where t.cnt>=2;  
ctid | sid | crt_time | mdf_time | cnt
------------+--------+----------------------------+----------------------------+-----
(0,11) | 511 | 2016-12-29 17:42:13.935348 | 2016-12-29 17:33:43.092625 | 2
(20,11) | 511 | 2016-12-29 17:42:13.935348 | 2016-12-29 17:33:43.102726 | 2
(20,22) | 522 | 2016-12-29 17:42:24.935348 | 2016-12-29 17:33:43.102927 | 2
(0,22) | 522 | 2016-12-29 17:42:24.935348 | 2016-12-29 17:33:43.09283 | 2
(21,8) | 533 | 2016-12-29 17:42:35.935348 | 2016-12-29 17:33:43.103155 | 2
(1,8) | 533 | 2016-12-29 17:42:35.935348 | 2016-12-29 17:33:43.093191 | 2
(21,19) | 544 | 2016-12-29 17:42:46.935348 | 2016-12-29 17:33:43.103375 | 2
(1,19) | 544 | 2016-12-29 17:42:46.935348 | 2016-12-29 17:33:43.093413 | 2
....
postgres=# select count(*) from (select * from (select ctid,sid,crt_time,mdf_time, count(*) over(partition by sid,crt_time) as cnt from tbl_dup) t where t.cnt=2) t;  
count
--------
181726
(1 row)
Time: 1690.709 ms
create table tbl_uniq(like tbl_dup including all);  

insert into tbl_uniq (id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8)
select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from
(select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from tbl_dup) t
where t.rn=1;

INSERT 0 909137
Time: 5854.349 ms
postgres=# explain (analyze,verbose,timing,costs,buffers)  insert into tbl_uniq (id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8)  
select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from
(select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from tbl_dup) t
where t.rn=1;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Insert on public.tbl_uniq (cost=423098.84..458098.84 rows=5000 width=292) (actual time=5994.723..5994.723 rows=0 loops=1)
Buffers: shared hit=1021856 read=36376 dirtied=36375, temp read=37391 written=37391
-> Subquery Scan on t (cost=423098.84..458098.84 rows=5000 width=292) (actual time=1715.278..3620.269 rows=909137 loops=1)
Output: t.id, t.sid, t.crt_time, t.mdf_time, t.c1, t.c2, t.c3, t.c4, t.c5, t.c6, t.c7, t.c8
Filter: (t.rn = 1)
Rows Removed by Filter: 90863
Buffers: shared hit=40000, temp read=37391 written=37391
-> WindowAgg (cost=423098.84..445598.84 rows=1000000 width=300) (actual time=1715.276..3345.392 rows=1000000 loops=1)
Output: row_number() OVER (?), tbl_dup.id, tbl_dup.sid, tbl_dup.crt_time, tbl_dup.mdf_time, tbl_dup.c1, tbl_dup.c2, tbl_dup.c3, tbl_dup.c4, tbl_dup.c5, tbl_dup.c6, tbl_dup.c7, tbl_dup.c8
Buffers: shared hit=40000, temp read=37391 written=37391
-> Sort (cost=423098.84..425598.84 rows=1000000 width=292) (actual time=1715.263..2174.426 rows=1000000 loops=1)
Output: tbl_dup.sid, tbl_dup.crt_time, tbl_dup.mdf_time, tbl_dup.id, tbl_dup.c1, tbl_dup.c2, tbl_dup.c3, tbl_dup.c4, tbl_dup.c5, tbl_dup.c6, tbl_dup.c7, tbl_dup.c8
Sort Key: tbl_dup.sid, tbl_dup.crt_time, tbl_dup.mdf_time DESC
Sort Method: external sort Disk: 299128kB
Buffers: shared hit=40000, temp read=37391 written=37391
-> Seq Scan on public.tbl_dup (cost=0.00..50000.00 rows=1000000 width=292) (actual time=0.012..398.007 rows=1000000 loops=1)
Output: tbl_dup.sid, tbl_dup.crt_time, tbl_dup.mdf_time, tbl_dup.id, tbl_dup.c1, tbl_dup.c2, tbl_dup.c3, tbl_dup.c4, tbl_dup.c5, tbl_dup.c6, tbl_dup.c7, tbl_dup.c8
Buffers: shared hit=40000
Planning time: 0.174 ms
Execution time: 6120.921 ms
(20 rows)
postgres=# create index CONCURRENTLY idx_tbl_dup on tbl_dup(sid,crt_time,mdf_time desc);  
CREATE INDEX
Time: 765.426 ms

postgres=# truncate tbl_uniq;
TRUNCATE TABLE
Time: 208.808 ms
postgres=# insert into tbl_uniq (id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8)
select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from
(select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from tbl_dup) t
where t.rn=1;
INSERT 0 909137
Time: 3978.425 ms

postgres=# explain (analyze,verbose,timing,costs,buffers) insert into tbl_uniq (id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8)
select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from
(select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from tbl_dup) t
where t.rn=1;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Insert on public.tbl_uniq (cost=0.42..159846.13 rows=5000 width=292) (actual time=4791.360..4791.360 rows=0 loops=1)
Buffers: shared hit=1199971 read=41303 dirtied=36374
-> Subquery Scan on t (cost=0.42..159846.13 rows=5000 width=292) (actual time=0.061..2177.768 rows=909137 loops=1)
Output: t.id, t.sid, t.crt_time, t.mdf_time, t.c1, t.c2, t.c3, t.c4, t.c5, t.c6, t.c7, t.c8
Filter: (t.rn = 1)
Rows Removed by Filter: 90863
Buffers: shared hit=218112 read=4929
-> WindowAgg (cost=0.42..147346.13 rows=1000000 width=300) (actual time=0.060..1901.174 rows=1000000 loops=1)
Output: row_number() OVER (?), tbl_dup.id, tbl_dup.sid, tbl_dup.crt_time, tbl_dup.mdf_time, tbl_dup.c1, tbl_dup.c2, tbl_dup.c3, tbl_dup.c4, tbl_dup.c5, tbl_dup.c6, tbl_dup.c7, tbl_dup.c8
Buffers: shared hit=218112 read=4929
-> Index Scan using idx_tbl_dup on public.tbl_dup (cost=0.42..127346.13 rows=1000000 width=292) (actual time=0.051..601.249 rows=1000000 loops=1)
Output: tbl_dup.id, tbl_dup.sid, tbl_dup.crt_time, tbl_dup.mdf_time, tbl_dup.c1, tbl_dup.c2, tbl_dup.c3, tbl_dup.c4, tbl_dup.c5, tbl_dup.c6, tbl_dup.c7, tbl_dup.c8
Buffers: shared hit=218112 read=4929
Planning time: 0.304 ms
Execution time: 4834.392 ms
(15 rows)
Time: 4835.484 ms
with recursive skip as (    
(
select tbl_dup as tbl_dup from tbl_dup where (sid,crt_time,mdf_time) in (select sid,crt_time,mdf_time from tbl_dup order by sid,crt_time,mdf_time desc limit 1)
)
union all
(
select (
select tbl_dup from tbl_dup where (sid,crt_time,mdf_time) in (select sid,crt_time,mdf_time from tbl_dup t where t.sid>(s.tbl_dup).sid or (t.sid=(s.tbl_dup).sid and t.crt_time>(s.tbl_dup).crt_time) and t.sid is not null order by t.sid,t.crt_time,t.mdf_time desc limit 1)
) from skip s where (s.tbl_dup).sid is not null
) -- 这里的where (s.tbl_dup).sid is not null 一定要加, 否则就死循环了.
)
select (t.tbl_dup).sid, (t.tbl_dup).crt_time from skip t where t.* is not null;
with recursive skip as (    
(
select tbl_dup as tbl_dup from tbl_dup where (id) in (select id from tbl_dup order by sid,crt_time,mdf_time desc limit 1)
)
union all
(
select (
select tbl_dup from tbl_dup where id in (select id from tbl_dup t where t.sid>(s.tbl_dup).sid or (t.sid=(s.tbl_dup).sid and t.crt_time>(s.tbl_dup).crt_time) and t.id is not null order by t.sid,t.crt_time,t.mdf_time desc limit 1)
) from skip s where (s.tbl_dup).id is not null
) -- 这里的where (s.tbl_dup).id is not null 一定要加, 否则就死循环了.
)
select (t.tbl_dup).sid, (t.tbl_dup).crt_time from skip t where t.* is not null;
postgres=# delete from tbl_dup where (sid,crt_time,mdf_time) in (select sid,crt_time,mdf_time from (select sid,crt_time,mdf_time,row_number() over(partition by sid,crt_time order by mdf_time desc) as rn from tbl_dup) t where t.rn<>1);  

DELETE 90863
Time: 2079.588 ms


postgres=# explain delete from tbl_dup where (sid,crt_time,mdf_time) in (select sid,crt_time,mdf_time from (select sid,crt_time,mdf_time,row_number() over(partition by sid,crt_time order by mdf_time desc) as rn from tbl_dup) t where t.rn<>1);
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------
Delete on tbl_dup (cost=187947.63..283491.75 rows=995000 width=50)
-> Hash Semi Join (cost=187947.63..283491.75 rows=995000 width=50)
Hash Cond: ((tbl_dup.sid = t.sid) AND (tbl_dup.crt_time = t.crt_time) AND (tbl_dup.mdf_time = t.mdf_time))
-> Seq Scan on tbl_dup (cost=0.00..50000.00 rows=1000000 width=26)
-> Hash (cost=159846.13..159846.13 rows=995000 width=64)
-> Subquery Scan on t (cost=0.42..159846.13 rows=995000 width=64)
Filter: (t.rn <> 1)
-> WindowAgg (cost=0.42..147346.13 rows=1000000 width=28)
-> Index Only Scan using idx_tbl_dup on tbl_dup tbl_dup_1 (cost=0.42..127346.13 rows=1000000 width=20)
(9 rows)
postgres=# select count(*) , count(distinct (sid,crt_time)) from tbl_dup;  
count | count
--------+--------
909137 | 909137
(1 row)

Fast Method 1

postgres=# create extension file_fdw;  
CREATE EXTENSION


postgres=# copy tbl_dup to '/home/digoal/tbl_dup.csv' ;
COPY 1000000

postgres=# create server file foreign data wrapper file_fdw;
CREATE SERVER

CREATE FOREIGN TABLE ft_tbl_dup (
id serial8,
sid int,
crt_time timestamp,
mdf_time timestamp,
c1 text default md5(random()::text),
c2 text default md5(random()::text),
c3 text default md5(random()::text),
c4 text default md5(random()::text),
c5 text default md5(random()::text),
c6 text default md5(random()::text),
c7 text default md5(random()::text),
c8 text default md5(random()::text)
) server file options (filename '/home/digoal/tbl_dup.csv' );

postgres=# copy (select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from
(select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from ft_tbl_dup) t
where t.rn=1) to '/home/digoal/tbl_uniq.csv';

COPY 909137
Time: 10973.289 ms
split -l 50000 tbl_dup.csv load_test_  

for i in `ls load_test_??`
do
psql <<EOF &
drop foreign table "ft_$i";
CREATE FOREIGN TABLE "ft_$i" (
id serial8,
sid int,
crt_time timestamp,
mdf_time timestamp,
c1 text default md5(random()::text),
c2 text default md5(random()::text),
c3 text default md5(random()::text),
c4 text default md5(random()::text),
c5 text default md5(random()::text),
c6 text default md5(random()::text),
c7 text default md5(random()::text),
c8 text default md5(random()::text)
) server file options (filename '/home/digoal/$i' );

\timing

copy (select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from
(select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from "ft_$i") t
where t.rn=1) to '/home/digoal/uniq_csv.$i';

EOF
done
COPY 45500  
Time: 764.978 ms
COPY 45500
Time: 683.255 ms
COPY 45500
Time: 775.625 ms
COPY 45500
Time: 733.227 ms
COPY 45500
Time: 750.978 ms
COPY 45500
Time: 766.984 ms
COPY 45500
Time: 796.796 ms
COPY 45500
Time: 797.016 ms
COPY 45500
Time: 881.682 ms
COPY 45500
Time: 794.691 ms
COPY 45500
Time: 812.932 ms
COPY 45500
Time: 921.792 ms
COPY 45500
Time: 890.095 ms
COPY 45500
Time: 845.815 ms
COPY 45500
Time: 867.456 ms
COPY 45500
Time: 874.979 ms
COPY 45500
Time: 882.578 ms
COPY 45500
Time: 880.131 ms
COPY 45500
Time: 901.515 ms
COPY 45500
Time: 904.857 ms

Fast Method 2

CREATE unlogged TABLE tmp (   
id serial8,
sid int,
crt_time timestamp,
mdf_time timestamp,
c1 text default md5(random()::text),
c2 text default md5(random()::text),
c3 text default md5(random()::text),
c4 text default md5(random()::text),
c5 text default md5(random()::text),
c6 text default md5(random()::text),
c7 text default md5(random()::text),
c8 text default md5(random()::text)
) with (autovacuum_enabled=off, toast.autovacuum_enabled=off);

create index idx_tmp_1 on tmp (sid,crt_time,mdf_time desc);
split -l 20000 tbl_dup.csv load_test_
date +%F%T.%N

for i in `ls load_test_??`
do
psql <<EOF &
truncate tmp;
copy tmp from '/home/digoal/$i';

EOF
done

for ((i=1;i>0;i=1))
do
sleep 0.0001
cnt=`ps -ewf|grep -v grep|grep -c psql`
if [ $cnt -eq 0 ]; then
break
fi
done

psql <<EOF
copy (select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from
(select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from tmp) t
where t.rn=1) to '/dev/shm/tbl_uniq.csv';
EOF

date +%F%T.%N
2016-12-3000:59:42.309126109
2016-12-3000:59:47.589134168

Concurrent Method, Stream Processing — Event Processing

CREATE stream stream_dup (   
id int8,
sid int,
crt_time timestamp,
mdf_time timestamp,
c1 text default md5(random()::text),
c2 text default md5(random()::text),
c3 text default md5(random()::text),
c4 text default md5(random()::text),
c5 text default md5(random()::text),
c6 text default md5(random()::text),
c7 text default md5(random()::text),
c8 text default md5(random()::text)
) ;

CREATE unlogged table tbl_uniq (
id serial8,
sid int,
crt_time timestamp,
mdf_time timestamp,
c1 text default md5(random()::text),
c2 text default md5(random()::text),
c3 text default md5(random()::text),
c4 text default md5(random()::text),
c5 text default md5(random()::text),
c6 text default md5(random()::text),
c7 text default md5(random()::text),
c8 text default md5(random()::text) ,
unique (sid,crt_time)
) with (autovacuum_enabled=off, toast.autovacuum_enabled=off);
create or replace function filter() returns trigger as $$
declare
begin
insert into tbl_uniq values (NEW.id,NEW.sid, NEW.crt_time,NEW.mdf_time,NEW.c1,NEW.c2,NEW.c3,NEW.c4,NEW.c5,NEW.c6,NEW.c7,NEW.c8) on conflict (sid,crt_time) do update set
id=excluded.id, mdf_time=excluded.mdf_time, c1=excluded.c1,c2=excluded.c2,c3=excluded.c3,c4=excluded.c4,c5=excluded.c5,c6=excluded.c6,c7=excluded.c7,c8=excluded.c8
where tbl_uniq.mdf_time<excluded.mdf_time;
return new;
end;
\$\$ language plpgsql strict;
CREATE CONTINUOUS TRANSFORM ct AS
SELECT id::int8,sid::int,crt_time::timestamp,mdf_time::timestamp,c1::text,c2::text,c3::text,c4::text,c5::text,c6::text,c7::text,c8::text FROM stream_dup
THEN EXECUTE PROCEDURE filter();

activate;

How to Clear Duplicate Rows without a Unique ID?

create index idx1 on tbl_dup(ctid);pipeline=# explain delete from tbl_dup where (ctid) in (select ctid from (select ctid,row_number() over(partition by sid,crt_time order by ctid desc) as rn from tbl_dup) t where t.rn<>1); 
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------
Delete on tbl_dup (cost=673139.27..683574.38 rows=1000000 width=36)
-> Nested Loop (cost=673139.27..683574.38 rows=1000000 width=36)
-> Unique (cost=673138.84..683088.84 rows=199 width=36)
-> Sort (cost=673138.84..678113.84 rows=1990000 width=36)
Sort Key: t.ctid
-> Subquery Scan on t (cost=332753.69..402753.69 rows=1990000 width=36)
Filter: (t.rn <> 1)
-> WindowAgg (cost=332753.69..377753.69 rows=2000000 width=18)
-> Sort (cost=332753.69..337753.69 rows=2000000 width=18)
Sort Key: tbl_dup_1.sid, tbl_dup_1.crt_time, tbl_dup_1.ctid DESC
-> Seq Scan on tbl_dup tbl_dup_1 (cost=0.00..100000.00 rows=2000000 width=18)
-> Index Only Scan using idx1 on tbl_dup (cost=0.43..2.43 rows=1 width=6)
Index Cond: (ctid = t.ctid)
(13 rows)
Time: 1.402 ms
pipeline=# delete from tbl_dup where (ctid) in (select ctid from (select ctid,row_number() over(partition by sid,crt_time order by ctid desc) as rn from tbl_dup) t where t.rn<>1);
DELETE 181726
Time: 3316.990 ms

Optimization Methods of Duplicate Data Cleansing — Technical Issues

CREATE unlogged TABLE tmp_uniq (   
id serial8,
sid int,
crt_time timestamp,
mdf_time timestamp,
c1 text default md5(random()::text),
c2 text default md5(random()::text),
c3 text default md5(random()::text),
c4 text default md5(random()::text),
c5 text default md5(random()::text),
c6 text default md5(random()::text),
c7 text default md5(random()::text),
c8 text default md5(random()::text),
unique (sid,crt_time)
) with (autovacuum_enabled=off, toast.autovacuum_enabled=off);
ERROR:  21000: ON CONFLICT DO UPDATE command cannot affect row a second time  
HINT: Ensure that no rows proposed for insertion within the same command have duplicate constrained values.
LOCATION: ExecOnConflictUpdate, nodeModifyTable.c:1133
split -l 20000 tbl_dup.csv load_test_

for i in `ls load_test_??`
do
psql <<EOF &
drop foreign table "ft_$i";

CREATE FOREIGN TABLE "ft_$i" (
id serial8,
sid int,
crt_time timestamp,
mdf_time timestamp,
c1 text default md5(random()::text),
c2 text default md5(random()::text),
c3 text default md5(random()::text),
c4 text default md5(random()::text),
c5 text default md5(random()::text),
c6 text default md5(random()::text),
c7 text default md5(random()::text),
c8 text default md5(random()::text)
) server file options (filename '/home/digoal/$i' );

\timing

insert into tmp_uniq select * from "ft_$i" on conflict do update set
id=excluded.id, sid=excluded.sid, crt_time=excluded.crt_time, mdf_time=excluded.mdf_time,
c1=excluded.c1,c2=excluded.c2,c3=excluded.c3,c4=excluded.c4,c5=excluded.c5,c6=excluded.c6,c7=excluded.c7,c8=excluded.c8
where mdf_time<excluded.mdf_time
;

EOF
done
create stream ss_uniq (  
id int8,
sid int,
crt_time timestamp,
mdf_time timestamp,
c1 text default md5(random()::text),
c2 text default md5(random()::text),
c3 text default md5(random()::text),
c4 text default md5(random()::text),
c5 text default md5(random()::text),
c6 text default md5(random()::text),
c7 text default md5(random()::text),
c8 text default md5(random()::text)
);
CREATE CONTINUOUS VIEW cv_uniq as
select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from ss_uniq;

Summary

Original Source:

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store