SpringBoot整合kafka消费者注解详解

2023-03-13 10:38:20

目录

目标

实战

简单消费

监听多个主题

监听一个主题,指定分区消费消息

指定多个分区,指定起始偏移量消费消息

监听多个主题,指定多个分区,指定起始偏移量消费消息

指定多个kafka监听器

手动提交偏移量(需要配置手动提交偏移量配置)


目标

本文不讲解SpringBoot整合kafka,只列举SpringBoot注解消费kafka消息的多种形式。


实战

简单消费

    /**
     * 指定一个消费者组,一个主题主题。
     * @param record
     */
    @KafkaListener(topics = IPHONE_TOPIC,groupId = APPLE_GROUP)
    public void simpleConsumer(ConsumerRecord<String, String> record) {
        System.out.println("进入simpleConsumer方法");
        System.out.printf(
                "分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
                record.partition(),
                record.offset(),
                record.key(),
                record.value(),
                record.timestamp()
        );
    }

监听多个主题

    /**
     * 指定多个主题。
     *
     * @param record
     */
    @KafkaListener(topics = {IPHONE_TOPIC,IPAD_TOPIC},groupId = APPLE_GROUP)
    public void topics(ConsumerRecord<String, String> record) {
        System.out.println("进入topics方法");
        System.out.printf(
                "主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
                record.topic(),
                record.partition(),
                record.offset(),
                record.key(),
                record.value(),
                record.timestamp()
        );
    }

监听一个主题,指定分区消费消息

    /**
     * 监听一个主题,且指定消费主题的哪些分区。
     * 参数详解:消费者组=apple_group;监听主题=iphoneTopic;只消费的分区=1,2;消费者数量=2
     * @param record
     */
    @KafkaListener(
            groupId = APPLE_GROUP,
            topicPartitions = {
                    @TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"})
            },
            concurrency = "2"
    )
    public void consumeByPattern(ConsumerRecord<String, String> record) {
        System.out.println("consumeByPattern");
        System.out.printf(
                "主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
                record.topic(),
                record.partition(),
                record.offset(),
                record.key(),
                record.value(),
                record.timestamp()
        );
    }

指定多个分区,指定起始偏移量消费消息

    /**
     * 指定多个分区从哪个偏移量开始消费。
     */
    @KafkaListener(
            groupId = APPLE_GROUP,
            topicPartitions = {
                    @TopicPartition(
                            topic = IPAD_TOPIC,
                            partitions = {"0","1"},
                            partitionOffsets = {
                                    @PartitionOffset(partition = "2", initialOffset = "10"),
                                    @PartitionOffset(partition = "3", initialOffset = "0"),
                            }
                    )
            },
            concurrency = "10"
    )
    public void consumeByPartitionOffsets(ConsumerRecord<String, String> record) {
        System.out.println("consumeByPartitionOffsets");
        System.out.printf(
                "主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
                record.topic(),
                record.partition(),
                record.offset(),
                record.key(),
                record.value(),
                record.timestamp()
        );
    }

监听多个主题,指定多个分区,指定起始偏移量消费消息

    /**
     * 指定多个主题。参数详解如上面的方法。
     * @param record
     */
    @KafkaListener(
            groupId = APPLE_GROUP,
            topicPartitions = {
                    @TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"}),
                    @TopicPartition(topic = IPAD_TOPIC, partitions = "1",
                            partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))
            },
            concurrency = "4"
    )
    public void topics2(ConsumerRecord<String, String> record) {
        System.out.println("topics2");
        System.out.printf(
                "主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
                record.topic(),
                record.partition(),
                record.offset(),
                record.key(),
                record.value(),
                record.timestamp()
        );
    }

指定多个kafka监听器

    /**
     * 指定多个消费者组。参数详解如上面的方法。
     *
     * @param record
     */
    @KafkaListeners({
            @KafkaListener(
                    groupId = APPLE_GROUP,
                    topicPartitions = {
                            @TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"}),
                            @TopicPartition(topic = IPAD_TOPIC, partitions = "1",
                                    partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))
                    },
                    concurrency = "3"
            ),
            @KafkaListener(
                    groupId = XM_GROUP,
                    topicPartitions = {
                            @TopicPartition(topic = XMPHONE_TOPIC, partitions = {"1", "2"}),
                            @TopicPartition(topic = XMPAD_TOPIC, partitions = "1",
                                    partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))
                    },
                    concurrency = "3"
            )
    }
    )
    public void groupIds(ConsumerRecord<String, String> record) {
        System.out.println("groupIds");
        System.out.println("内容:" + record.value());
        System.out.println("分区:" + record.partition());
        System.out.println("偏移量:" + record.offset());
        System.out.println("创建消息的时间戳:" + record.timestamp());
    }

手动提交偏移量(需要配置手动提交偏移量配置)

    /**
     * 设置手动提交偏移量
     *
     * @param record
     */
    @KafkaListener(
            topics = IPHONE_TOPIC,
            groupId = APPLE_GROUP,
            //3个消费者
            concurrency = "3"
    )
    public void setCommitType(ConsumerRecord<String, String> record, Acknowledgment ack) {
        System.out.println("setCommitType");
        System.out.println("内容:" + record.value());
        System.out.println("分区:" + record.partition());
        System.out.println("偏移量:" + record.offset());
        System.out.println("创建消息的时间戳:" + record.timestamp());
        ack.acknowledge();
    }
  • 作者:我的身前一尺是我的世界
  • 原文链接:https://blog.csdn.net/qq_39706570/article/details/127872644
    更新时间:2023-03-13 10:38:20