一、ActiveMQ介绍
ActiveMQ是ASF(Apache Software Foundation)的一款消息中间件(middle-ware),消息中间件主要完成的是消息的接收、存储和转发
。
主要实现的模式是生产消费模式、订阅发布模式
。
其主要区别是:
生产消费模式中,生产完消息,消息一经消费,便不再存在。
发布订阅模式中,一条消息可以有多个订阅者,即一条消息的消费者可以有多个。
消息中间件的主要作用
:流量削峰、异步处理、应用解耦、日志处理。
合理利用消息中间件,可以大大提升网站的并发量,增强网站的稳定性。类似于ActiveMQ这样的消息中间件产品还有很多:RocketMQ、RabbitMQ和Kafka。其中Kafka是构建微服务系统首选的产品。
消息形式:
1、点对点(queue)
2、一对多(topic)
二、ActiveMQ安装、服务端开启
2.1 安装
安装非常简单,官网下载,选择适合的版本,解压,安装启动即可。需开启8161和61616端口,8161是用于后台管理的端口,61616是Java连接使用。
后台登录地址http://ip:8161/admin,用户名密码都是admin。
2.2 服务端启动
在本地下载好ActiveMQ,进入bin目录,执行./activemq start即可。这一步需要本机上有可用的JRE环境。
可以通过ActiveMQ可视化管理界面进行队列创建和消息管理,同时也可以验证我们的ActiveMQ是否正常工作。(访问地址:localhost:8161,初始用户名和密码都为admin)。
ActiveMQ管理端界面如图所示。
上面的启动是一次性的,如果想一直开启服务,可以将Activemq安装为系统服务:
win10下将Activemq安装为系统服务
三、SpringBoot 整合实现ActiveMQ
3.1添加依赖
<!--activemq依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency><!--消息队列连接池:提升效率的连接池 queue方式可以不添加--><!--<dependency>--><!-- <groupId>org.apache.activemq</groupId>--><!-- <artifactId>activemq-pool</artifactId>--><!--</dependency>--><!-- 如果配置线程池则加入 --><dependency><groupId>org.messaginghub</groupId><artifactId>pooled-jms</artifactId></dependency>
3.2添加配置文件(yml文件)
server:#Spring boot项目访问端口port:8080spring:activemq:broker-url: tcp://127.0.0.1:61616user: adminpassword: admin# 如果是true,则是Topic;如果是false或者默认,则是queue。jms:pub-sub-domain:falsepool:enabled:false#连接池启动 默认false# max-connections: 10 # 最大连接数 默认1# 使用queue(点对点)方式是,pool.enable要设置为false,默认使用的是queue方式,使用topic(订阅)方式是设置为true,同时要添加spring.jms.pub-sub-domain=true
ActiveMQ,有两种形式,分别为Queue(生产消费),Topic(发布订阅)。
Queue为点对点模式,即有一个消息,才能有一个消费,多个消费者不会重复对应一个消息。
Topic为一对多形式,当订阅者订阅后,发布者发布消息所有订阅者都会接受到消息。
3.3 Queue
3.3.1 Queue配置
packagecom.example.activemqjava.common.activemq.config;importorg.apache.activemq.command.ActiveMQQueue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjavax.jms.Queue;/**
* Queue配置
* Queue为点对点模式,即有一个消息,才能有一个消费,多个消费者不会重复对应一个消息
* @author qzz
*/@ConfigurationpublicclassQueueConfig{/**
* 定义存放消息的队列
* @return
*/@BeanpublicQueuequeue(){returnnewActiveMQQueue("my-test");}}
3.3.2 创建生产者
packagecom.example.activemqjava.common.activemq;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.jms.core.JmsMessagingTemplate;importorg.springframework.stereotype.Component;importorg.springframework.stereotype.Service;importjavax.jms.Queue;/**
* 生产者
* @author qzz
*/@ComponentpublicclassProducer{@AutowiredprivateQueue queue;@AutowiredprivateJmsMessagingTemplate jmsMessagingTemplate;/**
* 发送消息
* @param msg
*/publicvoidsendMessage(String msg){//方法一:添加消息对消息队列
jmsMessagingTemplate.convertAndSend(queue,msg);//方法二:这种方式不需要手动创建queue,系统会自动创建名为test的队列// jmsMessagingTemplate.convertAndSend("test",msg);}/**
* 发送消息
* @param msg
*/publicvoidsendMessage(String destination,String msg){//方法二:这种方式不需要手动创建queue,系统会自动创建名为test的队列if(destination!=null){
jmsMessagingTemplate.convertAndSend(destination,msg);}}}
3.3.3 创建消费者
packagecom.example.activemqjava.common.activemq;importorg.springframework.jms.annotation.JmsListener;importorg.springframework.stereotype.Component;/**
* 消费者
* @author qzz
*/@ComponentpublicclassConsumer{/**
* 消费消息
* 使用JmsListener配置消费者监听的队列,其中message是接收到的消息
*
* @SendTo("Squeue"):SendTo会将此方法返回的数据,写入到OutQueue中去
* @param message
*/@JmsListener(destination="my-test")publicvoidreceiveQueue(String message){System.out.println("Consumer接收的消息是:"+message);}}
3.3.4 启动类添加@EnableJms 注解,启动消息队列
/**
* @author qzz
* @EnableJms 会启动 jms 的注解扫描即发现 @JmsListener 注释的方法创建消息监听容器,相当于 <jms:annotation-d riven/>
*/@SpringBootApplication@EnableJms//启动消息队列publicclassActivemqJavaApplication{publicstaticvoidmain(String[] args){SpringApplication.run(ActivemqJavaApplication.class, args);}}
3.3.5 测试
packagecom.example.activemqjava.controller;importcom.example.activemqjava.common.activemq.Producer;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;/**
* 测试 activemq
* @author qzz
*/@RestController@RequestMapping("/test")publicclassJmsTestController{@AutowiredprivateProducer producer;/**
* 发送消息
*/@RequestMapping("/sendMessages")publicvoidsendMessages(){for(int i=0;i<5;i++){
producer.sendMessage("this is a queue test"+i);}}}
运行之后登陆ActiveMQ后台管理界面如下:
调用 http://localhost:8080/test/sendMessages接口,查看效果:
刷新ActiveMQ后台管理界面:
Number Of Pending Messages:消息队列中待处理的消息
Number Of Consumers:消费者的数量
Messages Enqueued:累计进入过消息队列的总量
Messages Dequeued:累计消费过的消息总量
3.4 Topic
server:#Spring boot项目访问端口port:8083spring:activemq:broker-url: tcp://127.0.0.1:61616user: adminpassword: admin# 如果是true,则是Topic;如果是false或者默认,则是queue。jms:pub-sub-domain:truepool:enabled:true#连接池启动 默认falsemax-connections:10# 最大连接数 默认1# 使用queue(点对点)方式是,pool.enable要设置为false,默认使用的是queue方式,使用topic(订阅)方式是设置为true,同时要添加spring.jms.pub-sub-domain=true
3.4.1 Topic配置
packagecom.example.activemqjava1.activemq.config;importorg.apache.activemq.command.ActiveMQTopic;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjavax.jms.Topic;/**
* Topic配置
* Topic:发布/订阅模式,生产者生产了一个消息,可以由多个消费者进行消费
* @author qzz
*/@ConfigurationpublicclassTopicConfig{/**
* 定义存放消息的topic
* @return
*/@BeanpublicTopictopic(){returnnewActiveMQTopic("my-topic");}}
3.4.2 发布者
packagecom.example.activemqjava1.activemq;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.jms.core.JmsMessagingTemplate;importorg.springframework.stereotype.Service;importjavax.jms.Topic;/**
* 发布者
* @author qzz
*/@ServicepublicclassTopicProducer{@AutowiredprivateTopic topic;@AutowiredprivateJmsMessagingTemplate jmsMessagingTemplate;/**
* 发送消息
* @param msg
*/publicvoidsendTopic(String msg){this.jmsMessagingTemplate.convertAndSend(this.topic,msg);}}
3.4.3 订阅者
packagecom.example.activemqjava1.activemq;importorg.springframework.jms.annotation.JmsListener;importorg.springframework.stereotype.Component;/**
* 订阅者
* @author qzz
*/@ComponentpublicclassTopicConsumer{/**
* 消费消息
* 使用JmsListener配置消费者监听的topic,其中message是接收到的消息
* @param message
*/@JmsListener(destination="my-topic")publicvoidreceiveTopic(String message){System.out.println("TopicConsumer接收的消息是:"+message);}}
3.4.4 测试
packagecom.example.activemqjava1.controller;importcom.example.activemqjava1.activemq.TopicProducer;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;/**
* 测试 activemq
* @author qzz
*/@RestController@RequestMapping("/test")publicclassJmsTestController{@AutowiredprivateTopicProducer topicProducer;/**
* 发送消息
*/@RequestMapping("/sendMessages")publicvoidsendMessages(){for(int i=0;i<5;i++){
topicProducer.sendTopic("this is a topic test"+i);}}}
运行之后登陆ActiveMQ后台管理界面如下:
调用 http://localhost:8083/test/sendMessages接口,查看效果:
发现,发送消息成功,但是消息并没有被 监听消费。
原因是:JmsListener注解默认只接收queue消息,如果要接收topic消息,需要设置containerFactory</font>
3.4.5 创建一个配置类,在配置类中提供监听工厂配置
packagecom.example.activemqjava1.activemq.config;importorg.apache.activemq.ActiveMQConnectionFactory;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.jms.config.DefaultJmsListenerContainerFactory;importorg.springframework.jms.config.JmsListenerContainerFactory;importjavax.jms.ConnectionFactory;/**
* axtiveMQ配置类:创建监听工厂配置
* @author qzz
*/@ConfigurationpublicclassActiveMQConfig{@Value("${spring.activemq.broker-url}")privateString brokerUrl;@Value("${spring.activemq.user}")privateString userName;@Value("${spring.activemq.password}")privateString password;@BeanpublicConnectionFactoryconnectionFactory(){returnnewActiveMQConnectionFactory(userName,password,brokerUrl);}@Bean(name="topicListener")publicJmsListenerContainerFactory<?>jmsListenerContainerFactory(ConnectionFactory connectionFactory){DefaultJmsListenerContainerFactory factory=newDefaultJmsListenerContainerFactory();//是否支持 发布订阅
factory.setPubSubDomain(true);
factory.setConnectionFactory(connectionFactory);return factory;}}
3.4.6 修改订阅者中的@JmsListener注解的属性
packagecom.example.activemqjava1.activemq;importorg.springframework.jms.annotation.JmsListener;importorg.springframework.stereotype.Component;/**
* 订阅者
* @author qzz
*/@ComponentpublicclassTopicConsumer{/**
* 消费消息
* 使用JmsListener配置消费者监听的topic,其中message是接收到的消息
* @param message
*/@JmsListener(destination="my-topic",containerFactory="topicListener")publicvoidreceiveTopic(String message){System.out.println("TopicConsumer接收的消息是:"+message);}}
3.4.7 重启项目,看效果
登录ActiveMQ后台管理界面,删除对应topic,
然后重启项目,进行测试:
调用 http://localhost:8083/test/sendMessages接口,控制台结果如下:
说明 检查消费成功!
参考资料:https://blog.csdn.net/mycsdn6/article/details/106322223/
https://blog.csdn.net/qq_38403590/article/details/119773671
https://www.cnblogs.com/wuyoucao/p/10947940.html