第十节 死信队列

2023-03-28 14:28:11

一、基本功能

        场景一:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。

        场景二:转账申请提交了,预计2小时内到账。

        从上述常见的场景一开始入手。

        从下订单(未付款)开始,这个订单,可以视为一种消息,就进入死信队列。进入死信队列的特点就是消息有时间限制。

        如果这个订单30分钟后还没有付款,那么这个消息存活时间就到了。

        于是就把这个订单从死信队列转移到另外一个专门用于取消订单的队列。

        核心功能总结:投递到死信交换机的消息,将会有时间限制。时间到了以后,如果仍然没有被消费。那么这个消息将会被转发到创建它的死信交换机中。死信交换机如果没有绑定任何真实队列,那么这个消息就丢失了。否则,将这个消息投递到真实队列中。

二、流程图

        从右往左看 

        消息源:视为我们下单了这个动作。

        面向生产端的交换机:下单的消息,被此交换机 投递到 死信队列。

        死信队列:下单消息超过30分钟后,用户仍然没有付款,那么这个消息,需要被发送到死信交换机。

        死信交换机:死信交换机再将这个消息投递到消费者队列。

        消费者队列:获取到用户没有付款的订单号。

        消费端监听者:取出消费者队列中的订单号数据。根据订单号,删除数据库中此次订单记录。

        消息生产者向生产端的路由器发送消。路由器将此消息根据路由键,投递到死信队列。此消息根据死信队列配置,将会有X秒中的生存时间。X秒种后,消息生存时间TTL到了。那么,此消息就会被投递到死信交换机中。死信交换机会把此消息重新投递到真实队列。监听真实队列的消费者就能监听到消息了。

三、核心代码

#自定义参数
defineProps:
    rabbit: #MQ队列名称
        direct:
            #死信交换机名称
            dead.exchage: local::mq10:dead.exchange
            #死信路由键名称
            dead.routing.key: mq10::dead.routeKey
            #死信队列名称
            dead.queue: local::mq10:dead.queue

            #面向生产端的交换机
            produce.exchange: local::mq10:produce.exchange
            #死信队列绑定面向生产端的路由键
            produce.routing.key: mq10::routeKey
            #接收端真正的消费监听队列
            real.queue: local::mq10:real.queue
@Configuration
@Getter
public class RabbitMQConfig {


    @Value("${defineProps.rabbit.direct.dead.exchage}")
    private String deadExchange;

    @Value("${defineProps.rabbit.direct.dead.queue}")
    private String deadQueue;

    @Value("${defineProps.rabbit.direct.dead.routing.key}")
    private String deadRoutingKey;

    @Value("${defineProps.rabbit.direct.produce.exchange}")
    private String produceExchange;

    @Value("defineProps.rabbit.direct.produce.routing.key")
    private String produceRoutingKey;

    @Value("${defineProps.rabbit.direct.real.queue}")
    private String realQueue;


    //第一步:创建死信队列
    @Bean
    public Queue simpleDeadQueue() {
        Map<String, Object> map = Maps.newHashMapWithExpectedSize(5);
        /**
         * 死信队列由死信交换机创建
         * 需要指明死信路由键 以及 消息存活时间
         */
        map.put("x-dead-letter-exchange", deadExchange);
        map.put("x-dead-letter-routing-key", deadRoutingKey);
        map.put("x-message-ttl", 10000);
        return new Queue(deadQueue, true, false, false, map);
    }

    //第二步:创建生产端交换机
    @Bean
    public TopicExchange produceExchange() {
        return new TopicExchange(produceExchange, true, false);
    }

    //第三步:创建绑定:死信队列绑定到生产端
    @Bean
    public Binding simpleDeadBinding() {
        return BindingBuilder.bind(simpleDeadQueue()).to(produceExchange()).with(produceRoutingKey);
    }

    //第四步:创建实际的消费队列
    @Bean
    public Queue realQueue() {
        return new Queue(realQueue, true, false, false, null);
    }

    //第五步:创建死信交换机
    @Bean
    public TopicExchange simpleDeadRealExchange(){
        return new TopicExchange(deadExchange, true, false);
    }

    //第六步:将实际队列绑定到死信交换机.路由键使用死信路由键
    @Bean
    public Binding simpleRealDeadBinding(){
        return BindingBuilder.bind(realQueue()).to(simpleDeadRealExchange()).with(deadRoutingKey);
    }

         生产端核心代码

@RequestMapping(PREFIX + "test")
    public void test() {
        template.setExchange(config.getProduceExchange());
        template.setRoutingKey(config.getProduceRoutingKey());
        template.convertAndSend("死信队列来辣!");
    }

        接收端核心代码

@RabbitHandler
    public void handleMessage(@Payload String msg, @Headers Channel channel, Message message) throws IOException {
        try {
            LOGGER.info("接收到消息:{}", msg);
            long tag = message.getMessageProperties().getDeliveryTag();
            //确定消费
            channel.basicAck(tag, false);
        } catch (IOException e) {
            LOGGER.error("消费异常");
            long tag = message.getMessageProperties().getDeliveryTag();
            //第三个参数,是否重新入队列,让别的消费者消费
            //设置为false,那么这个消息就真的被丢弃了
            channel.basicNack(tag,false,true);
        }
    }

        效果:

         (1)首先向Controller发送一个请求。

         (2)Controller构造消息,向生产端路由器投递。路由器将此消息投递到了死信队列。

         (3)我们查看网页版的RabbitMQ服务器,查看死信队列的状态。发现有一条数据等待消费。过了10秒种后(由死信队列设置),消息没有被消费。那么消息将会被投递到死信路由器中。

         (4)死信路由器再把这个消息投递给真实的队列。就是动图中,死信队列的下面一个队列。

         (5)于是真实队列中就有了一条数据,那么监听此真实队列的消费者就能够从队列中获取消息。

         (6)最后是消费者接收到消息。最后手动提醒RabbitMQ服务器,消息已经被正确消费。真实队列的Unacked从1变成0

真实业务

        用户下单表中维护了一条下单记录,使用Id作为主键,使用status作为下单状态。

        用户下单后,首先在数据库中插入一条下单记录。Controller把此下单记录的ID作为消息发送到死信队列中。 

        消息在死信队列X秒后,寿终正寝。此时,死信队列将此消息(即订单的ID)投递到真实队列中。

        监听真实队列的消费者,发现真实队列中有消息,因此就监听到用户的下单ID。再根据ID去数据库中查询下单记录。

                  如果下单记录的status为"未支付",说明用户在规定时间内仍然没有付款。将status更新为用户已经取消。

                  如果下单记录的status为"已支付",说明用户在规定时间内已经付款。这个时候可以做"减少库存,记录日志"等异步操作。

源代码下载 

        源代码地址:https://github.com/hairdryre/Study_RabbitMQ

  • 作者:小大宇
  • 原文链接:https://zhoutianyu.blog.csdn.net/article/details/89953846
    更新时间:2023-03-28 14:28:11