How Can We Monitor “No Incoming Messages” Data Exceptions?

Alibaba Cloud
5 min readOct 14, 2019

By Digoal

Stream computing can be used in a variety of scenarios, but, in e-commerce, often it can struggle given delivery and refund timeouts. But one question is how can streaming warnings made to be suited even in these scenarios? The answer that I will present in this blog is a solution that is done through a combination of timeout and scheduling operations that uses PostgreSQL.

PostgreSQL is advantageous to our needs in this tutorial because it uses flexible common table expression (CTE) syntax common to SQL languages and partial indexes that do not record unnecessary data. Also, it uses data manipulation language (DML) statements that can be returned and used together with CTE syntax to minimize interactive computing, and it takes multi-index bitmap scans, which combine multiple indexes for scanning, which allows us to combine the indexes of multiple fields for scanning when using an OR condition.

To sum things up, in this tutorial, I will show you how you can create an example solution for these problems through a combination of timeout and scheduling operations using PostgreSQL.

Create a Demo Design

To create a demo solution, first you need to design the structure of the monitored table. This table will record the timeout processing time of orders and refund events, as well as the number of timeout notifications, next notification time, and completion statuses. To do this, you can use the following command in PostgreSQL:

create table tbl (  
id int8,
-- ... Other fields, such as the completion status
state int, -- Completion status (1 indicates Completed)
deadts timestamp, -- Timeout
nts interval, -- Timeout interval, used to update the next notification time, for example, one notification per day
notify_times int default 0, -- Number of notifications
deadts_next timestamp -- Next notification time
);

Next, you’ll want to create a partial index. This index is so that users are notified of only the unfinished tickets, which are what they mostly likely to be mainly concerned about. By using a partial index, we can reduce the index size and increase the speed. To do this, enter this code:

create index idx_tbl_1 on tbl (deadts) where notify_times=0 and state<>1;  

create index idx_tbl_2 on tbl (deadts_next) where deadts_next is not null and state<>1;

Next, as part of this, we need to obtain the related data from users, which would including the notification data and update the number of notifications and the next notification time. You can do this by running this code:

with tmp1 as (  
update tbl set
deadts_next=now()+nts,
notify_times=notify_times+1
where ctid = any (array(
select ctid from tbl where
( deadts < now() and notify_times=0 and state<>1)
union all
select ctid from tbl where
( deadts_next < now() and deadts_next is not null and state<>1)
limit 10000 -- Obtains 10,000 timeout data records at a time.
))
returning *
)
select * from tmp1;

Then, execute the plan, as so:

CTE Scan on tmp1  (cost=18163.25..18163.45 rows=10 width=48)
CTE tmp1
-> Update on tbl tbl_2 (cost=18151.05..18163.25 rows=10 width=54)
InitPlan 1 (returns $0)
-> Limit (cost=0.13..18151.03 rows=10000 width=6)
-> Append (cost=0.13..764699.60 rows=421301 width=6)
-> Index Scan using idx_tbl_1 on tbl (cost=0.13..169527.13 rows=369766 width=6)
Index Cond: (deadts < now())
-> Index Scan using idx_tbl_2 on tbl tbl_1 (cost=0.43..590959.46 rows=51535 width=6)
Index Cond: (deadts_next < now())
-> Tid Scan on tbl tbl_2 (cost=0.01..12.21 rows=10 width=54)
TID Cond: (ctid = ANY ($0))
(12 rows)

Create Performance Indicators

Assuming that 0.1 billion data records are written, one question you may ask is how long does it take to process one million data records concurrently? Well, consider the following:

-- Processing of 0.1 billion data records is completed.  
insert into tbl select id, 1, now(), '5 min', 0, null from generate_series(1,100000000) t(id);

-- Processing of one million data records times out.
insert into tbl select id, 0, now(), '5 min', 0, null from generate_series(1,1000000) t(id);

As you can see from the above code, the notification performance, for example, is generating a notification each time the processing of 10,000 records times out.

Following this, a small batch of data is retrieved, and the timeout is updated so that AutoVacuum can be introduced to collect garbage in real time. Consider the following:

with tmp1 as (  
update tbl set
deadts_next=now()+nts,
notify_times=notify_times+1
where ctid = any (array(
select ctid from tbl where
( deadts < now() and notify_times=0 and state<>1)
union all
select ctid from tbl where
( deadts_next < now() and deadts_next is not null and state<>1)
limit 10000 -- Obtains 10,000 timeout data records at a time.
))
returning *
)
select * from tmp1;


-- Planning

CTE Scan on tmp1 (cost=18163.25..18163.45 rows=10 width=48) (actual time=39.092..78.707 rows=10000 loops=1)
Output: tmp1.id, tmp1.state, tmp1.deadts, tmp1.nts, tmp1.notify_times, tmp1.deadts_next
Buffers: shared hit=75094 read=49 dirtied=49
CTE tmp1
-> Update on public.tbl tbl_2 (cost=18151.05..18163.25 rows=10 width=54) (actual time=39.089..74.637 rows=10000 loops=1)
Output: tbl_2.id, tbl_2.state, tbl_2.deadts, tbl_2.nts, tbl_2.notify_times, tbl_2.deadts_next
Buffers: shared hit=75094 read=49 dirtied=49
InitPlan 1 (returns $0)
-> Limit (cost=0.13..18151.03 rows=10000 width=6) (actual time=31.265..36.899 rows=10000 loops=1)
Output: tbl.ctid
Buffers: shared hit=11395
-> Append (cost=0.13..764699.60 rows=421301 width=6) (actual time=31.264..35.354 rows=10000 loops=1)
Buffers: shared hit=11395
-> Index Scan using idx_tbl_1 on public.tbl (cost=0.13..169527.13 rows=369766 width=6) (actual time=0.014..0.014 rows=0 loops=1)
Output: tbl.ctid
Index Cond: (tbl.deadts < now())
Buffers: shared hit=1
-> Index Scan using idx_tbl_2 on public.tbl tbl_1 (cost=0.43..590959.46 rows=51535 width=6) (actual time=31.249..33.870 rows=10000 loops=1)
Output: tbl_1.ctid
Index Cond: (tbl_1.deadts_next < now())
Buffers: shared hit=11394
-> Tid Scan on public.tbl tbl_2 (cost=0.01..12.21 rows=10 width=54) (actual time=39.017..43.529 rows=10000 loops=1)
Output: tbl_2.id, tbl_2.state, tbl_2.deadts, tbl_2.nts, (tbl_2.notify_times + 1), (now() + tbl_2.nts), tbl_2.ctid
TID Cond: (tbl_2.ctid = ANY ($0))
Buffers: shared hit=21395
Planning time: 0.301 ms
Execution time: 79.905 ms

So as you can see, the data is processed smoothly.

Time: 79.905 ms

Conclusion

So to sum up, in this tutorial, I have introduced method by which you can solve the problem of timeout data monitoring in e-commerce scenarios that not only solves this issue but also delivers good performance.

Original Source

--

--

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com