前言
在实际业务场景下,为保证消息从producer能够准确无误的送达kafka的broker,kafka提供了针对消息ACK的几种级别,即broker对producer消息应答级别
kafka提供了3种ACK的应答机制
- acks=0,生产者发送过来数据就不管了,可靠性较差,效率高;
- acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
- acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。
下面用代码演示下消息的生产段的ACK配置代码
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class ProducerAck {
public static void main(String[] args) throws Exception {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 设置 acks ACKS_CONFIG 可选择的值有: 0 1 all ,分别对应ACK的3种级别
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 重试次数 retries,默认是 int 最大值,2147483647
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
System.out.println("开始发送数据");
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("zcy222","congge " + i));
}
// 5. 关闭资源
kafkaProducer.close();
}
}
核心代码部分和之前正常发送消息没有太大区别,只需要在属性配置里面添加下面这句代码即可
properties.put(ProducerConfig.ACKS_CONFIG, "all");
同时,建议配合消息发送重试的配置一起使用,这个也是实际场景中常用的一种处理方式
运行上面的程序,消息可以正常发送到 ”zcy222“这个topic中