How We Developed DingTalk: Implementing the Message System Architecture

By Wang Tantan, nicknamed Siming at Alibaba.

Developing a messaging app is one of the major real-world scenarios for Alibaba Cloud Tablestore because its data storage structure is specifically suitable for storing message-type data. Alibaba Cloud Tablestore encapsulates the Timeline model that allows creating messaging models and meet messaging-related requirements.

The highly popular Timeline Model comes in two versions: v1 and v2. Some users may have doubts about Tablestore’s concepts such as framework, structure, and model or how to apply the Timeline model in different scenarios. So, this article is here to help these users with how to implement a simple instant messaging (IM) system.

Common instant chat tools, such as DingTalk and WeChat, provide the IM capability. Chat sessions are classified into two types: one-on-one chat and group chat. Chat with official accounts is similar to one-on-one chat. This article uses DingTalk as an example to explain how to implement functions, such as new message reminders, unread message counting, viewing historical chat content, fuzzy searches for group names, history search by keyword, and multi-client synchronization, based on the Timeline model of Tablestore.

This article will help you gain a better understanding of the implementation solution, abstract concepts, and APIs of the Timeline model. The following section describes each chat system function module with reference to various aspects, including function, solution, table design, and implementation code.

Function Modules

Message Storage

The following figure shows the schema of message data (table design: im_timeline_store_table).

Repository

Function: Message Display in Session Windows

The message body includes the sender, message ID (used to remove message duplicates), message sending time, message body content, and message type (which may be image, file, or text). However, this article only describes text-type messages. The following snapshot shows a session in a public chat group

As shown in the above screenshot, clicking on a session displays the latest messages in that particular session on one page in a window. The displayed messages are pulled from the repository. The system obtains the queue instance of the session based on the TimelineID and calls the ScanAPI and ScanParam parameter (messages sorted by SequenceID in descending order) of the queue instance and pulls the latest messages and displays them on one page.

When you reach the bottom of the page while scrolling up, the client initiates the second request based on the minimum SequenceID of the first request to obtain the message records on the second page. Each page shows 20 to 30 message records. The client persistently stores session messages and updates local messages after detecting new messages. This increases the cache and reduces network I/O. Refer to the core code shown in the following snippet.

public List<AppMessage> fetchConversationMessage(String timelineId, long sequenceId) {
TimelineStore store = timelineV2.getTimelineStoreTableInstance();
TimelineIdentifier identifier = new TimelineIdentifier.Builder()
.addField("timeline_id", timelineId)
.build();
ScanParameter parameter = new ScanParameter()
.scanBackward(sequenceId)
.maxCount(30);
Iterator<TimelineEntry> iterator = store.createTimelineQueue(identifier).scan(parameter); List<AppMessage> appMessages = new LinkedList<AppMessage>();
while (iterator.hasNext() && counter++ <= 30) {
TimelineEntry timelineEntry = iterator.next();
AppMessage appMessage = new AppMessage(timelineId, timelineEntry);
appMessages.add(appMessage);
}
return appMessages;
}

The repository provides full message storage for the entire application so that the messages are permanently stored. The time to live (TTL) of repository-stored data must be set to -1.

Functions: Multidimensional Combination and Full-text Retrieval

The following screenshot shows the core code.

public List<AppMessage> fetchConversationMessage(String timelineId, long sequenceId) {
TimelineStore store = timelineV2.getTimelineStoreTableInstance();
TimelineIdentifier identifier = new TimelineIdentifier.Builder()
.addField("timeline_id", timelineId)
.build();
ScanParameter parameter = new ScanParameter()
.scanBackward(sequenceId)
.maxCount(30);
Iterator<TimelineEntry> iterator = store.createTimelineQueue(identifier).scan(parameter); List<AppMessage> appMessages = new LinkedList<AppMessage>();
int counter = 0;
while (iterator.hasNext() && counter++ <= 30) {
TimelineEntry timelineEntry = iterator.next();
AppMessage appMessage = new AppMessage(timelineId, timelineEntry);
appMessages.add(appMessage);
}
return appMessages;
}

