Cloud-Stream
使用场景
支持rabbitMQ和kafka 两种队列
中间件的差异性导致我们实际项目开发中给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我们想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推到重新做,因为它跟我们的系统耦合了,这时候springCloud Stream给我们提供一种解耦的方式。
设计思想
Stream通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
Binder:input对应于消费者,output对应生产者
Stream中的消息通信方式遵循了发布-订阅模式,topic主题进行广播。
在RabbitMQ中就是Exchange,在kafka中就是Topic
Stream标准流程
编码API和常用注解
以RabbitMQ为案例
生产者
pom文件依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>
yml配置
server:port:8801spring:application:
name:cloud-stream-providercloud:stream:binders:#在此处配置要绑定的rabbitMQ的服务信息;defaultRabbit:#表示定义的名称,用于binding集合type: rabbit#消息组件类型environment:#设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport:5672username: guestpassword: guestbindings:# 服务的整合处理output:# 这个名字是一个通道的名称destination: studyExchange# 表示要使用的Exchange名称定义content-type: application/json# 设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit# 设置要绑定的消息服务的具体设置
配置含义
1、binders: 这是一组binder的集合,这里配置了一个名为test的binder,这个binder中是包含了一个rabbit的连接信息
2、bindings:这是一组binding的集合,这里配置了一个名为testOutPut的binding,这个binding中配置了指向名test的binder下的一个交换机testRabbit。
3、扩展: 如果我们项目中不仅集成了rabbit还集成了kafka那么就可以新增一个类型为kafka的binder、如果项目中会使用多个交换机那么就使用多个binding,
消息生产业务类
publicinterfaceIMessageProvider{publicStringsend();}@EnableBinding(Spurce.class)//定义消息的推送管道publicclassIMessageProviderImplimplementsIMessageProvider{@ResourceprivateMessageChannel output;@OverridepublicStringsend(){String serial= UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build())returnnull;}}@RestControllerpublicclassSendMessageController{@ResouceprivateIMessageProvider messageProvider;@GetMapping("sendMessage")publicStringsendMessage(){return messageProvider.send();}}
消费者1
pom依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>
yml配置
server:
port: 8802
spring:
application:
name:cloud-stream-consumer
cloud:
stream:
binders: #在此处配置要绑定的rabbitMQ的服务信息;
defaultRabbit: #表示定义的名称,用于binding集合
type: rabbit #消息组件类型
environment: #设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
group: rabbitA #消息分组
监听业务类
@Component@EnableBinding(Sink.class)publicclassReceiveMessageListener{@StreamListtener(Sink.INPUT)publicvoidinput(Message<String> message){System.out.println("消费者1号:"+message.getPayload+"\t")}}
消费者2
pom依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>
yml配置
server:
port: 8803
spring:
application:
name:cloud-stream-consumer
cloud:
stream:
binders: #在此处配置要绑定的rabbitMQ的服务信息;
defaultRabbit: #表示定义的名称,用于binding集合
type: rabbit #消息组件类型
environment: #设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
group: rabbitA #消息分组
监听业务类
@Component@EnableBinding(Sink.class)publicclassReceiveMessageListener{@StreamListtener(Sink.INPUT)publicvoidinput(Message<String> message){System.out.println("消费者1号:"+message.getPayload+"\t")}}
消息分组(解决在集群下重复消费的问题)
注意在Stream中处于同一个group中多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。
不同组是可以全面消费的(重复消费),
同一组内会发生竞争关系,只有其中一个可以消费
加上消息分组后自动支持持久化
如果文章中有没有涉及到的技能点,欢迎大家在下方的评论中指出