SpringBoot整合RabbitMQ(Topic交换器)

2022-06-19 10:49:56

Topic交换器特点:

各个队列与Topic交换器之间的路由键配置模糊,发送者发送一条消息,只要路由键符合规则的队列就能接收到消息。

创建消息生产者module:配置交换器名称和路由键信息。消息发送时,需要指定交换器和路由键。

pom.xml导入RabbitMQ坐标

<dependency>
?? ?<groupId>org.springframework.boot</groupId>
?? ?<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.properties配置RabbitMQ信息

#RabbitMQ基本信息
spring.rabbitmq.host=182.61.40.184
spring.rabbitmq.port=5672
spring.rabbitmq.username=rabbitmq
spring.rabbitmq.password=rabbitmq

#自定义配置
#设置交换器名称
spring.rabbitmq.exchange=topicExchange
#设置路由键 user.log.info
spring.rabbitmq.routingkey.user.info=user.log.info
#设置路由键 user.log.error
spring.rabbitmq.routingkey.user.error=user.log.error
#设置路由键 user.log.warn
spring.rabbitmq.routingkey.user.warn=user.log.warn
#设置路由键 order.log.info
spring.rabbitmq.routingkey.order.info=order.log.info
#设置路由键 order.log.error
spring.rabbitmq.routingkey.order.error=order.log.error
#设置路由键 order.log.warn
spring.rabbitmq.routingkey.order.warn=order.log.warn

创建Controller,通过浏览器请求,发送消息到RabbitMQ

@Controller
public class RabbitMQController {
	
	@Autowired
	AmqpTemplate amqpTemplat;
	
	@Value("${spring.rabbitmq.exchange}")
	String exchangeName;			//交换器名称
	
	@Value("${spring.rabbitmq.routingkey.user.info}")
	String routingKeyUserInfo;		//路由键user.log.info
	
	@Value("${spring.rabbitmq.routingkey.user.error}")
	String routingKeyUserError;		//路由键user.log.error
	
	@Value("${spring.rabbitmq.routingkey.user.warn}")
	String routingKeyUserWarn;		//路由键user.log.warn
	
	@Value("${spring.rabbitmq.routingkey.order.info}")
	String routingKeyOrderInfo;		//路由键order.log.info
	
	@Value("${spring.rabbitmq.routingkey.order.error}")
	String routingKeyOrderError;	//路由键order.log.error
	
	@Value("${spring.rabbitmq.routingkey.order.warn}")
	String routingKeyOrderWarn;		//路由键order.log.warn

	
	
	
	@RequestMapping(value="/userLogInfo",method=RequestMethod.GET)
	@ResponseBody
	public void userLogInfo() throws InterruptedException {
		while (true) {
			Thread.sleep(1000);
			//参数1:交换器       参数2:路由键       参数3:要发送的消息
			amqpTemplat.convertAndSend(exchangeName,routingKeyUserInfo,"user:log.info");  
		}
	}
	
	@RequestMapping(value="/userLogError",method=RequestMethod.GET)
	@ResponseBody
	public void userLogError() throws InterruptedException {
		while (true) {
			Thread.sleep(1000);
			//参数1:交换器       参数2:路由键       参数3:要发送的消息
			amqpTemplat.convertAndSend(exchangeName,routingKeyUserError,"user:log.error");  
		}
	}
	
	@RequestMapping(value="/userLogWarn",method=RequestMethod.GET)
	@ResponseBody
	public void userLogWarn() throws InterruptedException {
		while (true) {
			Thread.sleep(1000);
			//参数1:交换器       参数2:路由键       参数3:要发送的消息
			amqpTemplat.convertAndSend(exchangeName,routingKeyUserWarn,"user:log.warn");  
		}
	}
	
	
	
	@RequestMapping(value="/orderLogInfo",method=RequestMethod.GET)
	@ResponseBody
	public void orderLogInfo() throws InterruptedException {
		while (true) {
			Thread.sleep(1000);
			//参数1:交换器       参数2:路由键       参数3:要发送的消息
			amqpTemplat.convertAndSend(exchangeName,routingKeyOrderInfo,"order:log.info");  
		}
	}
	