You’ll want to allow users to retrieve only the messages that they are authorized to view by extending the recipient ID array in the message body field. When a user searches through all groups, the recipient field must be set to the corresponding user ID. The permission management function is unavailable in the example. So, you’ll want to add and modify this function as needed.

Synchronization Database

Function: Real-time Statistics on New Messages

As shown in the above screenshot, it is easy to maintain a total unread message counter for an online client, and an unread message counter (a red dot) for each session of the client. The count results are stored on the client or persistently stored with Redis. Unread messages are those that have been pulled from the synchronization database and counted but have not been read by the user.

After pulling the new message list, the client or application layer traverses all new messages and increases the number of unread messages during the session by 1. This implements an instant awareness and update of unread messages. The number of unread messages clears after the user clicks to view the session.

When the number of unread messages is updated, the session list shows a digest and sending time of the latest message. This type of information constantly updates when the user traverses the new message list. The statistics and briefing features depend on the synchronization database rather than the repository.

The following screenshot shows the core code.

public List<AppMessage> fetchSyncMessage(String userId, long lastSequenceId) {
TimelineStore sync = timelineV2.getTimelineSyncTableInstance();
TimelineIdentifier identifier = new TimelineIdentifier.Builder()
.addField("timeline_id", userId)
.build();
ScanParameter parameter = new ScanParameter()
.scanForward(lastSequenceId)
.maxCount(30);
Iterator<TimelineEntry> iterator = sync.createTimelineQueue(identifier).scan(parameter); List<AppMessage> appMessages = new LinkedList<AppMessage>();
int counter = 0;
while (iterator.hasNext() && counter++ <= 30) {
AppMessage appMessage = new AppMessage(userId, iterator.next());
appMessages.add(appMessage);
}
return appMessages;
}

The client initiates an additional request when nonexistent sessions in the session list are counted. The system obtains basic session information (such as the group display picture, friend display picture, and group name) based on the TimelineID, initializes the unread message counter to 0, accumulates the number of new messages, and updates the latest message digest.

The synchronization database instantly perceives and collects statistics on new messages in the IM scenario through redundant writing to make new message reading and statistics faster and more efficient. The synchronization database does not permanently store redundant messages because the inbox concept does not exist in the IM scenario. Redundant messages expire after seven days. You’ll want to adjust the TTL of the synchronization database as needed.

Function: Asynchronous Write Divergence

In scenarios with a large number of users and a high degree of activity, synchronous writing may have performance issues. Therefore, we recommend implementing a group write divergence through asynchronous tasks.

You’ll want to implement a task queue based on Tablestore and obtain direct results after a write divergence task is written to the queue. Other processes execute the task queue. The task queue stores the group ID and complete message information. The consumption process reads new tasks through continuous polling, obtains the complete group member list from the group relationship table, and performs write divergence.

The task queue is directly implemented based on Tablestore. A table has two primary key columns. The first column stores topics and the second column is an auto-increment column. Each topic corresponds to a single queue. Tasks are sequentially written to a single queue. When concurrency keeps increasing, hash tasks by bucket and randomly write them to multiple topics. This increases the number of consumers (consumption concurrency) and makes write divergence more efficient. Maintain the checkpoint of each topic for task queue consumption. Execute the tasks before ordering them prior to the checkpoint step. The system sequentially obtains new tasks after the checkpoint through the getRange operation to ensure task execution. The system re-writes failed tasks to the task queue to improve fault tolerance and increases the retry counter. If there are repeated execution failures, the system stops re-writing and writes the failed tasks into a special queue for application developers to query and locate problems.

Metadata Management

User Metadata

Table design: im_user_table.

