Introduction to the Spring Cloud Stream System

Spring Cloud Stream is used in the Spring Cloud system to build highly scalable event-driven microservices, for the purpose of simplifying the development of messages in a Spring Cloud application. Spring Cloud Stream (SCS) has a lot of content, as well as many external dependencies. To be familiar with SCS, you must first learn about Spring Messaging and Spring Integration. This article mainly covers the following topics:

  • Spring Messaging
  • Spring Integration
  • SCS and its features

Spring Messaging

Spring Messaging is a Spring Framework module, which is a programming model of unified messaging.

  • For example, a has a Payload (message body) and a Header:
Image for post
Image for post
package org.springframework.messaging;
public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}
  • The receives messages. You can call the send() method to send messages to this message channel:
Image for post
Image for post
@FunctionalInterface
public interface MessageChannel {
long INDEFINITE_TIMEOUT = -1;
default boolean send(Message<? > message) {
return send(message, INDEFINITE_TIMEOUT);
}
boolean send(Message<? > message, long timeout);
}

How is a message channel message consumed?

  • Messages of a message channel are consumed by the subinterface of the message channel. This subinterface is subscribed to by :
public interface SubscribableChannel extends MessageChannel {
boolean subscribe(MessageHandler handler);
boolean unsubscribe(MessageHandler handler);
}
  • The messages are finally consumed and processed by :
@FunctionalInterface
public interface MessageHandler {
void handleMessage(Message<? > message) throws MessagingException;
}

Spring Messaging also provides some other internal features that are derived from the messaging model:

1. Message receiving argument handler and return value handler: The message receiving argument handler is used in conjunction with annotations such as and . The return value handler is used in conjunction with the annotation after a message is received.
2. Message body content converter: .
3. Unified and abstract message sending template: .
4. Message channel interceptor: .

Spring Integration

Spring Integration Extends the Spring programming model to support the well-known Enterprise Integration Patterns. Spring Integration is an extension of Spring Messaging. It involves many new concepts, including , , , , , and . It also provides the implementation the (such as , , ) and (such as , , and ).

First, this article describes several message processing methods:

  • Splitter:
Image for post
Image for post
  • Aggregator:
Image for post
Image for post
  • Filter:
Image for post
Image for post
  • Dispatcher:
Image for post
Image for post

Then, let us try Spring Integration with a simple example:

SubscribableChannel messageChannel = new DirectChannel(); // 1messageChannel.subscribe(msg -> { // 2
System.out.println("receive: " + msg.getPayload());
});
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build()); // 3

1. Build a subscribable message channel .
2. Use to consume messages of this message channel.
3. Send a message to this message channel. This message will be eventually consumed by the of the message channel. At last, the controller prints: .

has an internal , which dispatches messages to the corresponding . literally means a unicasting dispatcher which can choose only one message channel. How does it choose the message channel? An internal is provided. By default, UnicastingDispatcher polls for the channel. Scaling up is supported.

Let us modify the preceding code to use multiple to process messages:

SubscribableChannel messageChannel = new DirectChannel();messageChannel.subscribe(msg -> {
System.out.println("receive1: " + msg.getPayload());
});
messageChannel.subscribe(msg -> {
System.out.println("receive2: " + msg.getPayload());
});
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());

The internal message dispatcher of is , which is a unicasting dispatcher polls for the channel. Therefore, two consumption activities correspond to two . The controller prints:

receive1: msg from alibaba
receive2: msg from alibaba

After introducing the , let us take a look at the , which is used by the message channel. dispatches messages to all :

SubscribableChannel messageChannel = new PublishSubscribeChannel();messageChannel.subscribe(msg -> {
System.out.println("receive1: " + msg.getPayload());
});
messageChannel.subscribe(msg -> {
System.out.println("receive2: " + msg.getPayload());
});
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());

Send two messages, and both are consumed by all . The controller prints:

receive1: msg from alibaba
receive2: msg from alibaba
receive1: msg from alibaba
receive2: msg from alibaba

Spring Cloud Stream

Relationships between SCS and other modules are:

  • SCS is encapsulated on the basis of Spring Integration, and SCS introduces some concepts, such as , , , and .
  • SCS is integrated with Spring Boot Actuator, and it provides the /bindings and /channels actuator endpoints.
  • SCS is integrated with Spring Boot Externalized Configuration, and it provides external configuration classes such as BindingProperties and BinderProperties.
  • SCS enhances the processing logic in the cases of message sending failure and consumption failure.
  • SCS is an enhancement to Spring Integration. SCS is integrated with the Spring Boot system and is the foundation of Spring Cloud Bus. SCS shields implementation details of the underlying message middleware, in the hopes that a unified set of APIs can be used to send and consume messages. The implementation details of the underlying message middleware is implemented by the Binder of each message middleware.

