SpringBoot整合RocketMQ(商业云端版)——发送普通消息
- 首先去阿里云控制台获取相关的资源,如topic、groupId、以及鉴权需要的AccessKet.
- 在springboot项目的pom.xml中添加以来
<!-- RocketMQ --> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.8.4.Final</version> </dependency>
- 配置application配置文件
#RocketMQ 将xxxxx替换为你的资源 rocketmq.accessKey=xxxxx rocketmq.secretKey=xxxxx rocketmq.nameSrvAddr=http://xxxxx:8080 rocketmq.topic=xxxxx rocketmq.groupId=xxxxx rocketmq.tag=*
- 封装MQ配置类
@Configuration @ConfigurationProperties(prefix = "rocketmq") public class MqConfig { private String accessKey; private String secretKey; private String nameSrvAddr; public Properties getMqPropertie() { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey); properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey); properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr); return properties; } public String getAccessKey() { return accessKey; } public void setAccessKey(String accessKey) { this.accessKey = accessKey; } public String getSecretKey() { return secretKey; } public void setSecretKey(String secretKey) { this.secretKey = secretKey; } public String getNameSrvAddr() { return nameSrvAddr; } public void setNameSrvAddr(String nameSrvAddr) { this.nameSrvAddr = nameSrvAddr; } }
- 封装 MQ发送配置
@Component @RefreshScope public class TopicProperites { @Value("${rocketmq.groupId}") private String groupId; @Value("${rocketmq.topic}") private String topic; @Value("${rocketmq.tag}") private String tag; public String getGroupId() { return groupId; } public void setGroupId(String groupId) { this.groupId = groupId; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public String getTag() { return tag; } public void setTag(String tag) { this.tag = tag; } @Override public String toString() { return "TopicProperites{" + "groupId='" + groupId + '\'' + ", topic='" + topic + '\'' + ", tag='" + tag + '\'' + '}'; } }
- 给消息生产者注入配置信息,ProducerBean用于将Producer集成至SpringBoot中
@Configuration @RefreshScope public class ProducerClient { @Autowired private MqConfig mqConfig; @Bean(initMethod = "start", destroyMethod = "shutdown") public ProducerBean buildProducer() { ProducerBean producer = new ProducerBean(); producer.setProperties(mqConfig.getMqPropertie()); return producer; } }
- 消息生产者Producer
@Component public class Producer { private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class); @Autowired private ProducerBean producer; @Autowired private TopicProperites topicProperites; public SendResult send (String body) { Message msg = new Message( // Message所属的Topic topicProperites.getTopic(), // Message Tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤 opicProperites.getTag(), // Message Body 可以是任何二进制形式的数据, MQ不做任何干预 // 需要Producer与Consumer协商好一致的序列化和反序列化方式 body.getBytes()); // 设置代表消息的业务关键属性,请尽可能全局唯一 // 以方便您在无法正常收到消息情况下,可通过MQ 控制台查询消息并补发 // 注意:不设置也不会影响消息正常收发 msg.setKey(null); LOGGER.info("准备发送消息为:{}", body); // 发送消息,只要不抛异常就是成功 try { SendResult sendResult = producer.send(msg); if (sendResult != null) { success(msg, sendResult.getMessageId()); return sendResult; } else { error(msg, new Exception()); return null; } } catch (ONSClientException e) { error(msg, e); //出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。 } } private void error(Message msg, Exception e) { LOGGER.error("发送MQ消息失败 -- Topic:{},Key:{},tag:{},body:{}" , msg.getTopic(), msg.getKey(), msg.getTag(), new String(msg.getBody())); LOGGER.error("Exception -- : ", e); } private void success(Message msg, String messageId) { LOGGER.info("发送MQ消息成功 -- Topic:{},msgId:{},Key:{},tag:{},body:{}" , msg.getTopic(), messageId, msg.getKey(), msg.getTag(), new String(msg.getBody())); } }
- 消息消费者配置
@Configuration @RefreshScope public class ConsumerClient { @Autowired private MqConfig mqConfig; @Autowired private TopicProperites topicProperites; @Autowired private MqMessageListener messageListener; @Bean(initMethod = "start", destroyMethod = "shutdown") public ConsumerBean buildConsumer() { ConsumerBean consumerBean = new ConsumerBean(); //配置文件 Properties properties = mqConfig.getMqPropertie(); properties.setProperty(PropertyKeyConst.GROUP_ID, topicProperites.getGroupId()); //将消费者线程数固定为20个 20为默认值 properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20"); //重试次数 -1代表重试16次 properties.setProperty(PropertyKeyConst.MaxReconsumeTimes, "-1"); consumerBean.setProperties(properties); //订阅关系 Map<Subscription, MessageListener> subscriptionTable = new HashMap<>(); Subscription subscription = new Subscription(); subscription.setTopic(topicProperites.getTopic()); subscription.setExpression(topicProperites.getTag()); subscriptionTable.put(subscription, messageListener); //订阅多个topic如上面设置 consumerBean.setSubscriptionTable(subscriptionTable); return consumerBean; } }
- 消息消费者
@Component public class MqMessageListener implements MessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(MqMessageListener.class); @Override public Action consume(Message message, ConsumeContext consumeContext) { try { //消费消息 String msgTag = message.getTag(); String msgBody = new String(message.getBody(), StandardCharsets.UTF_8); LOGGER.info("接收到MQ消息 -- Topic:{},tag:{},msgId:{},Key:{},body:{}", message.getTopic(), msgTag, message.getMsgID(), message.getKey(), new String(message.getBody())); //获取消息重试次数 int reconsumeTimes = message.getReconsumeTimes(); LOGGER.info("获取消息重试次数为:" + reconsumeTimes); //消费成功,继续消费下一条消息 return Action.CommitMessage; } catch (Exception e) { LOGGER.error("消费MQ消息失败! msgId:" + message.getMsgID() + " ---Exception:" + e); //消费失败,告知服务器稍后再投递这条消息,继续消费其他消息 return Action.ReconsumeLater; } } }
详细的其他方式的发送可参考:Springboot实战项目整合阿里云RocketMQ消息队列实现发送普通消息,延时消息 - 百度文库
https://wenku.baidu.com/view/6669bc0e4873f242336c1eb91a37f111f1850d6b.html