Best Practices of Designing MaxCompute Tables

We will be sharing the best practices of designing Alibaba Cloud MaxCompute tables by addressing several typical application scenarios of MaxCompute.

Operations that Produce Too Many Small Files

Since small files in MaxCompute tables influence storage and computing performance, we will first explain which operations will cause too many small files so that we can avoid these operations when designing tables.

During the process of uploading data by using MaxCompute Tunnel SDK, a file is generated for each commit operation. If each file is too small (for example, less than 10 KB) and data is continually uploaded (for example, every 5 seconds), then 720 small files will be generated in just one hour, and 17,280 in just one day. When MaxCompute Tunnel SDK is used to upload data, a lot of empty directories (small files on the server) will be generated if sessions are created and data is directly committed without being uploaded.

When the Tunnel command in the MaxCompute Console command-line tool is used to upload data, local large files will be split into too small files, causing too many small files after uploaded. When DataHub is used to archive data, each shard in DataHub writes data into MaxCompute under the following two conditions: If the total data volume reaches 64 MB, commit data to MaxCompute to form a file. Or commit every five minutes to form a file. Therefore, if there are too many shards (for example, 20 shards), each shard is far from having 64 MB data within five minutes (for example, several hundred KB of data), and a large number of small files will be produced. In this case, 241220=5760 files will be generated in just one day.

When data modeling tools such as DataWorks are used to incrementally insert data into a table (or table partition) in MaxCompute, each Insert Into operation will produce a file. If the Insert Into operation is run on 10 records each time, and a total of 10,000 records are inserted each day, then 1000 small files are created each day.

When Alibaba Cloud Data Transmission Service (DTS) is used to synchronize data from databases like ApsaraDB for RDS to MaxCompute, DTS will create full tables and increment tables. A relatively complete data sync is committed during inserting progress data in increment tables due to few entries inserted each time, causing too many small files in increment tables. For example, if data is synchronized every 5 minutes and 10 entries are synchronized each time, the incremental data in a day is 10,000 entries, producing a total of 1000 small files. In this case, full ultimate tables and incremental data tables need to be merged after data sync is completed.

In the event of too many source data collection clients, source data will directly go into a partition through Tunnel. Every time each source data collection client commits data, a separate file will be created in the same partition, causing a large number of small files.

Log Service triggers Function Compute to continuously and frequently write files into MaxCompute, and streaming data of small files goes into MaxCompute.

Divide Projects Based on Data

A project is an object in the highest layer in MaxCompute. Resources are assigned, isolated and managed by project, enabling multi-tenant management.

  1. If multiple applications need to share data, it is recommended to use the same project.
  2. If data requested by each application is relevant, it is recommended to use different projects. The tables and partitions across projects can be exchanged by granting package.

Designing Dimension Tables

Generally, tables that describe properties are dimension tables. A dimension table can be associated to any tables in any table groups and doesn’t require partition configuration at the time of creation. However, a dimension table has a limit on the data size of a form. Considerations in designing and using dimension tables:

  1. Generally a form in a dimension table should contain no more than 10 million entries.
  2. Large-scale data updates in dimension tables should be avoided.
  3. Mapjoin can be used to JOIN dimension tables and other tables.

Designing Zipper Tables–Ultimate Storage

The ultimate storage feature is coming soon. This section mainly describes its design ideas. When designing data models of data warehouses based on MaxCompute zipper tables, we may often encounter the following requirement scenarios:

  1. Data volume is very large. Some fields in tables may need to be updated, such as user address, product description, order status, and phone number.
  2. It is required to view snapshot information at a specific time point or within a specific period of time ( for example, to check the status of a specific order at a past time point or check how many times the information about a specific user has been updated within a specific period of time).
  3. Only a small portion of information is changed at a relatively long time interval. For example, a table contains a total of 10 million member info entries, and only around 100,000 entries are added or changed each day. If a full copy of member data is saved each day, each full copy will hold massive amounts of unchanged data, wasting a lot of storage resources.

If you encounter the previously mentioned scenarios, consider using ultimate storage. MaxCompute allows different tables to be converted to ultimate storage tables. The following is an ultimate storage operation example:

Create a source table.

create table src_tbl (key0 STRING, key1 STRING, col0 STRING, col1 STRING, col2 STRING) PARTITIO N (datestam p_x STRING, pt0 STRING);

Import data. Convert src_tbl to an ultimate storage table.

set odps.exstore.primarykey=key0,key1;
[set odps.exstore.ignorekey=col0;]
EXSTO RE exstore_tbl PARTITIO N (datestam p_x='20140801'); EXSTO RE exstore_tbl PARTITIO N (datestam p_x='20140802');

Designing Data Collection Source Tables

Data collecting methods: stream writing, batch writing, and periodical scheduling of inserting strip data.

In the event of large data volume, make sure that data in the same business unit uses partitions and tables; in the event of small data volume, optimize data collection frequency.

Stream writing:

  1. For stream writing of data, generally there are many collection channels. It is helpful to make table design plans according to collection channels. If massive amounts of data is written in a single data channel, design partitions based on time.
  2. If the data volume is relatively small in collection channels, use non-partitioned tables and design the terminal type and collection time into standard column fields.
  3. When Datahub is used to write data, the number of shards should be properly planned to avoid small collection channel traffic and too many channels due to too many shards.

Batch writing:

  1. For batch writing, the focus is on the data writing cycle and the periodical scheduling of inserting strip data.
  2. Avoid inserting data periodically, because inserting data periodically requires creating partitioned tables and inserting data in new partitions in order to reduce the influences on the original partitions.

Designing Log Tables

A log table is actually a journal entry table that keeps all incoming entries together and isn’t involved in updating already archived items.

create table src_tbl (key0 STRING, key1 STRING, col0 STRING, col1 STRING, col2 STRING) PARTITIO N (datestam p_x STRING, pt0 STRING);
set odps.exstore.primarykey=key0,key1;
[set odps.exstore.ignorekey=col0;]
EXSTO RE exstore_tbl PARTITIO N (datestam p_x='20140801'); EXSTO RE exstore_tbl PARTITIO N (datestam p_x='20140802');

The following are several considerations in designing log tables:

  1. Consider whether it is necessary to remove duplicated entries in log tables.
  2. Consider whether dimension properties need to be extended.
  3. Consider the following two factors when determining if it is necessary to associate dimension properties in a dimension table: how often the business service is used, and whether association will cause latency in output.
  4. Consider whether to scale dimension tables.
  5. Consider partition terminal types.

There are usually a large number of log tables, and data statistics and analysis are usually performed separately on PCs and on the mobile app side during the business analysis process. In the meantime, data collection on PCs and the mobile app side is done by using two different systems. Therefore, the common practice is to design multiple detail DWD tables based on the terminal type.

In the event of too many terminals but relatively small data size (for example, data on one terminal is less than 1 TB but is frequently collected), consider setting the terminal information as an ordinary column instead of partitioning the terminal.

Note:

  1. When designing log table partitions, you can partition log tables by day based on the log collection time. Before writing data, collect and group data, and perform a commit each time a batch of data (usually 64 MB) is written.
  2. For log data, because updating information in original partitions is very rare, the insert operation can be performed to insert a very small amount of data. However, it is generally recommended to perform as few Insert operations as possible.
  3. If a large amount of data needs to be updated, use the INSERT OVERWRITE operation to avoid producing too many small files.
  4. Set proper partitions for log tables and configure archiving operations for cold data that has not been accessed for a long time.

Designing Interaction Detail Tables

Periodic snapshot tables where all collected records are stored as snapshots on a daily basis.

Common Problem: A large number of records are accumulated, and to generate snapshots on a specific day requires merging the increment table that day and the full table before that day, which will consume a lot of resources. To get the snapshots newly collected on the last day, the full table needs to be scanned. The question is how to reduce resource consumption.

Recommended Solution: Create a transactional fact table, and create a periodic snapshot table that holds currently valid collected snapshots to meet statistics and analysis requirements in different business scenarios.

Note:

  1. The most important consideration in designing interaction detail tables is to understand the relationship between stock data and incremental data. Data in new partitions can be written as incremental data.
  2. Avoid inserting data into old partitions or modifying data in old partitions.
  3. When inserting data or overwriting data throughout a table, use “Insert Overwrite” whenever possible instead of INSERT INTO.