Use_id identifies user metadata that has a one-to-one mapping to timeline_id in the synchronization database. When synchronizing the new messages of a user, the system only pulls messages from the single user-related message queue (TimelineQueue) in the synchronization database. You'll want to set user_id to be the same as timeline_id in the synchronization database to facilitate unique ID management. By doing so, the message write divergence is completed based on the user_id list of the group and the friend user_id list.

Function: User Retrieval

  • User_id: The primary key query
  • QR code (including the user_id): The primary key query
  • User name: The search index, segmented string setting for the user name field
  • User tag: The search index, tag retrieval for array string indexes, multi-tag score retrieval and sorting for nested indexes
  • People nearby: The search index, a query of people nearby and people in specific geo-fences through GEO

Session Metadata

The Timeline model provides the Timeline Meta management capability that supports session metadata management through APIs.

The message queues (TimelineQueue) of sessions are managed in the repository and have one-to-one mapping to the rows in session metadata. After a user selects a session, the application batch-pulls messages from the corresponding message queue in descending order and displays the messages on the client. This process is the same for group chats and one-on-one chats, regardless of the session category.

Function: Group Retrieval

  • Group ID: The primary key query
  • QR code (including the user_id): The primary key query
  • Group name: The search index, segmented string setting for the user name field
  • Group tag: The search index, tag retrieval for array string indexes, multi-tag score retrieval and sorting for nested indexes

Note: The mapping between one-on-one chat sessions and users is directly maintained on the basis of session metadata. A users field is added to the one-on-one chat session metadata to store two user_ids, without having to maintain the relationship table. The timeline_id is created based on the one-on-one chat relationship table im_user_relation_table and used as a secondary index of the first primary key column.

Relationship Maintenance

One-on-one Chat Relationship

Function: Relationship Between Users and One-on-One Chat Sessions

Table design: im_user_relation_table.

The first column contains the main user_id, and the second column contains the sub user_id. Two data rows are inserted into the relationship table after the two users become friends. A user_id is regarded as main_user, and the other user_id (which belongs to the friend of the user) is regarded as sub_user. The timline_id of the session between the two users is included in the property column. Also, maintain the nicknames and display pictures of the two users.

Create search indexes based on the one-on-one chat relationship table to obtain a friends’ list and sort them by add time or nickname.

Considering latency and fees, use search indexes and the getRange operation to quickly pull the friends' list to maintain and query friend relationships.

Function: Interpersonal Relationship

This design allows you to obtain session information based on your ID and the ID of your friend as long as the two written rows are consistent with each other.

When A removes B from the friend list or vice versa, the system directly forms the user relationship that corresponds to the two primary key columns in the relationship table and ends the friendship between A and B through physical deletion (which just means row deletion) or tombstone (which is a status change to the property column).

Refer to the following core code.

public void establishFriendship(String userA, String userB, String timelineId) {
PrimaryKey primaryKeyA = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("main_user", PrimaryKeyValue.fromString(userA))
.addPrimaryKeyColumn("sub_user", PrimaryKeyValue.fromString(userB))
.build();
RowPutChange rowPutChangeA = new RowPutChange(userRelationTable, primaryKeyA);
rowPutChangeA.addColumn("timeline_id", ColumnValue.fromString(timelineId));
PrimaryKey primaryKeyB = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("main_user", PrimaryKeyValue.fromString(userB))
.addPrimaryKeyColumn("sub_user", PrimaryKeyValue.fromString(userA))
.build();
RowPutChange rowPutChangeB = new RowPutChange(userRelationTable, primaryKeyB);
rowPutChangeB.addColumn("timeline_id", ColumnValue.fromString(timelineId));
BatchWriteRowRequest request = new BatchWriteRowRequest();
request.addRowChange(rowPutChangeA);
request.addRowChange(rowPutChangeB);
syncClient.batchWriteRow(request);
}
public void breakupFriendship(String userA, String userB) {
PrimaryKey primaryKeyA = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("main_user", PrimaryKeyValue.fromString(userA))
.addPrimaryKeyColumn("sub_user", PrimaryKeyValue.fromString(userB))
.build();
RowDeleteChange rowPutChangeA = new RowDeleteChange(userRelationTable, primaryKeyA); PrimaryKey primaryKeyB = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("main_user", PrimaryKeyValue.fromString(userB))
.addPrimaryKeyColumn("sub_user", PrimaryKeyValue.fromString(userA))
.build();
RowDeleteChange rowPutChangeB = new RowDeleteChange(userRelationTable, primaryKeyB); BatchWriteRowRequest request = new BatchWriteRowRequest();
request.addRowChange(rowPutChangeA);
request.addRowChange(rowPutChangeB);
syncClient.batchWriteRow(request);
}

