SpringBoot 整合实现ActiveMQ

2022年11月22日13:26:15

一、ActiveMQ介绍

ActiveMQ是ASF(Apache Software Foundation)的一款消息中间件(middle-ware),消息中间件主要完成的是消息的接收、存储和转发

主要实现的模式是生产消费模式、订阅发布模式

其主要区别是:
生产消费模式中,生产完消息,消息一经消费,便不再存在。
发布订阅模式中,一条消息可以有多个订阅者,即一条消息的消费者可以有多个。

消息中间件的主要作用:流量削峰、异步处理、应用解耦、日志处理。

合理利用消息中间件,可以大大提升网站的并发量,增强网站的稳定性。类似于ActiveMQ这样的消息中间件产品还有很多:RocketMQ、RabbitMQ和Kafka。其中Kafka是构建微服务系统首选的产品。

消息形式:
1、点对点(queue)
2、一对多(topic)

二、ActiveMQ安装、服务端开启

2.1 安装

安装非常简单,官网下载,选择适合的版本,解压,安装启动即可。
需开启8161和61616端口,8161是用于后台管理的端口,61616是Java连接使用。

后台登录地址http://ip:8161/admin,用户名密码都是admin。

2.2 服务端启动

在本地下载好ActiveMQ,进入bin目录,执行./activemq start即可。这一步需要本机上有可用的JRE环境。
可以通过ActiveMQ可视化管理界面进行队列创建和消息管理,同时也可以验证我们的ActiveMQ是否正常工作。(访问地址:localhost:8161,初始用户名和密码都为admin)。
ActiveMQ管理端界面如图所示。

SpringBoot 整合实现ActiveMQ
上面的启动是一次性的,如果想一直开启服务,可以将Activemq安装为系统服务:
win10下将Activemq安装为系统服务

三、SpringBoot 整合实现ActiveMQ

3.1添加依赖

<!--activemq依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency><!--消息队列连接池:提升效率的连接池 queue方式可以不添加--><!--<dependency>--><!--   <groupId>org.apache.activemq</groupId>--><!--    <artifactId>activemq-pool</artifactId>--><!--</dependency>--><!-- 如果配置线程池则加入 --><dependency><groupId>org.messaginghub</groupId><artifactId>pooled-jms</artifactId></dependency>

3.2添加配置文件(yml文件)

server:#Spring boot项目访问端口port:8080spring:activemq:broker-url: tcp://127.0.0.1:61616user: adminpassword: admin# 如果是true,则是Topic;如果是false或者默认,则是queue。jms:pub-sub-domain:falsepool:enabled:false#连接池启动  默认false#      max-connections: 10 # 最大连接数 默认1# 使用queue(点对点)方式是,pool.enable要设置为false,默认使用的是queue方式,使用topic(订阅)方式是设置为true,同时要添加spring.jms.pub-sub-domain=true

ActiveMQ,有两种形式,分别为Queue(生产消费),Topic(发布订阅)。

Queue为点对点模式,即有一个消息,才能有一个消费,多个消费者不会重复对应一个消息。

Topic为一对多形式,当订阅者订阅后,发布者发布消息所有订阅者都会接受到消息。

3.3 Queue

3.3.1 Queue配置

packagecom.example.activemqjava.common.activemq.config;importorg.apache.activemq.command.ActiveMQQueue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjavax.jms.Queue;/**
 * Queue配置
 * Queue为点对点模式,即有一个消息,才能有一个消费,多个消费者不会重复对应一个消息
 * @author qzz
 */@ConfigurationpublicclassQueueConfig{/**
     * 定义存放消息的队列
     * @return
     */@BeanpublicQueuequeue(){returnnewActiveMQQueue("my-test");}}

3.3.2 创建生产者

