一、消费失败重试
- 默认的队列监听是自动确认的,但是如果出现异常不会自动确认
- 默认的失败机制是不断重试,这样会影响mq性能
- 可以在配置文件中指定失败重试次数和重试间隔
pring:rabbitmq:...listener:simple:retry:enabled:truemax-attempts:3max-interval:1000
- 配置之后将会按照间隔时间重试三次,重试之后如果消息依然没有发送的话消息将会直接丢弃
- 但是这样直接丢弃消息在某些场景下并不合适,所以需要使用到死信队列,当消息不可达时由死信处理
二、死信队列应用
2.1 绑定死信交换器
- 死信队列指的是当消息消费失败或失败超过一定次数时可以让其进入死信队列,然后再让监听死信队列的线程处理,可以保证消息的可靠性
- 死信队列需要绑定到正常的队列上,所以如果原来交换器和队列已经有绑定需要先解绑才能正常绑定
- 声明死信队列和正常队列绑定,死信队列、交换器等通常使用dl/dlx进行标识,标识为死信
import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;import java.util.HashMap;import java.util.Hashtable;import java.util.Map;@ConfigurationpublicclassSubscribeExchangeConfig{@Beanpublic DirectExchangeemailDlxDirectExchange(){return ExchangeBuilder.directExchange("exchange.direct.dlx.springboot.email").build();}@Beanpublic QueueemailDlxQueue(){return QueueBuilder.durable("queue.direct.dlx.springboot.email").build();}@Bean@Resourcepublic BindingemailDlxBiding(Queue emailDlxQueue, DirectExchange emailDlxDirectExchange){return BindingBuilder.bind(emailDlxQueue).to(emailDlxDirectExchange).with("springboot.email.dlk.routing.key");}@Beanpublic DirectExchangeemailDirectExchange(){return ExchangeBuilder.directExchange("exchange.direct.springboot.email").build();}@Beanpublic QueueemailQueue(){
Map<String, Object> params=newHashtable<>(4);
params.put("x-dead-letter-exchange","exchange.direct.dlx.springboot.email");
params.put("x-dead-letter-routing-key","springboot.email.dlk.routing.key");return QueueBuilder.durable("queue.direct.springboot.email").withArguments(params).build();}@Bean@Resourcepublic BindingemailBiding(Queue emailQueue, DirectExchange emailDirectExchange){return BindingBuilder.bind(emailQueue).to(emailDirectExchange).with("springboot.email.routing.key");}}
- 绑定之后可以在管理界面看到正常队列上多了DLX(死信交换器)和DLK(死信路由键)参数

2.1 监听死信队列
- 死信队列监听,按照正常监听队列监听即可
@RabbitListener(queues="queue.direct.dlx.springboot.email")publicvoiddlxReceiver(String msg){
System.out.println("dlxReceiver = "+ msg);}
2.2 死信队列测试
@RabbitListener(queues="queue.direct.springboot.email")publicvoidreceiver01(String msg){
Integer.parseInt("a");
System.out.println("receiver01 message = "+ msg);}@RabbitListener(queues="queue.direct.dlx.springboot.email")publicvoiddlxReceiver(String msg){
System.out.println("dlxReceiver = "+ msg);}
- 在正常队列中模拟异常,然后往队列中发送消息,当三次之后(配置文件配置的重试)就会从正常队列中移除然后加入到死信队列中,然后死信队列中将会监听到信息
三、消息消费确认
- 当消费消息的时候需要回应消息是否确认消费或者需要拒绝
- 如果消费一条消息如果不确认,将会是unacked状态,然后过期后将会又重新入队
- 确认或拒绝需要在监听器处指定Channel参数,Message参数可选
- org.springframework.amqp.core.Message
- deliveryTag获取:message.getMessageProperties().getDeliveryTag()
- com.rabbitmq.client.Channel
- 消息确认:basicAck(long deliveryTag, boolean multiple) throws IOException
- multiple:是否批量确认,将会确认当前tag及之前的全部,谨慎false
- 消息拒绝:basicReject(long deliveryTag, boolean requeue) throws IOException
- requeue:是否重新入队,false从队列中直接丢弃,true将会重新入队,谨慎false
- 批量消息确认:basicAck(long deliveryTag, boolean multiple) throws IOException
- 批量消息拒绝:basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException
@RabbitListener(queues="queue.direct.dlx.springboot.email")publicvoiddlxReceiver(String msg, Message message, Channel channel){
System.out.println("dlxReceiver = "+ msg);try{
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("消息确认成功");}catch(IOException e){
System.out.println(e.getMessage());}}
四、重复消费解决
- 为了避免因为网络导致消息确认失败二导致消息重复消费,需要处理此种情况
- 推荐解决方法之一就是使用redis来保证消息的幂等,可以将消费和放入redis中作为一个事务
- 需要考虑:消费成功了但是放入redis失败
- 将消费和放入缓存放入同一个事务,但是也需要注意事务提交失败,缓存存入成功
- 如果缓存放入失败则事务异常
- 缓存有效期保证,例如多少时间前的应该删除等
- redis成功,但是事务保存失败
- 事务失败清除key,但是可能清除key失败,所以此时需要对此种情况必须要确认key清除成功
@RabbitListener(queues="queue.direct.springboot.email")publicvoidreceiver(Message message)throws Exception{
String messageId= message.getMessageProperties().getMessageId();if(messageId!= null&&!cacheUtil.exists(messageId)){
String msg=newString(message.getBody());
cacheUtil.set(messageId,true);}else{
System.out.println("已经消费过了");}}