Kafka 消費者(Consumer)訂閱主題(Topic)以消費消息。以下是使用 Java 客戶端庫進行訂閱的步驟:
在 Maven 項目的 pom.xml 文件中添加以下依賴:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
創建一個 Kafka 消費者配置對象,指定 Kafka 集群的地址、消費者組 ID 等屬性。
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
使用消費者配置創建一個 Kafka 消費者實例。
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
使用 subscribe
方法訂閱一個或多個主題。
consumer.subscribe(Arrays.asList("my-topic"));
使用 poll
方法輪詢消息,并使用 consume
方法處理消息。
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
// 處理消息的邏輯
}
}
這是一個簡單的 Kafka 消費者訂閱消息的示例。實際應用中,你可能需要根據需求進行更多的配置和處理。