packagecom.example.activemqjava.common.activemq;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.jms.core.JmsMessagingTemplate;importorg.springframework.stereotype.Component;importorg.springframework.stereotype.Service;importjavax.jms.Queue;/**
 * 生产者
 * @author qzz
 */@ComponentpublicclassProducer{@AutowiredprivateQueue queue;@AutowiredprivateJmsMessagingTemplate jmsMessagingTemplate;/**
     * 发送消息
     * @param msg
     */publicvoidsendMessage(String msg){//方法一:添加消息对消息队列
        jmsMessagingTemplate.convertAndSend(queue,msg);//方法二:这种方式不需要手动创建queue,系统会自动创建名为test的队列//        jmsMessagingTemplate.convertAndSend("test",msg);}/**
     * 发送消息
     * @param msg
     */publicvoidsendMessage(String destination,String msg){//方法二:这种方式不需要手动创建queue,系统会自动创建名为test的队列if(destination!=null){
            jmsMessagingTemplate.convertAndSend(destination,msg);}}}

3.3.3 创建消费者

packagecom.example.activemqjava.common.activemq;importorg.springframework.jms.annotation.JmsListener;importorg.springframework.stereotype.Component;/**
 * 消费者
 * @author qzz
 */@ComponentpublicclassConsumer{/**
     * 消费消息
     * 使用JmsListener配置消费者监听的队列,其中message是接收到的消息
     *
     * @SendTo("Squeue"):SendTo会将此方法返回的数据,写入到OutQueue中去
     * @param message
     */@JmsListener(destination="my-test")publicvoidreceiveQueue(String message){System.out.println("Consumer接收的消息是:"+message);}}

3.3.4 启动类添加@EnableJms 注解,启动消息队列

/**
 * @author qzz
 * @EnableJms 会启动 jms 的注解扫描即发现 @JmsListener 注释的方法创建消息监听容器,相当于 <jms:annotation-d riven/>
 */@SpringBootApplication@EnableJms//启动消息队列publicclassActivemqJavaApplication{publicstaticvoidmain(String[] args){SpringApplication.run(ActivemqJavaApplication.class, args);}}

3.3.5 测试

packagecom.example.activemqjava.controller;importcom.example.activemqjava.common.activemq.Producer;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;/**
 * 测试  activemq
 * @author qzz
 */@RestController@RequestMapping("/test")publicclassJmsTestController{@AutowiredprivateProducer producer;/**
     * 发送消息
     */@RequestMapping("/sendMessages")publicvoidsendMessages(){for(int i=0;i<5;i++){
            producer.sendMessage("this is a queue test"+i);}}}

运行之后登陆ActiveMQ后台管理界面如下:
SpringBoot 整合实现ActiveMQ
调用 http://localhost:8080/test/sendMessages接口,查看效果:
SpringBoot 整合实现ActiveMQ
刷新ActiveMQ后台管理界面:
SpringBoot 整合实现ActiveMQ

Number Of Pending Messages:消息队列中待处理的消息
Number Of Consumers:消费者的数量
Messages Enqueued:累计进入过消息队列的总量
Messages Dequeued:累计消费过的消息总量

3.4 Topic

server:#Spring boot项目访问端口port:8083spring:activemq:broker-url: tcp://127.0.0.1:61616user: adminpassword: admin# 如果是true,则是Topic;如果是false或者默认,则是queue。jms:pub-sub-domain:truepool:enabled:true#连接池启动  默认falsemax-connections:10# 最大连接数 默认1# 使用queue(点对点)方式是,pool.enable要设置为false,默认使用的是queue方式,使用topic(订阅)方式是设置为true,同时要添加spring.jms.pub-sub-domain=true

3.4.1 Topic配置

packagecom.example.activemqjava1.activemq.config;importorg.apache.activemq.command.ActiveMQTopic;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjavax.jms.Topic;/**
 * Topic配置
 *  Topic:发布/订阅模式,生产者生产了一个消息,可以由多个消费者进行消费
 * @author qzz
 */@ConfigurationpublicclassTopicConfig{/**
     * 定义存放消息的topic
     * @return
     */@BeanpublicTopictopic(){returnnewActiveMQTopic("my-topic");}}

3.4.2 发布者

packagecom.example.activemqjava1.activemq;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.jms.core.JmsMessagingTemplate;importorg.springframework.stereotype.Service;importjavax.jms.Topic;/**
 * 发布者
 * @author qzz
 */@ServicepublicclassTopicProducer{@AutowiredprivateTopic topic;@AutowiredprivateJmsMessagingTemplate jmsMessagingTemplate;/**
     * 发送消息
     * @param msg
     */publicvoidsendTopic(String msg){this.jmsMessagingTemplate.convertAndSend(this.topic,msg);}}

