Kafka Producer Ack(確認)機制用于確保消息被成功發送到 Kafka 集群。在 Kafka Producer 中,有兩種發送確認方式:同步(synchronous)和異步(asynchronous)。要實現異步發送,請按照以下步驟操作:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
這里,我們配置了 Kafka 集群的地址(bootstrap.servers
),以及消息鍵(key)和值(value)的序列化方式。
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
send()
方法發送消息,并設置回調函數以處理異步響應:producer.send(new ProducerRecord<>("your-topic", "key", "value"), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
// 處理發送失敗的情況
exception.printStackTrace();
} else {
// 處理發送成功的情況
System.out.println("Message sent to topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset());
}
}
});
在上面的代碼中,我們使用 send()
方法發送消息,并傳入一個 ProducerRecord
對象,表示要發送的消息及其目標主題、分區和鍵。同時,我們設置了一個回調函數 Callback
,用于處理異步發送的結果。當消息發送成功或失敗時,回調函數會被調用。
在發送完消息后,記得關閉 KafkaProducer 實例以釋放資源:
producer.close();
通過以上步驟,我們實現了 Kafka Producer 的異步發送功能。在實際應用中,可以根據需求選擇合適的發送確認方式。