Kafka 消費者通過 Group ID 來將來自一個主題的消息分發給多個消費者
使用命令行工具創建消費者組:
你可以使用 kafka-consumer-groups.sh
工具來查看已存在的消費者組,或者使用 --new-consumer-group
參數創建一個新的消費者組。例如:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --create --group my-new-group
這將創建一個名為 my-new-group
的新消費者組。
使用 Kafka 管理工具(如 Confluent Control Center、Kafka Manager 等)創建消費者組:
這些工具通常提供了一個 Web 界面,你可以在其中管理 Kafka 集群、主題和消費者組。在這些工具中,你可以找到創建消費者組的功能,并按照提示操作即可。
使用編程語言(如 Java、Python、Go 等)創建消費者組:
在你的應用程序中,你需要使用 Kafka 客戶端庫來創建一個消費者,并指定消費者組 ID。以下是一個使用 Python 的 confluent_kafka
庫創建消費者組的示例:
from confluent_kafka import Consumer, KafkaError
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-new-group'
}
consumer = Consumer(conf)
consumer.subscribe(['my-topic'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print(f"End of partition reached {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
else:
raise KafkaException(msg.error())
else:
print(f"Received message: {msg.value().decode('utf-8')}")
except KeyboardInterrupt:
pass
finally:
consumer.close()
這段代碼創建了一個名為 my-new-group
的消費者組,并訂閱了名為 my-topic
的主題。當運行此代碼時,消費者將從該主題中讀取消息。