3.4.3 订阅者

packagecom.example.activemqjava1.activemq;importorg.springframework.jms.annotation.JmsListener;importorg.springframework.stereotype.Component;/**
 * 订阅者
 * @author qzz
 */@ComponentpublicclassTopicConsumer{/**
     * 消费消息
     * 使用JmsListener配置消费者监听的topic,其中message是接收到的消息
     * @param message
     */@JmsListener(destination="my-topic")publicvoidreceiveTopic(String message){System.out.println("TopicConsumer接收的消息是:"+message);}}

3.4.4 测试

packagecom.example.activemqjava1.controller;importcom.example.activemqjava1.activemq.TopicProducer;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;/**
 * 测试  activemq
 * @author qzz
 */@RestController@RequestMapping("/test")publicclassJmsTestController{@AutowiredprivateTopicProducer topicProducer;/**
     * 发送消息
     */@RequestMapping("/sendMessages")publicvoidsendMessages(){for(int i=0;i<5;i++){
            topicProducer.sendTopic("this is a topic test"+i);}}}

运行之后登陆ActiveMQ后台管理界面如下:
SpringBoot 整合实现ActiveMQ
调用 http://localhost:8083/test/sendMessages接口,查看效果:
SpringBoot 整合实现ActiveMQ
SpringBoot 整合实现ActiveMQ
发现,发送消息成功,但是消息并没有被 监听消费。

原因是:JmsListener注解默认只接收queue消息,如果要接收topic消息,需要设置containerFactory</font>

3.4.5 创建一个配置类,在配置类中提供监听工厂配置

packagecom.example.activemqjava1.activemq.config;importorg.apache.activemq.ActiveMQConnectionFactory;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.jms.config.DefaultJmsListenerContainerFactory;importorg.springframework.jms.config.JmsListenerContainerFactory;importjavax.jms.ConnectionFactory;/**
 * axtiveMQ配置类:创建监听工厂配置
 * @author qzz
 */@ConfigurationpublicclassActiveMQConfig{@Value("${spring.activemq.broker-url}")privateString brokerUrl;@Value("${spring.activemq.user}")privateString userName;@Value("${spring.activemq.password}")privateString password;@BeanpublicConnectionFactoryconnectionFactory(){returnnewActiveMQConnectionFactory(userName,password,brokerUrl);}@Bean(name="topicListener")publicJmsListenerContainerFactory<?>jmsListenerContainerFactory(ConnectionFactory connectionFactory){DefaultJmsListenerContainerFactory factory=newDefaultJmsListenerContainerFactory();//是否支持 发布订阅
        factory.setPubSubDomain(true);
        factory.setConnectionFactory(connectionFactory);return factory;}}

3.4.6 修改订阅者中的@JmsListener注解的属性

packagecom.example.activemqjava1.activemq;importorg.springframework.jms.annotation.JmsListener;importorg.springframework.stereotype.Component;/**
 * 订阅者
 * @author qzz
 */@ComponentpublicclassTopicConsumer{/**
     * 消费消息
     * 使用JmsListener配置消费者监听的topic,其中message是接收到的消息
     * @param message
     */@JmsListener(destination="my-topic",containerFactory="topicListener")publicvoidreceiveTopic(String message){System.out.println("TopicConsumer接收的消息是:"+message);}}

3.4.7 重启项目,看效果

登录ActiveMQ后台管理界面,删除对应topic,
SpringBoot 整合实现ActiveMQ
然后重启项目,进行测试:

调用 http://localhost:8083/test/sendMessages接口,控制台结果如下:
SpringBoot 整合实现ActiveMQ
SpringBoot 整合实现ActiveMQ

说明 检查消费成功!

参考资料:https://blog.csdn.net/mycsdn6/article/details/106322223/

https://blog.csdn.net/qq_38403590/article/details/119773671
https://www.cnblogs.com/wuyoucao/p/10947940.html

  • 作者:12程序猿
  • 原文链接:https://blog.csdn.net/qq_26383975/article/details/124943780
    更新时间:2022年11月22日13:26:15 ,共 8945 字。