Knowledge sharing — Introduction to the Spring Cloud Bus message bus

In the previous article titled, Introduction to the Spring Cloud Stream System , we discussed about the details of the Spring Cloud Stream. In this article, we’ll be discussing about another component of the Spring Cloud system. Spring Cloud Bus is positioned to be the message bus of the Spring Cloud system, which uses message broker to connect all nodes of the distributed system. The official Reference document of the Bus is very simple. It does not even have a picture.

The code structure of the latest Spring Cloud Bus (the code is short) is provided as follows:

The Bus Demo

Before we analyze how the Bus is implemented, let us take a look at two simple examples that uses the Bus.

Add a New Configuration Item to All Nodes

The example of the Bus is relatively simple, because the default configuration is already available in the AutoConfiguration layer of the Bus. You only need to introduce the Spring Cloud Stream framework corresponding to the message middleware and dependencies of the Bus. After that, all running applications will use the same topic to receive and send messages.

The demo for the Bus has been uploaded to GitHub: https://github.com/fangjian0423/rocketmq-binder-demo/tree/master/rocketmq-bus-demo This demo initiates five nodes. If you add a new configuration item to any one of these five nodes, you add it to all of them.

Access the configuration-retrieving address provided by the controller of any node (the key is ):

curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'

The results returned by all nodes are all UNKNOWN, because the key does not exist in the configuration of any node.

The Bus has a built-in , which is used to add or update configuration through the message broker.

Access the endpoint (URL) of a node to add a new configuration item to this node (for example, access the URL of node1):

curl -X POST 'http://localhost:10001/actuator/bus-env?name=hangzhou&value=alibaba' -H 'content-type: application/json'

Then access all nodes to obtain the configuration:

$ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X POST 'http://localhost:10001/actuator/bus-env?name=hangzhou&value=alibaba' -H 'content-type: application/json'
~ ⌚
$ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'
alibaba%

You can see that a new configuration item has been added to all nodes. The key of the configuration item is , and the value is . The configuration item is added through the .

Modify the configuration of some nodes

For example, set the destination to rocketmq-bus-node2 on node1 (spring.cloud.bus.id of node2 is set to , which matches the setting on node1) to modify the configuration:

curl -X POST 'http://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hangzhou&value=xihu' -H 'content-type: application/json'

Access to obtain the configuration (the message is sent from node1, so the Bus also modifies the configuration of node1):

~ ⌚
$ curl -X POST 'http://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hangzhou&value=xihu' -H 'content-type: application/json'
~ ⌚
$ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou'
xihu%
~ ⌚
$ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'
xihu%

You can see that only the configuration of node1 and node2 is modified, and that of the remaining three nodes remain unchanged.

Implementation of the Bus

Concepts

Event

A remote event is defined in the Bus. This event inherits the event of Spring , and has four implementations:

  • EnvironmentChangeRemoteApplicationEvent: The remote environment change event. This event mainly receives the data, and inserts the data into the context of Spring. The Bus demo is completed by using this event in conjunction with and .
  • AckRemoteApplicationEvent: The remote acknowledgement event. The Bus will send an event upon successfully receiving a remote event.
  • RefreshRemoteApplicationEvent: The remote configuration refresh event. This event is used in conjunction with the annotation and all annotations to dynamically refresh the @Configuration class that is annotated by these annotations.
  • UnknownRemoteApplicationEvent: The remote unknown event. If an exception is thrown when you convert internal messages of the Bus into a remote event, the exception will be encapsulated as this event.

The Bus also has a non-: the message sending event. This event is used in conjunction with Trace to record the remote messages that have been sent.

All these events are used in conjunction with an . For example, the is used in conjunction with to add or update the configuration:

public class EnvironmentChangeListener
implements ApplicationListener<EnvironmentChangeRemoteApplicationEvent> {
private static Log log = LogFactory.getLog(EnvironmentChangeListener.class); @Autowired
private EnvironmentManager env;
@Override
public void onApplicationEvent(EnvironmentChangeRemoteApplicationEvent event) {
Map<String, String> values = event.getValues();
log.info("Received remote environment change request. Keys/values to update "
+ values);
for (Map.Entry<String, String> entry : values.entrySet()) {
env.setProperty(entry.getKey(), entry.getValue());
}
}
}

