How to Synchronize Data from Hive to MaxCompute?

Image for post
Image for post

By Yunhua, Intelligent Product Expert at Alibaba Cloud

The following content is based on the PowerPoint slides of Yunhua’s presentation on MaxCompute Migration Assist (MMA). This article covers two parts:

1) Features, Technical Architecture, and Principles of MMA
2) Data Migration Demonstration on MMA

1) Features, Technical Architecture, and Principles of MMA

1.1 MMA Features

MaxCompute Migration Assist (MMA) is a MaxCompute data migration tool that is used for batch processing, storage, data integration, and job orchestration and scheduling. MMA has a migration evaluation and analysis feature that automatically generates migration evaluation reports, which help you determine data type mapping compatibility issues when synchronizing data from Hive to MaxCompute, such as syntax issues.

MMA supports automatic data migration, batch table creation, and automatic batch data migration. It also provides a job syntax analysis feature to check whether Hive SQL can be run on MaxCompute. In addition, MMA supports workflow migration, job migration and transformation for the mainstream data integration tool Sqoop, and automatic creation of DataWorks data integration jobs.

1.2 MMA Architecture

The following figure displays the MMA architecture. The left side shows the customer’s Hadoop cluster, and the right side shows Alibaba Cloud big data services, mainly DataWorks and MaxCompute.

MMA runs on your Hadoop cluster, and your server must be able to access the Hive Server. Post-deployment on a host, the MMA client automatically obtains the Hive metadata. It reads the Hive metadata from MySQL and automatically converts it to MaxCompute Data Definition Language (DDL) statements.

Next, run DDL statements to create tables on MaxCompute in batches, start batch synchronization jobs, and submit concurrent Hive SQL jobs to the Hive Server. You can call a user-defined function (UDF) based on the Hive SQL job. The UDF integrates the Tunnel SDK to write data to MaxCompute tables in batches based on Tunnel. When migrating jobs and workflows, you can check workflow jobs based on the Hive metadata that MMA discovers automatically. This includes batch converting workflow configurations in workflow components to DataWorks workflow configurations for generating DataWorks workflows. After these steps, data is migrated to jobs and workflows. After the migration completion, you need to connect to the business system based on the MaxCompute and DataWorks architectures.

1.3 Technical Architecture and Principles of MMA Agent

MMA supports the batch migration of data and workflows through the client and server. The MMA client installed on your server provides the following features:

  • Automatically obtain the Hive metadata
  • Generate DDL and user-defined table function (UDTF) statements
  • Create tables in batches and migrate Hive data in batches

Accordingly, MMA contains four components:

  • Meta Carrier automatically extracts the Hive metadata and generates a Hive Metastore structure locally.
  • Meta Processor batch converts Hive metadata into MaxCompute DDL statements based on the results generated by Meta Carrier, including the table creation statements and data type conversion statements.
  • The built-in ODPS Console component allows you to batch create MaxCompute tables by using the MaxCompute DDL statements generated by Meta Processor.
  • Finally, the Data Carrier batch creates Hive SQL jobs. Each Hive SQL job is equivalent to the concurrent data synchronization of multiple tables or partitions.

2) Data Migration Demonstration on MMA

2.1 Prepare the Environment

MMA requires JDK V1.6 or later and Python V3 or later, as shown in the following figure. The host that runs MMA submits Hive SQL jobs through the Hive client. The host should be able to access the Hive Server and connect to MaxCompute. The right side of the following figure shows a scenario where the customer found an issue when synchronizing data based on MMA. In this example, you have an IDC and Elastic Compute Service (ECS) instance in Alibaba Cloud and have connected the IDC to Alibaba Cloud through a private line. Before installing MMA, you can directly access MaxCompute from the ECS instance but will fail to access MaxCompute from machines in the IDC. In this case, add a virtual border router (VBR) to the private line, and subsequently, connect the IDC to the ECS instance and even to MaxCompute through the network.

2.2 Download and Compile the Toolkit

Download the compiled toolkit, as shown in the following figure. You can also download the source code from the GitHub address available on the MMA website, and then compile it locally based on your Hive version.