Group Chat Relationship

Function: Relationship Between Group Chat Sessions and Group Members

Therefore, it is recommended using two primary key columns during table design, with the first column storing the group ID and the second column storing user IDs. Such a design enables pulling group member information through the getRange operation.

Besides this, there is also a need for the user-to-group mapping in addition to the group-to-user mapping maintained by the group chat relationship table. The costs of redundancy and consistency maintenance may greatly increase if you create a table for querying a list of groups to which a user belongs.

Use the two indexes to support the reverse mapping. In the example, use a secondary index and the user_id field as the index primary key to directly query the list of groups to which a user belongs based on the index. This improves real-time synchronization and lowers costs.

Also, index groups, users, and group joining time by using search indexes to query the list of groups to which a user belongs, and sort these groups by group joining time.

Table design: im_group_relation_table.

You can directly obtain all the members of a group by calling the getRange operation based on the group relationship table and the primary relationship table. You can directly obtain the IDs of group members during write divergence, which makes write divergence more efficient. This also facilitates the display of group members in a list.

The following snippet shows the core code.

public List<Conversation> listMySingleConversations(String userId) {
PrimaryKey start = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("main_user", PrimaryKeyValue.fromString(userId))
.addPrimaryKeyColumn("sub_user", PrimaryKeyValue.INF_MIN)
.build();
PrimaryKey end = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("main_user", PrimaryKeyValue.fromString(userId))
.addPrimaryKeyColumn("sub_user", PrimaryKeyValue.INF_MAX)
.build();
RangeRowQueryCriteria criteria = new RangeRowQueryCriteria(userRelationTable);
criteria.setInclusiveStartPrimaryKey(start);
criteria.setExclusiveEndPrimaryKey(end);
criteria.setMaxVersions(1);
criteria.setLimit(100);
criteria.setDirection(Direction.FORWARD);
criteria.addColumnsToGet(new String[] {"timeline_id"});
GetRangeRequest request = new GetRangeRequest(criteria);
GetRangeResponse response = syncClient.getRange(request);
List<Conversation> singleConversations = new ArrayList<Conversation>(response.getRows().size()); for (Row row : response.getRows()) {
String timelineId = row.getColumn("timeline_id").get(0).getValue().asString();
String subUserId = row.getPrimaryKey().getPrimaryKeyColumn("sub_user").getValue().asString();
User friend = describeUser(subUserId);
Conversation conversation = new Conversation(timelineId, friend); singleConversations.add(conversation);
}
return singleConversations;
}

Function: Relationship Between Users and Group Chat Sessions

To list all groups that a user joins, create a secondary index based on the primary table and set the user field as the first primary key column for the index.

The following figure shows the data structure of the index. Use the secondary index to directly obtain the TimlineID list for the groups that a user joins by calling the getRange operation.

Secondary index: im_group_relation_global_index.

Consider the core code as shown below.