Call the method to set the configuration upon receiving the from another node. This method sends an for each configuration item change, which is listened for by the to perform the rebinding operation, and to add or update the configuration.

Actuator Endpoint

The Bus provides two Endpoints: and to add or update the configuration, or to refresh the global configuration. Their endpoint IDs or URLs are and respectively.

Configuration

Message sending in the Bus inevitably involves the Topic and Group information. Such content has been encapsulated in . The default prefix is For example:

  • is used to enable or disable the listener that listens for the global refresh event.
  • is used to enable or disable the endpoint for adding or updating the configuration.
  • is used to enable or disable the sending of the .
  • is used to enable or disable the listener that listens for the Trace.

The default topic for sending messages is , which can be changed through configuration. You can set the Group to the broadcasting mode, or set it to the latest mode by using UUID in conjunction with the offset.

Each Bus application has a unique Bus ID. The official format of the Bus ID is complex:

${vcap.application.name:${spring.application.name:application}}:${vcap.application.instance_index:${spring.application.index:${local.server.port:${server.port:0}}}}:${vcap.application.instance_id:${random.value}}

We recommend that you set the Bus ID manually, because the destination of remote events of the Bus need to match the Bus ID.

spring.cloud.bus.id=${spring.application.name}-${server.port}

Underlying analysis of the Bus

Underlying analysis of the Bus involves the following questions:

  • How does the Bus send messages?
  • How does the Bus receive messages?
  • How does the Bus match the destination?
  • How does the Bus trigger the next action after receiving a remote event?

The automatic configuration class is annotated by the annotation.

The annotation has been specified in the previous article Knowledge sharing - Introduction to the Spring Cloud Stream system and how it works. Its value is , and it will create the message channel for the input and output methods in based on the source and sink interfaces:

public interface SpringCloudBusClient {    String INPUT = "springCloudBusInput";    String OUTPUT = "springCloudBusOutput";    @Output(SpringCloudBusClient.OUTPUT)
MessageChannel springCloudBusOutput();
@Input(SpringCloudBusClient.INPUT)
SubscribableChannel springCloudBusInput();
}

The binding properties: springCloudBusInput and springCloudBusOutput can be changed by modifying the configuration file (for example, by modifying the topic):

spring.cloud.stream.bindings:
springCloudBusInput:
destination: my-bus-topic
springCloudBusOutput:
destination: my-bus-topic

Send and receive messages

// BusAutoConfiguration@EventListener(classes = RemoteApplicationEvent.class) // 1
public void acceptLocal(RemoteApplicationEvent event) {
if (this.serviceMatcher.isFromSelf(event)
&& !( event instanceof AckRemoteApplicationEvent)) { // 2
this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build()); // 3
}
}
@StreamListener(SpringCloudBusClient.INPUT) // 4
public void acceptRemote(RemoteApplicationEvent event) {
if (event instanceof AckRemoteApplicationEvent) {
if (this.bus.getTrace().isEnabled() && ! this.serviceMatcher.isFromSelf(event)
&& this.applicationEventPublisher ! = null) { // 5
this.applicationEventPublisher.publishEvent(event);
}
// If it's an ACK we are finished processing at this point
return;
}
if (this.serviceMatcher.isForSelf(event)
&& this.applicationEventPublisher ! = null) { // 6
if (! this.serviceMatcher.isFromSelf(event)) { // 7
this.applicationEventPublisher.publishEvent(event);
}
if (this.bus.getAck().isEnabled()) { // 8
AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,
this.serviceMatcher.getServiceId(),
this.bus.getAck().getDestinationService(),
event.getDestinationService(), event.getId(), event.getClass());
this.cloudBusOutboundChannel
.send(MessageBuilder.withPayload(ack).build());
this.applicationEventPublisher.publishEvent(ack);
}
}
if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher ! = null) { // 9
// We are set to register sent events so publish it for local consumption,
// irrespective of the origin
this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,
event.getOriginService(), event.getDestinationService(),
event.getId(), event.getClass()));
}
}

