PostgreSQL Data Deduplication Methods

Image for post
Image for post

Background

Deduplication is a common process with many variants. For example:

1) Remove duplicates from a single column.

Retention Rule: Retain the latest or oldest record or the record with the largest value of a specified field.

2) Remove duplicates from multiple columns.

Retention Rule: Retain the latest or oldest record or the record with the largest value of a specified field.

3) Remove duplicates by rows.

Retention Rule: Retain the latest or oldest record or the record with the largest value of a specified field.

4) Multi-column hybrid deduplication (for example, ROW1: col1=1, col2=2; ROW2: col1=2, col2=1).

Retention Rule: Retain the latest or oldest record or the record with the largest value of a specified field.

5) If multiple versions of a record are generated due to pg_resetwal, determine the records to be deleted or retain the record with the largest XMIN value.

create unique index idx on tbl (c1,c2);
-- error

The following examples show how to use different methods to remove duplicates. Choose the one that works best for you.

Single-column Deduplication

Consider the following test data.

create table test1(id int primary key, c1 int, c2 timestamp);  
insert into test1 select generate_series(1,1000000), random()*1000, clock_timestamp();

create index idx_test1 on test1(c1,id);
-- 这个索引可以起到加速效果。如果没有这个索引,以下三种方法,第二种效率最高,其次是第三种。

Requirement: Remove repeated rows in c1 and retain the rows with the largest ID.

Method 1) Use Aggregation, NOT IN

postgres=# explain delete from test1 where id not in (select max(id) from test1 group by c1);  
QUERY PLAN
------------------------------------------------------------------------------------------------------------------
Delete on test1 (cost=35115.63..53023.01 rows=500055 width=6)
-> Seq Scan on test1 (cost=35115.63..53023.01 rows=500055 width=6)
Filter: (NOT (hashed SubPlan 1))
SubPlan 1
-> GroupAggregate (cost=0.42..35113.13 rows=1001 width=8)
Group Key: test1_1.c1
-> Index Only Scan using idx_test1 on test1 test1_1 (cost=0.42..30102.57 rows=1000110 width=8)
(7 rows)
Time: 0.564 ms

postgres=# delete from test1 where id not in (select max(id) from test1 group by c1);
DELETE 998999
Time: 1126.504 ms (00:01.127)

Method 2) Use Window Query, IN

postgres=# explain select id from (select row_number() over(partition by c1 order by id) as rn, id from test1) t where t.rn<>1;  
QUERY PLAN
--------------------------------------------------------------------------------------------------
Subquery Scan on t (cost=0.42..60075.54 rows=995109 width=4)
Filter: (t.rn <> 1)
-> WindowAgg (cost=0.42..47574.17 rows=1000110 width=16)
-> Index Only Scan using idx_test1 on test1 (cost=0.42..30072.24 rows=1000110 width=8)
(4 rows)
Time: 0.512 ms

postgres=# delete from test1 where id in (select id from (select row_number() over(partition by c1 order by id) as rn, id from test1) t where t.rn<>1);
DELETE 998999
Time: 2430.276 ms (00:02.430)

Method 3) Use PL/pgSQL or Use Sorting and Cursor Internally

Deduplication per record only involves the overhead of one sort operation and the comparison of each record.

do language plpgsql $$       
declare
v_rec record;
v_c1 int;
cur1 cursor for select c1,id from test1 order by c1,id for update;
begin
for v_rec in cur1 loop
if v_rec.c1 = v_c1 then
delete from test1 where current of cur1;
end if;
v_c1 := v_rec.c1;
end loop;
end;
$$;

DO
Time: 7345.773 ms (00:07.346)

postgres=# select count(*) from test1;
count
-------
1001
(1 row)

Time: 61.672 ms
postgres=# select * from test1 limit 10;
id | c1 | c2
----+-----+----------------------------
1 | 582 | 2017-06-02 10:21:10.60918
2 | 278 | 2017-06-02 10:21:10.609331
3 | 659 | 2017-06-02 10:21:10.609338
4 | 372 | 2017-06-02 10:21:10.609341
5 | 184 | 2017-06-02 10:21:10.609343
6 | 121 | 2017-06-02 10:21:10.609345
7 | 132 | 2017-06-02 10:21:10.609347
8 | 290 | 2017-06-02 10:21:10.609348
9 | 980 | 2017-06-02 10:21:10.60935
10 | 305 | 2017-06-02 10:21:10.609352
(10 rows)

