Apache Flink Best Practices: Constructing Real-Time Data Warehouses in the Financial Industry

Step up the digitalization of your business with Alibaba Cloud 2020 Double 11 Big Sale! Get new user coupons and explore over 16 free trials, 30+ bestselling products, and 6+ solutions for all your needs!

Background

Finance is the core of the modern economy. China’s financial industry has continued to develop in the market-oriented reform and open up with the rapid growth of financial resources. During the development, financial stability is directly related to the future of national economic development because the financial industry can act as the barometer. Based on the objective analysis of the status of China’s financial industry, the exploration of its development trends are helpful to eliminate hidden dangers. By doing so, the financial industry can develop in a healthy and orderly direction.

  1. Financial Services and Product Innovation: Massive amounts of users’ data, such as their interests and preferences, have been recorded by social networks and other platforms. With this data, new products that are closely related to users’ demands can be provided based on the analysis of their behavioral patterns.
  2. Improved User Experience: Big data analysis analyzes users and improves their experience by providing personalized services.

Service Scenarios

An insurance company has developed a financial app where it delivers its insurance advertisements and offers preferential activities. Users can get services, such as buying insurance products, through the app.

The business construction of the company mentioned above involves two aspects:

  • App: An application program that acts as an access portal for users. On the app, users can browse insurance advertisements and preferential activities and pay for insurance products.
  • Background System:

For Operation Personnel:

  1. According to orders submitted by users, the system calculates the total number of insured people and the total amount of insurance coverage in the specified period. Thus, the system can assist operation personnel in optimizing the operation plan.
  2. The system also analyzes the daily behavior and concerned information of users. All of the analyses will be used for personalized recommendations on the app.

For Business Manager:

The background system monitors changes in the insured amount of key users. Then, for those key users with large changes in the insured amount, the system sends their information to the business manager. This helps the business manager retain these users.

Architecture

  • Data Collection: In this scenario, data in the data warehouse is collected to DATAHUB in real-time as the input data of Apache Flink. This data is mainly from the event tracking information of systems, such as apps.
  • Real-Time Data Warehouse Architecture: In this scenario, the Extract Transform and Load (ETL) and Business Intelligence (BI) of the real-time data warehouse is built using Apache Flink. Apache Flink can read and process data from DATAHUB in real-time and perform joint queries with dimension tables. The final results of real-time statistics are entered into the downstream Relational Database Service (RDS.)

Business Metrics

Analyzing Operational Data:

  • Number of insured persons per hour
  • Total insurance premiums per hour
  • Total number of insurance orders per hour

Monitoring User Behavior:

  • Original insured amount of the user
  • Current insured amount of the user

Analyzing User Behavior:

  • What kind of webpages are visited by the user
  • Address of the last page visited by the user

Data Structure

In this scenario, the total number of insured people and the total insurance premium per hour is calculated.

After insuring, an order form will be generated, which includes the user’s ID, name, order number, etc. Apache Flink reads the information of the order in real-time and uses the where function to filter the data that is generated beyond the current hour. Then, Apache Flink uses the last_value function to obtain the order information of each user by groups according to users’ ID. Finally, the total insurance premiums and the total number of insured people are aggregated and counted by the hour.

Input Table

CREATE   TABLE  user_order
(
id bigint comment 'User id'
,order_no varchar comment 'Order number'
,order_type bigint comment 'Order type'
,pay_time bigint comment 'Payment time'
,order_price double comment 'Order price'
,customer_name varchar comment 'User name'
,customer_tel varchar comment 'User call'
,certificate_no varchar comment 'Certificate number'
,gmt_created bigint comment 'Creation time'
,gmt_modified bigint comment 'Modification time'
,account_payble double comment 'Payable amount'
) WITH (
type='datahub',
topic='user_order'
...
)

Output Table

CREATE    TABLE hs_order (
biz_date varchar COMMENT 'yyyymmddhh'
,total_premium DOUBLE COMMENT 'Total premiums'
,policy_cnt BIGINT COMMENT 'Keep the singular number'
,policy_holder_cnt BIGINT COMMENT 'Number of insured persons'
,PRIMARY KEY (biz_date)
) WITH
(
type='rds',
tableName='adm_pfm_zy_order_gmv_msx_hs'
...
)
;

Business Code

  1. Data Cleansing