Updating and Deleting MaxCompute Table Data

The following section shows how to implement delete/update/merge SQL (supported operations in relational databases) on MaxCompute:

Prepare tables

-- Full table for the previous day
table1(key1 string,key2 string,col1 string,col2 string);
-- Today's incremental table
table2(key1 string,key2 string,col1 string,col2 string);
-- Today's incremental table (to be deleted)
table3(key1 string,key2 string,col1 string,col2 string);

Update (update records in table2 into table1)

insert overwrite table table1 select t1.key1
,t1.key2
,case when t2.key1 is not null then t2.col1 else t1.col1 end as col1 ,case when t2.key1 is not null then t2.col2 else t1.col2 end as col2
from table1 t1
left outer join table2 t2 on t1.key1=t2.key1 and t1.key2 = t2.key2 ;

Delete (delete records in table2 from table1)

insert overwrite table table1 select t1.key1
,t1.key2 ,t1.col1 ,t1.col2
from table1 t1
left outer join table2 t2 on t1.key1=t2.key1 and t1.key2 = t2.key2 where t2.key1 is null
;

Merge (not deleted)

insert overwrite table table1 select
from (
-- Exclude the records shared in table1 and table2 from table1. The remaining records are records that are not updated today select t1.key1
,t1.key2 ,t1.col1 ,t1.col2
from table1 t1
left outer join table2 t2 on t1.key1=t2.key1 and t1.key2 = t2.key2 where t2.key1 is null
union all
-- Merge today's incremental records to get full records select t2.key1
select t2.key1
,t2.key2
,t2.col1
,t2.col2
from table2 t2)tt
;

Merge (deleted)

insert overwrite table table1 select
from (
-- Exclude the records shared between the previous day's table and today's table from the previous day's table, then exclude records deleted today. The remaining records are records that are not updated today
select t1.key1
,t1.key2 ,t1.col1 ,t1.col2
from table1 t1
left outer join table2 t2 on t1.key1=t2.key1 and t1.key2 = t2.key2 left outer join table3 t3 on t1.key1=t3.key1 and t1.key2 = t3.key2
where t2.key1 is null or t2.key1 is null
union all
-- Merge today's incremental records to get full records select t2.key1
,t2.key2 ,t2.col1 ,t2.col2
from table2 t2)tt ;

Managing Lifecycles

Data lifecycle management is available in MaxCompute tables and partitions. A table (partition), if not changed within a specified period of time starting from the last update time, will be automatically recycled in MaxCompute. This specified period of time is the lifecycle, which is set at the table level.

create table test_lifecycle(key string) lifecycle 100;/alter table test_l ifecycle set lifecycle 50;

MaxCompute determines whether to recycle a non-partitioned table or a partition in a partitioned table based on the LastDataModifiedTime and lifecycle settings of that non-partitioned table or partition. MaxCompute SQL supports the touch operation to modify LastDataModifiedTime of a partition. This operation can modify LastDataModifiedTime of a partition to the current time. If the LastDataModifiedTime value is modified, MaxCompute will consider that data in that table or partition has changed, and the lifecycle timer will be reset.

ALTER TABLE table_nam e TO UCH PARTITIO N(partition_col='partition_col_valu e', ...) ;

Note:

  1. Set a proper lifecycle for a table upon the creation of the table to reduce storage space usage.
  2. Any table data changes will affect the data lifecycle timer, including merging small files.

Avoiding Whole Table Scans

A full table scan can be time consuming especially when you have a large table. To avoid this, consider the following:

  1. Create a partitioned table or limit a scan to only certain table columns.
  2. Partition a data table properly. Set the common query criteria as column names.
  3. Perform hash clustering when reading common query criteria.
  4. Add partition filtering criteria, reduce the number of partitions to be scanned, or separate small tables and then scan the partitions of small-sized middle tables to reduce the amount of data to be scanned.
  5. Store the middle results of the globally scanned table in a middle table.
  6. Scanning all partitions within a year each day will consume a lot of computing resources. It is advised to create a separate middle table, summarize data once daily, and then scan the partitions within a year in this middle table to reduce the data volume to be scanned.

