Apache Kafka是一個分布式流處理平臺,廣泛用于構建實時數據管道和流應用。在使用Kafka時,查詢Topic列表和Topic下的消息是常見的操作。本文將詳細介紹如何使用Kafka命令行工具和API來查詢Topic列表以及Topic下的消息。
Kafka提供了一個命令行工具kafka-topics.sh,可以用來查詢Kafka集群中的Topic列表。
要查詢Kafka集群中的所有Topic,可以使用以下命令:
kafka-topics.sh --list --bootstrap-server <broker_address>
其中,<broker_address>是Kafka broker的地址,例如localhost:9092。
如果你只想查詢某個特定的Topic,可以使用--topic參數:
kafka-topics.sh --describe --topic <topic_name> --bootstrap-server <broker_address>
這將顯示指定Topic的詳細信息,包括分區數、副本數等。
除了命令行工具,你還可以使用Kafka的Java API來查詢Topic列表。Kafka提供了AdminClient類,可以用來管理Kafka集群。
首先,你需要創建一個AdminClient實例:
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient adminClient = AdminClient.create(props);
使用AdminClient的listTopics方法可以查詢Topic列表:
ListTopicsResult topics = adminClient.listTopics();
Set<String> topicNames = topics.names().get();
for (String topicName : topicNames) {
System.out.println(topicName);
}
如果你不想直接使用命令行工具或Java API,Kafka還提供了一個REST Proxy,可以通過HTTP請求來查詢Topic列表。
發送一個GET請求到/topics端點:
curl -X GET http://localhost:8082/topics
這將返回一個JSON格式的Topic列表。
Kafka提供了一個命令行工具kafka-console-consumer.sh,可以用來消費指定Topic的消息。
要消費指定Topic的消息,可以使用以下命令:
kafka-console-consumer.sh --topic <topic_name> --bootstrap-server <broker_address>
這將從指定Topic的最新消息開始消費,并實時輸出到控制臺。
如果你想從某個特定的偏移量開始消費消息,可以使用--offset參數:
kafka-console-consumer.sh --topic <topic_name> --bootstrap-server <broker_address> --offset <offset>
其中,<offset>是你想要開始消費的偏移量。
除了命令行工具,你還可以使用Kafka的Java API來消費指定Topic的消息。Kafka提供了KafkaConsumer類,可以用來消費消息。
首先,你需要創建一個KafkaConsumer實例:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
使用KafkaConsumer的subscribe方法可以訂閱指定的Topic:
consumer.subscribe(Arrays.asList("my-topic"));
使用KafkaConsumer的poll方法可以消費消息:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
Kafka REST Proxy也支持通過HTTP請求來消費指定Topic的消息。
發送一個GET請求到/consumers/<group_id>/instances/<instance_id>/topics/<topic_name>端點:
curl -X GET http://localhost:8082/consumers/my-group/instances/my-instance/topics/my-topic
這將返回指定Topic的消息。
本文介紹了如何使用Kafka命令行工具、Java API和REST Proxy來查詢Topic列表和Topic下的消息。通過這些方法,你可以輕松地管理和監控Kafka集群中的Topic和消息。無論是通過命令行工具還是編程接口,Kafka都提供了靈活的方式來滿足不同的需求。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。