Cutting Edge Technology in PostgreSQL 10

Index sorting can be used even if some driving columns exist. For example, index(c1) is used for order by c1,id.

Multi-column Deduplication

Consider the following test data.

create table test1(id int primary key, c1 int, c2 int, c3 timestamp);  
insert into test1 select generate_series(1,1000000), random()*1000, random()*1000, clock_timestamp();

create index idx_test1 on test1(c1,c2,id);
-- 这个索引可以起到加速效果。

Requirement: Remove repeated rows in c1 and c2 and retain the rows with the largest ID.

Method 1

postgres=# explain (analyze,verbose,timing,costs,buffers) delete from test1 where id not in (select max(id) from test1 group by c1,c2);  
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Delete on public.test1 (cost=40820.36..59690.36 rows=500000 width=6) (actual time=1634.960..1634.960 rows=0 loops=1)
Buffers: shared hit=1378788
-> Seq Scan on public.test1 (cost=40820.36..59690.36 rows=500000 width=6) (actual time=1090.956..1446.374 rows=367618 loops=1)
Output: test1.ctid
Filter: (NOT (hashed SubPlan 1))
Rows Removed by Filter: 632382
Buffers: shared hit=1011170
SubPlan 1
-> GroupAggregate (cost=0.42..40570.36 rows=100000 width=12) (actual time=0.035..842.497 rows=632382 loops=1)
Output: max(test1_1.id), test1_1.c1, test1_1.c2
Group Key: test1_1.c1, test1_1.c2
Buffers: shared hit=1004800
-> Index Only Scan using idx_test1 on public.test1 test1_1 (cost=0.42..32070.36 rows=1000000 width=12) (actual time=0.027..587.506 rows=1000000 loops=1)
Output: test1_1.c1, test1_1.c2, test1_1.id
Heap Fetches: 1000000
Buffers: shared hit=1004800
Planning time: 0.211 ms
Execution time: 1641.679 ms
(18 rows)

Method 2

postgres=# explain (analyze,verbose,timing,costs,buffers) delete from test1 where id in (select id from (select row_number() over(partition by c1,c2 order by id) as rn, id from test1) t where t.rn<>1);  
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Delete on public.test1 (cost=83752.89..130385.27 rows=995000 width=34) (actual time=2199.376..2199.376 rows=0 loops=1)
Buffers: shared hit=1378741, temp read=6482 written=6420
-> Hash Semi Join (cost=83752.89..130385.27 rows=995000 width=34) (actual time=1381.636..1929.284 rows=367584 loops=1)
Output: test1.ctid, t.*
Hash Cond: (test1.id = t.id)
Buffers: shared hit=1011157, temp read=6482 written=6420
-> Seq Scan on public.test1 (cost=0.00..16370.00 rows=1000000 width=10) (actual time=0.013..140.130 rows=1000000 loops=1)
Output: test1.ctid, test1.id
Buffers: shared hit=6370
-> Hash (cost=64513.39..64513.39 rows=995000 width=32) (actual time=1377.349..1377.349 rows=367584 loops=1)
Output: t.*, t.id
Buckets: 65536 Batches: 32 Memory Usage: 1326kB
Buffers: shared hit=1004787, temp written=2591
-> Subquery Scan on t (cost=0.42..64513.39 rows=995000 width=32) (actual time=0.074..1269.919 rows=367584 loops=1)
Output: t.*, t.id
Filter: (t.rn <> 1)
Rows Removed by Filter: 632416
Buffers: shared hit=1004787
-> WindowAgg (cost=0.42..52013.39 rows=1000000 width=20) (actual time=0.054..1117.668 rows=1000000 loops=1)
Output: row_number() OVER (?), test1_1.id, test1_1.c1, test1_1.c2
Buffers: shared hit=1004787
-> Index Only Scan using idx_test1 on public.test1 test1_1 (cost=0.42..32013.39 rows=1000000 width=12) (actual time=0.035..627.329 rows=1000000 loops=1)
Output: test1_1.c1, test1_1.c2, test1_1.id
Heap Fetches: 1000000
Buffers: shared hit=1004787
Planning time: 0.565 ms
Execution time: 2199.450 ms
(27 rows)

