如下所示:
安裝kafka支持庫pip install kafka-python
from kafka import KafkaProducer
import json
'''
生產者demo
向test_lyl2主題中循環寫入10條json數據
注意事項:要寫入json數據需加上value_serializer參數,如下代碼
'''
producer = KafkaProducer(
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
bootstrap_servers=['192.168.12.101:6667','192.168.12.102:6667','192.168.12.103:6667']
)
for i in range(10):
data={
"name":"李四",
"age":23,
"gender":"男",
"id":i
}
producer.send('test_lyl2', data)
producer.close()
from kafka import KafkaConsumer
import json
'''
消費者demo
消費test_lyl2主題中的數據
注意事項:如需以json格式讀取數據需加上value_deserializer參數
'''
consumer = KafkaConsumer('test_lyl2',group_id="lyl-gid1",
bootstrap_servers=['192.168.12.101:6667','192.168.12.102:6667','192.168.12.103:6667'],
auto_offset_reset='earliest',value_deserializer=json.loads
)
for message in consumer:
print(message.value)
以上這篇對python操作kafka寫入json數據的簡單demo分享就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持億速云。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。