rocketMQ事物消息与springboot整合与springcloudstream整合

2022-07-22 10:45:31

事务消息

在rocketMQ中事务消息 首先假设一个场景 其中有 模块1 发送者 Broker(可以说是rocketMQ的队列) 事物消息就是 当模块1中出现异常 发送者给broket发送消息就会发送失败 大体流程就是当发送者给broker发送消息会发送一个half消息这个消息消费者是看不到的 然后broker返回一个已接收half消息的通知给发送者 然后发送者执行相关的业务代码 让业务代码中返回 return LocalTransactionState.ROLLBACK_MESSAGE、 return LocalTransactionState.COMMIT_MESSAGE、 return LocalTransactionState.UNKNOW 这三种消息
ROLLBACK_MESSAGE 表示提交事物 然后发送者把消息发送
COMMIT_MESSAGE 表示 回滚事物把要发送的消息丢弃
UNKNOW 表示等会在重新检测这个状态 默认检测15次
这三种状态是在事物监听中做的rocketMQ为了完成这个功能实现了一个事物监听的接口

事物消息的在rocketMQ官网下载的源码中的事务消息生产者的案例见:org.apache.rocketmq.example.transaction.TransactionProducer这个类中

事物监听代码 事物监听接口TransactionListener

public class TransactionListenerImpl implements TransactionListener {
	//在提交完事务消息后执行 当发送了一条消息就会提交 在这里可以判断这条消息到底给不给broker。
	//返回COMMIT_MESSAGE状态的消息会立即被消费者消费到。
	//返回ROLLBACK_MESSAGE状态的消息会被丢弃。
	//返回UNKNOWN状态的消息会由Broker过一段时间再来回查事务的状态。
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String tags = msg.getTags();
        //TagA的消息会立即被消费者消费到
        if(StringUtils.contains(tags,"TagA")){
            return LocalTransactionState.COMMIT_MESSAGE;
        //TagB的消息会被丢弃
        }else if(StringUtils.contains(tags,"TagB")){
            return LocalTransactionState.ROLLBACK_MESSAGE;
        //其他消息会等待Broker进行事务状态回查。
        }else{
            return LocalTransactionState.UNKNOW;
        }
    }
	//在对UNKNOWN状态的消息进行状态回查时执行。返回的结果是一样的。
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
		String tags = msg.getTags();
        //TagC的消息过一段时间会被消费者消费到
        if(StringUtils.contains(tags,"TagC")){
            return LocalTransactionState.COMMIT_MESSAGE;
        //TagD的消息也会在状态回查时被丢弃掉
        }else if(StringUtils.contains(tags,"TagD")){
            return LocalTransactionState.ROLLBACK_MESSAGE;
        //剩下TagE的消息会在多次状态回查后最终丢弃
        }else{
            return LocalTransactionState.UNKNOW;
        }
    }
}

ACL权限控制

<dependency>
	<groupId>org.apache.rocketmq</groupId>
	<artifactId>rocketmq-acl</artifactId>
	<version>4.7.1</version>
</dependency>


需要以上依赖 就是使用账号登录rocketMQ   在rocketMQ官网上的源码中docs/cn/acl/user_guide.md这个文件中可以看到详细信息

主要是将broker.conf这个配置文件中写上aclEnable=true 然后在plain_acl.yml中进行权限配置 这个文件是热加载的

#全局白名单,不受ACL控制
#通常需要将主从架构中的所有节点加进来
globalWhiteRemoteAddresses:
- 10.10.103.*
- 192.168.0.*

accounts:
#第一个账户
- accessKey: RocketMQ
  secretKey: 12345678
  whiteRemoteAddress:
  admin: false 
  defaultTopicPerm: DENY #默认Topic访问策略是拒绝
  defaultGroupPerm: SUB #默认Group访问策略是只允许订阅
  topicPerms:
  - topicA=DENY #topicA拒绝
  - topicB=PUB|SUB #topicB允许发布和订阅消息
  - topicC=SUB #topicC只允许订阅
  groupPerms:
  # the group should convert to retry topic
  - groupA=DENY
  - groupB=PUB|SUB
  - groupC=SUB
#第二个账户,只要是来自192.168.1.*的IP,就可以访问所有资源
- accessKey: rocketmq2
  secretKey: 12345678
  whiteRemoteAddress: 192.168.1.*
  # if it is admin, it could access all resources
  admin: true

SpringBoot整合RocketMQ

SpringBoot集成的RocketMQ的starter依赖是由Spring社区提供 目前正在迭代的过程中 不同的版本之间差距非常大 甚至基础的底层对象都会经常有改动

整合步骤

首先创建maven工程 导入依赖

<dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-core</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-webmvc</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.1.6.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
    </dependencies>

rocketmq-spring-boot-starter:2.1.1引入的SpringBoot包版本是2.0.5.RELEASE,这里把SpringBoot的依赖包升级了一下。

然后以SpringBoot方式创建简单的例子

启动类

@SpringBootApplication
public class RocketMQScApplication {

    public static void main(String[] args) {
        SpringApplication.run(RocketMQScApplication.class,args);
    }
}

配置文件 application.properties

