Spring Cloud Stream
本章节的代码:https://github.com/wicksonZhang/Spring-Cloud
我们只需要聚焦在如下服务当中:
1. 基础概念
- 消息驱动:是一种编程模型,其中组件之间通过异步消息传递来实现松耦合、分布式的通信和协作,提高系统的可伸缩性和弹性。
1.1. Spring Cloud Stream 解决了什么问题?
Spring Cloud Stream 解决了消息驱动微服务架构中消息生产者和消息消费者的解耦、消息传递、以及不同消息代理系统的适配问题。
假设有一个电商系统,其中订单服务负责处理订单相关的业务,当订单服务产生一个新的订单时,需要将这条订单信息发送到消息通道,而不需要关心消息是如何被处理、传递到哪里的。订单服务产生订单信息之后,库存服务需要减少相应库存,那么库存服务只需要通过订阅相应的消息通道,处理订单创建的消息。
再举一个场景的例子,类似于微信公众号的消息推送。当公众号推送消息之后,只有订阅了这个公众号的人才能收到消息。
这种方式下,消息生产者和消费者之间是松耦合的,它们可以独立部署和演化,更好地支持微服务架构的原则。
1.2. Spring Cloud Stream 是什么?
Spring Cloud Stream 是基于 Spring Boot 的一个用于构建消息驱动微服务的框架。具体的核心概念和特定如下:
- Binder(绑定器): Spring Cloud Stream 引入了 Binder 的概念,它是与消息代理系统交互的适配器。通过 Binder,可以方便地切换消息代理系统,比如从 RabbitMQ 切换到 Kafka,而不用修改应用程序代码。
- 消息通道(Message Channels): Spring Cloud Stream 使用消息通道来实现消息的传递。应用程序可以将消息发送到通道,并从通道接收消息。
- 消息处理(Message Processing): Spring Cloud Stream 提供了一组注解,如
@StreamListener
,使得消息的处理逻辑变得简单明了。

1.3. Spring Cloud Stream 的优缺点
优点
- 简化配置和开发: Spring Cloud Stream 简化了消息驱动微服务的配置和开发,通过声明式的方式,开发者只需要关注业务逻辑而不用过多考虑底层的消息传递细节。
- 适配多种消息代理系统: Spring Cloud Stream 支持多种消息代理系统,包括 RabbitMQ、Kafka、Redis 等,这使得系统更具灵活性。
- 整合 Spring 生态系统: Spring Cloud Stream 是 Spring Cloud 生态系统的一部分,可以与其他 Spring Cloud 组件无缝集成。
缺点
- 过度抽象可能导致不灵活: 尽管高度的抽象使得开发变得简单,但在一些特定场景下,过度的抽象可能会导致不够灵活。一些复杂的消息处理需求可能需要更详细的配置和定制。
2. 核心注解
如下图中是 Spring Cloud Stream 的基本原理,其中Binder 层负责和MQ中间件的通信,应用程序 Application Core 通过 inputs 接收 Binder 包装后的 Message,相当于是消费者Consumer;通过 outputs 投递 Message给 Binder,然后由 Binder 转换后投递给MQ中间件,相当于是生产者Producer。

组成 |
说明 |
Middleware |
中间件,目前支持 RabbitMQ 和 KafKa |
Binder |
负责和MQ中间件进行连接和通信,可以动态的改变消息类型(对应 Kafka 的 topic,RabbitMQ 的 exchange) |
@Input |
注解标识输入通道,通过该输入通道接收到的消息进入应用程序 |
@Output |
注解标识输出通达,发布的消息将通过该通道离开应用程序 |
@StreamListener |
监听队列,用于消费者的队列的消息接收 |
@EnableBinding |
将信道 change 和 exchange 绑定在一起 |
3. 具体操作
准备工作
实现需求
我们创建消息生产者和消息消费者。当生产者产生消息之后,两个消费者会通过消息通道监听到对应的消息。具体大流程如下图

实现思路
- Step-1:创建消息生产者
11-spring-cloud-stream-producer
- Step-2:创建消息消费者1
11-spring-cloud-stream-consumer1-11200
- Step-3:创建消息消费者2
11-spring-cloud-stream-consumer2-11300
代码结构

