MaxCompute Tunnel Offline Batch Data Channel FAQs

Best Practices for SDK Upload

import java.io.IOException;
import java.util.Date;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
public class UploadSample {
private static String accessId = "<your access id>";
private static String accessKey = "<your access Key>";
private static String odpsUrl = "http://service.odps.aliyun.com/api";
private static String project = "<your project>";
private static String table = "<your table name>";
private static String partition = "<your partition spec>";
public static void main(String args[]) {
// Measure twice, cut once
Account account = new AliyunAccount(accessId, accessKey);
Odps odps = new Odps(account);
odps.setEndpoint(odpsUrl);
odps.setDefaultProject(project);
TableTunnel tunnel = new TableTunnel(odps);
try {
// Determine the partition to write to
PartitionSpec partitionSpec = new PartitionSpec(partition);
// Create a session valid for 24 hours on the partition of this table at the server. The session can upload a total of 20,000 blocks of data within 24 hours.
// When creating a session, it only takes seconds, but some resources need to be used on the server and temporary directories need to be created, which makes the operation onerous. Therefore, it is strongly recommended to reuse a session to upload as much as possible for the same partition data.
UploadSession uploadSession = tunnel.createUploadSession(project,
table, partitionSpec);
System.out.println("Session Status is : "
+ uploadSession.getStatus().toString());
TableSchema schema = uploadSession.getSchema();
// After the data is ready, open the Writer to start writing data and write a block. Each block can only be uploaded successfully once, and cannot be uploaded repeatedly. The success of CloseWriter indicates the Block upload has completed, otherwise the block can be uploaded again. A maximum of 20,000 BlockId, that is, 0-19999, are allowed in the same session. If exceeding this number, please commit the session and create a new session for use, and so on.
// When the data written to a block is too small, the system will produce a large number of small files, seriously degrading computing performance. We strongly recommend over 64 MB of data be written each time (up to 100 GB of data can be written to the same block).
// You can estimate the total value according to the average data volume and record count. For example: 64 MB < Average data size × Record count < 100 GB
// maxBlockID server is limited to 20,000. Users can use a certain number of blocks, such as 100, per session according to their own business needs, but it is recommended that the more blocks they use in each session, the better, because creating a session is a onerous operation.
// If only a small amount of data is uploaded after a session is created, it will not only cause problems such as small files and empty directories, but also seriously affect the overall performance of the upload (it takes seconds to create a session, and it may only take a few dozen milliseconds to actually upload)
int maxBlockID = 20000;
for (int blockId = 0; blockId < maxBlockID; blockId++) {
// Prepare at least 64MB of data before writing
// For example: read several files or read data from a database
try {
// Create a Writer on the Block. If no more than 4 KB of data is written for 2 consecutive minutes at any time after the Writer is created, the connection is disconnected with a timeout
// Therefore, it is recommended to prepare data that can be written directly in memory before creating a Writer
RecordWriter recordWriter = uploadSession.openRecordWriter(blockId);
// Convert all data read into Tunnel Record format and add
int recordNumber = 1000000;
for (int index = 0; i < recordNumber; i++) {
// Convert the raw data of the “index” into an odps record
Record record = uploadSession.newRecord();
for (int i = 0; i < schema.getColumns().size(); i++) {
Column column = schema.getColumn(i);
switch (column.getType()) {
case BIGINT:
record.setBigint(i, 1L);
break;
case BOOLEAN:
record.setBoolean(i, true);
break;
case DATETIME:
record.setDatetime(i, new Date());
break;
case DOUBLE:
record.setDouble(i, 0.0);
break;
case STRING:
record.setString(i, "sample");
break;
default:
throw new RuntimeException("Unknown column type: "
+ column.getType());
}
}
// Writes the data to the server. Each 4 KB of data written triggers a network transmission
// If no network transmission occurs for 120 seconds, the server closes the connection. At this time, the Writer becomes unavailable and you must write data again.
recordWriter.write(record);
}
// Closing successfully means that the block was uploaded successfully, but the data is not visible in the odps temporary directory until the entire session is committed
recordWriter.close();
} catch (TunnelException e) {
// It is recommended to retry a certain number of times
e.printStackTrace();
System.out.println("write failed:" + e.getMessage());
} catch (IOException e) {
// It is recommended to retry a certain number of times
e.printStackTrace();
System.out.println("write failed:" + e.getMessage());
}
}
// Submit all the Blocks. uploadSession.getBlockList() can specify the blocks it will submit. The data will not be formally written to the Odps partition until the Commit succeeds. It is recommended to retry 10 times if the Commit fails
for (int retry = 0; retry < 10; ++retry) {
try {
// Seconds operation, formally submitting data
uploadSession.commit(uploadSession.getBlockList());
break;
} catch (TunnelException e) {
System.out.println("uploadSession commit failed:" + e.getMessage());
} catch (IOException e) {
System.out.println("uploadSession commit failed:" + e.getMessage());
}
}
System.out.println("upload success!") ;
} catch (TunnelException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}

Frequently Asked Questions about MaxCompute Tunnel

Can block IDs be repeated?

Is there a restriction on block size?

Can a session be shared? Does a session have a lifecycle?

If a session is created but not used, does it consume system resources?

How can I process Write/Read timeout or I/O exceptions?

Is MaxCompute Tunnel suitable for batch uploading or stream uploading?

Are partitions required for data uploading through MaxCompute Tunnel?

What is the relationship between Dship and MaxCompute Tunnel?

Does data uploaded with Tunnel append to or overwrite existing data on a file?

What is the routing function of MaxCompute Tunnel?

How much data in a block is preferred when uploading data with MaxCompute Tunnel?

Why do I keep getting a timeout prompt when using MaxCompute Tunnel?

Why do I receive the exception, “You have NO privilege ‘odps:Select’ on {acs:odps:*:projects/XXX/tables/XXX}. project ‘XXX’ is protected” when I use Tunnel to download data?

Why do I receive the exception, “ErrorCode=FlowExceeded, ErrorMessage=Your flow quota is exceeded” when I use Tunnel to upload data?

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store