	@RequestMapping(value="/orderLogError",method=RequestMethod.GET)
	@ResponseBody
	public void orderLogError() throws InterruptedException {
		while (true) {
			Thread.sleep(1000);
			//参数1:交换器       参数2:路由键       参数3:要发送的消息
			amqpTemplat.convertAndSend(exchangeName,routingKeyOrderError,"order:log.error");  
		}
	}
	
	@RequestMapping(value="/orderLogWarn",method=RequestMethod.GET)
	@ResponseBody
	public void orderLogWarn() throws InterruptedException {
		while (true) {
			Thread.sleep(1000);
			//参数1:交换器       参数2:路由键       参数3:要发送的消息
			amqpTemplat.convertAndSend(exchangeName,routingKeyOrderWarn,"order:log.warn");  
		}
	}
	
}

创建消息消费者module:配置队列名称,交换器名称和类型,路由键信息和它们之间绑定信息,编写队列消息处理组件。

pom.xml导入RabbitMQ坐标

<dependency>
?? ?<groupId>org.springframework.boot</groupId>
?? ?<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.properties配置RabbitMQ信息

#RabbitMQ基本信息
spring.rabbitmq.host=182.61.40.184
spring.rabbitmq.port=5672
spring.rabbitmq.username=rabbitmq
spring.rabbitmq.password=rabbitmq

#自定义配置
#设置交换器名称
spring.rabbitmq.exchange=topicExchange
#设置队列A名称 log.info
spring.rabbitmq.queueA=log.info
#设置队列A的路由键
spring.rabbitmq.routingkeyA=*.log.info
#设置队列B的名称 log.error
spring.rabbitmq.queueB=log.error
#设置队列B的路由键
spring.rabbitmq.routingkeyB=*.log.error
#设置队列C名称 log
spring.rabbitmq.queueC=log
#设置队列C的路由键
spring.rabbitmq.routingkeyC=*.log.*

队列A处理消息组件

@Component
@RabbitListener(
	bindings = @QueueBinding(
		value = @Queue(value = "${spring.rabbitmq.queueA}",autoDelete = "true"),
		exchange = @Exchange(value="${spring.rabbitmq.exchange}",type = ExchangeTypes.TOPIC),
		key = "${spring.rabbitmq.routingkeyA}"
	)
)
public class RabbitMQReciveA {
	
	@RabbitHandler
	public void reciveMsg(String msg) {
		System.out.println("A:log.info   " + msg+".........");
	}
}

队列B处理消息组件

@Component
@RabbitListener(
	bindings = @QueueBinding(
		value = @Queue(value = "${spring.rabbitmq.queueB}",autoDelete = "true"),
		exchange = @Exchange(value="${spring.rabbitmq.exchange}",type = ExchangeTypes.TOPIC),
		key = "${spring.rabbitmq.routingkeyB}"
	)
)
public class RabbitMQReciveB {
	
	@RabbitHandler
	public void reciveMsg(String msg) {
		System.out.println("B:log.error   " + msg+".........");
	}
}

队列C处理消息组件

@Component
@RabbitListener(
	bindings = @QueueBinding(
		value = @Queue(value = "${spring.rabbitmq.queueC}",autoDelete = "true"),
		exchange = @Exchange(value="${spring.rabbitmq.exchange}",type = ExchangeTypes.TOPIC),
		key = "${spring.rabbitmq.routingkeyC}"
	)
)
public class RabbitMQReciveC {
	@RabbitHandler
	public void reciveMsg(String msg) {
		System.out.println("C:log   " + msg+".........");
	}
}

测试:

启动两个module,访问Controller

  • 作者:m0_67393828
  • 原文链接:https://blog.csdn.net/m0_67393828/article/details/123789265
    更新时间:2022-06-19 10:49:56