1. Use the Spring event listener to listen for all received by the application. For example, sends the and sends the to the application. All these events can be listened for by the listener.

2. Verify that an event received by the application is not an (otherwise, the Bus will get stuck in an endless loop: it repeatedly receives and sends messages). Then verify that the event was sent by the application itself. If both conditions are met, perform Step 3.

3. Create a message by using the remote event as the payload. Then use the MessageChannel (the binding name of which is springCloudBusOutput) created by Spring Cloud Stream to send the message to broker.

4. Use the to annotate the MessageChannel (the binding name is springCloudBusInput) created by Spring Cloud Stream, and the message received is a remote message.

5. Assume that the remote event is an , the trace feature is enabled, and the event was not sent by the application (indicating that it was sent by another application). If these conditions are met, send the to the application to allow it to acknowledge that it has received a remote event sent by another application. Then the process ends.

6. If the remote event was sent by another application to the application, perform Step 7 and Step 8. Otherwise, perform Step 9.

7. If the remote event was sent by another application, send the event to the application. If application has already been processed by the corresponding message recipient, you do not have to send the event to the application again.

8. If you have enabled the , create an and send this event to all applications. The reason for sending the event to the application is that you did not send the to the application. As a result, the application did not acknowledge the receipt of the event that was sent by itself. The reason for sending the event to other applications is that the application needs to inform other applications that it has received the message.

9. If you have enabled Trace, create and send the to the application.

After is triggered, the of all nodes will detect the configuration change, and controllers of all these nodes will print the following information:

o.s.c.b.event.EnvironmentChangeListener  : Received remote environment change request. Keys/values to update {hangzhou=alibaba}

If you listen for on the current node, you will receive information from all nodes. For example, the listened for on node5 is as follows:

ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670484,"originService":"rocketmq-bus-node5:10005","destinationService":"**","id":"375f0426-c24e-4904-bce1-5e09371fc9bc","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670184,"originService":"rocketmq-bus-node1:10001","destinationService":"**","id":"91f06cf1-4bd9-4dd8-9526-9299a35bb7cc","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670402,"originService":"rocketmq-bus-node2:10002","destinationService":"**","id":"7df3963c-7c3e-4549-9a22-a23fa90a6b85","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670406,"originService":"rocketmq-bus-node3:10003","destinationService":"**","id":"728b45ee-5e26-46c2-af1a-e8d1571e5d3a","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670427,"originService":"rocketmq-bus-node4:10004","destinationService":"**","id":"1812fd6d-6f98-4e5b-a38a-4b11aee08aeb","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}

Now, let us answer the four questions that are listed in the begining of this section:

  • How does Bus send messages? The Bus sends an event to the topic by using the method through Spring Cloud Stream.
  • How does Bus receive messages? The Bus receives messages from the topic by using the method through Spring Cloud Stream.
  • How does the Bus match the destination? The Bus matches the destination by using the remote-event-receiving method of the method.
  • How does the Bus trigger the next action after receiving a remote event? The Bus performs the next action after receiving the on the current node based on the Spring event mechanism. For example, the receives the , and the receives the .

Summary

The Spring Cloud Bus does not have too much content. However, you need to first understand the Spring Cloud Stream system and the Spring event mechanism before you can sufficiently understand how the Bus processes local and remote events.

Currently, the Bus provides only a few built-in remote events, most of which are configuration related. We can use the in conjunction with the annotation to build our own microservice message system.

Author

Fang Jian (nickname: Luoye), GitHub ID @fangjian0423, open-source fan, Alibaba Cloud senior development engineer, developer of Alibaba Cloud EDAS. Fang Jian is one of the owners of the open-source Spring Cloud Alibaba project.

Reference:https://www.alibabacloud.com/blog/knowledge-sharing---introduction-to-the-spring-cloud-bus-message-bus_594823?spm=a2c41.12911926.0.0

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store