@RocketMQMessageListener
@RocketMQMessageListener 标记消息消费者,可以简化代码。
consumerGroup:消费组,全局唯一
topic:主题
selectorExpression : 标签,默认" *" 全部
consumeMode :
- ConsumeMode.CONCURRENTLY 多线程并发竞争接受消费,不能保证消息的有效性,
- ConsumeMode.ORDERLY 一个队列一个线程,有序的接受消息
messageModel : 消息模式: MessageModel.CLUSTERING 点对点(默认)MessageModel.BROADCASTING 广播
RocketMQ事务
1、Producer 发送事务消息
rocketMQTemplate.sendMessageInTransaction
Producer (MQ发送方)发送事务消息至MQ Server,MQ Server将消息状态标记为Prepared(预备状态),注意此时这条消息消费者(MQ订阅方)是无法消费到的。
2、Producer 实现接口 RocketMQLocalTransactionListener
2.1、重写方法 RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg)
Producer 端执行业务代码逻辑,通过本地数据库事务控制。
若Producer 本地事务执行成功则自动向MQServer发送COMMIT消息,此时MQ订阅方才可以正常消费消息;
若Producer 本地事务执行失败则自动向MQServer发送ROLLBACK消息,MQ Server接收到ROLLBACK消息后 将删除此消息 。
2.2、重写方法 RocketMQLocalTransactionState checkLocalTransaction(Message message)
如果执行Producer端本地事务过程中,执行端挂掉,网络异常,或者超时,导致消息的状态一直是Prepared(预备状态),MQ Server有一定时器会不停的询问 Producer的checkLocalTransaction方法来获取事务执行状态,这个过程叫事务回查。MQ Server会根据事务回查结果来决定是否投递消息。
3,消费者消费
MQ订阅消费消息,消费成功则向MQ回应ack,否则将重复接收消息。这里ack默认自动回应,即程序执行正常则自动回应ack。
1,引用jar包
build.gradle文件添加jar包引用
compile group: 'org.apache.rocketmq', name: 'rocketmq-spring-boot-starter', version: '2.1.1'
2,配置
application.properties 配置文件
###rocketmq###
rocketmq.name-server=192.168.1.3:9876
rocketmq.producer.group=app-demp
rocketmq.producer.timeout=4000
3,生产者
MsgSender 消息发送接口
public interface MsgSender {
/**
* 发送消息
*
* @param data 消息信息
* @param topic 主题
*/
void sendMessage(String topic, Object data);
/**
* 发送消息
*
* @param data 消息信息
* @param topic 主题
* @param tags 主题的标签
*/
void sendMessage(String topic, String tags, Object data);
/**
* 发送消息(支持分布式事务)
*
* @param data 消息信息
* @param topic 主题
*/
void sendMessageInTransaction(String topic, Object data);
/**
* 发送消息(支持分布式事务)
*
* @param data 消息信息
* @param topic 主题
* @param tags 主题的标签
*/
void sendMessageInTransaction(String topic, String tags, Object data);
}
MsgSenderTemplateService 消息发送实现
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Slf4j
@Component
public class MsgSenderTemplateService implements MsgSender {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Override
public void sendMessage(String topic, Object data) {
rocketMQTemplate.convertAndSend(topic, data);
log.info("发送MQ成功:message={}", JSON.toJSONString(data));
}
@Override
public void sendMessage(String topic, String tags, Object data) {
rocketMQTemplate.convertAndSend(String.format("%s:%s", topic, tags), data);
log.info("发送MQ成功:message={}", JSON.toJSONString(data));
}
@Override
public void sendMessageInTransaction(String topic, Object data) {
Message<?> message = MessageBuilder.withPayload(data).build();
rocketMQTemplate.sendMessageInTransaction(topic, message, null);
log.info("发送MQ成功:message={}", JSON.toJSONString(data));
}
@Override
public void sendMessageInTransaction(String topic, String tags, Object data) {
MessageBuilder<Object> messageBuilder = MessageBuilder.withPayload(data);
messageBuilder.setHeader("msg", JSON.toJSONString(data));
rocketMQTemplate.sendMessageInTransaction(String.format("%s:%s", topic, tags), messageBuilder.build(), null);
log.info("发送MQ成功:message={}", JSON.toJSONString(data));
}
}
OrderProducer 订单发送普通消息
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* @author qizenan
* @date 2020-9-9
**/
@Service
public class OrderProducer {
@Resource
private MsgSender msgSender;
public void createOrder() {
Map<String, Object> orderInfo = new HashMap<>();
orderInfo.put("orderId", UUID.randomUUID().toString());
orderInfo.put("price", 10000);
orderInfo.put("description", "我是注册订单,请尽快处理");
msgSender.sendMessage("TEMP", "order", orderInfo);
}
}
4,消费者
OrderConsumer 消费订单消息
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @author qizenan
* @date 2020-9-9
**/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "TEMP-GROUP", topic = "TEMP",selectorExpression ="order" )
public class OrderConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
String message = new String(messageExt.getBody());
log.info("收到消息,topic:{}, tag:{}, msgId:{}, body:{}", messageExt.getTopic(), messageExt.getTags(),
messageExt.getMsgId(), message);
Map<Integer, Object> orderInfo = JSON.parseObject(messageExt.getBody(), Map.class);
log.info("订单信息 orderInfo = {} ", orderInfo.toString());
}
}
发生者运行结果
消费者运行结果
5,RocketMQ的分布式事务
OrderProducer 订单发送事件消息
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* @author qizenan
* @date 2020-9-9
**/
@Service
public class OrderProducer {
@Resource(name = "msgSenderTemplateService")
private MsgSender msgSender;
public void createOrder() {
Map<String, Object> orderInfo = new HashMap<>();
orderInfo.put("orderId", UUID.randomUUID().toString());
orderInfo.put("price", 10000);
orderInfo.put("description", "我是注册订单,请尽快处理");
msgSender.sendMessageInTransaction("TEMP", "order", orderInfo);
}
}
实现RocketMQLocalTransactionListener
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @author qizenan
* @date 2020-9-9
**/
@Slf4j
@Component
@RocketMQTransactionListener()
public class ProducerListener implements RocketMQLocalTransactionListener {
/**
* RocketMQ的Producer本地事务:先执行本地的业务代码(使用Spring的事件管理),判断是否成功。
* 成功返回: RocketMQLocalTransactionState.COMMIT,失败返回:RocketMQLocalTransactionState.ROLLBACK
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
Map orderInfo = checkDestination(message, "TEMP", "order", Map.class);
if (orderInfo != null) {
try {
log.info("执行本地订单业务逻辑 orderInfo={} ", orderInfo.toString());
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
return RocketMQLocalTransactionState.COMMIT;
}
/**
* 根据 topic,tag 获取发送者的信息
*
* @param message RockeRtMQ的消息
* @param topic 主题
* @param tag 标签
* @param tClass 生产者发生的消息的class
* @return 生产者发生的消息的
*/
private <T> T checkDestination(Message message, String topic, String tag, Class<T> tClass) {
String destination = topic;
if (StringUtils.isNotBlank(tag)) {
destination += ":" + tag;
}
MessageHeaders headers = message.getHeaders();
String msgDestination = headers.get("rocketmq_TOPIC", String.class);
String msgTag = headers.get("rocketmq_TAGS", String.class);
if (StringUtils.isNotBlank(msgTag)) {
msgDestination += ":" + msgTag;
}
if (!destination.equals(msgDestination)) {
return null;
}
Object msg = headers.get("msg");
if (msg == null) {
return null;
}
try {
return JSON.parseObject((String) msg, tClass);
} catch (Exception e) {
log.error("msgDestination = {} 转化 {} 异常", msgDestination, tClass.getName(), e);
}
return null;
}
/**
* 因为网络异常或其他原因时,RocketMQ的消息状态处于UNKNOWN时,调用该方法检查Producer的本地事务是否已经执行成功,
* 成功返回: RocketMQLocalTransactionState.COMMIT,失败返回:RocketMQLocalTransactionState.ROLLBACK
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
Map orderInfo = checkDestination(message, "TEMP", "order", Map.class);
if (orderInfo != null) {
boolean isSuccess = true;
log.info("查询本地订单是否已经执行成功 orderInfo={},isSuccess={} ", orderInfo.toString(), isSuccess);
if (isSuccess) {
return RocketMQLocalTransactionState.COMMIT;
} else {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
return RocketMQLocalTransactionState.COMMIT;
}
}
生产者执行结果
消费者执行结果