Transferring IoT Terminal Device Data using Message Queues

In a typical Internet of Things (IoT) scenario, a business server constantly receives message uploads by devices in real time. The server then specifies the requirements for switching between online and offline device statuses and forwards data to Message Service (MNS) queues by configuring server subscription. This allows data to be obtained directly from MNS queues.

Alibaba Cloud IoT platform utilizes MNS to provide the feature of subscribing messages on the server. This method has several advantages because MNS can ensure message reliability and avoid message loss when the server is unavailable. Additionally, MNS can balance traffic between peak and off-peak hours when handling a large volume of message concurrency, avoiding server unavailability due to drastic concurrency stress. This article shows how users' servers get device data after device data is sent to Alibaba Cloud's IoT platform.

Image for post
Image for post

Configuring Server Subscription from the Alibaba Cloud IoT Console

To enable Alibaba Cloud IoT, please visit www.alibabacloud.com/product/iot

Create a Product

We create a product named "Air Testing" in the Alibaba Cloud IoT Console and select the basic type.

Image for post
Image for post

Add a Device Under the Product

In the Alibaba Cloud IoT console, we add a specific device under Device Management > Air Testing.

Image for post
Image for post

Configure Server Subscription under the Product

We use the Alibaba Cloud IoT console to enable server subscription for the Air Testing product, and check Device-reported Messages and Device Status Change Notifications. After the subscription is enabled, the "cn-shanghai" MNS region and the "aliyun-iot-a1jnUEKYhw4" queue will be shown as follows.

Image for post
Image for post

By reading related Alibaba Cloud IoT documents, we understand the architecture of messages in the queue, as shown below:

{
"payload": "Base64 Encode data",
"messagetype": "status",
"messageid": 996000000000000001,
"topic": "the Topic of a specific device",
"timestamp": 1526450324
}
  1. Messageid: a message ID generated by Alibaba Cloud IoT.

Device-side development

We use Node.js scripts to simulate a device, establish connections between the device and IoT cloud, and then upload data.

Get IoT SDK for Node.js

In package.json, add the npm dependency "aliyun-iot-mqtt": "0.0.4" module.

Write Device-side Application Code

From the console, we need to obtain the device identity triplets (productKey, deviceName, and deviceSecret) and regionId.

/**
* package.json add dependency:"aliyun-iot-mqtt": "0.0.4"
*/
const mqtt = require('aliyun-iot-mqtt');

//Device triplets
const options = {
productKey: "product",
deviceName: "device",
deviceSecret: "secret",
regionId: "cn-shanghai"
};

//device and cloud establish connection, device connected
const client = mqtt.getAliyunIotMqttClient(options);

//topic
const topic = `${options.productKey}/${options.deviceName}/update`;
const data = {
temperature: Math.floor((Math.random()*20)+10),
humidity: Math.floor((Math.random()*100)+20),
};
//specified topic publishes data to cloud
client.publish(topic, JSON.stringify(data));

Run the Simulation Device Script

$node iot-mns.js

After running the script, in the IoT cloud console, we can check the device behavior analysis log under Product > Log Service. We can see that behaviors are logged in chronological order:

  1. The device established a connection and went online at 10:53:05;
Image for post
Image for post

By checking the device behavior analysis log, we can see that behaviors are logged in chronological order:

  1. First, the device published messages;
Image for post
Image for post

After switching to the MNS console and selecting the "Shanghai" region, we can see that there are three active messages in the message queue "aliyun-iot-a1jnUEKYhw4".

Image for post
Image for post

Device Messages in a Message Queue

Use MNS

We'll use Java development in this example. Add dependencies "aliyun-sdk-mns", "httpasyncclient", and "fastjson" in pom, as shown below:

<dependencies>
<dependency>
<groupId>com.aliyun.mns</groupId>
<artifactId>aliyun-sdk-mns</artifactId>
<version>1.1.8</version>
<classifier>jar-with-dependencies</classifier>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.42</version>
</dependency>
</dependencies>

We use MNS SDKs to create connections and get messages in a queue in a polling manner. For easier reading, we decode payload data in Base64. The complete application code is shown as follows:

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.model.Message;
import org.apache.commons.codec.binary.Base64;

public class ComsumerDemo {

public static String accessKeyId = "AK";
public static String accessKeySecret = "AK secret";
public static String endpoint = "mns public network endpoint";

public static void main(String[] args) {
CloudAccount account = new CloudAccount(accessKeyId,accessKeySecret,endpoint);
MNSClient client = account.getMNSClient();
//Get message queue
CloudQueue queue = client.getQueueRef("aliyun-iot-a1jnUEKYhw4");

while (true) {
Message popMsg = queue.popMessage(10);
if (popMsg != null) {
System.out.println("message id: " + popMsg.getMessageId());
System.out.println("message body: \n" + decodeBase64(popMsg.getMessageBodyAsString()));
//Delete messages
queue.deleteMessage(popMsg.getReceiptHandle());
}
}
}

public static String decodeBase64(String jsonString) {
try {
JSONObject json = JSON.parseObject(jsonString);
String payload = new String(Base64.decodeBase64(json.getString("payload")));
json.put("payload", JSON.parseObject(payload));
return json.toJSONString();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}

Run the application

The console output is as follows:

  1. At timestamp=1526450324, the device came online
message id: E2CF179AD5686386-2-16367878417-200000009
message body:
{
"payload": {
"lastTime": "2018-05-16 13:58:44.413",
"clientIp": "42.120.74.246",
"time": "2018-05-16 13:58:44.427",
"productKey": "a1jnUEKYhw4",
"deviceName": "suw8umOqgJ72yCADerZp",
"status": "online"
},
"messagetype": "status",
"messageid": 996631012001751041,
"timestamp": 1526450324
}


message id: "656A4F66B391367-1-1636787AAC0-20000000C"
message body:
{
"payload": {
"temperature": 14,
"humidity": 116
},
"messagetype": "upload",
"topic": "/a1jnUEKYhw4/suw8umOqgJ72yCADerZp/update",
"messageid": 996631053735047169,
"timestamp": 1526450334
}

message id: E2CF179AD5686386-2-1636787F5F1-20000000A
message body:
{
"payload": {
"lastTime": "2018-05-16 13:59:04.381",
"time": "2018-05-16 13:59:13.571",
"productKey": "a1jnUEKYhw4",
"deviceName": "suw8umOqgJ72yCADerZp",
"status": "offline"
},
"messagetype": "status",
"messageid": 996631134240534528,
"timestamp": 1526450353
}

Source Code

You can find the complete source code on our GitHub blog page: github.com/iot-blog/aliyun-iot-mns

Reference: https://www.alibabacloud.com/blog/transferring-iot-terminal-device-data-using-message-queues_594094?spm=a2c41.12185746.0.0

Written by

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store