UDF Development Guide with MaxCompute Studio

By Sixiang

Background

This article briefly describes how to create new projects, add code, implement packaging, upload resource packages and register methods and can help new users get quickly started.

Creating a UDF

  1. In IntelliJ IDEA, install the MaxCompute Studio plug-in. The specific installation steps will not be described here. The basic method is to go to the plug-in center, search for MaxCompute Studio and then install the plug-in. For more information about the installation, refer to the official MaxCompute document.
  2. After the plug-in is installed, add your own package:
Image for post
Image for post
  1. Next create a new project and select “MaxCompute Java”, as shown in the following screenshot:
Image for post
Image for post
  1. After the new project is created, view the pom file and you can see that related dependencies have been added, as shown in the following screenshot:
Image for post
Image for post

Creating a Java Class of the UDF

Note: Create a Java class under src->main-Java instead of in the example, or you will not find the main class when you register a method later.

Image for post
Image for post
  1. Write code, for example:
Image for post
Image for post
  1. Perform packaging. Right click on this Java class file and select “Run Maven” and “clean install”, as shown in the following screenshot:
  1. View the created package, as shown in the following screenshot:
Image for post
Image for post
  1. Upload this package to the server under “MaxCompute” -> “Add Resource” in IntelliJ IDEA, as shown in the following screenshot:
Image for post
Image for post
  1. Confirm the package to be uploaded and click OK to upload the package.
Image for post
Image for post
  1. Next register a method under “MaxCompute” -> “Create Function” in IntelliJ IDEA, as shown in the following screenshot:
Image for post
Image for post
  1. Select “Resource” and determine the name of the Main class (This explains why the preceding section says a Java class should be written under Main. If the Java class is written in the example, the main class cannot be loaded here.) Enter a name for the method and click “OK” for confirmation, as shown in the following screenshot:
Image for post
Image for post
  1. After the method is registered, a tooltip shows:
Image for post
Image for post
  1. Finally, go to the DataWorks console and run it:
Image for post
Image for post

MaxCompute Studio makes it a lot easier and more convenient to create UDFs. This article shows a very basic use case. If you are new to UDFs, you can follow the preceding steps.

Implement JSON parsing by using UDFs

Requirement

When performing data analysis, you may need to parse JSON data. The input data is a field, for example:

[{“id”:”123",”name”:”jack”,”owner”:”yixiu”},{“id”:”456",”name”:” daniel”,”owner”:”sixiang”}]

The input data is a JSON array, which needs to be parsed into two rows of data (in the one-to-many relationship):

Image for post
Image for post

Implementation

This requirement can be implemented through UDTFs. However, for UDTFs, fastjson cannot be used for data parsing, because packages of JSON and protobuf classes will often use reflection, which will be blocked by the sandbox. The solution is to use the built-in Gson of ODPS to parse JSON data.

Gson gson = new Gson();List<PreProject> projectList = gson.fromJson(projectListString, new TypeToken<List<PreProject>>(){}.getType());

Use UDJ to Customize SQL Join Operations

Overview

Currently MaxCompute provides many built-in Join operations, including Inner Join, Right Outer Join, Left Outer Join, Full Outer Join, Semi Join, and Anti Semi Join. These built-in JOIN operations are powerful enough for average users to meet most of their requirements. However, the standard JOIN implementation still cannot support many cross-table operations.

This article uses a sample scenario and describes how to use UDJ (User Defined Join), a new extension mechanism introduced in the UDF framework, to implement flexible cross-table operations. This is also a big step towards the NewSQL data processing framework based on the new-generation architecture of MaxCompute.

Background

Generally, UDFs (User Defined Functions) are used to describe users’ code framework. Existing UDF/UDTF/UDAF interfaces are designed mainly for operations on one single data table. However, users often need to combine built-in JOIN operations, various UDFs/UDTFs and complex SQL statements to implement user-defined cross-table operations. In some particular scenarios where multi-table operations are involved, users have to use the traditional and completely custom MR instead of SQL to implement their required computations.

