目录
目标
本文不讲解SpringBoot整合kafka,只列举SpringBoot注解消费kafka消息的多种形式。
实战
简单消费
/**
* 指定一个消费者组,一个主题主题。
* @param record
*/
@KafkaListener(topics = IPHONE_TOPIC,groupId = APPLE_GROUP)
public void simpleConsumer(ConsumerRecord<String, String> record) {
System.out.println("进入simpleConsumer方法");
System.out.printf(
"分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
record.partition(),
record.offset(),
record.key(),
record.value(),
record.timestamp()
);
}
监听多个主题
/**
* 指定多个主题。
*
* @param record
*/
@KafkaListener(topics = {IPHONE_TOPIC,IPAD_TOPIC},groupId = APPLE_GROUP)
public void topics(ConsumerRecord<String, String> record) {
System.out.println("进入topics方法");
System.out.printf(
"主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value(),
record.timestamp()
);
}
监听一个主题,指定分区消费消息
/**
* 监听一个主题,且指定消费主题的哪些分区。
* 参数详解:消费者组=apple_group;监听主题=iphoneTopic;只消费的分区=1,2;消费者数量=2
* @param record
*/
@KafkaListener(
groupId = APPLE_GROUP,
topicPartitions = {
@TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"})
},
concurrency = "2"
)
public void consumeByPattern(ConsumerRecord<String, String> record) {
System.out.println("consumeByPattern");
System.out.printf(
"主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value(),
record.timestamp()
);
}
指定多个分区,指定起始偏移量消费消息
/**
* 指定多个分区从哪个偏移量开始消费。
*/
@KafkaListener(
groupId = APPLE_GROUP,
topicPartitions = {
@TopicPartition(
topic = IPAD_TOPIC,
partitions = {"0","1"},
partitionOffsets = {
@PartitionOffset(partition = "2", initialOffset = "10"),
@PartitionOffset(partition = "3", initialOffset = "0"),
}
)
},
concurrency = "10"
)
public void consumeByPartitionOffsets(ConsumerRecord<String, String> record) {
System.out.println("consumeByPartitionOffsets");
System.out.printf(
"主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value(),
record.timestamp()
);
}
监听多个主题,指定多个分区,指定起始偏移量消费消息
/**
* 指定多个主题。参数详解如上面的方法。
* @param record
*/
@KafkaListener(
groupId = APPLE_GROUP,
topicPartitions = {
@TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"}),
@TopicPartition(topic = IPAD_TOPIC, partitions = "1",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))
},
concurrency = "4"
)
public void topics2(ConsumerRecord<String, String> record) {
System.out.println("topics2");
System.out.printf(
"主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value(),
record.timestamp()
);
}
指定多个kafka监听器
/**
* 指定多个消费者组。参数详解如上面的方法。
*
* @param record
*/
@KafkaListeners({
@KafkaListener(
groupId = APPLE_GROUP,
topicPartitions = {
@TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"}),
@TopicPartition(topic = IPAD_TOPIC, partitions = "1",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))
},
concurrency = "3"
),
@KafkaListener(
groupId = XM_GROUP,
topicPartitions = {
@TopicPartition(topic = XMPHONE_TOPIC, partitions = {"1", "2"}),
@TopicPartition(topic = XMPAD_TOPIC, partitions = "1",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))
},
concurrency = "3"
)
}
)
public void groupIds(ConsumerRecord<String, String> record) {
System.out.println("groupIds");
System.out.println("内容:" + record.value());
System.out.println("分区:" + record.partition());
System.out.println("偏移量:" + record.offset());
System.out.println("创建消息的时间戳:" + record.timestamp());
}
手动提交偏移量(需要配置手动提交偏移量配置)
/**
* 设置手动提交偏移量
*
* @param record
*/
@KafkaListener(
topics = IPHONE_TOPIC,
groupId = APPLE_GROUP,
//3个消费者
concurrency = "3"
)
public void setCommitType(ConsumerRecord<String, String> record, Acknowledgment ack) {
System.out.println("setCommitType");
System.out.println("内容:" + record.value());
System.out.println("分区:" + record.partition());
System.out.println("偏移量:" + record.offset());
System.out.println("创建消息的时间戳:" + record.timestamp());
ack.acknowledge();
}