Cloud-Stream实例配置和讲解

2022-06-20 08:47:43

Cloud-Stream

使用场景

支持rabbitMQ和kafka 两种队列

中间件的差异性导致我们实际项目开发中给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我们想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推到重新做,因为它跟我们的系统耦合了,这时候springCloud Stream给我们提供一种解耦的方式。

设计思想
在这里插入图片描述
Stream通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
Binder:input对应于消费者,output对应生产者
Stream中的消息通信方式遵循了发布-订阅模式,topic主题进行广播。
在RabbitMQ中就是Exchange,在kafka中就是Topic

Stream标准流程
在这里插入图片描述

编码API和常用注解
在这里插入图片描述

以RabbitMQ为案例

生产者

pom文件依赖

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>

yml配置

server:port:8801spring:application:
    name:cloud-stream-providercloud:stream:binders:#在此处配置要绑定的rabbitMQ的服务信息;defaultRabbit:#表示定义的名称,用于binding集合type: rabbit#消息组件类型environment:#设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport:5672username: guestpassword: guestbindings:# 服务的整合处理output:# 这个名字是一个通道的名称destination: studyExchange# 表示要使用的Exchange名称定义content-type: application/json# 设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit# 设置要绑定的消息服务的具体设置

配置含义

1、binders: 这是一组binder的集合,这里配置了一个名为test的binder,这个binder中是包含了一个rabbit的连接信息
2、bindings:这是一组binding的集合,这里配置了一个名为testOutPut的binding,这个binding中配置了指向名test的binder下的一个交换机testRabbit。
3、扩展: 如果我们项目中不仅集成了rabbit还集成了kafka那么就可以新增一个类型为kafka的binder、如果项目中会使用多个交换机那么就使用多个binding,

消息生产业务类

publicinterfaceIMessageProvider{publicStringsend();}@EnableBinding(Spurce.class)//定义消息的推送管道publicclassIMessageProviderImplimplementsIMessageProvider{@ResourceprivateMessageChannel output;@OverridepublicStringsend(){String serial= UUID.randomUUID().toString();
    	output.send(MessageBuilder.withPayload(serial).build())returnnull;}}@RestControllerpublicclassSendMessageController{@ResouceprivateIMessageProvider messageProvider;@GetMapping("sendMessage")publicStringsendMessage(){return messageProvider.send();}}

消费者1

pom依赖

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>

yml配置

server:
  port: 8802
spring:
  application:
    name:cloud-stream-consumer
  cloud:
    stream:
      binders: #在此处配置要绑定的rabbitMQ的服务信息;
        defaultRabbit: #表示定义的名称,用于binding集合
          type: rabbit #消息组件类型
          environment: #设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置
          group: rabbitA #消息分组

监听业务类

@Component@EnableBinding(Sink.class)publicclassReceiveMessageListener{@StreamListtener(Sink.INPUT)publicvoidinput(Message<String> message){System.out.println("消费者1号:"+message.getPayload+"\t")}}

消费者2

pom依赖

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>

yml配置

server:
  port: 8803
spring:
  application:
    name:cloud-stream-consumer
  cloud:
    stream:
      binders: #在此处配置要绑定的rabbitMQ的服务信息;
        defaultRabbit: #表示定义的名称,用于binding集合
          type: rabbit #消息组件类型
          environment: #设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置
          group: rabbitA #消息分组

监听业务类

@Component@EnableBinding(Sink.class)publicclassReceiveMessageListener{@StreamListtener(Sink.INPUT)publicvoidinput(Message<String> message){System.out.println("消费者1号:"+message.getPayload+"\t")}}

消息分组(解决在集群下重复消费的问题)

注意在Stream中处于同一个group中多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。
不同组是可以全面消费的(重复消费),
同一组内会发生竞争关系,只有其中一个可以消费
加上消息分组后自动支持持久化

如果文章中有没有涉及到的技能点,欢迎大家在下方的评论中指出

  • 作者:Dhjie_king
  • 原文链接:https://blog.csdn.net/Dhjie_king/article/details/117416359
    更新时间:2022-06-20 08:47:43