2.3 Perform MMA Agent Operations

  • Use meta-carrier to ingest Hive metadata: Install the Hadoop environment on the host in advance with a local Hive Server. Download and decompress the odps-data-carrier.zip package locally. After decompression, the following directories are displayed:
  • The bin directory contains the key files of MMA: meta-carrier, meta-processor, odps_ddl_runner (used to batch create tables), and hive_udtf_sql_runner (used to synchronize data). The libs directory contains the JAR package and library that MMA depends on. The res/console/bin directory contains the ODPSCMD tool and the odps_config.ini configuration file.

The local Hive has three databases, of which the dma_demo database has five tables. These five tables are automatically batch synchronized to MaxCompute. Create a project in the DataWorks console. Open a new command window and connect to the new project by running the local ODPSCMD tool. Decompress the package on the host where MMA Hive has been installed, and access the odps-data-carrier directory. Run the bin/meta-carrier -h command to view the parameter description. -d is used to specify a database to extract metadata. If it is not specified, the metadata of all databases in Hive is pulled. -o is used to specify the output directory, -t is used to specify the table, and -u is used for specifying the URL, which is the address of Hive Metastore. Specify the address when you start the test. The thrift protocol is used for connection because the address is a thrift address. In addition, Hive metadata is stored locally. Therefore, you only need to pull the metadata from the dma_demo database and use -o to specify the directory. Tree Meta allows you to view the structure of metadata directories. The dma_demo directory is generated in the metadata directory, with the same name as the database. The JSON file in the dma_demo directory describes the metadata of the database. The two tables in partition_meta are partition tables, and the table in table_meta is a non-partition table that records the metadata of all table sets.

  • Use network-measurement-tool: The network-measurement-tool tests the network from a Hive cluster to each MaxCompute region. Obtain the network transmission speed and the approximate data volume and estimate the data transmission time based on the network transmission speed. The network-measurement-tool connects to the endpoints of MaxCompute nodes in all regions and sorts the endpoints in descending order of connection speed. As shown in the following figure, the speed in HANGZHOU is the fastest, with a connection time of 51 ms, while the speed in KUALA_LUMPUR is the slowest, with a connection time of 3393 ms.
  • Use sql-checker to check whether Hive SQL can be executed on MaxCompute: The sql-checker is used to check the Hive SQL syntax compatibility and determine whether Hive SQL can be run on MaxCompute. Specify the meta-directory for the input parameter, as well as specify the default project and SQL parameters. If the execution result indicates good compatibility, the SQL statement can be run on MaxCompute.
  • Use meta-processor to generate MaxCompute DDL statements and Hive UDTF SQL statements: The metadata of Hive Metastore has been pulled in Step 1. to convert Hive metadata to MaxCompute DDL statements. Run the bin/meta-processor -h command to view the parameters. -i indicates the input parameter while -o indicates the output directory. The -i parameter is the output result of the first command, indicating the metadata directory that stores Hive metadata pulled by meta-carrier, such as /meta processor -i meta -o output. Save the execution result of bin/meta-processor in the output directory. View the tree structure of the output directory. The dma_demo directory named after a MaxCompute project is generated in the output directory. The .sql file in the hive_udtf_sql directory in dma_demo is used for batch data migration. The dma_demo directory also contains the odps_ddl directory, which is used to subsequently batch create tables. The .sql file in the odps_ddl directory contains table creation statements.
  • Use odps_ddl_runner.py to create tables and partitions in batches: After generating the DDL statement, create tables in batches. Tables are batch created using the ODPSCMD tool (client tool). The odps_config file in the first-level directory of the toolkit contains basic parameters, among which project_name, access_id, access_key, and end_point are required. After setting the parameters, create tables in batches. Run the python36 bin/odps_ddl_runner.py -h command on the batch table creation tool. The meta-processor automatically generates an input parameter. The odpscmd parameter is not required because the directory is located by default. When creating tables, start the ODPSCMD tool and submit the table creation statements to MaxCompute through the client tool. Run show table to check whether the five tables were created and then check whether the partitions were created. If partitions are created on Hive and MaxCompute, the table structures on the two sides must be the same.
  • Use hive_udtf_sql_runner.py to migrate data: Run the python36 bin/hive_udtf_sql_runner.py command to read SQL statements in the .sql files in the output directory. Check the parameters in the python36 bin/hive_udtf_sql_runner.py command, where input_all is used to batch migrate all data in the output directory. If you only want to migrate data from a single table or a single partition, use the input_single_file parameter. The parallelism parameter indicates the degree of parallelism (DOP). After data is migrated, check whether the table contains data in MaxCompute. Compare the data in MaxCompute with that in the corresponding table on Hive. If the size is the same, the data in the corresponding table in Hive is the same as that in MaxCompute, indicating that the data was completely migrated.
  • Advanced feature 1: metadata generation for a specified database or table: Specify a table for which you want to generate metadata. The meta-carrier tool captures tables from a specified database.
  • Advanced feature 2: flexible mapping from Hive to MaxCompute: If you need to customize a table on MaxCompute, change the table name by adding a prefix or suffix and change custom field names. For example, modify the JSON file in the MaxCompute DDL statement to customize the table name or field names.
  • Advanced feature 3: data migration from a single table or partition: In the preceding example, data was batch migrated from five tables. Run the drop table inventory command to demonstrate data migration from a single partition. To synchronize the data of only one partition, you need to create a new table. Run the python36 bin/odps_ddl_runner.py command to create a table, specify the output directory, and create tables in batches. Now, you have created the inventory table and its five partitions. If no data exists in the created partitions, use the input_single_file parameter to specify the SQL directory of a specified partition (for example, the second partition) as output/dma_demo/hive_udtf_sql/single_partition/inventory_1.sql. Check the execution result and compare the data in the second partition in Hive with the single partition data migrated to MaxCompute. If the data is consistent, it implies that the migration complete.