These two methods require relatively high thresholds on the user side. For computing platforms, the mix of complex Join operations and the user code scattered in different places of SQL statements will lead to many instances of the “logic black box” and make it difficult to produce optimal execution plans. MR further decreases the chance of the execution optimization. In addition, it also provides a much lower execution efficiency than the deep optimization native runtime produced based on the LLVM code generator of MaxCompute.

The general availability of MaxCompute 2.0 injects more flexibility into the development of computing platform frameworks. Based on this progress, we have proposed the goal of creating the NewSQL ecosystem. By using an extended SQL framework, NewSQL allows users to express main logic processes in descriptive words and only use user code where distributed system execution processes are not involved. This design allows users to convert the computing logic from “How” (specific steps to complete a distributed computing process) to “What” (users’ desired tasks and data operations described in logic). With this conversion, users can focus more on the “What” aspect and optimize their business processing logic; computing platforms deal with the “How” aspect, perform complex system optimizations and produce optimal plans to implement specific processes.

In this context, we introduced UDJ in the UDF framework, a new extension mechanism that targets multi-table data operations. This new mechanism is expected to enable both high usability and excellent system optimization by reducing the operations targeting the underlying details of distributed systems. Otherwise, users have to perform these operations by using methods like MR.

Sample scenario

Consider two log tables: payment and user_client_log. The payment table stores users’ payment records. Each payment record contains user ID, payment time, and payment content. The user_client_log table stores users’ client logs. Each log contains user ID, log time, and log content.

Now for each client log, we need to find that user’s payment record in the payment table that is the closest to the log time of that log. Then we merge the payment content of the matched payment record and the log content of that client log and return the result. Example data:

payment

Image for post
Image for post

user_client_log

Image for post
Image for post

Take the following log record in the user_client_log table for example.

Image for post
Image for post

The following payment record has the time closest to the time of the preceding log record.

Image for post
Image for post

These two records are merged as follows:

Image for post
Image for post

The merging result of the two tables is as follows:

Image for post
Image for post

Before using UDJ, let’s see if we can use standard JOIN operations to solve this problem.

The answer is no, because in addition to joining records by user_id, we also need to find which record in the payment table has the smallest time offset with a specific record in the user_client_log table. SQL pseudocode for solving this problem may look like this:

SELECTp.user_id,p.time,merge(p.pay_info, u.content)FROMpayment p RIGHT OUTER JOIN user_client_log uON p.user_id = u.user_id and abs(p.time - u.time) = min(abs(p.time - u.time))

This requires the knowledge of the small offset value between p.time and u.time of the same user_id when we join the two tables. However, the join conditions are not allowed on aggregate functions. Therefore, standard Join operations cannot implement this seemingly simple problem. In a distributed system, Join operations actually group data from two tables by one or more fields and temporarily put the same group of data in an intermediate place. The critical problem is that standard SQL has limited operations after Join. At this point, this problem can be solved if users implement a generic programming interface as a plug-in, process the joined data in a customized way in that interface and return the results. UDJ solves this problem.

Use Java to write UDJ code

This section will walk you through how UDJ solves this problem. Since UDJ is a new feature, a new SDK is required.

<dependency><groupId>com.aliyun.odps</groupId><artifactId>odps-sdk-udf</artifactId><version>0.30.0</version><scope>provided</scope></dependency>

The new SDK contains a new abstract class UDJ. We implement the UDJ features by inheriting this class:

