kafka依赖_SpringBoot整合kafka,以及多播机制的应用

2022-05-16 20:23:04

最近做一个需求需要用SpringBoot整合Kafka的多播机制,所以就顺便写篇文章介绍一下自己的对Kafka多播机制的应用吧。

先说明一下我的需求,一个分布式系统内每个节点需要处理存在该节点本地的文件,但是处理的任务必须放到队列中排队避免高并发引起的系统崩溃。当时由于需要处理的文件都是在发出消息的节点上的,所以考虑之后决定使用Kafka的多播机制来实现。

首先简单讲解一下Kafka的多播机制。Kafka的消息是由消费组Group来消费的,Kafka会把消息复制多份发给每个注册的消费组一份。每个消费组内只消费该消息一次,也就是说只要这个消费组中有人消费了这个消息,这个消息就认为被消费了,但是不影响其他消费组内的这个消息。

所以我的流程是这样的:每个节点都用自己的IP作为GroupId来注册一个消费组,当消费消息的时候判断消息中包含的ip地址是否与该节点的IP地址匹配,如果匹配就处理文件,如果不匹配就直接消费消息不做任何操作。

1.SpringBoot整合Kafka

SpringBoot整合Kafka是非常简单的,引入kafka的依赖再在application.properties文件上配置Kafka的配置参数就可以直接使用了。如下图所示:

2b1e1d360792b62936f2e27044ee8a6d.png
d19518c25a913a6494a94231dbaf0863.gif
69d16f4a2441e0587ef9fbc58ac67d7f.png
d19518c25a913a6494a94231dbaf0863.gif

2. kafka生产者发送消息

466a08d86ac67e4624cc85b399758261.png
d19518c25a913a6494a94231dbaf0863.gif

如上图所示,只需要调用kafkaTemplate对象就可以直接发送消息了。

3.kafka注册动态消费组

一般kafka的消费组groupId是在SpringBoot中的配置文件中配置的,如下:

42cbb26755ca99e2ba3682f120231961.png
d19518c25a913a6494a94231dbaf0863.gif

但是这是静态的groupId,不满足我们的需求,所以我们要创建动态的groupId来覆盖这个静态的配置。

创建一个config类,使用System.setProperty()方法将动态的groupId添加到系统配置中覆盖原来的配置。如下:

dea3b91596ad9305bf79655299d2d7ea.png
d19518c25a913a6494a94231dbaf0863.gif

这样每个节点就都会注册一个groupId了,也就实现的kafka的多播机制。

4. 创建kafka的监听类

344bc4b3751fe51266d046c08d0de0a7.png
d19518c25a913a6494a94231dbaf0863.gif

SpringBoot创建监听方法也非常简单,只要使用@KafkaListener注解就可以创建一个监听方法了,不过要注意这里的topic参数必须静态的。

文章就到这里,更多精彩文章敬请期待。

  • 作者:weixin_39522170
  • 原文链接:https://blog.csdn.net/weixin_39522170/article/details/109977322
    更新时间:2022-05-16 20:23:04