Transferring IoT Terminal Device Data using Message Queues

In a typical Internet of Things (IoT) scenario, a business server constantly receives message uploads by devices in real time. The server then specifies the requirements for switching between online and offline device statuses and forwards data to Message Service (MNS) queues by configuring server subscription. This allows data to be obtained directly from MNS queues.

Alibaba Cloud IoT platform utilizes MNS to provide the feature of subscribing messages on the server. This method has several advantages because MNS can ensure message reliability and avoid message loss when the server is unavailable. Additionally, MNS can balance traffic between peak and off-peak hours when handling a large volume of message concurrency, avoiding server unavailability due to drastic concurrency stress. This article shows how users' servers get device data after device data is sent to Alibaba Cloud's IoT platform.

Configuring Server Subscription from the Alibaba Cloud IoT Console

Create a Product

Add a Device Under the Product

Configure Server Subscription under the Product

By reading related Alibaba Cloud IoT documents, we understand the architecture of messages in the queue, as shown below:

{
"payload": "Base64 Encode data",
"messagetype": "status",
"messageid": 996000000000000001,
"topic": "the Topic of a specific device",
"timestamp": 1526450324
}
  1. Messageid: a message ID generated by Alibaba Cloud IoT.
  2. Messagetype: refers to message types: status (device status) and upload (device-reported messages).
  3. topic: indicates which topic a message is derived from. When "messageType" is "status", "topic" is "null"; when "messageType" is "upload", "topic" is the Topic of a specific device.
  4. payload: refers to Base64 Encode data. When "messageType" is "status", it is device status data; when "messageType" is "upload", it is the original data that a device publishes to Topic.
  5. Timestamp: refers to message generation timestamps in a queue, not business timestamps.

Device-side development

Get IoT SDK for Node.js

Write Device-side Application Code

/**
* package.json add dependency:"aliyun-iot-mqtt": "0.0.4"
*/
const mqtt = require('aliyun-iot-mqtt');

//Device triplets
const options = {
productKey: "product",
deviceName: "device",
deviceSecret: "secret",
regionId: "cn-shanghai"
};

//device and cloud establish connection, device connected
const client = mqtt.getAliyunIotMqttClient(options);

//topic
const topic = `${options.productKey}/${options.deviceName}/update`;
const data = {
temperature: Math.floor((Math.random()*20)+10),
humidity: Math.floor((Math.random()*100)+20),
};
//specified topic publishes data to cloud
client.publish(topic, JSON.stringify(data));

Run the Simulation Device Script

$node iot-mns.js

After running the script, in the IoT cloud console, we can check the device behavior analysis log under Product > Log Service. We can see that behaviors are logged in chronological order:

  1. The device established a connection and went online at 10:53:05;
  2. The device was disconnected and went offline at 10:53:12.

By checking the device behavior analysis log, we can see that behaviors are logged in chronological order:

  1. First, the device published messages;
  2. Then, the device sent messages to RuleEngine;
  3. Finally, the device transmitted the MNS message queue aliyun-iot-a1jnUEKYhw4.

After switching to the MNS console and selecting the "Shanghai" region, we can see that there are three active messages in the message queue "aliyun-iot-a1jnUEKYhw4".

Device Messages in a Message Queue

Use MNS

<dependencies>
<dependency>
<groupId>com.aliyun.mns</groupId>
<artifactId>aliyun-sdk-mns</artifactId>
<version>1.1.8</version>
<classifier>jar-with-dependencies</classifier>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.42</version>
</dependency>
</dependencies>

We use MNS SDKs to create connections and get messages in a queue in a polling manner. For easier reading, we decode payload data in Base64. The complete application code is shown as follows:

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.model.Message;
import org.apache.commons.codec.binary.Base64;

public class ComsumerDemo {

public static String accessKeyId = "AK";
public static String accessKeySecret = "AK secret";
public static String endpoint = "mns public network endpoint";

public static void main(String[] args) {
CloudAccount account = new CloudAccount(accessKeyId,accessKeySecret,endpoint);
MNSClient client = account.getMNSClient();
//Get message queue
CloudQueue queue = client.getQueueRef("aliyun-iot-a1jnUEKYhw4");

while (true) {
Message popMsg = queue.popMessage(10);
if (popMsg != null) {
System.out.println("message id: " + popMsg.getMessageId());
System.out.println("message body: \n" + decodeBase64(popMsg.getMessageBodyAsString()));
//Delete messages
queue.deleteMessage(popMsg.getReceiptHandle());
}
}
}

public static String decodeBase64(String jsonString) {
try {
JSONObject json = JSON.parseObject(jsonString);
String payload = new String(Base64.decodeBase64(json.getString("payload")));
json.put("payload", JSON.parseObject(payload));
return json.toJSONString();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}

Run the application

  1. At timestamp=1526450324, the device came online
  2. At timestamp=1526450334, the device uploaded data (complete data can be checked by using payload)
  3. At timestamp=1526450353, the device went offline
message id: E2CF179AD5686386-2-16367878417-200000009
message body:
{
"payload": {
"lastTime": "2018-05-16 13:58:44.413",
"clientIp": "42.120.74.246",
"time": "2018-05-16 13:58:44.427",
"productKey": "a1jnUEKYhw4",
"deviceName": "suw8umOqgJ72yCADerZp",
"status": "online"
},
"messagetype": "status",
"messageid": 996631012001751041,
"timestamp": 1526450324
}


message id: "656A4F66B391367-1-1636787AAC0-20000000C"
message body:
{
"payload": {
"temperature": 14,
"humidity": 116
},
"messagetype": "upload",
"topic": "/a1jnUEKYhw4/suw8umOqgJ72yCADerZp/update",
"messageid": 996631053735047169,
"timestamp": 1526450334
}

message id: E2CF179AD5686386-2-1636787F5F1-20000000A
message body:
{
"payload": {
"lastTime": "2018-05-16 13:59:04.381",
"time": "2018-05-16 13:59:13.571",
"productKey": "a1jnUEKYhw4",
"deviceName": "suw8umOqgJ72yCADerZp",
"status": "offline"
},
"messagetype": "status",
"messageid": 996631134240534528,
"timestamp": 1526450353
}

Source Code

Reference: https://www.alibabacloud.com/blog/transferring-iot-terminal-device-data-using-message-queues_594094?spm=a2c41.12185746.0.0

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.