create view  last_order
as
select
id as id
,last_value(order_no) as order_no
,last_value(customer_tel) as customer_tel
,last_value(gmt_modified) as gmt_modified
,last_value(account_payble) as account_payble
from user_order
where gmt_modified >= cast(UNIX_TIMESTAMP(FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyy-MM-dd'), 'yyyy-MM-dd')*1000 as bigint)
group by id
;
  1. Data Summarization
insert into hs_order
select
biz_date
,cast (total_premium as double) as total_premium
,cast (policy_cnt as bigint) as policy_cnt
,cast (policy_holder_cnt as bigint) as policy_holder_cnt
from (
select
from_unixtime(cast(gmt_modified/1000 as bigint),'yyyyMMddHH') as biz_date
,sum(coalesce(account_payble,0)) as total_premium
,count(distinct order_no) as policy_cnt
,count(distinct customer_tel) as policy_holder_cnt
from last_order a
group by
from_unixtime(cast(gmt_modified/1000 as bigint),'yyyyMMddHH')
)a
;

In this scenario, key users with large changes in the insurance amount are being monitored, for example, the total insurance amount changes over 15%.

Apache Flink reads new order data created by users in real-time. New orders contain users’ ID and the current insured amount. Unsaved orders are filtered with the where function. As the dimension table stores the data of concerned key users, such as the original insurance amount, user ID in new orders will be used for joint queries with data in the dimension table. If the user is found in the dimension table and the total insured amount decreases by more than 15%, detailed information about the user will be sent to the business manager. The insured amount and insurance information about the user will also be updated in the dimension table.

Input Table

create table update_info
(
id bigint comment 'User id'
,channel varchar comment 'Channel number'
,open_id varchar comment 'Order id'
,event varchar comment 'Event type'
,now_premium varchar comment 'Current insured amount'
,creator varchar comment 'Creator'
,modifier varchar comment ''
,gmt_modified bigint comment 'Modification time'
,now_info varchar comment 'Current insurance information'
) with (
type = 'datahub',
topic = 'dh_prd_dm_account_wechat_trace'
...

);

Dimension Table

create table raw_info
(
id bigint comment 'User id'
,raw_premium varchar comment 'Original insured amount'
,raw_info varchar comment 'Original insurance information'
,PRIMARY KEY(id)
,PERIOD FOR SYSTEM_TIME
) WITH (
type='ots',
tableName='adm_zy_acct_sub_wechat_list'
...
);

Output Table

create table update_info
(
id bigint comment 'User id'
,raw_info varchar comment 'Original insurance information'
,now_info varchar comment 'Current insurance information'
,raw_premium varchar comment 'Original insured amount'
,now_premium varchar comment 'Current insured amount'
,PRIMARY KEY(id)
) WITH (
type='rds',
tableName='wechat_activity_user'
...
);

Business Code

create view info_join as 
select
t1.id as id
,t2.raw_info as raw_info
,t1.now_info as now_info
,t2.raw_premium as raw_premium
,t1.now_premium as now_premium
from update_info t1
inner join raw_info FOR SYSTEM_TIME AS OF PROCTIME() as t2
on t1.id = t2.id ;
insert into update_info
select
id as id
,raw_info as raw_info
,now_info as now_info
,raw_premium as raw_premium
,now_premium as now_premium
from info_join where now_premium<raw_premium*0.85
;
insert into raw_info
select
id as id
,now_premium as raw_premium
,now_info as raw_info
from info_join
;

In this scenario, the name and type of the last-visited page are recorded as the user’s feature values.

Apache Flink reads the logs of users’ browsing activities, including the user ID, page name, and page type. Through grouping users by user ID and device ID, Apache Flink uses the last_value function to obtain the name and type of the last-visited page. The information collected will be entered into RDS as the input data of the recommendation system. Thus, relevant advertisements will be presented to users when logging on to the app next time. By doing so, the click-through rate of advertisements and the possibility of making orders will be improved.

Input Table

create table user_log
(
log_time bigint comment 'Log collection date (Linux time)'
,device_id varchar comment 'Device id'
,account_id varchar comment 'Account id'
,bury_point_type varchar comment 'Page type or tracking type'
,page_name varchar comment 'Page name or tracking point level 1 directory'
) WITH (
type = 'datahub',
topic = 'edw_zy_evt_bury_point_log'
...
);

Output Table

insert into user_last_log
select
account_id
,device_id
,last_value(bury_point_type) as bury_point_type
,last_value(page_name) as page_name
from user_log
where account_id is not null
group by account_id,device_id

Business Code

insert user_last_log
select
account_id
, device_id
,last_value(bury_point_type) as bury_point_type
,last_value(page_name) as page_name
From user_log
Where account_id is not null
group account_id,device_id

Learn more about Alibaba Cloud Realtime Compute for Apache Flink

Original Source:

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store