2.4 Use DataWorks to Automatically Migrate Data and Workflows

MMA V1.0 does not provide workflow migration as a service. Currently, you must use an offline tool. Generate the directory according to the template, as shown in the following figure. If you use open-source components to migrate workflows, you can store the configuration in the corresponding directory according to the template. If you do not use open-source components, such as an in-house workflow scheduling and orchestration service, you can generate workflow data based on the directory structure of the template, compress the data into a ZIP archive, and upload it to DataWorks. At present, MMA V1.0 requires you to package the file into a ZIP archive and upload it. The backend automatically parses and loads it to the DataWorks workflow. When the upload is completed, the DataWorks batch generates MaxCompute tables according to the MaxCompute DDL SQL statements. Then, DataWorks initiates a DataX data synchronization job to complete the batch data migration.

The following figure shows the configurable project description file project.xml, where project information is customizable, and the workflow description file workflow.xml, which contains custom workflow parameters. Modify the parameters in the configuration file.

2.5 Migration Solutions for Other Types of Jobs

  • UDF and MapReduce job migration: Directly upload the JAR package to MaxCompute to provide V2.0 support and enable the Hive compatibility flag. Set the Hive compatibility flag to true and then migrate the UDFs and MapReduce jobs from Hive to MaxCompute. Note that you cannot directly access the file system, networks, and external data sources from UDFs or MapReduce jobs.
  • External table migration: In principle, you can migrate structured data to tables on MaxCompute. If you need to access external files through external tables, we recommend that you migrate data from Hadoop Distributed File System (HDFS) to OSS or Tablestore and then create external tables on MaxCompute to access the files.
  • Spark job migration: MMA is fully compatible with open-source Spark syntax. You only need to download the Spark On MaxCompute client and add MaxCompute connection parameters when compiling Spark SQL statements. Everything else is the same as the Spark SQL syntax.

2.6 View Migration Evaluation Reports

After the MaxCompute DDL creation, the system generates both the DDL SQL statement and the migration evaluation report report.html. The migration evaluation report is a compatibility report, which specifies whether the mappings between the data structures of Hive tables and the data structures of MaxCompute tables are risky and identifies the level of risk. The report also provides details and warning messages, such as incompatible data types or syntax warnings. Before migration, view this report to evaluate the migration risks.

3) Conclusion

In this article, you have learned how to synchronize Hive data to MaxCompute using MaxCompute Migration Assist (MMA) on Alibaba Cloud. We have also explored the features, technical architecture, and implementation principles of MMA with a demonstration of the data migration using MMA.

Original Source:

Written by

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store