Kafka 消息發送的消息確認機制是通過生產者客戶端來實現的。生產者客戶端在發送消息到 Kafka 集群時,可以配置不同的確認策略來確保消息的可靠傳輸。以下是 Kafka 生產者提供的兩種主要確認方式:
同步確認(Synchronous Acknowledgment):
異步確認(Asynchronous Acknowledgment):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all"); // 設置為 all 表示需要所有副本都確認
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my-topic", "key", "value"), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("Message sent to topic: " + metadata.topic() + " partition: " + metadata.partition() + " offset: " + metadata.offset());
}
}
});
producer.close();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "1"); // 設置為 1 表示只需要 leader 確認
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my-topic", "key", "value"), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("Message sent to topic: " + metadata.topic() + " partition: " + metadata.partition() + " offset: " + metadata.offset());
}
}
});
producer.close();
根據具體的應用場景和需求,可以選擇合適的確認策略來確保消息的可靠傳輸。