public List<Conversation> listMyGroupConversations(String userId) {
PrimaryKey start = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("user_id", PrimaryKeyValue.fromString(userId))
.addPrimaryKeyColumn("group_id", PrimaryKeyValue.INF_MIN)
.build();
PrimaryKey end = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("user_id", PrimaryKeyValue.fromString(userId))
.addPrimaryKeyColumn("group_id", PrimaryKeyValue.INF_MAX)
.build();
RangeRowQueryCriteria criteria = new RangeRowQueryCriteria(groupRelationGlobalIndex);
criteria.setInclusiveStartPrimaryKey(start);
criteria.setExclusiveEndPrimaryKey(end);
criteria.setMaxVersions(1);
criteria.setLimit(100);
criteria.setDirection(Direction.FORWARD);
criteria.addColumnsToGet(new String[] {"group_id"});
GetRangeRequest request = new GetRangeRequest(criteria);
GetRangeResponse response = syncClient.getRange(request);
List<Conversation> groupConversations = new ArrayList<Conversation>(response.getRows().size()); for (Row row : response.getRows()) {
String timelineId = row.getPrimaryKey().getPrimaryKeyColumn("group_id").getValue().asString();
Group group = describeGroup(timelineId);
Conversation conversation = new Conversation(timelineId, group); groupConversations.add(conversation);
}
return groupConversations;
}

Instant Awareness

Session Pool Solution

One method is to configure online clients to refresh and pull new messages periodically. This method works but can cause unnecessary waste of network resources. As the number of users increases, the application comes under a heavier burden. The burden is all the more prominent when many inactive users stay online during the day.

To solve this problem, the application usually maintains a session push pool that records information about online clients and users. When a user sends a new message to another online user, the application obtains the user’s session through the session push pool and instructs the client to pull the new message from the synchronization database. By using this method, the burden from message synchronization increases only with the number of real messages, which relieves the synchronization database of many unnecessary query requests. You can implement the session push pool through a memory-type database or a Tablestore database. However, be sure to guarantee pool persistence.

The instant awareness feature counts unread messages in a session table. The method for new message counting is explained in detail in the section, Synchronization Database, under Function: Real-time Statistics on New Messages in this article.

The unread message count must be persistently stored. Otherwise, it may be cleared after a device change or re-logon. This leads to ignoring many new message reminders, which is unacceptable.

More Information

Multi-client Synchronization

  1. In the single-client scenario, the synchronization database provides checkpoint persistence to indicate the SequenceID of the latest read message. However, the checkpoint does not differentiate between clients. With local persistence, multi-client synchronization may encounter the problem of inconsistent unread message counts among different clients. So, to ensure a consistent unread message count among different clients, the application server must maintain the checkpoint and the unread message count. And, the application server must also perceive that the unread message count is cleared when the session is clicked, and instruct other online clients to maintain real-time consistency of the unread message count.
  2. In the multi-client scenario, when a user sends a new message from a client, other clients do not perceive or refresh their sent messages if they have no other new messages. So, this challenge must be solved for multi-client synchronization. One simple solution is to write the sent message to the corresponding synchronization database. When counting unread messages, the client does not count its messages but updates the messages to the latest message digest. Therefore, this solution also solves the preceding problem in multi-client synchronization.

Add Friends and Apply for Joining Groups

The target user or group moderator may perceive the request through the synchronization database. You’ll need to instruct the group moderator to promptly review the request in the form of a new message type or a special session as soon as possible. You’ll also need to persistently store the request list in a separate table, under the premise of instant awareness of new requests on users’ part.

Practice

But, before running the code locally, ensure that you meet the following conditions:

  • Activate the service and create an instance
  • Obtain the AccessKey ID and AccessKey secret
  • Set the sample profile
  • The instance supports the secondary index

Open-source Code URL

Sample Configuration

# mac 或 linux系统下:/home/userhome/tablestoreCong.json
# windows系统下: C:\Documents and Settings\%用户名%\tablestoreCong.json
{
"endpoint": "http://instanceName.cn-hangzhou.ots.aliyuncs.com",
"accessId": "***********",
"accessKey": "***********************",
"instanceName": "instanceName"
}

You’ll want to obtain the endpoint, the access address of the instance, from the instance details page in the console accessId: the AccessKey ID, which is available in the provided URL accessKey: the AccessKey secret, which is available in the provided URL instanceName: the name of the used instance

Sample Portal

Original Source:

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.