package com.aliyun.odps.udf.example.udj;import com.aliyun.odps.Column;import com.aliyun.odps.OdpsType;import com.aliyun.odps.Yieldable;import com.aliyun.odps.data.ArrayRecord;import com.aliyun.odps.data.Record;import com.aliyun.odps.udf.DataAttributes;import com.aliyun.odps.udf.ExecutionContext;import com.aliyun.odps.udf.UDJ;import com.aliyun.odps.udf.annotation.Resolve;import java.util.ArrayList;import java.util.Iterator;/** For each record of right table, find the nearest record of left table and* merge two records.*/@Resolve("->string,bigint,string")public class PayUserLogMergeJoin extends UDJ {private Record outputRecord;/** Will be called prior to the data processing phase. User could implement* this method to do initialization work.*/@Overridepublic void setup(ExecutionContext executionContext, DataAttributes dataAttributes) {//outputRecord = new ArrayRecord(new Column[]{new Column("user_id", OdpsType.STRING),new Column("time", OdpsType.BIGINT),new Column("content", OdpsType.STRING)});}/** Override this method to implement join logic.* @param key Current join key* @param left Group of records of left table corresponding to the current key* @param right Group of records of right table corresponding to the current key* @param output Used to output the result of UDJ*/@Overridepublic void join(Record key, Iterator<Record> left, Iterator<Record> right, Yieldable<Record> output) {outputRecord.setString(0, key.getString(0));if (! right.hasNext()) {// Empty right group, do nothing.return;} else if (! left.hasNext()) {// Empty left group. Output all records of right group without merge.while (right.hasNext()) {Record logRecord = right.next();outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());outputRecord.setString(2, logRecord.getString(1));output.yield(outputRecord);}return;}ArrayList<Record> pays = new ArrayList<>();// The left group of records will be iterated from the start to the end// for each record of right group, but the iterator cannot be reset.// So we save every records of left to an ArrayList.left.forEachRemaining(pay -> pays.add(pay.clone()));while (right.hasNext()) {Record log = right.next();long logTime = log.getDatetime(0).getTime();long minDelta = Long.MAX_VALUE;Record nearestPay = null;// Iterate through all records of left, and find the pay record that has// the minimal difference in terms of time.for (Record pay: pays) {long delta = Math.abs(logTime - pay.getDatetime(0).getTime());if (delta < minDelta) {minDelta = delta;nearestPay = pay;}}// Merge the log record with nearest pay record and output to the result.outputRecord.setBigint(1, log.getDatetime(0).getTime());outputRecord.setString(2, mergeLog(nearestPay.getString(1), log.getString(1)));output.yield(outputRecord);}}String mergeLog(String payInfo, String logContent) {return logContent + ", pay " + payInfo;}@Overridepublic void close() {}}

Note: In this example, the NULL values of the records are not processed. To simplify the data processing procedure for better demonstration, this example assumes that no NULL values are contained in the tables.

Each time you call the UDJ method, records that match the same key in the two tables are returned. To solve the preceding problem, UDJ simply iterates all groups in the left table (payment) for a record with the time closest to a specific log record in the right table (user_client_log), merges the log content and the payment content of the payment record found and returns the result set.

In this example, we assume that each user has a relatively small number of payment records. Therefore, we can pre-load data groups in the left table into memory. (Normally a user will not produce payment records within a day that are too large to be stored in memory.) However, what if this assumption is not true? This issue will be discussed in the Use SORT BY for pre-sorting section later.

Create a UDJ function in MaxCompute

After you have written the UDJ code in Java, upload the code to MaxCompute SQL as a plug-in. You must register the code with MaxCompute first. Assume that the preceding code is packaged into an odps-udf-example.jar file. Upload the packaged code to MaxCompute as a jar resource by using the “add jar” command:

add jar odps-udf-example.jar;

Use the “create function” statement to register the UDJ function, specify the UDJ function name in SQL as “pay_user_log_merge_join” and associate its corresponding jar resource “odps-udf-example.jar” with the class name “com.aliyun.odps.udf.example.udj.PayUserLogMergeJoin” in the jar package:

create function pay_user_log_merge_joinas 'com.aliyun.odps.udf.example.udj.PayUserLogMergeJoin'using 'odps-udf-example.jar';

Use the UDJ function in MaxCompute SQL

After you have registered the UDJ, the function can be used in MaxCompute SQL.

For easier and better demonstration, create a source table first:

CREATE TABLE payment (user_id string,time datetime,pay_info string);CREATE TABLE user_client_log (user_id string,time datetime,content string);

Create some demo data:

INSERT OVERWRITE TABLE payment VALUES('1335656', datetime '2018-02-13 19:54:00', 'PEqMSHyktn'),('2656199', datetime '2018-02-13 12:21:00', 'pYvotuLDIT'),('2656199', datetime '2018-02-13 20:50:00', 'PEqMSHyktn'),('2656199', datetime '2018-02-13 22:30:00', 'gZhvdySOQb'),('8881237', datetime '2018-02-13 08:30:00', 'pYvotuLDIT'),('8881237', datetime '2018-02-13 10:32:00', 'KBuMzRpsko'),('9890100', datetime '2018-02-13 16:01:00', 'gZhvdySOQb'),('9890100', datetime '2018-02-13 16:26:00', 'MxONdLckwa');INSERT OVERWRITE TABLE user_client_log VALUES('1000235', datetime '2018-02-13 00:25:36', 'click FNOXAibRjkIaQPB'),('1000235', datetime '2018-02-13 22:30:00', 'click GczrYaxvkiPultZ'),('1335656', datetime '2018-02-13 18:30:00', 'click MxONdLckpAFUHRS'),('1335656', datetime '2018-02-13 19:54:00', 'click mKRPGOciFDyzTgM'),('2656199', datetime '2018-02-13 08:30:00', 'click CZwafHsbJOPNitL'),('2656199', datetime '2018-02-13 09:14:00', 'click nYHJqIpjevkKToy'),('2656199', datetime '2018-02-13 21:05:00', 'click gbAfPCwrGXvEjpI'),('2656199', datetime '2018-02-13 21:08:00', 'click dhpZyWMuGjBOTJP'),('2656199', datetime '2018-02-13 22:29:00', 'click bAsxnUdDhvfqaBr'),('2656199', datetime '2018-02-13 22:30:00', 'click XIhZdLaOocQRmrY'),('4356142', datetime '2018-02-13 18:30:00', 'click DYqShmGbIoWKier'),('4356142', datetime '2018-02-13 19:54:00', 'click DYqShmGbIoWKier'),('8881237', datetime '2018-02-13 00:30:00', 'click MpkvilgWSmhUuPn'),('8881237', datetime '2018-02-13 06:14:00', 'click OkTYNUHMqZzlDyL'),('8881237', datetime '2018-02-13 10:30:00', 'click OkTYNUHMqZzlDyL'),('9890100', datetime '2018-02-13 16:01:00', 'click vOTQfBFjcgXisYU'),('9890100', datetime '2018-02-13 16:20:00', 'click WxaLgOCcVEvhiFJ');

In MaxCompute SQL, use the UDJ function that you have created:

SELECT r.user_id, from_unixtime(time/1000) as time, content FROM (SELECT user_id, time as time, pay_info FROM payment) p JOIN (SELECT user_id, time as time, content FROM user_client_log) uON p.user_id = u.user_idUSING pay_user_log_merge_join(p.time, p.pay_info, u.time, u.content)rAS (user_id, time, content);

The UDJ syntax is similar to the syntax of standard Join operations. In this example, the UDJ function has one additional USING clause, where: “pay_user_log_merge_join” is the name of the registered UDJ function in SQL; “(p.time, p.pay_info, u.time, u.content)” is the columns in the left and right tables used in the UDJ; “r” is the alias of the results of UDJ, which is used for results of referencing the UDJ elsewhere; “(user_id, time, content)” is the name of the column produced from the UDJ.

Run the preceding SQL. The result shows that UDJ can perfectly solve the aforementioned problem. The result is:

+---------+------------+---------+
| user_id | time | content |
+---------+------------+---------+
| 1000235 | 2018-02-13 00:25:36 | click FNOXAibRjkIaQPB |
| 1000235 | 2018-02-13 22:30:00 | click GczrYaxvkiPultZ |
| 1335656 | 2018-02-13 18:30:00 | click MxONdLckpAFUHRS, pay PEqMSHyktn |
| 1335656 | 2018-02-13 19:54:00 | click mKRPGOciFDyzTgM, pay PEqMSHyktn |
| 2656199 | 2018-02-13 08:30:00 | click CZwafHsbJOPNitL, pay pYvotuLDIT |
| 2656199 | 2018-02-13 09:14:00 | click nYHJqIpjevkKToy, pay pYvotuLDIT |
| 2656199 | 2018-02-13 21:05:00 | click gbAfPCwrGXvEjpI, pay PEqMSHyktn |
| 2656199 | 2018-02-13 21:08:00 | click dhpZyWMuGjBOTJP, pay PEqMSHyktn |
| 2656199 | 2018-02-13 22:29:00 | click bAsxnUdDhvfqaBr, pay gZhvdySOQb |
| 2656199 | 2018-02-13 22:30:00 | click XIhZdLaOocQRmrY, pay gZhvdySOQb |
| 4356142 | 2018-02-13 18:30:00 | click DYqShmGbIoWKier |
| 4356142 | 2018-02-13 19:54:00 | click DYqShmGbIoWKier |
| 8881237 | 2018-02-13 00:30:00 | click MpkvilgWSmhUuPn, pay pYvotuLDIT |
| 8881237 | 2018-02-13 06:14:00 | click OkTYNUHMqZzlDyL, pay pYvotuLDIT |
| 8881237 | 2018-02-13 10:30:00 | click OkTYNUHMqZzlDyL, pay KBuMzRpsko |
| 9890100 | 2018-02-13 16:01:00 | click vOTQfBFjcgXisYU, pay gZhvdySOQb |
| 9890100 | 2018-02-13 16:20:00 | click WxaLgOCcVEvhiFJ, pay MxONdLckwa |
+---------+------------+---------+

Use SORT BY for pre-sorting

As mentioned in the UDJ code section, to find a record in the payment table with the smallest time offset, we need to iterate all the records in the payment table. Therefore, we pre-load all the payment records of the same user_id into one ArrayList. This is practical when a user has a relatively small number of payment records. However, in some scenarios, the data in the same array may be too large to be stored in memory. In this case, we can use the pre-sorting feature of UDJ. Let’s get back to the point: Assume that a user has such a large number of payment records that cannot be put in memory. If you think about it, you will find that this problem can be easily solved if all the data in the array has been sorted by time. To implement UDJ, simply compare the “top” data of the iterators of the two tables.

@Overridepublic void join(Record key, Iterator<Record> left, Iterator<Record> right, Yieldable<Record> output) {outputRecord.setString(0, key.getString(0));if (! right.hasNext()) {return;} else if (! left.hasNext()) {while (right.hasNext()) {Record logRecord = right.next();outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());outputRecord.setString(2, logRecord.getString(1));output.yield(outputRecord);}return;}long prevDelta = Long.MAX_VALUE;Record logRecord = right.next();Record payRecord = left.next();Record lastPayRecord = payRecord.clone();while (true) {long delta = logRecord.getDatetime(0).getTime() - payRecord.getDatetime(0).getTime();if (left.hasNext() && delta > 0) {// The delta of time between two records is decreasing, we can still// explore the left group to try to gain a smaller delta.lastPayRecord = payRecord.clone();prevDelta = delta;payRecord = left.next();} else {// Hit to the point of minimal delta. Check with the last pay record,// output the merge result and prepare to process the next record of// right group.Record nearestPay = Math.abs(delta) < prevDelta ? payRecord : lastPayRecord;outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());String mergedString = mergeLog(nearestPay.getString(1), logRecord.getString(1));outputRecord.setString(2, mergedString);output.yield(outputRecord);if (right.hasNext()) {logRecord = right.next();prevDelta = Math.abs(logRecord.getDatetime(0).getTime() - lastPayRecord.getDatetime(0).getTime());} else {break;}}}}

We only need to slightly modify the preceding SQL statement and add the SORT BY clause at the end of the UDJ statement to sort records of the left and right table in the UDJ array by the time field. (If you are following the sample, do not forget to update the jar package of the UDJ.)

SELECT r.user_id, from_unixtime(time/1000) as time, content FROM (SELECT user_id, time as time, pay_info FROM payment) p JOIN (SELECT user_id, time as time, content FROM user_client_log) uON p.user_id = u.user_idUSING pay_user_log_merge_join(p.time, p.pay_info, u.time, u.content)rAS (user_id, time, content)SORT BY p.time, u.time;

In this example, to implement the same feature as the previous algorithm, at most three records need to be cached at the same time if data after UDJ is pre-sorted by using the SORT BY clause.

Reference:https://www.alibabacloud.com/blog/udf-development-guide-with-maxcompute-studio_594738?spm=a2c41.12825660.0.0

Written by

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store