#NameServer地址
rocketmq.name-server=192.168.232.128:9876
#默认的消息生产者组
rocketmq.producer.group=springBootGroup

消息生产者

package com.roy.rocket.basic;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;


@Component
public class SpringProducer {

    @Resource
    private RocketMQTemplate rocketMQTemplate;
	//发送普通消息的示例
    public void sendMessage(String topic,String msg){
        this.rocketMQTemplate.convertAndSend(topic,msg);
    }
	//发送事务消息的示例
    public void sendMessageInTransaction(String topic,String msg) throws InterruptedException {
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            Message<String> message = MessageBuilder.withPayload(msg).build();
            String destination =topic+":"+tags[i % tags.length];
            SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message,destination);
            System.out.printf("%s%n", sendResult);

            Thread.sleep(10);
        }
    }
}

消费者

package com.roy.rocket.basic;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;


@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic")
public class SpringConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("Received message : "+ message);
    }
}

事物消息的监听器

package com.roy.rocket.config;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.StringMessageConverter;

import java.util.concurrent.ConcurrentHashMap;


@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionImpl implements RocketMQLocalTransactionListener {

    private ConcurrentHashMap<Object, String> localTrans = new ConcurrentHashMap<>();
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        Object id = msg.getHeaders().get("id");
        String destination = arg.toString();
        localTrans.put(id,destination);
        org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),"UTF-8",destination, msg);
        String tags = message.getTags();
        if(StringUtils.contains(tags,"TagA")){
            return RocketMQLocalTransactionState.COMMIT;
        }else if(StringUtils.contains(tags,"TagB")){
            return RocketMQLocalTransactionState.ROLLBACK;
        }else{
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        //SpringBoot的消息对象中,并没有transactionId这个属性。跟原生API不一样。
//        String destination = localTrans.get(msg.getTransactionId());
        return RocketMQLocalTransactionState.COMMIT;
    }
}

SpringCloudStream整合RocketMQ

SpringCloudStream是一个把所有中间件全部进行抽象 就是它把所有的mq全部进一步封装 我们只需要给它一个地址告诉它是什么mq 然后我们使用注解就能完成各个mq的功能 我们不需要关心mq的代码 我们只需要写业务逻辑代码

简单例子

创建maven 导入依赖

<dependencies>
		<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-client</artifactId>
			<version>4.7.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-acl</artifactId>
			<version>4.7.1</version>
		</dependency>
		<dependency>
			<groupId>com.alibaba.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
			<version>2.2.3.RELEASE</version>
			<exclusions>
				<exclusion>
					<groupId>org.apache.rocketmq</groupId>
					<artifactId>rocketmq-client</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.apache.rocketmq</groupId>
					<artifactId>rocketmq-acl</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
			<version>2.3.3.RELEASE</version>
		</dependency>
	</dependencies>

启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;


@EnableBinding({Source.class, Sink.class})
@SpringBootApplication
public class ScRocketMQApplication {

    public static void main(String[] args) {
        SpringApplication.run(ScRocketMQApplication.class,args);
    }
}

@EnableBinding这个注解是SpringCloudStream封装的Source为发送者、Sink是消费者

配置文件application.properties

#ScStream通用的配置以spring.cloud.stream开头
spring.cloud.stream.bindings.input.destination=TestTopic
spring.cloud.stream.bindings.input.group=scGroup
spring.cloud.stream.bindings.output.destination=TestTopic
#rocketMQ的个性化配置以spring.cloud.stream.rocketmq开头
#spring.cloud.stream.rocketmq.binder.name-server=192.168.232.128:9876;192.168.232.129:9876;192.168.232.130:9876
spring.cloud.stream.rocketmq.binder.name-server=192.168.232.128:9876

在SpringCloudStream中一个bindings代表跟mq的一个连接 binder代表一个mq只需要这样配置

消费者

package com.roy.scrocket.basic;

import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;


@Component
public class ScConsumer {

@StreamListener(Sink.INPUT)
public void onMessage(String messsage){
    System.out.println("received message:"+messsage+" from binding:"+ Sink.INPUT);
}
}

生产者

package com.roy.scrocket.basic;

import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;


@Component
public class ScProducer {

@Resource
private Source source;

public void sendMessage(String msg){
    Map<String, Object> headers = new HashMap<>();
    headers.put(MessageConst.PROPERTY_TAGS, "testTag");
    MessageHeaders messageHeaders = new MessageHeaders(headers);
    Message<String> message = MessageBuilder.createMessage(msg, messageHeaders);
    this.source.output().send(message);
}
}

加一个controller测试

package com.roy.scrocket.controller;

import com.roy.scrocket.basic.ScProducer;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;


@RestController
@RequestMapping("/MQTest")
public class MQTestController {

@Resource
private ScProducer producer;
@RequestMapping("/sendMessage")
public String sendMessage(String message){
    producer.sendMessage(message);
    return "消息发送完成";
}
}

SpringCloudStream对于rabbitmq、kafka支持很好对于rocketmq支持不是很好 集成的rocketmq版本很低 相关文档很少

  • 作者:染指1110
  • 原文链接:https://blog.csdn.net/qq_36301061/article/details/119303903
    更新时间:2022-07-22 10:45:31