Data Processing with DataWorks

Image for post
Image for post

This is the third section of the DataWorks workshop. In this section, you will learn how to process log data that has been collected into MaxCompute through DataWorks. That is, through this section, you will learn how to run a data flow chart, how to create a new data table, and how to configure periodic scheduling properties. Before you begin this section, make sure that you have read the previous section Data Acquisition with DataWorks.

Creating Data Tables

You can refer to Data acquisition: log data upload to create data tables.

  • Create ods_log_info_d table
  1. Right click Table in the workshop business flow. Click Create Table and enter the table’s name ods_log_info_d. You can then click DDL Mode to type in the table creation SQL statements.

The following are table creation statements:

CREATE TABLE IF NOT EXISTS ods_log_info_d (
ip string comment 'IP address',
uid STRING COMMENT 'User ID',
time string comment 'time:yyyymmddhh:mi:ss',
status string comment 'server return status code',
bytes string comment 'the number of bytes returned to the Client',
region string comment 'region,get from IP',
method string comment 'HTTP request type',
url string comment 'url',
protocol string comment 'HTTP Protocol version number',
referer string comment 'source ures',
device string comment 'terminal type',
identity string comment 'Access type crawler feed user unknown'
)
PARTITIONED BY (
dt STRING
);
  1. Click Submit to Development Environment and Submit to Production Environment.
  • Create dw_user_info_all_d table

The method of creating a new report table is identical to that of a table statement as follows:

Create a copy table

CREATE TABLE IF NOT EXISTS dw_user_info_all_d (
uid STRING COMMENT 'User ID',
gender STRING COMMENT 'Gender',
age_range STRING COMMENT 'Age range',
zodiac STRING COMMENT 'Zodiac sign',
region string comment 'region, get from IP',
device string comment 'terminal type',
identity string comment 'Access type crawler feed user unknown',
method string comment 'HTTP request type',
url string comment 'url',
referer string comment 'source url',
time string comment 'time:yyyymmddhh:mi:ss'
)
PARTITIONED BY (
dt string
);
  • Create rpt_user_info_d table

The following are table creation statements:

Create a copy table

Create Table if not exists rpt_user_info_d(
uid STRING COMMENT 'User ID',
region string comment 'region, get from IP',
device string comment 'terminal type',
pv bigint comment 'pv',
gender STRING COMMENT 'Gender',
age_range STRING COMMENT 'Age range',
zodiac STRING COMMENT 'Zodiac sign'
)
PARTITIONED BY (
dt string
);

Business Flow Design

Open the Workshop Business Flow and drag three ODPS SQL nodes named ods_log_info_d, dw_user_info_all_d, and rpt_user_info_d into the canvas, n, and configure dependencies.

Image for post
Image for post

Creating user-defined functions

1. Download ip2region.jar.

2. Right-click Resource, and select Create Resource > jar.

3. Click Select File, select ip2region.jar, which has been downloaded locally, and click OK.

Image for post
Image for post

4. After the resource has been uploaded to dataworks, click Submit.

Image for post
Image for post

5. Right-click a function and select Create Function.

Image for post
Image for post

6. Enter the function name getregion, select the Business Flow to which you want to belong, and click Submit.

Image for post
Image for post

7. Enter the function configuration in the Registry Function dialog box, specify the class name, description, command format, and parameter description.

Image for post
Image for post

Here are the parameters you need to enter:

  • Function Name: getregion
  • Class Name: org.alidata.odps.udf.Ip2Region
  • Resource list: ip2region.jar
  • Description: IP address translation area
  • Command Format: getregion ('IP')
  • Parameter description: IP Address

8. Click Save and submit.

Configuring the ODPS SQL Nodes

  • Configure the ods_log_info_d Node
  1. Double-click the ods_log_info_d node to go to the node configuration page and write the processing logic.
Image for post
Image for post

The SQL logic is as follows:

INSERT OVERWRITE TABLE ods_log_info_d PARTITION (dt=${bdp.system.bizdate})
SELECT ip
, uid
, time
, status
, bytes -- use a custom UDF to get a locale over IP
, getregion (ip) as region -- the request difference is divided into three fields through the regular
, regexp_substr(request, '(^[^ ]+ )') AS method
, regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') AS url
, regexp_substr(request, '([^ ]+$)') AS protocol -- get more precise urls with regular clear refer
, regexp_extract(referer, '^[^/]+://([^/]+){1}') AS referer -- Get terminal information and access form through agent
, CASE
WHEN TOLOWER(agent) RLIKE 'android' THEN 'android'
WHEN TOLOWER(agent) RLIKE 'iphone' THEN 'iphone'
WHEN TOLOWER(agent) RLIKE 'ipad' THEN 'ipad'
WHEN TOLOWER(agent) RLIKE 'macintosh' THEN 'macintosh'
WHEN TOLOWER(agent) RLIKE 'windows phone' THEN 'windows_phone'
WHEN TOLOWER(agent) RLIKE 'windows' THEN 'windows_pc'
ELSE 'unknown'
END AS device
, CASE
WHEN TOLOWER(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
WHEN TOLOWER(agent) RLIKE 'feed'
OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') RLIKE 'feed' THEN 'feed'
WHEN TOLOWER(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)'
AND agent RLIKE '^[Mozilla|Opera]'
AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') NOT RLIKE 'feed' THEN 'user'
ELSE 'unknown'
END AS identity
FROM (
SELECT SPLIT(col, '##@@')[0] AS ip
, SPLIT(col, '##@@')[1] AS uid
, SPLIT(col, '##@@')[2] AS time
, SPLIT(col, '##@@')[3] AS request
, SPLIT(col, '##@@')[4] AS status
, SPLIT(col, '##@@')[5] AS bytes
, SPLIT(col, '##@@')[6] AS referer
, SPLIT(col, '##@@')[7] AS agent
FROM ods_raw_log_d
WHERE dt = ${bdp.system.bizdate}
) a;
  1. Click Save.
Image for post
Image for post
  • Configure dw_user_info_all_d Node
  1. Double-click the dw_user_info_all_d node to go to the node configuration page and write the processing logic.

The SQL logic is as follows:

INSERT OVERWRITE TABLE dw_user_info_all_d PARTITION (dt='${bdp.system.bizdate}')
SELECT COALESCE(a.uid, b.uid) AS uid
, b.gender
, b.age_range
, b.zodiac
, a.region
, a.device
, a.identity
, a.method
, a.url
, a.referer
, a.time
FROM (
SELECT *
FROM ods_log_info_d
WHERE dt = ${bdp.system.bizdate}
) a
LEFT OUTER JOIN (
SELECT *
FROM ods_user_info_d
WHERE dt = ${bdp.system.bizdate}
) b
ON a.uid = b.uid;
  1. Click Save.
  • Configure a rpt_user_info_d Node
  1. Double-click the fig node to go to the node configuration page and write the processing logic.

The SQL logic is as follows:

INSERT OVERWRITE TABLE rpt_user_info_d PARTITION (dt='${bdp.system.bizdate}')
SELECT uid
, MAX(region)
, MAX(device)
, COUNT(0) AS pv
, MAX(gender)
, MAX(age_range)
, MAX(zodiac)
FROM dw_user_info_all_d
WHERE dt = ${bdp.system.bizdate}
GROUP BY uid;
  1. Click Save.

Submitting Business Flows

1. Click Submit to submit the node tasks that have been configured in the Business Flow.

2. Select the nodes that need to be submitted in the Submit dialog box, and check the Ignore Warnings on I/O Inconsistency, click Submit.

Image for post
Image for post

Running Business Flows

1. Click Run to verify the code logic.

Image for post
Image for post

2. Click Queries in the left-hand navigation bar.

3. Select New > ODPS SQL.

Image for post
Image for post

4. Write and execute SQL statements, Query Task for results, and confirm data output.

Image for post
Image for post

The query statement is as follows:

--- View the data in the data box
select * From glaswhere dt ''business day'' limit 10;

Publishing Business Flow

After the Business Flow is submitted, it indicates that the task has entered the development environment, but the task of developing an environment does not automatically schedule, so the tasks completed by the configuration need to be published to the production environment. Before publishing to the production environment, test this task code.

1. Click Publish to go to the publish page.

Image for post
Image for post

2. Select the task to publish and click Add To Be-Published List.

3. Enter the list of pending releases, and click Pack and publish all.

4. View published content on the Publish Package List page.

Running Tasks in Production

1. After the task has been published successfully, click Operation center.

2. Select Workshop Business Flows in the Task List.

Image for post
Image for post

3. Right-click the workshop_start node in the DAG graph and select Patch Data > Current and downstream nodes.

Image for post
Image for post

4. Check the task that needs to fill the data, enter the business date, and click OK.

Image for post
Image for post

When you click OK, you automatically jump to the patch data task instance page.

5. Click Refresh until the SQL task runs successfully.

Image for post
Image for post

Next Step

Now that you’ve learn the content of this section, you can continue to the last section of this workshop, which is Data Quality Monitoring with DataWorks.

Original Source:

Written by

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store