Using Message Service to Schedule Jobs in Batch Compute

Overview

Batch Compute is a distributed cloud service suitable for processing massive volumes of data concurrently. The system automates resource management, job scheduling, and data loading and supports billing on a Pay-As-You-Go basis. This tutorial demonstrates how you can submit or schedule jobs automatically simply by sending message from Alibaba Cloud Message Service.

  • You can run various system maintenance jobs depending on the alerts generated from system health monitoring applications
  • Trigger data load job as soon as file arrives from external system. File watcher application can send file arrival message to Message Service queue and consumer of the message will submit the file load job in batch compute.
  • Analysis jobs of call-center calls can be triggered as soon as the recorded audio file is uploaded into cloud storage. File watcher program can send job-triggering message into the message queue for analysis job to start.

Architecture

Prerequisites

This tutorial can easily be understood and followed if you are familiar with the below technologies and conceipts,

  • Alibaba Cloud Environment
    If you do not have access, get 1 year free access with $10 credit using the below link
    https://www.alibabacloud.com/referral?referralCode=kwfk44
  • Alibaba Cloud Message Service
    Click here to know basics of Cloud Message service
  • Batch Compute
    Click here to know about Batch Compute product
  • Python Programing Knowledge

Server Configurations

First, you’ll need Alibaba cloud account. If you don’t have one, then sign up for free cloud account with $5/$10 credit using the following link,

  • Object Storage Service (OSS)
  • Cloud Messaging Service
  • Batch Compute

Developing Batch Job Scripts and Uploading into OSS Bucket

Write 3 different python scripts for 3 different jobs as follows,

  • Script1.py
print ("This is Script1 Output!")
  • Script2.py
print ("This is Script2 Output!")
  • Script3.py
print ("This is Script3 Output!")
$> tar -czf job1.tar.gz Script1.py$> tar -czf job2.tar.gz Script2.py$> tar -czf job3.tar.gz Script3.py$> python osscmd config --id=<AccessKeyID>--key=<AccessKeySecret>--host=oss-cn-shenzhen.aliyuncs.com$> python osscmd put job1.tar.gz oss://mybatchjobscripts/job1.tar.gz$> python osscmd put job2.tar.gz oss://mybatchjobscripts/job2.tar.gz$> python osscmd put job3.tar.gz oss://mybatchjobscripts/job3.tar.gz

Creating Messaging Queue

  • Login to Alibaba Cloud Console and open Message Service home page
  • Create new message queue as follows, make sure you choose the region appropriately and copy the end point url.

Consumer Python Scripts Walkthrough

  • Download/clone sample python code from github repository
[Base]AccessKeyId = <>AccessKeySecret = <>Endpoint = http://<>.mns.us-east-1.aliyuncs.com/
cfg_fn = os.path.join(os.path.dirname(os.path.abspath(__file__)) + "\MsgService.cfg")parser = ConfigParser.ConfigParser()parser.read(cfg_fn)
accessKeyId = parser.get("Base", "AccessKeyId")accessKeySecret = parser.get("Base", "AccessKeySecret")endpoint = parser.get("Base", "Endpoint")
from mns.account import Accountfrom mns.queue import *from batchcompute import Client, ClientError
from batchcompute import CN_SHENZHEN as REGION
job_desc = """{"Name": "RunJob-%s","Type": "DAG","JobFailOnInstanceFail": true,"Description": "","Priority": 0,"DAG": {"Tasks": {"T1": {"ClusterId": "","InstanceCount": 1,"MaxRetryCount": 0,"Parameters": {"Command": {"CommandLine": "python %s.py","PackagePath": "oss://mybatchjobscripts/%s.tar.gz"---------------------
#Load config valuesaccid,acckey,endpoint,token = MNSCommon.LoadConfig()#Instantiate batchcompute clientclient=Client(REGION, accid, acckey)# Instantiate message service account classmy_account = Account(endpoint, accid, acckey, token)
recv_msg = my_queue.receive_message(wait_seconds)
if recv_msg.message_body.lower() == "job1":job_json_argument = job_desc % ("job1","Script1","job1")elif recv_msg.message_body.lower() == "job2":job_json_argument = job_desc % ("job2","Script2","job2")elif recv_msg.message_body.lower() == "job3":job_json_argument = job_desc % ("job3","Script3","job3")else:continuejob_id = client.create_job(json.loads(job_json_argument)).Id
my_queue.delete_message(recv_msg.receipt_handle)print "Delete Message Succeed!  ReceiptHandle:%s" % recv_msg.receipt_handle

Run and Test

Now let’s execute the python code. Pass the message queue name as argument to the script. Here “ExecuteBatchJob” is the message queue name

PS> python ConsumerMsg.py ExecuteBatchJob

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com