Kafka消費順序可以在消費者端進行調整。具體來說,可以通過設置消費者的max.poll.records
屬性來控制每次 poll() 操作返回的最大記錄數,從而影響消費順序。當max.poll.records
設置為1時,每次 poll() 只返回一條記錄,這樣可以確保按照消息到達 Kafka 的順序進行消費。
以下是一個示例代碼,展示了如何在 Kafka 消費者中設置max.poll.records
屬性:
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1"); // 設置每次 poll() 返回的最大記錄數為 1
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
在這個示例中,我們將max.poll.records
設置為1,這樣消費者就會按照消息到達 Kafka 的順序進行消費。請注意,這種方法可能會降低消費者的吞吐量,因為每次 poll() 操作都會等待直到有新的消息可用。如果需要處理大量消息,可以考慮增加max.poll.records
的值,或者使用其他方法來確保消費順序,例如使用單個分區或者使用事務。