springboot整合kafka随记

2022年7月5日08:13:13

springboot项目整合kafka

除了网上说的使用@KafkaListener注解之外,还可以通过实现CommandLineRunner接口的方式来配置kafka的消费者。

@Component@Slf4jpublicclassKafkaConsumerimplementsCommandLineRunner{@Value("${spring.kafka.topic}")privateString kafkaTopic;@Value("${spring.kafka.consumer.group-id}")privateString group_id;@Value("${spring.kafka.bootstrap-servers}")privateString bootstrapServers;@Value("${spring.kafka.consumer.key-deserializer}")privateString consumerDk;@Value("${spring.kafka.consumer.value-deserializer}")privateString consumerDv;@Value("${spring.kafka.consumer.username}")privateString username;@Value("${spring.kafka.consumer.pwd}")privateString pwd;@Overridepublicvoidrun(String... args)throwsException{Properties properties=newProperties();Password saslJaasConfig=newPassword(PlainLoginModule.class.getName()+" required username=\""+
                username+"\" pass"+"word=\""+ pwd+"\";");
        properties.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,SecurityProtocol.SASL_PLAINTEXT.name);
        properties.put(SaslConfigs.SASL_MECHANISM,"PLAIN");//kafka集群的ip和端口,用户在集群申请完成后会获得该信息。建议在配置bootstrap.servers配置进所有kafka节点信息
        properties.put("bootstrap.servers", bootstrapServers);//序列化和反序列化类型
        properties.put("key.deserializer", consumerDk);
        properties.put("value.deserializer", consumerDv);//消费者组
        properties.put("group.id", group_id);//自定义配置信息,非必须,开发人员可根据自身业务需求进行合理配置,更多配置信息请参考kafka官方文档// kafka自动提交关闭,这样保证数据不丢// 常量值或者可配置
        properties.put("enable.auto.commit","true");
        properties.put("auto.commit.interval.ms","1000");
        properties.put("auto.offset.reset","latest");
        properties.put("max.poll.records",100);try{org.apache.kafka.clients.consumer.KafkaConsumer<String,String> consumer=neworg.apache.kafka.clients.consumer.KafkaConsumer<>(properties);
            consumer.subscribe(Collections.singleton(kafkaTopic));while(true){ConsumerRecords<String,String> records= consumer.poll(Duration.ofMillis(100));for(ConsumerRecord<String,String>record: records){String msg=record.value();
                    log.info("offset = "+record.offset()+", value = "+ msg);}}}catch(Exception e){}}}

springboot项目中kafka正常启动,但是无法正常消费数据

在springboot项目中整合kafka,遇到项目启动可以正常启动并加载kafka配置,但是无法消费消息(已确认消息是已经发出来的),试了网上很多办法,但是都未能解决我遇到的问题,最后尝试修改了kafka消费者中的group-id,发现可以消费到消息了。

  • 作者:Xb0rd
  • 原文链接:https://blog.csdn.net/m0_50851720/article/details/120033022
    更新时间:2022年7月5日08:13:13 ,共 2171 字。