Spring-RabbitMQ 消费者消息确认案例实践

2022-09-01 11:57:17

Springboot 版本: 2.7.0

消费者消息确认模式分类

  1. NONE:等同于rabbitMQ客户端的自动确认,只要投递了就认为是成功的。
  2. MANUAL:需要用户通过 channel 的 ack/nack 手动确认。
  3. AUTO(默认值):自动模式,消费者正常执行结束认为成功,报错认为失败。

代码实现

配置类:

@Slf4j@ConfigurationpublicclassRabbitConfiguration{publicfinalstaticString TOPIC_EXCHANGE="myExchange";publicfinalstaticString QUEUE_NAME="myQueue";@BeanpublicRabbitAdminamqpAdmin(ConnectionFactory connectionFactory){returnnewRabbitAdmin(connectionFactory);}@BeanpublicRabbitTemplaterabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate template=newRabbitTemplate(connectionFactory);
        template.setMessageConverter(jsonConverter());
        template.setExchange(TOPIC_EXCHANGE);
        template.setConfirmCallback((correlationData, ack, cause)->{if(ack){
                log.info("消息:{}发送成功", correlationData.getId());}else{
                log.error("消息:{}发送失败,失败原因为:{}", correlationData.getId(), cause);}});

        template.setMandatory(true);
        template.setReturnsCallback(returned->{
            log.error("消息:{}路由失败, 失败原因为:{}", returned.getMessage().toString(), returned.getReplyText());});return template;}@BeanpublicTopicExchangetopicExchange(){returnnewTopicExchange(TOPIC_EXCHANGE,true,false);}@BeanpublicQueuequeue(){returnnewQueue(QUEUE_NAME);}@BeanpublicBindingbinding(){returnBindingBuilder.bind(queue()).to(topicExchange()).with("my.test.*");}@BeanpublicJackson2JsonMessageConverterjsonConverter(){returnnewJackson2JsonMessageConverter();}}

配置文件:

spring:rabbitmq:host: localhostport:5672username: adminpassword: adminvirtual-host: my_vhost# 消息确认(ACK)publisher-confirm-type: CORRELATED#correlated #确认消息已发送到交换机(Exchange)publisher-returns:true#确认消息已发送到队列(Queue)

生产者:

@ComponentpublicclassPublisherService{@AutowiredprivateRabbitTemplate rabbitTemplate;publicvoidsend(){CorrelationData correlationData=newCorrelationData();
        rabbitTemplate.convertAndSend("my.test.message",newUser("Kleven",18), correlationData);}}
@Data@NoArgsConstructor@AllArgsConstructor@ToStringpublicclassUserimplementsSerializable{privatestaticfinallong serialVersionUID=-5079682733940745661L;privateString name;privateInteger age;}

模式一、NONE

当确认模式设置为NONE时,只要中间件投递了消息就认为成功并将消息从队列中移除。

@RabbitListener(queues="myQueue", messageConverter="jsonConverter", ackMode="NONE")publicvoidnoneAckListener(User user){
        log.info("收到消息 -> {}", user);// 添加个错误用于测试int a=1/0;}

结果:
可以看到,即使消费者出错了,队列中的消息依然被删除了。Spring-RabbitMQ 消费者消息确认案例实践

模式二、MANUAL

channel.basicAck 确认一个或多个消息

/**
* @param deliveryTag 当前消息的投递标签,是一个自增的数字。
* @param multiple true:确认 deliveryTag <= 当前消息deliveryTag 的所有消息; false:只确认当前收到的消息。
*/voidbasicAck(long deliveryTag,boolean multiple)throwsIOException;
@AutowiredprivateJackson2JsonMessageConverter jsonConverter;@RabbitListener(queues="myQueue", ackMode="MANUAL")publicvoidmanualAckListener(Message message,Channel channel)throwsIOException{long deliveryTag= message.getMessageProperties().getDeliveryTag();

        log.info("成功消费消息 -> {}", jsonConverter.fromMessage(message));

        channel.basicAck(deliveryTag,false);}

结果:
消息消费成功,且从队列中删除。

消息:aaa9b3b7-85b4-42fb-8a12-0aad488817f1发送成功
成功消费消息 -> User(name=Kleven,age=18)

Spring-RabbitMQ 消费者消息确认案例实践

channel.basicNack 拒绝一个或多个消息

/**
     *
     * @param multiple 拒绝 deliveryTag <= 当前消息deliveryTag 的所有消息; false:只拒绝当前收到的消息。
     * @param requeue true 将拒绝对的消息重新加入队列。
     */voidbasicNack(long deliveryTag,boolean multiple,boolean requeue)throwsIOException;
@AutowiredprivateJackson2JsonMessageConverter jsonConverter;@RabbitListener(queues="myQueue", ackMode="MANUAL")publicvoidmanualAckListener(Message message,Channel channel)throwsIOException{long deliveryTag= message.getMessageProperties().getDeliveryTag();

        log.info("消费消息 -> {}", jsonConverter.fromMessage(message));

        channel.basicNack(deliveryTag,false,true);}

结果:
当 requeue 为 true时,拒绝消息后消息从重新入队,可以看到队列中任然有一条数据。
当 requeue 为 false时,拒绝消息后消息也还是从队列中删除掉了。

Spring-RabbitMQ 消费者消息确认案例实践

模式三、AUTO

默认值,消费者成功时认为成功并从队列中删除消息。消费者失败时认为失败,不会从队列中删除消息。

@RabbitListener(queues="myQueue", messageConverter="jsonConverter")publicvoidautoAckListener(User user){
        log.info("收到消息 -> {}", user);// 添加个错误用于测试int a=1/0;}

结果:
可以看到,消费者出错后,消息依然在队列中。当移除消费者中的错误代码后,成功消费消息后,队列中的数据被删除。
Spring-RabbitMQ 消费者消息确认案例实践

  • 作者:余数kl
  • 原文链接:https://blog.csdn.net/u012359704/article/details/126381251
    更新时间:2022-09-01 11:57:17