溫馨提示×

kafka subscribe怎樣進行消息訂閱

小樊
144
2024-12-18 12:41:12
欄目: 大數據

Kafka 消費者(Consumer)訂閱主題(Topic)以消費消息。以下是使用 Java 客戶端庫進行訂閱的步驟:

  1. 添加 Kafka 客戶端依賴

在 Maven 項目的 pom.xml 文件中添加以下依賴:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>
  1. 創建消費者配置

創建一個 Kafka 消費者配置對象,指定 Kafka 集群的地址、消費者組 ID 等屬性。

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  1. 創建消費者

使用消費者配置創建一個 Kafka 消費者實例。

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  1. 訂閱主題

使用 subscribe 方法訂閱一個或多個主題。

consumer.subscribe(Arrays.asList("my-topic"));
  1. 消費消息

使用 poll 方法輪詢消息,并使用 consume 方法處理消息。

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        // 處理消息的邏輯
    }
}

這是一個簡單的 Kafka 消費者訂閱消息的示例。實際應用中,你可能需要根據需求進行更多的配置和處理。

0
亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女