Avoiding Small Files

As mentioned at the beginning of the article, small files in MaxCompute tables can affect storage and computing performance. Therefore, we should reduce the number of small files produced during the computing process. To achieve this, simply perform the INSERT OVERWRITE operation on a source table (or partition) or write data into a new table and delete the source table.

Suggestions on small files produced when Tunnel collects data:

  1. Commit data each time the buffer reaches 64 MB when invoking the tunnelsdk;
  2. Avoid uploading small files frequently when using Console. Instead, upload data until accumulated data becomes large enough; if a partitioned table is imported, it is recommended to set a lifecycle so that data unused within the period can be automatically cleaned.
  3. As previously described, perform the InsertOverwrite operation on a source table (or partition);
  4. Alter the merging mode, and merge data by using console commands.

You should set lifecycles for all temporary tables at the time of table creation so that junk data can be automatically recycled after a lifecycle has expired. Applying for too many datahub shards will cause excessive small files. Suggestions on the number of DataHub shards:

  1. The default throughput for each shard is 1 Mbit/s. It is suggested to determine how many shards should be assigned based on this default throughput configuration (You can also add several additional shards);
  2. The logic of synchronizing odps is that each shard has a separate task (committed every 5 minute or data reaches 64 MB). By default, the interval of the commit operation is set to 5 minutes so as to find data in odps as soon as possible. If partitions are created by hour, each shard has 12 files every hour.
  3. If data volumes are very small but many shards are included, then there are too many small files in odps (the number of shards*12/hour).
  4. Don’t assign too many shards. Instead, assign shards as needed.

Converting to Hash Clustering Tables

Advantages of hash clustering tables: optimize bucket pruning, aggregation, and storage. If CLUSTERED BY is used to specify Hash Key when you create a table, MaxCompute will perform Hash operations on the specified columns and spread data into buckets based on hash values.

Principles for selecting hash key values:

  1. Select columns that contain few repeated key values.
  2. Use SORTED BY to specify how fields are sorted in a bucket.

How to convert to hash clustering tables:

ALTER TABLE table_nam e [CLUSTERED BY (col_nam e [, col_nam e, ...]) [SORTED BY (col_nam e [ASC | DESC] [, col_nam e [ASC | DESC] ...])] INTO number_of_buckets BUCKETS]

The ALTER TABLE statement is suitable for stock tables. After new aggregation properties are added, new partition are stored as hash clusters. After a HashClustering table is created, use Insert Overwrite to convert from a source table to the created table.

Note that a hash clustering table has the following limitations:

  1. Insert Into is not supported. Only Insert Overwrite can be used to add data.
  2. It is not supported to upload data to a range cluster table directly by using Tunnel, because data uploaded by Tunnel is unordered.

Summary: Table Creation Example

Now that we know the best practices of designing a table in MaxCompute, let us apply our knowledge by using a simple example of weather information collection.

Firstly, data in this scenario includes weather information and geographic data such as place’s names, area, basic population. We know that Geographic data doesn’t change a lot. However, weather information changes significantly and frequently. Weather information is collected by using multiple terminals, and has massive amounts of data. Weather information traffic is usually steady when the number of terminals remains steady.

Table design guide:

  1. It is recommended that the relevant data in this scenario should be divided into a geographic data table and a weather log table to distinguish data that is less likely to change from data that changes significantly and frequently.
  2. Since the involved data volume is very large, it is recommended to partition the weather log table by area. The weather log table can also be partitioned by time unit (for example, by day). This partition method can avoid changing irrelevant data due to weather changes in a specific place or at a specific time.
  3. DataHub is used on collection terminals to aggregate data. Choose a proper number of shard channels based on the stable traffic volume and batch write data into the weather log table instead of using INSERT INTO.

To learn more about Alibaba Cloud MaxCompute, visit https://www.alibabacloud.com/product/maxcompute

Reference:https://www.alibabacloud.com/blog/best-practices-of-designing-maxcompute-tables_594373?spm=a2c41.12516450.0.0

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store