Kafka ProducerRecord 本身并不能直接確認消息是否已經被成功發送。Kafka ProducerRecord 只是將消息封裝成一個對象,包含需要發送的主題、鍵、值等信息。實際的消息發送過程是由 Kafka Producer 完成的。
Kafka Producer 在發送消息時會返回一個 Future 對象,你可以使用這個 Future 對象來檢查消息是否已經成功發送。具體方法如下:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerResult;
// ...
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
// 發送消息并獲取 Future 對象
Future<RecordMetadata> future = producer.send(record);
// 檢查消息是否成功發送
if (future.isDone()) {
try {
RecordMetadata metadata = future.get();
System.out.println("Message sent to topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset());
} catch (Exception e) {
e.printStackTrace();
}
} else {
System.out.println("Message is still being sent.");
}
在這個示例中,我們首先創建了一個 KafkaProducer 對象,然后創建了一個 ProducerRecord 對象。接著,我們調用 producer.send()
方法發送消息,并獲取一個 Future 對象。通過調用 future.isDone()
方法,我們可以檢查消息是否已經成功發送。如果消息已經成功發送,我們可以使用 future.get()
方法獲取一個 RecordMetadata 對象,其中包含了消息發送的詳細信息。