Method 3

postgres=# do language plpgsql $$       
declare
v_rec record;
v_c1 int;
v_c2 int;
cur1 cursor for select c1,c2 from test1 order by c1,c2,id for update;
begin
for v_rec in cur1 loop
if v_rec.c1 = v_c1 and v_rec.c2=v_c2 then
delete from test1 where current of cur1;
end if;
v_c1 := v_rec.c1;
v_c2 := v_rec.c2;
end loop;
end;
$$;
DO

Time: 4637.183 ms (00:04.637)

Row-based Deduplication

Using ctid = any(array(select ctid from ...)); to remove duplicates is the fastest method.

Consider the following test data.

create table test1(c1 int, c2 int);  
insert into test1 select random()*1000, random()*1000 from generate_series(1,1000000);

-- 行号ctid 系统列无法创建索引

Requirement: Remove repeated rows and keep one record.

Reserve data by row number when no primary key exists.

Method 1

The NOT IN method of ctid causes a loop, which affects performance. We do not recommend using the NOT IN method.

postgres=# explain (analyze,verbose,timing,costs,buffers) delete from test1 where ctid not in (select max(ctid) from test1 group by c1,c2);  
^CCancel request sent
ERROR: 57014: canceling statement due to user request
LOCATION: ProcessInterrupts, postgres.c:2984
Time: 426433.450 ms (07:06.433)

Method 2

postgres=# explain (analyze,verbose,timing,costs,buffers) delete from test1 where ctid = any(array( select ctid from (select row_number() over(partition by c1,c2 order by ctid) as rn, ctid from test1) t where t.rn<>1));  
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Delete on public.test1 (cost=422032.41..427035.41 rows=500000 width=36) (actual time=3525.690..3525.690 rows=0 loops=1)
Buffers: shared hit=376073 dirtied=4398, temp read=10658 written=10683
-> Merge Join (cost=422032.41..427035.41 rows=500000 width=36) (actual time=3003.047..3352.172 rows=367223 loops=1)
Output: test1.ctid, t.*
Inner Unique: true
Merge Cond: (test1.ctid = t.ctid)
Buffers: shared hit=8850, temp read=10658 written=10683
-> Sort (cost=122873.59..125373.59 rows=1000000 width=6) (actual time=786.814..883.721 rows=1000000 loops=1)
Output: test1.ctid
Sort Key: test1.ctid
Sort Method: external sort Disk: 15656kB
Buffers: shared hit=4425, temp read=1957 written=1957
-> Seq Scan on public.test1 (cost=0.00..14425.00 rows=1000000 width=6) (actual time=0.021..112.431 rows=1000000 loops=1)
Output: test1.ctid
Buffers: shared hit=4425
-> Sort (cost=299158.81..299159.31 rows=200 width=36) (actual time=2216.021..2268.235 rows=367223 loops=1)
Output: t.*, t.ctid
Sort Key: t.ctid
Sort Method: external sort Disk: 18688kB
Buffers: shared hit=4425, temp read=8701 written=8726
-> Unique (cost=294176.17..299151.17 rows=200 width=36) (actual time=1790.180..1949.522 rows=367223 loops=1)
Output: t.*, t.ctid
Buffers: shared hit=4425, temp read=6365 written=6390
-> Sort (cost=294176.17..296663.67 rows=995000 width=36) (actual time=1790.179..1874.394 rows=367223 loops=1)
Output: t.*, t.ctid
Sort Key: t.ctid
Sort Method: external merge Disk: 18744kB
Buffers: shared hit=4425, temp read=6365 written=6390
-> Subquery Scan on t (cost=125069.59..160069.59 rows=995000 width=36) (actual time=692.878..1542.122 rows=367223 loops=1)
Output: t.*, t.ctid
Filter: (t.rn <> 1)
Rows Removed by Filter: 632777
Buffers: shared hit=4425, temp read=4022 written=4039
-> WindowAgg (cost=125069.59..147569.59 rows=1000000 width=22) (actual time=692.858..1401.210 rows=1000000 loops=1)
Output: row_number() OVER (?), test1_1.ctid, test1_1.c1, test1_1.c2
Buffers: shared hit=4425, temp read=4022 written=4039
-> Sort (cost=125069.59..127569.59 rows=1000000 width=14) (actual time=692.850..947.055 rows=1000000 loops=1)
Output: test1_1.ctid, test1_1.c1, test1_1.c2
Sort Key: test1_1.c1, test1_1.c2, test1_1.ctid
Sort Method: external merge Disk: 25496kB
Buffers: shared hit=4425, temp read=4022 written=4039
-> Seq Scan on public.test1 test1_1 (cost=0.00..14425.00 rows=1000000 width=14) (actual time=0.010..131.128 rows=1000000 loops=1)
Output: test1_1.ctid, test1_1.c1, test1_1.c2
Buffers: shared hit=4425
Planning time: 0.247 ms
Execution time: 3547.727 ms
(46 rows)

