Spring Boot整合Kafka的简单用例(@KafkaListener注解)

2022年6月17日11:15:32

第一步、启动zookeeper server和kafka server

启动zookeeper server:bin/zookeeper-server-start.sh config/zookeeper.properties
启动两个kafka server:bin/kafka-server-start.sh config/server-1.properties;
bin/kafka-server-start.sh config/server.properties
zookeeper会选举一个作为leader,另外一个作为slave

第二步、创建一个maven项目

这一篇中修改了Spring Boot的版本为2.0.0,pom.xml如下:

<dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>2.0.0.RELEASE</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.1.4.RELEASE</version></dependency>

第三步、kafka配置

@Configuration@EnableKafkapublicclassKafkaConfig {/* --------------producer configuration-----------------**/@Beanpublic Map<String, Object>producerConfigs() {
        Map<String, Object> props =new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093");
        props.put(ProducerConfig.RETRIES_CONFIG,0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG,1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;
    }@Beanpublic ProducerFactory<String, String>producerFactory() {returnnew DefaultKafkaProducerFactory<>(producerConfigs());
    }/* --------------consumer configuration-----------------**/@Beanpublic Map<String, Object>consumerConfigs() {
        Map<String, Object> props =new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093");
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"0");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,100);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;
    }@Bean
    ConcurrentKafkaListenerContainerFactory<String, String>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());return factory;
    }@Beanpublic ConsumerFactory<String, String>consumerFactory() {returnnew DefaultKafkaConsumerFactory<>(consumerConfigs());
    }@Bean//消息监听器public MyListenermyListener() {returnnew MyListener();
    }/* --------------kafka template configuration-----------------**/@Beanpublic KafkaTemplate<String,String>kafkaTemplate() {
        KafkaTemplate<String, String> kafkaTemplate =new KafkaTemplate<>(producerFactory());return kafkaTemplate;
    }

}

第四步、topic的配置

自动创建的topic分区数是1,复制因子是0

@Configuration@EnableKafkapublicclassTopicConfig {@Beanpublic KafkaAdminkafkaAdmin() {
        Map<String, Object> configs =new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093");returnnew KafkaAdmin(configs);
    }@Beanpublic NewTopicfoo() {
        /第一个是参数是topic名字,第二个参数是分区个数,第三个是topic的复制因子个数//当broker个数为1个时会创建topic失败,//提示:replication factor: 2 larger than available brokers: 1//只有在集群中才能使用kafka的备份功能returnnew NewTopic("foo",10, (short)2);
    }@Beanpublic NewTopicbar() {returnnew NewTopic("bar",10, (short)2);
    }@Beanpublic NewTopictopic1(){returnnew NewTopic("topic1",10, (short)2);
    }@Beanpublic NewTopictopic2(){returnnew NewTopic("topic2",10, (short)2);
    }
}

第五步、使用@KafkaListener注解

topicPartitions和topics、topicPattern不能同时使用

publicclass MyListener {
    @KafkaListener(id ="myContainer1",//id是消费者监听容器
            topicPartitions =//配置topic和分区:监听两个topic,分别为topic1、topic2,topic1只接收分区0,3的消息,//topic2接收分区0和分区1的消息,但是分区1的消费者初始位置为5
            { @TopicPartition(topic ="topic1", partitions = {"0","3" }),
                    @TopicPartition(topic ="topic2", partitions ="0",
                            partitionOffsets = @PartitionOffset(partition ="1", initialOffset ="4"))
            })publicvoidlisten(ConsumerRecord<?, ?> record) {
        System.out.println("topic" + record.topic());
        System.out.println("key:" + record.key());
        System.out.println("value:"+record.value());
    }


    @KafkaListener(id ="myContainer2",topics = {"foo","bar"})publicvoidlisten2(ConsumerRecord<?, ?> record){
        System.out.println("topic:" + record.topic());
        System.out.println("key:" + record.key());
        System.out.println("value:"+record.value());
    }
}

第六步、创建发送消息的接口

@RestControllerpublicclassKafkaController {privatefinalstatic Logger logger = LoggerFactory.getLogger(KafkaController.class);@Autowiredprivate  KafkaTemplate<String,String> kafkaTemplate;@RequestMapping(value ="/{topic}/send",method = RequestMethod.GET)publicvoidsendMeessageTotopic1(@PathVariable String topic,@RequestParam(value ="partition",defaultValue ="0")int partition) {
        logger.info("start send message to {}",topic);
        kafkaTemplate.send(topic,partition,"你","好");
    }
}

第七步、启动程序、调用接口

消息监听器只监听订阅的topic的特定分区的消息
Spring Boot整合Kafka的简单用例(@KafkaListener注解)
源码:https://github.com/NapWells/java_framework_learn/tree/master/springkafka2

  • 作者:原来丨
  • 原文链接:https://blog.csdn.net/qq_36027670/article/details/79488880
    更新时间:2022年6月17日11:15:32 ,共 4963 字。