SpringBoot和RocketMQ的实例RocketMQTemplate

2023年2月19日10:29:15

@RocketMQMessageListener

@RocketMQMessageListener 标记消息消费者,可以简化代码。

consumerGroup:消费组,全局唯一

topic:主题

selectorExpression : 标签,默认" *" 全部

consumeMode :

  1. ConsumeMode.CONCURRENTLY 多线程并发竞争接受消费,不能保证消息的有效性,
  2. ConsumeMode.ORDERLY 一个队列一个线程,有序的接受消息

messageModel : 消息模式: MessageModel.CLUSTERING 点对点(默认)MessageModel.BROADCASTING 广播

RocketMQ事务

1、Producer 发送事务消息

rocketMQTemplate.sendMessageInTransaction

Producer (MQ发送方)发送事务消息至MQ Server,MQ Server将消息状态标记为Prepared(预备状态),注意此时这条消息消费者(MQ订阅方)是无法消费到的。

2、Producer 实现接口 RocketMQLocalTransactionListener

2.1、重写方法 RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg)

Producer 端执行业务代码逻辑,通过本地数据库事务控制。

若Producer 本地事务执行成功则自动向MQServer发送COMMIT消息,此时MQ订阅方才可以正常消费消息;

若Producer 本地事务执行失败则自动向MQServer发送ROLLBACK消息,MQ Server接收到ROLLBACK消息后 将删除此消息 。

2.2、重写方法 RocketMQLocalTransactionState checkLocalTransaction(Message message)

如果执行Producer端本地事务过程中,执行端挂掉,网络异常,或者超时,导致消息的状态一直是Prepared(预备状态),MQ Server有一定时器会不停的询问 Producer的checkLocalTransaction方法来获取事务执行状态,这个过程叫事务回查。MQ Server会根据事务回查结果来决定是否投递消息。

3,消费者消费

MQ订阅消费消息,消费成功则向MQ回应ack,否则将重复接收消息。这里ack默认自动回应,即程序执行正常则自动回应ack。

1,引用jar包

build.gradle文件添加jar包引用

compile group: 'org.apache.rocketmq', name: 'rocketmq-spring-boot-starter', version: '2.1.1'

2,配置

application.properties 配置文件

###rocketmq###
rocketmq.name-server=192.168.1.3:9876
rocketmq.producer.group=app-demp
rocketmq.producer.timeout=4000

3,生产者

MsgSender 消息发送接口

public interface MsgSender {
    /**
     * 发送消息
     *
     * @param data  消息信息
     * @param topic 主题
     */
    void sendMessage(String topic, Object data);

    /**
     * 发送消息
     *
     * @param data  消息信息
     * @param topic 主题
     * @param tags  主题的标签
     */
    void sendMessage(String topic, String tags, Object data);
    
    /**
     * 发送消息(支持分布式事务)
     *
     * @param data  消息信息
     * @param topic 主题
     */
    void sendMessageInTransaction(String topic, Object data);

    /**
     * 发送消息(支持分布式事务)
     *
     * @param data  消息信息
     * @param topic 主题
     * @param tags  主题的标签
     */
    void sendMessageInTransaction(String topic, String tags, Object data);
}

MsgSenderTemplateService 消息发送实现


import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Slf4j
@Component
public class MsgSenderTemplateService implements MsgSender {
    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public void sendMessage(String topic, Object data) {
        rocketMQTemplate.convertAndSend(topic, data);
        log.info("发送MQ成功:message={}", JSON.toJSONString(data));
    }

    @Override
    public void sendMessage(String topic, String tags, Object data) {
        rocketMQTemplate.convertAndSend(String.format("%s:%s", topic, tags), data);
        log.info("发送MQ成功:message={}", JSON.toJSONString(data));
    }

    @Override
    public void sendMessageInTransaction(String topic, Object data) {
        Message<?> message = MessageBuilder.withPayload(data).build();
        rocketMQTemplate.sendMessageInTransaction(topic, message, null);
        log.info("发送MQ成功:message={}", JSON.toJSONString(data));
    }

    @Override
    public void sendMessageInTransaction(String topic, String tags, Object data) {
        MessageBuilder<Object> messageBuilder = MessageBuilder.withPayload(data);
        messageBuilder.setHeader("msg", JSON.toJSONString(data));

        rocketMQTemplate.sendMessageInTransaction(String.format("%s:%s", topic, tags), messageBuilder.build(), null);
        log.info("发送MQ成功:message={}", JSON.toJSONString(data));
    }
}

OrderProducer 订单发送普通消息

import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;


/**
 * @author qizenan
 * @date 2020-9-9
 **/
@Service
public class OrderProducer {
    @Resource
    private MsgSender msgSender;