Method 3

postgres=# do language plpgsql $$       
declare
v_rec record;
v_c1 int;
v_c2 int;
cur1 cursor for select c1,c2 from test1 order by c1,c2,ctid for update;
begin
for v_rec in cur1 loop
if v_rec.c1 = v_c1 and v_rec.c2=v_c2 then
delete from test1 where current of cur1;
end if;
v_c1 := v_rec.c1;
v_c2 := v_rec.c2;
end loop;
end;
$$;
DO

Time: 5395.774 ms (00:05.396)

Multi-column Hybrid Deduplication

To remove duplicates in multiple columns, use arrays. However, if the element orders in two arrays are inconsistent, the elements in the arrays are different.

postgres=# select array[1,2] = array[2,1];  
?column?
----------
f
(1 row)

postgres=# select array[1,2] @> array[2,1] and array[2,1] @> array[1,1,2];
?column?
----------
t
(1 row)

postgres=# select array[1,2] @> array[2,2,1] and array[2,1] @> array[1,1,2];
?column?
----------
t
(1 row)

Therefore, sort and store the elements and use the columns that require deduplication as array elements.

Create a sorting function that supports any columns and output sorted arrays.

postgres=# create or replace function sort_vals(variadic v_arr text[]) returns text[] as $$  
select array_agg(arr order by arr) from unnest(v_arr) t(arr);
$$ language sql strict;

postgres=# select sort_vals('a','a','b','a','c');
sort_vals
-------------
{a,a,a,b,c}
(1 row)

Consider the following test data.

create table test1(c1 int, c2 int);  
insert into test1 select random()*1000, random()*1000 from generate_series(1,1000000);

Requirement: Remove records repeated in c1 and c2 (for example, 1,2 and 2,1 are repeated records) and retain any record.

Method 1

Use sort_vals to sort and reorganize arrays. This method is simple and easy to understand.

