RocketMQ阿里云版整合springboot简单使用

2022-07-17 14:18:45

SpringBoot整合RocketMQ(商业云端版)——发送普通消息

  1. 首先去阿里云控制台获取相关的资源,如topic、groupId、以及鉴权需要的AccessKet.
  2. 在springboot项目的pom.xml中添加以来
            <!-- RocketMQ -->
            <dependency>
                <groupId>com.aliyun.openservices</groupId>
                <artifactId>ons-client</artifactId>
                <version>1.8.4.Final</version>
            </dependency>
  3.  配置application配置文件
    #RocketMQ 将xxxxx替换为你的资源
    rocketmq.accessKey=xxxxx
    rocketmq.secretKey=xxxxx
    rocketmq.nameSrvAddr=http://xxxxx:8080
    rocketmq.topic=xxxxx
    rocketmq.groupId=xxxxx
    rocketmq.tag=*
  4. 封装MQ配置类
    @Configuration
    @ConfigurationProperties(prefix = "rocketmq")
    public class MqConfig {
    
        private String accessKey;
        private String secretKey;
        private String nameSrvAddr;
    
        public Properties getMqPropertie() {
            Properties properties = new Properties();
            properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
            properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
            properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
            return properties;
        }
    
        public String getAccessKey() {
            return accessKey;
        }
        public void setAccessKey(String accessKey) {
            this.accessKey = accessKey;
        }
        public String getSecretKey() {
            return secretKey;
        }
        public void setSecretKey(String secretKey) {
            this.secretKey = secretKey;
        }
        public String getNameSrvAddr() {
            return nameSrvAddr;
        }
        public void setNameSrvAddr(String nameSrvAddr) {
            this.nameSrvAddr = nameSrvAddr;
        }
    }
  5. 封装 MQ发送配置
    @Component
    @RefreshScope
    public class TopicProperites {
    
        @Value("${rocketmq.groupId}")
        private String groupId;
    
        @Value("${rocketmq.topic}")
        private String topic;
    
        @Value("${rocketmq.tag}")
        private String tag;
    
        public String getGroupId() {
            return groupId;
        }
    
        public void setGroupId(String groupId) {
            this.groupId = groupId;
        }
    
        public String getTopic() {
            return topic;
        }
    
        public void setTopic(String topic) {
            this.topic = topic;
        }
    
        public String getTag() {
            return tag;
        }
    
        public void setTag(String tag) {
            this.tag = tag;
        }
    
        @Override
        public String toString() {
            return "TopicProperites{" +
                    "groupId='" + groupId + '\'' +
                    ", topic='" + topic + '\'' +
                    ", tag='" + tag + '\'' +
                    '}';
        }
    }
  6.  给消息生产者注入配置信息,ProducerBean用于将Producer集成至SpringBoot中
    @Configuration
    @RefreshScope
    public class ProducerClient {
    
        @Autowired
        private MqConfig mqConfig;
    
        @Bean(initMethod = "start", destroyMethod = "shutdown")
        public ProducerBean buildProducer() {
            ProducerBean producer = new ProducerBean();
            producer.setProperties(mqConfig.getMqPropertie());
            return producer;
        }
    }
  7.  消息生产者Producer
    @Component
    public class Producer {
        private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
    
        @Autowired
        private ProducerBean producer;
    
        @Autowired
        private TopicProperites topicProperites;
    
        public SendResult send (String body) {
            Message msg = new Message(
                    // Message所属的Topic
                    topicProperites.getTopic(),
                    // Message Tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤
                    opicProperites.getTag(),
                    // Message Body 可以是任何二进制形式的数据, MQ不做任何干预
                    // 需要Producer与Consumer协商好一致的序列化和反序列化方式
                    body.getBytes());
            // 设置代表消息的业务关键属性,请尽可能全局唯一
            // 以方便您在无法正常收到消息情况下,可通过MQ 控制台查询消息并补发
            // 注意:不设置也不会影响消息正常收发
            msg.setKey(null);
            LOGGER.info("准备发送消息为:{}", body);
            // 发送消息,只要不抛异常就是成功
            try {
                SendResult sendResult = producer.send(msg);
                if (sendResult != null) {
                    success(msg, sendResult.getMessageId());
                    return sendResult;
                } else {
                    error(msg, new Exception());
                    return null;
                }
            } catch (ONSClientException  e) {
                error(msg, e);
                //出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。
            }
        }
    
        private void error(Message msg, Exception e) {
            LOGGER.error("发送MQ消息失败 -- Topic:{},Key:{},tag:{},body:{}"
                    , msg.getTopic(), msg.getKey(), msg.getTag(), new String(msg.getBody()));
            LOGGER.error("Exception -- : ", e);
        }
        private void success(Message msg, String messageId) {
            LOGGER.info("发送MQ消息成功 -- Topic:{},msgId:{},Key:{},tag:{},body:{}"
                    , msg.getTopic(), messageId, msg.getKey(), msg.getTag(), new String(msg.getBody()));
        }
    
    }
  8.  消息消费者配置
    @Configuration
    @RefreshScope
    public class ConsumerClient {
    
        @Autowired
        private MqConfig mqConfig;
    
        @Autowired
        private TopicProperites topicProperites;
    
        @Autowired
        private MqMessageListener messageListener;
    
        @Bean(initMethod = "start", destroyMethod = "shutdown")
        public ConsumerBean buildConsumer() {
            ConsumerBean consumerBean = new ConsumerBean();
            //配置文件
            Properties properties = mqConfig.getMqPropertie();
            properties.setProperty(PropertyKeyConst.GROUP_ID,  topicProperites.getGroupId());
            //将消费者线程数固定为20个 20为默认值
            properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
            //重试次数 -1代表重试16次
            properties.setProperty(PropertyKeyConst.MaxReconsumeTimes, "-1");
            consumerBean.setProperties(properties);
            //订阅关系
            Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
            Subscription subscription = new Subscription();
            subscription.setTopic(topicProperites.getTopic());
            subscription.setExpression(topicProperites.getTag());
            subscriptionTable.put(subscription, messageListener);
            //订阅多个topic如上面设置
    
            consumerBean.setSubscriptionTable(subscriptionTable);
            return consumerBean;
        }
    }
  9.  消息消费者
    @Component
    public class MqMessageListener implements MessageListener {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(MqMessageListener.class);
    
        @Override
        public Action consume(Message message, ConsumeContext consumeContext) {
            try {
                //消费消息
                String msgTag = message.getTag();
                String msgBody = new String(message.getBody(), StandardCharsets.UTF_8);
                LOGGER.info("接收到MQ消息 -- Topic:{},tag:{},msgId:{},Key:{},body:{}",
                        message.getTopic(), msgTag, message.getMsgID(), message.getKey(),
                        new String(message.getBody()));
    
                //获取消息重试次数
                int reconsumeTimes = message.getReconsumeTimes();
                LOGGER.info("获取消息重试次数为:" + reconsumeTimes);
    
                //消费成功,继续消费下一条消息
                return Action.CommitMessage;
            } catch (Exception e) {
                LOGGER.error("消费MQ消息失败! msgId:" + message.getMsgID() + " ---Exception:" + e);
                //消费失败,告知服务器稍后再投递这条消息,继续消费其他消息
                return Action.ReconsumeLater;
            }
        }
    }

    详细的其他方式的发送可参考:Springboot实战项目整合阿里云RocketMQ消息队列实现发送普通消息,延时消息 - 百度文库https://wenku.baidu.com/view/6669bc0e4873f242336c1eb91a37f111f1850d6b.html

  • 作者:贰拾_陆
  • 原文链接:https://blog.csdn.net/m0_46216934/article/details/125169237
    更新时间:2022-07-17 14:18:45