    public void createOrder() {
        Map<String, Object> orderInfo = new HashMap<>();
        orderInfo.put("orderId", UUID.randomUUID().toString());
        orderInfo.put("price", 10000);
        orderInfo.put("description", "我是注册订单,请尽快处理");

        msgSender.sendMessage("TEMP", "order", orderInfo);
    }

}

4,消费者

OrderConsumer 消费订单消息

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.util.Map;


/**
 * @author qizenan
 * @date 2020-9-9
 **/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "TEMP-GROUP", topic = "TEMP",selectorExpression ="order" )
public class OrderConsumer implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        String message = new String(messageExt.getBody());
        log.info("收到消息,topic:{}, tag:{}, msgId:{}, body:{}", messageExt.getTopic(), messageExt.getTags(),
                messageExt.getMsgId(), message);

        Map<Integer, Object> orderInfo = JSON.parseObject(messageExt.getBody(), Map.class);
        log.info("订单信息 orderInfo = {} ", orderInfo.toString());
    }

}

发生者运行结果
SpringBoot和RocketMQ的实例RocketMQTemplate

消费者运行结果
SpringBoot和RocketMQ的实例RocketMQTemplate

5,RocketMQ的分布式事务

OrderProducer 订单发送事件消息

import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;


/**
 * @author qizenan
 * @date 2020-9-9
 **/
@Service
public class OrderProducer {
    @Resource(name = "msgSenderTemplateService")
    private MsgSender msgSender;

    public void createOrder() {
        Map<String, Object> orderInfo = new HashMap<>();
        orderInfo.put("orderId", UUID.randomUUID().toString());
        orderInfo.put("price", 10000);
        orderInfo.put("description", "我是注册订单,请尽快处理");

        msgSender.sendMessageInTransaction("TEMP", "order", orderInfo);
    }

}

实现RocketMQLocalTransactionListener


import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;

import java.util.Map;


/**
 * @author qizenan
 * @date 2020-9-9
 **/
@Slf4j
@Component
@RocketMQTransactionListener()
public class ProducerListener implements RocketMQLocalTransactionListener {
    /**
     * RocketMQ的Producer本地事务:先执行本地的业务代码(使用Spring的事件管理),判断是否成功。
     * 成功返回: RocketMQLocalTransactionState.COMMIT,失败返回:RocketMQLocalTransactionState.ROLLBACK
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        Map orderInfo = checkDestination(message, "TEMP", "order", Map.class);
        if (orderInfo != null) {
            try {
                log.info("执行本地订单业务逻辑 orderInfo={} ", orderInfo.toString());
                return RocketMQLocalTransactionState.COMMIT;
            } catch (Exception e) {
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        }
        return RocketMQLocalTransactionState.COMMIT;
    }

    /**
     * 根据 topic,tag 获取发送者的信息
     *
     * @param message RockeRtMQ的消息
     * @param topic   主题
     * @param tag     标签
     * @param tClass  生产者发生的消息的class
     * @return 生产者发生的消息的
     */
    private <T> T checkDestination(Message message, String topic, String tag, Class<T> tClass) {
        String destination = topic;
        if (StringUtils.isNotBlank(tag)) {
            destination += ":" + tag;
        }
        MessageHeaders headers = message.getHeaders();
        String msgDestination = headers.get("rocketmq_TOPIC", String.class);
        String msgTag = headers.get("rocketmq_TAGS", String.class);
        if (StringUtils.isNotBlank(msgTag)) {
            msgDestination += ":" + msgTag;
        }
        if (!destination.equals(msgDestination)) {
            return null;
        }
        Object msg = headers.get("msg");
        if (msg == null) {
            return null;
        }
        try {
            return JSON.parseObject((String) msg, tClass);
        } catch (Exception e) {
            log.error("msgDestination = {} 转化 {} 异常", msgDestination, tClass.getName(), e);
        }
        return null;
    }

    /**
     * 因为网络异常或其他原因时,RocketMQ的消息状态处于UNKNOWN时,调用该方法检查Producer的本地事务是否已经执行成功,
     * 成功返回: RocketMQLocalTransactionState.COMMIT,失败返回:RocketMQLocalTransactionState.ROLLBACK
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        Map orderInfo = checkDestination(message, "TEMP", "order", Map.class);
        if (orderInfo != null) {
            boolean isSuccess = true;
            log.info("查询本地订单是否已经执行成功 orderInfo={},isSuccess={} ", orderInfo.toString(), isSuccess);
            if (isSuccess) {
                return RocketMQLocalTransactionState.COMMIT;
            } else {
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        }
        return RocketMQLocalTransactionState.COMMIT;
    }
}

生产者执行结果
SpringBoot和RocketMQ的实例RocketMQTemplate

消费者执行结果
SpringBoot和RocketMQ的实例RocketMQTemplate

  • 作者:枫叶-哈哈
  • 原文链接:https://blog.csdn.net/fangye1/article/details/108778141
    更新时间:2023年2月19日10:29:15 ,共 7715 字。