Knowledge sharing — Introduction to the Spring Cloud Bus message bus

The Bus Demo

Before we analyze how the Bus is implemented, let us take a look at two simple examples that uses the Bus.

Add a New Configuration Item to All Nodes

The example of the Bus is relatively simple, because the default configuration is already available in the AutoConfiguration layer of the Bus. You only need to introduce the Spring Cloud Stream framework corresponding to the message middleware and dependencies of the Bus. After that, all running applications will use the same topic to receive and send messages.

curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'
curl -X POST 'http://localhost:10001/actuator/bus-env?name=hangzhou&value=alibaba' -H 'content-type: application/json'
$ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou'
unknown%
~ ⌚
$ curl -X POST 'http://localhost:10001/actuator/bus-env?name=hangzhou&value=alibaba' -H 'content-type: application/json'
~ ⌚
$ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'
alibaba%

Modify the configuration of some nodes

For example, set the destination to rocketmq-bus-node2 on node1 (spring.cloud.bus.id of node2 is set to rocketmq-bus-node2:10002, which matches the setting on node1) to modify the configuration:

curl -X POST 'http://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hangzhou&value=xihu' -H 'content-type: application/json'
~ ⌚
$ curl -X POST 'http://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hangzhou&value=xihu' -H 'content-type: application/json'
~ ⌚
$ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou'
alibaba%
~ ⌚
$ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou'
xihu%
~ ⌚
$ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'
xihu%

Implementation of the Bus

Concepts

  • EnvironmentChangeRemoteApplicationEvent: The remote environment change event. This event mainly receives the Map<String, String> data, and inserts the data into the context environment of Spring. The Bus demo is completed by using this event in conjunction with EnvironmentBusEndpoint and EnvironmentChangeListener.
  • AckRemoteApplicationEvent: The remote acknowledgement event. The Bus will send an AckRemoteApplicationEvent event upon successfully receiving a remote event.
  • RefreshRemoteApplicationEvent: The remote configuration refresh event. This event is used in conjunction with the @RefreshScope annotation and all @ConfigurationProperties annotations to dynamically refresh the @Configuration class that is annotated by these annotations.
  • UnknownRemoteApplicationEvent: The remote unknown event. If an exception is thrown when you convert internal messages of the Bus into a remote event, the exception will be encapsulated as this event.
public class EnvironmentChangeListener
implements ApplicationListener<EnvironmentChangeRemoteApplicationEvent> {
private static Log log = LogFactory.getLog(EnvironmentChangeListener.class); @Autowired
private EnvironmentManager env;
@Override
public void onApplicationEvent(EnvironmentChangeRemoteApplicationEvent event) {
Map<String, String> values = event.getValues();
log.info("Received remote environment change request. Keys/values to update "
+ values);
for (Map.Entry<String, String> entry : values.entrySet()) {
env.setProperty(entry.getKey(), entry.getValue());
}
}
}
  • spring.cloud.bus.refresh.enabled is used to enable or disable the listener that listens for the global refresh event.
  • spring.cloud.bus.env.enabled is used to enable or disable the endpoint for adding or updating the configuration.
  • spring.cloud.bus.ack.enabled is used to enable or disable the sending of the AckRemoteApplicationEvent.
  • spring.cloud.bus.trace.enabled is used to enable or disable the listener that listens for the Trace.
${vcap.application.name:${spring.application.name:application}}:${vcap.application.instance_index:${spring.application.index:${local.server.port:${server.port:0}}}}:${vcap.application.instance_id:${random.value}}
spring.cloud.bus.id=${spring.application.name}-${server.port}
  • How does the Bus send messages?
  • How does the Bus receive messages?
  • How does the Bus match the destination?
  • How does the Bus trigger the next action after receiving a remote event?