is the component that binds an internal message middleware to an external one. ECS provides two methods: and , which are used to respectively bind the consumer and the producer. The currently available implementations are Rabbit Binder and Kafka Binder, and Spring Cloud Alibaba has already implemented RocketMQ Binder.

Image for post
Image for post

You can see from the following picture that Binding is the bridge that connects the application and the message middleware, and is used to produce and consume messages. Let us take at a simple example that uses the RocketMQ Binder, and then analyze its underlying processing logic:

  • Enable the classes and send a message:
@SpringBootApplication
@EnableBinding({ Source.class, Sink.class }) // 1
public class SendAndReceiveApplication {

public static void main(String[] args) {
SpringApplication.run(SendAndReceiveApplication.class, args);
}

@Bean // 2
public CustomRunner customRunner() {
return new CustomRunner();
}
public static class CustomRunner implements CommandLineRunner { @Autowired
private Source source;
@Override
public void run(String... args) throws Exception {
int count = 5;
for (int index = 1; index <= count; index++) {
source.output().send(MessageBuilder.withPayload("msg-" + index).build()); // 3
}
}
}
}
  • Receive the message:
@Service
public class StreamListenerReceiveService {
@StreamListener(Sink.INPUT) // 4
public void receiveByStreamListener1(String receiveMsg) {
System.out.println("receiveByStreamListener: " + receiveMsg);
}
}

This snippet is very straight forward, and it does not involve any code that relates to RocketMQ. The message is sent and received based on the SCS system. If you want to switch from RabbitMQ to Kafka, change the configuration file. You do not have to modify the code.

Let us analyze how this snippet works:

  1. SCS provides two interface properties for the annotation: Source and Sink. SCS internally builds the based on the and attributes. The MessageChannel returned by the corresponding output and input methods is . The values of the corresponding annotations modified by the output and input methods are the names of bindings in the configuration file.
public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();
}
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}

The names of bindings in the configuration file are the values of annotations of the and interfaces corresponding to the output and input methods:

spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.content-type=text/plain
spring.cloud.stream.bindings.input.group=test-group1

1. Build the . The method of will be executed upon startup of the application.

2. Call the output() method of the interface to obtain the DirectChannel`, and send a message to this message channel. The snippet is identical to the one in the Spring Integration section.

  • After a message is sent by the output() method of the Source interface to the message channel, the message is processed by the . This handler then sends the message to a MessageHandler created by the method (implemented by a message middleware) for processing.
  • The MessageHandler created by the method of the message middleware converts the Spring Message to the corresponding Message model, and sends the model to the broker of the middleware.

3. Use the annotation to subscribe to the message. Note that the corresponding value of Sink.input in the annotation is "input". The configuration is subject to the value whose corresponding binding name is input:

  • The method implemented by a message middleware uses the Consumer to subscribe to the message, and then converts the message model of the message middleware to Spring Message.
  • After the conversion, the Spring Message is sent to a message channel named input.
  • The corresponding to the annotation subscribes to the message channel whose name is input, and consumes the message.

The text description is a little verbose. The following diagram summarizes the entire process. The yellow-highlighted section covers the Binder implementation of a message middleware and the basic subscription and publication features of a message queue (MQ).

Image for post
Image for post

Now, let us take a look at an SCS snippet for message processing:

@StreamListener(value = Sink.INPUT, condition = "headers['index']=='1'")
public void receiveByHeader(Message msg) {
System.out.println("receive by headers['index']=='1': " + msg);
}
@StreamListener(value = Sink.INPUT, condition = "headers['index']=='9999'")
public void receivePerson(@Payload Person person) {
System.out.println("receive Person: " + person);
}
@StreamListener(value = Sink.INPUT)
public void receiveAllMsg(String msg) {
System.out.println("receive allMsg by StreamListener. content: " + msg);
}
@StreamListener(value = Sink.INPUT)
public void receiveHeaderAndMsg(@Header("index") String index, Message msg) {
System.out.println("receive by HeaderAndMsg by StreamListener. content: " + msg);
}

You may find this snippet similar to the one that is used in Spring MVC Controller to receive requests. That is because they use similar architectures. Spring MVC classes used to process arguments and return values of the controller are respectively and .

Spring Messaging classes used to process arguments and return values are respectively and .

Their class names and even the internal method names are the same.

Summary

Image for post
Image for post

The preceding figure summarizes classes of the SCS system. For more demos about SCS and the RocketMQ Binder, see RocketMQ Binder Demos. These demos cover the message aggregator, splitter, and filter; the handling of abnormal messages; message tags and SQL filtering; and synchronous and asynchronous consumption.

The next article will analyze the role of the Spring Cloud Bus in the Spring Cloud system, and show you how RocketMQ Binder of Spring Cloud Alibaba implements the Spring Cloud Stream standard.

Reference:https://www.alibabacloud.com/blog/introduction-to-the-spring-cloud-stream-system_594822?spm=a2c41.12911970.0.0

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store