Kafka Producer Ack 是一種機制,用于確保消息被成功發送到 Kafka 集群。在 Kafka Producer 中,有兩種確認方式:同步(synchronous)和異步(asynchronous)。
在同步確認模式下,Producer 在發送消息后,會等待 Kafka Broker 返回確認消息(Ack)。只有收到確認消息后,Producer 才會認為該消息已成功發送。這種方式可以確保消息的可靠傳輸,但會降低 Producer 的吞吐量。
要使用同步確認,需要在 Kafka Producer 配置中設置 acks
參數為 all
。例如:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
在異步確認模式下,Producer 發送消息后,不會等待 Kafka Broker 返回確認消息。而是在后臺啟動一個線程,通過輪詢的方式檢查消息是否已成功發送。這種方式可以提高 Producer 的吞吐量,但可能導致消息丟失或重復發送。
要使用異步確認,需要在 Kafka Producer 配置中設置 acks
參數為 1
,并實現 org.apache.kafka.clients.producer.Callback
接口。例如:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "1");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my-topic", "key", "value"), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("Message sent to topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset());
}
}
});
總之,Kafka Producer Ack 提供了同步和異步兩種消息確認方式,可以根據實際需求選擇合適的確認方式。