spring-boot 2.3.x 整合rocketmq

2022-07-19 09:26:21

spring-boot 2.3.x 整合rocketmq


本地项目的基础环境
环境版本
jdk1.8.0_201
maven3.6.0
Spring-boot2.3.3.RELEASE

1、rocketMq的安装(docker形式)

这里使用docker,做一个快速的单机版本安装,需要更详细的其他形式的安装,可以查看其他的相关资料或者官网地址

https://github.com/apache/rocketmq-docker

《docker环境下安装rockermq以及rockermq-console》

1.1、docker-compose.yml

version:'3'
services:
  namesrv:
    image: apacherocketmq/rocketmq
    container_name: namesrv
    ports:
    - 9876:9876
    volumes:
    - ./data/namesrv/logs:/home/rocketmq/logs
    command: sh mqnamesrv
  broker:
    image: apacherocketmq/rocketmq
    container_name: rmqbroker
    ports:
    - 10909:10909
    - 10911:10911
    - 10912:10912
    volumes:
    - ./data/broker/logs:/home/rocketmq/logs
    - ./data/broker/store:/home/rocketmq/store
    - ./data/broker/broker.conf:/home/rocketmq/rocketmq-4.6.0/conf/broker.conf
    command: sh mqbroker -n namesrv:9876 -c../conf/broker.conf
    depends_on:
    - namesrv
  rmqconsole:
    image: styletang/rocketmq-console-ng
    container_name: rmqconsole
    ports:
    - 8080:8080
    environment:
      JAVA_OPTS: -Drocketmq.namesrv.addr=namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false
    depends_on:
    - namesrv

2、构建一个rocketMq的项目

主要是导入rocketmq-spring-boot-starter的包,以及spring-boot-starter-web的包;

导入web的包,是再消费的时候,没有守护线程,程序启动后,就会自动退出,导入web包后,tomcat容器启动,消费的线程就不退出了;

<?xml version="1.0"?><projectxsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.3.RELEASE</version><relativePath/><!-- lookup parent from repository --></parent><groupId>com.badger</groupId><artifactId>badger-spring-boot-rocketmq</artifactId><version>0.0.1-SNAPSHOT</version><name>badger-spring-boot-rocketmq</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>1.8</java.version><maven-jar-plugin.version>3.1.1</maven-jar-plugin.version></properties><dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

3.1、定义yml配置文件

rocketmq:name-server: localhost:9876producer:group: test-producer-group

org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.class具体更可以参看配置类

3.2、生产者测试代码

@RunWith(SpringRunner.class)@SpringBootTest(classes={ RocketmqApplicaltion.class})publicclassTestApp{@Autowiredprivate RocketMQTemplate rocketMQTemplate;@TestpublicvoidsyncSend(){for(int i=0; i<100; i++){// 同步消息
            Message<String> bashMessage=newGenericMessage<String>("test_producer"+ i);
            SendResult syncSend= rocketMQTemplate.syncSend("test_producer", bashMessage);
            System.out.println(syncSend);}}@TestpublicvoidasyncSend(){for(int i=0; i<100; i++){// 异步消息
            Message<String> message=newGenericMessage<String>("test_producer"+ i);
            rocketMQTemplate.asyncSend("test_producer", message,newSendCallback(){@OverridepublicvoidonSuccess(SendResult sendResult){
                    System.out.println(sendResult);}@OverridepublicvoidonException(Throwable e){
                    System.out.println("发送失败");}});}}@TestpublicvoidsendOneWay(){for(int i=0; i<100; i++){// 单向发送消息
            Message<String> message=newGenericMessage<String>("test_producer"+ i);
            rocketMQTemplate.sendOneWay("test_producer", message);
            System.out.println("只发送一次");}}@TestpublicvoidsyncSendOrder(){// 发送有序消息
        String[] tags=newString[]{"TagA","TagC","TagD"};for(int i=0; i<10; i++){// 加个时间前缀
            Message<String> message=newGenericMessage<String>("我是顺序消息"+ i);
            SendResult sendResult= rocketMQTemplate.syncSendOrderly("test_producer:"+ tags[i% tags.length], message,
                    i+"");
            System.out.println(sendResult);}}}

注意:

1、跟原生rocketmq Api对比,rocketMQ start 的做了二次的封装,把同步异步发送消息,用方法名称做了区别;相同的是,无论是原生的api还是二次封装的api,异步调用的时候,回调是在参数体里的,毕竟异步发送需要等待回调,而同步发送可以只有回调。

2、顺序消息:严格顺序消息模式下,消费者收到的所有消息均是有顺序的

发送消息的时候,消息被存储在MessageQueue队列里的,默认的时候,是4个队列;为了保证消息的顺序,是需要把相同业务的数据按照顺序写入对应的队列中,单个队列下,数据是严格有序的;

rocketMQ start 对原生api做了二次封装,提供了默认的MessageQueue选择器,用的字符串的hash算法实现的,如果不满足实际需求,需要重写选择器。

3.3、定义消费端代码

@Component@RocketMQMessageListener(topic="test_producer", consumerGroup="test_consumer-group")publicclassDemoConsumerimplementsRocketMQListener<String>{private Logger logger= LoggerFactory.getLogger(getClass());@OverridepublicvoidonMessage(String message){
        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);}}

注意:

1、消费端的注解@RocketMQMessageListener属性中consumerGroup,多个消费端(消费集群)消费同一个topic的时候,需要定义成一致;

2、消费端消费的时候,是会多线程的形式消费topic里的4个MessageQueue的,如果要消费顺序消息,需要指定属性consumeModeConsumeMode.ORDERLY,表示同步消费;

3.4、主启动类

@SpringBootApplicationpublicclassRocketmqApplicaltion{publicstaticvoidmain(String[] args)throws Exception{
        SpringApplication.run(RocketmqApplicaltion.class, args);}}

《官方文档github》

《官方文档》

详细代码也可以参看《码云》

  • 作者:葵花下的獾
  • 原文链接:https://blog.csdn.net/qq_28410283/article/details/116377839
    更新时间:2022-07-19 09:26:21