3.1. 创建消息生产者
实现步骤
1 2 3 4 5 6
| Step-1: 创建消息生产者服务 11-spring-cloud-stream-producer-11100 Step-2: 导入 pom.xml 依赖 Step-3: 创建 bootstrap.yml Step-4: 创建启动类 SpringCloudStreamProducerApplication Step-5: 创建控制类 ProducerController Step-6: 创建消息生产者 IMessageProvider、MessageProviderImpl
|
Step-2: 导入 pom.xml 依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| <dependencies> <dependency> <groupId>cn.wickson.cloud</groupId> <artifactId>01-spring-cloud-common</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency>
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> </dependencies>
|
Step-3: 创建 bootstrap.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| server: port: 11100
spring: application: name: spring-cloud-stream-producer cloud:
stream: binders: defaultRabbit: type: rabbit environment: spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: output: destination: stream-exchange content-type: application/json binder: defaultRabbit
eureka: instance: hostname: spring-cloud-stream-producer instance-id: spring-cloud-stream-producer:11100 prefer-ip-address: true lease-renewal-interval-in-seconds: 2 lease-expiration-duration-in-seconds: 5 client: service-url: register-with-eureka: true fetch-registry: true defaultZone: http://eureka3300.com:3300/eureka,http://eureka3400.com:3400/eureka
|
Step-4: 创建启动类 SpringCloudStreamProducerApplication
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
@EnableEurekaClient @SpringBootApplication(scanBasePackages = "cn.wickson.cloud") public class SpringCloudStreamProducerApplication {
public static void main(String[] args) { SpringApplication.run(SpringCloudStreamProducerApplication.class, args); }
}
|
Step-5: 创建控制类 ProducerController
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
@Slf4j @Validated @RestController @RequestMapping("/producer") public class ProducerController {
@Resource private IMessageProvider messageProvider;
@GetMapping(value = "/sendMessage") public String sendMessage() { return messageProvider.sendMessage(); }
}
|
Step-6: 创建消息生产者 IMessageProvider、MessageProviderImpl
1 2 3 4 5 6 7 8 9 10 11
|
public interface IMessageProvider {
String sendMessage();
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
|
@Slf4j
@EnableBinding(Source.class) public class MessageProviderImpl implements IMessageProvider {
@Resource private MessageChannel output;
@Override public String sendMessage() { String uuid = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(uuid).build()); return "生产者生产一条消息:" + uuid; } }
|
3.2. 创建消息消费者1
实现步骤
1 2 3 4 5
| Step-1: 创建消息消费者1服务 11-spring-cloud-stream-consumer1-11200 Step-2: 导入 pom.xml 依赖 Step-3: 创建 bootstrap.yml Step-4: 创建启动类 SpringCloudStreamConsumer1Application Step-5: 创建监听类 Consumer1MessageListener
|
Step-2: 导入 pom.xml 依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| <dependencies> <dependency> <groupId>cn.wickson.cloud</groupId> <artifactId>01-spring-cloud-common</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency>
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> </dependencies>
|
Step-3: 创建 bootstrap.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| server: port: 11200
spring: application: name: spring-cloud-stream-consumer1 cloud: stream: binders: defaultRabbit: type: rabbit environment: spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: input: destination: stream-exchange content-type: application/json binder: defaultRabbit group: consumer1
eureka: instance: hostname: spring-cloud-stream-consumer1 instance-id: spring-cloud-stream-consumer1:11200 prefer-ip-address: true lease-renewal-interval-in-seconds: 2 lease-expiration-duration-in-seconds: 5 client: service-url: register-with-eureka: true fetch-registry: true defaultZone: http://eureka3300.com:3300/eureka,http://eureka3400.com:3400/eureka
|
Step-4: 创建启动类 SpringCloudStreamConsumer1Application
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
@EnableEurekaClient @SpringBootApplication public class SpringCloudStreamConsumer1Application {
public static void main(String[] args) { SpringApplication.run(SpringCloudStreamConsumer1Application.class, args); } }
|
Step-5: 创建监听类 Consumer1MessageListener
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
@Slf4j @Component @EnableBinding(Sink.class) public class Consumer1MessageListener {
@Value("${server.port}") private String serverPort;
@StreamListener(Sink.INPUT) public void input(Message<String> message) { log.info("Server.Port:{} , Consumer1MessageListener receive message :{}", serverPort, message.getPayload()); }
}
|
3.3. 创建消息消费者2
4. 单元测试
我们在上面的基础上,重新新增了两个服务进行单元测试,分别是 WebSocket
和 Gateway
两个微服务 和 前端项目。
服务信息如下:

Eureka 服务信息如下:

4.1. 消费者处于同一个组
当两个消费者处于同一个组:group: consumer1
,这两个组中只会有其中一个服务会接收到消息,不会重复消费。
bootstrap.yml
1 2 3 4 5 6 7 8 9 10 11
| bindings: input: destination: stream-exchange content-type: application/json binder: defaultRabbit group: consumer1
|
- 当两个消费者处于同一个组:
group: consumer1

4.2. 消费者处于不同的组
当两个消费者处于不同的个组
- consumer1 处于
group:consumer1
- consumer2 处于
group:consumer2
- 这两个组中只都会接收到消息,不会重复消费。
当两个消费者处于不同的组
