Kafka客戶端連接的方法主要包括以下步驟:
添加依賴:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
implementation 'org.apache.kafka:kafka-clients:2.8.0'
創建配置對象:
Properties
對象,用于配置Kafka客戶端的連接參數。這些參數包括Kafka集群的地址、端口號、序列化方式等。例如:Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 鍵序列化方式
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 值序列化方式
創建Kafka生產者或消費者:
Producer<String, String> producer = new KafkaProducer<>(props);
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "my-group"); // 消費者組ID
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 鍵反序列化方式
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 值反序列化方式
Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
連接到Kafka集群:
Producer
對象的send
方法發送消息,它會自動連接到Kafka集群。Consumer
對象的subscribe
方法訂閱一個或多個主題,然后調用poll
方法輪詢消息。在輪詢過程中,消費者會自動連接到Kafka集群并消費消息。關閉連接:
producer.close()
方法關閉生產者連接。對于消費者,可以調用consumer.close()
方法關閉消費者連接。此外,還可以調用Consumer
對象的disconnect
方法顯式斷開與Kafka集群的連接。通過以上步驟,你可以成功地使用Kafka客戶端連接到Kafka集群并進行消息的發送和消費。