public interface SpringCloudBusClient {    String INPUT = "springCloudBusInput";    String OUTPUT = "springCloudBusOutput";    @Output(SpringCloudBusClient.OUTPUT)
MessageChannel springCloudBusOutput();
@Input(SpringCloudBusClient.INPUT)
SubscribableChannel springCloudBusInput();
}
spring.cloud.stream.bindings:
springCloudBusInput:
destination: my-bus-topic
springCloudBusOutput:
destination: my-bus-topic
// BusAutoConfiguration@EventListener(classes = RemoteApplicationEvent.class) // 1
public void acceptLocal(RemoteApplicationEvent event) {
if (this.serviceMatcher.isFromSelf(event)
&& !( event instanceof AckRemoteApplicationEvent)) { // 2
this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build()); // 3
}
}
@StreamListener(SpringCloudBusClient.INPUT) // 4
public void acceptRemote(RemoteApplicationEvent event) {
if (event instanceof AckRemoteApplicationEvent) {
if (this.bus.getTrace().isEnabled() && ! this.serviceMatcher.isFromSelf(event)
&& this.applicationEventPublisher ! = null) { // 5
this.applicationEventPublisher.publishEvent(event);
}
// If it's an ACK we are finished processing at this point
return;
}
if (this.serviceMatcher.isForSelf(event)
&& this.applicationEventPublisher ! = null) { // 6
if (! this.serviceMatcher.isFromSelf(event)) { // 7
this.applicationEventPublisher.publishEvent(event);
}
if (this.bus.getAck().isEnabled()) { // 8
AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,
this.serviceMatcher.getServiceId(),
this.bus.getAck().getDestinationService(),
event.getDestinationService(), event.getId(), event.getClass());
this.cloudBusOutboundChannel
.send(MessageBuilder.withPayload(ack).build());
this.applicationEventPublisher.publishEvent(ack);
}
}
if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher ! = null) { // 9
// We are set to register sent events so publish it for local consumption,
// irrespective of the origin
this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,
event.getOriginService(), event.getDestinationService(),
event.getId(), event.getClass()));
}
}
o.s.c.b.event.EnvironmentChangeListener  : Received remote environment change request. Keys/values to update {hangzhou=alibaba}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670484,"originService":"rocketmq-bus-node5:10005","destinationService":"**","id":"375f0426-c24e-4904-bce1-5e09371fc9bc","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670184,"originService":"rocketmq-bus-node1:10001","destinationService":"**","id":"91f06cf1-4bd9-4dd8-9526-9299a35bb7cc","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670402,"originService":"rocketmq-bus-node2:10002","destinationService":"**","id":"7df3963c-7c3e-4549-9a22-a23fa90a6b85","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670406,"originService":"rocketmq-bus-node3:10003","destinationService":"**","id":"728b45ee-5e26-46c2-af1a-e8d1571e5d3a","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670427,"originService":"rocketmq-bus-node4:10004","destinationService":"**","id":"1812fd6d-6f98-4e5b-a38a-4b11aee08aeb","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}
  • How does Bus send messages? The Bus sends an event to the springCloudBus topic by using the BusAutoConfiguration#acceptLocal method through Spring Cloud Stream.
  • How does Bus receive messages? The Bus receives messages from the springCloudBus topic by using the BusAutoConfiguration#acceptRemote method through Spring Cloud Stream.
  • How does the Bus match the destination? The Bus matches the destination by using the remote-event-receiving method of the BusAutoConfiguration#acceptRemote method.
  • How does the Bus trigger the next action after receiving a remote event? The Bus performs the next action after receiving the RemoteApplicationEvent on the current node based on the Spring event mechanism. For example, the EnvironmentChangeListener receives the EnvironmentChangeRemoteApplicationEvent, and the RefreshListener receives the RefreshRemoteApplicationEvent.

Summary

The Spring Cloud Bus does not have too much content. However, you need to first understand the Spring Cloud Stream system and the Spring event mechanism before you can sufficiently understand how the Bus processes local and remote events.

Author

Fang Jian (nickname: Luoye), GitHub ID @fangjian0423, open-source fan, Alibaba Cloud senior development engineer, developer of Alibaba Cloud EDAS. Fang Jian is one of the owners of the open-source Spring Cloud Alibaba project.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com