pom文件添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.17.1</version></dependency>
配置 application.yml
server:
port:8080#springBoot项目访问端口
spring:
activemq:
broker-url: tcp://192.168.140.xx:30005#你activeMQ的ip和端口号
user: admin#activeMq账号
password: admin#activeMq密码
pool:
enabled:true#连接池启动
max-connections:10#最大连接数
SpringBoot 的启动类添加解 @EnableJms
@SpringBootApplication@EnableJmspublicclassMqconverterApplication{publicstaticvoidmain(String[] args){SpringApplication.run(MqconverterApplication.class, args);}}
创建 activeMQ 配置类
mportorg.apache.activemq.ActiveMQConnectionFactory;importorg.apache.activemq.command.ActiveMQQueue;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.jms.config.JmsListenerContainerFactory;importorg.springframework.jms.config.SimpleJmsListenerContainerFactory;importjavax.jms.ConnectionFactory;importjavax.jms.Queue;/**
* 配置类
*/@ConfigurationpublicclassActiveMQConfig{@Value("${spring.activemq.broker-url}")privateString brokerUrl;@Value("${spring.activemq.user}")privateString userName;@Value("${spring.activemq.password}")privateString password;@BeanpublicConnectionFactoryconnectionFactory(){returnnewActiveMQConnectionFactory(userName, password, brokerUrl);}/**
* 在 Queue 模式中,对消息的监听需要对containerFactory进行配置
*/@Bean("queueListener")publicJmsListenerContainerFactory<?>queueJmsListenerContainerFactory(ConnectionFactory connectionFactory){SimpleJmsListenerContainerFactory factory=newSimpleJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(false);return factory;}/**
* 在 topic 模式中,对消息的监听需要对containerFactory进行配置
*/@Bean("topicListener")publicJmsListenerContainerFactory<?>topicJmsListenerContainerFactory(ConnectionFactory connectionFactory){SimpleJmsListenerContainerFactory factory=newSimpleJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(true);return factory;}}
pubSubDomain:为true 代表 topic 模式,false 代表队列模式,默认是false
生产者
@RestControllerpublicclassActiveMQProducerController{/**
* 队列模式
* @param msg 消息
* @return
*/@PostMapping("/queue/{msg}")publicStringsendQueue(@PathVariable("msg")String msg){ActiveMQQueue queue=newActiveMQQueue("testq");
jmsMessagingTemplate.convertAndSend(queue, msg);return"success";}/**
* topic 模式
* @param msg 消息
* @return
*/@PostMapping("/topic/{msg}")publicStringsendTopic(@PathVariable("msg")String msg){ActiveMQTopic topic=newActiveMQTopic("test");
jmsMessagingTemplate.convertAndSend(topic, msg);return"success";}}
消费者
importcom.njc.mqconverter.property.KafkaTopicProperty;importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.jms.annotation.JmsListener;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.stereotype.Component;@Component@Slf4jpublicclassActiveMQConsumer{@AutowiredKafkaTemplate kafkaTemplate;/**
* queue模式的消费者
*/@JmsListener(destination="testq", containerFactory="queueListener")publicvoidreadActiveQueue(String message){String topic=KafkaTopicProperty.getSap();
kafkaTemplate.send(topic, message);
log.info("activeMQ 消费者接收:{}", message);}/**
* topic 模式的消费者
*/@JmsListener(destination="test", containerFactory="topicListener")publicvoidreadActivTopic1(String message){
log.info("activeMQ topic 消费者1接收:{}", message);}/**
* topic 模式的消费者
*/@JmsListener(destination="test", containerFactory="topicListener")publicvoidreadActivTopic2(String message){
log.info("activeMQ topic 消费者2接收:{}", message);}}
监听者的队列模式可以不配 containerFactory ,但topic的需要配置