postgres=# explain (analyze,verbose,timing,costs,buffers) delete from test1 where ctid = any(array (select ctid from (select row_number() over(partition by sort_vals(c1::text,c2::text) order by ctid) as rn, ctid from test1) t where t.rn<>1));  
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Delete on public.test1 (cost=963704.16..968707.16 rows=500000 width=36) (actual time=16466.913..16466.913 rows=0 loops=1)
Buffers: shared hit=576071, temp read=18863 written=18901
-> Merge Join (cost=963704.16..968707.16 rows=500000 width=36) (actual time=15766.506..16202.766 rows=567213 loops=1)
Output: test1.ctid, t.*
Inner Unique: true
Merge Cond: (test1.ctid = t.ctid)
Buffers: shared hit=8858, temp read=18863 written=18901
-> Sort (cost=122873.59..125373.59 rows=1000000 width=6) (actual time=782.867..880.729 rows=1000000 loops=1)
Output: test1.ctid
Sort Key: test1.ctid
Sort Method: external sort Disk: 15656kB
Buffers: shared hit=4425, temp read=1957 written=1957
-> Seq Scan on public.test1 (cost=0.00..14425.00 rows=1000000 width=6) (actual time=0.009..110.757 rows=1000000 loops=1)
Output: test1.ctid
Buffers: shared hit=4425
-> Sort (cost=840830.56..840831.06 rows=200 width=36) (actual time=14983.595..15066.186 rows=567213 loops=1)
Output: t.*, t.ctid
Sort Key: t.ctid
Sort Method: external sort Disk: 28864kB
Buffers: shared hit=4433, temp read=16906 written=16944
-> Unique (cost=835847.92..840822.92 rows=200 width=36) (actual time=14316.637..14568.357 rows=567213 loops=1)
Output: t.*, t.ctid
Buffers: shared hit=4433, temp read=13298 written=13336
-> Sort (cost=835847.92..838335.42 rows=995000 width=36) (actual time=14316.636..14456.355 rows=567213 loops=1)
Output: t.*, t.ctid
Sort Key: t.ctid
Sort Method: external merge Disk: 28952kB
Buffers: shared hit=4433, temp read=13298 written=13336
-> Subquery Scan on t (cost=409241.34..701741.34 rows=995000 width=36) (actual time=12177.370..13945.667 rows=567213 loops=1)
Output: t.*, t.ctid
Filter: (t.rn <> 1)
Rows Removed by Filter: 432787
Buffers: shared hit=4433, temp read=9679 written=9704
-> WindowAgg (cost=409241.34..689241.34 rows=1000000 width=46) (actual time=12177.303..13765.873 rows=1000000 loops=1)
Output: row_number() OVER (?), test1_1.ctid, (sort_vals(VARIADIC ARRAY[(test1_1.c1)::text, (test1_1.c2)::text]))
Buffers: shared hit=4433, temp read=9679 written=9704
-> Sort (cost=409241.34..411741.34 rows=1000000 width=38) (actual time=12177.293..13163.065 rows=1000000 loops=1)
Output: test1_1.ctid, (sort_vals(VARIADIC ARRAY[(test1_1.c1)::text, (test1_1.c2)::text]))
Sort Key: (sort_vals(VARIADIC ARRAY[(test1_1.c1)::text, (test1_1.c2)::text])), test1_1.ctid
Sort Method: external merge Disk: 51904kB
Buffers: shared hit=4430, temp read=9679 written=9704
-> Seq Scan on public.test1 test1_1 (cost=0.00..274425.00 rows=1000000 width=38) (actual time=0.202..8735.620 rows=1000000 loops=1)
Output: test1_1.ctid, sort_vals(VARIADIC ARRAY[(test1_1.c1)::text, (test1_1.c2)::text])
Buffers: shared hit=4425
Planning time: 0.292 ms
Execution time: 16500.934 ms
(46 rows)

Method 2

Use SUBQUERY to sort and reorganize arrays. This method is efficient but complicated.

explain (analyze,verbose,timing,costs,buffers)   
delete from test1 where ctid = any(array(
select rid from
(
select row_number() over(partition by val order by rid) as rn, rid from
(
select rid, array_agg(arr order by arr) val from
(select ctid rid, unnest(array[c1,c2]) arr from test1) t
group by rid
) t
) t
where t.rn<>1
));

QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Delete on public.test1 (cost=18979422.39..18984425.62 rows=199 width=36) (actual time=10186.459..10186.459 rows=0 loops=1)
Buffers: shared hit=575991, temp read=19174 written=19212
-> Merge Semi Join (cost=18979422.39..18984425.62 rows=199 width=36) (actual time=9421.861..9923.844 rows=567141 loops=1)
Output: test1.ctid, t.*
Merge Cond: (test1.ctid = t.rid)
Buffers: shared hit=8850, temp read=19174 written=19212
-> Sort (cost=122879.44..125379.56 rows=1000050 width=6) (actual time=796.023..894.723 rows=1000000 loops=1)
Output: test1.ctid
Sort Key: test1.ctid
Sort Method: external sort Disk: 15656kB
Buffers: shared hit=4425, temp read=1957 written=1957
-> Seq Scan on public.test1 (cost=0.00..14425.50 rows=1000050 width=6) (actual time=0.016..122.654 rows=1000000 loops=1)
Output: test1.ctid
Buffers: shared hit=4425
-> Sort (cost=18856542.95..18856543.45 rows=199 width=36) (actual time=8625.739..8775.583 rows=567141 loops=1)
Output: t.*, t.rid
Sort Key: t.rid
Sort Method: external merge Disk: 28952kB
Buffers: shared hit=4425, temp read=17217 written=17255
-> Subquery Scan on t (cost=18856528.85..18856535.35 rows=199 width=36) (actual time=6749.159..8251.185 rows=567141 loops=1)
Output: t.*, t.rid
Filter: (t.rn <> 1)
Rows Removed by Filter: 432859
Buffers: shared hit=4425, temp read=13598 written=13623
-> WindowAgg (cost=18856528.85..18856532.85 rows=200 width=46) (actual time=6749.138..8073.103 rows=1000000 loops=1)
Output: row_number() OVER (?), test1_1.ctid, (array_agg((unnest(ARRAY[test1_1.c1, test1_1.c2])) ORDER BY (unnest(ARRAY[test1_1.c1, test1_1.c2]))))
Buffers: shared hit=4425, temp read=13598 written=13623
-> Sort (cost=18856528.85..18856529.35 rows=200 width=38) (actual time=6749.128..7507.854 rows=1000000 loops=1)
Output: test1_1.ctid, (array_agg((unnest(ARRAY[test1_1.c1, test1_1.c2])) ORDER BY (unnest(ARRAY[test1_1.c1, test1_1.c2]))))
Sort Key: (array_agg((unnest(ARRAY[test1_1.c1, test1_1.c2])) ORDER BY (unnest(ARRAY[test1_1.c1, test1_1.c2])))), test1_1.ctid
Sort Method: external merge Disk: 44040kB
Buffers: shared hit=4425, temp read=13598 written=13623
-> GroupAggregate (cost=18106479.21..18856519.21 rows=200 width=38) (actual time=2315.955..4053.484 rows=1000000 loops=1)
Output: test1_1.ctid, array_agg((unnest(ARRAY[test1_1.c1, test1_1.c2])) ORDER BY (unnest(ARRAY[test1_1.c1, test1_1.c2])))
Group Key: test1_1.ctid
Buffers: shared hit=4425, temp read=5382 written=5382
-> Sort (cost=18106479.21..18356491.71 rows=100005000 width=10) (actual time=2315.934..2530.362 rows=2000000 loops=1)
Output: test1_1.ctid, (unnest(ARRAY[test1_1.c1, test1_1.c2]))
Sort Key: test1_1.ctid
Sort Method: external sort Disk: 43056kB
Buffers: shared hit=4425, temp read=5382 written=5382
-> ProjectSet (cost=0.00..521950.88 rows=100005000 width=10) (actual time=0.019..836.774 rows=2000000 loops=1)
Output: test1_1.ctid, unnest(ARRAY[test1_1.c1, test1_1.c2])
Buffers: shared hit=4425
-> Seq Scan on public.test1 test1_1 (cost=0.00..14425.50 rows=1000050 width=14) (actual time=0.010..137.319 rows=1000000 loops=1)
Output: test1_1.c1, test1_1.c2, test1_1.ctid
Buffers: shared hit=4425
Planning time: 0.241 ms
Execution time: 10228.121 ms
(49 rows)

Method 3

postgres=# do language plpgsql $$       
declare
v_rec record;
v_arr text[];
cur1 cursor for select sort_vals(c1::text,c2::text) as arr from test1 order by sort_vals(c1::text,c2::text),ctid for update;
begin
for v_rec in cur1 loop
if v_rec.arr = v_arr then
delete from test1 where current of cur1;
end if;
v_arr := v_rec.arr;
end loop;
end;
$$;
DO

Time: 18542.457 ms (00:18.542)

Summary

The performance of the three methods varies greatly depending on whether index acceleration is applied. However, the most stable method is to use window functions. Therefore, we recommend using window functions. When the element orders in two arrays are inconsistent, the elements in the two arrays are different and you can customize the shuffling function.

Original Source:

Written by

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store