Kafka與CentOS其他服務的協同工作機制及實現方法
Kafka作為分布式消息隊列,通過與CentOS上的其他服務(如數據存儲、日志收集、NoSQL、監控系統等)集成,構建實時數據流處理管道。以下是常見協同場景的具體實現方式:
EFK是CentOS上經典的日志處理方案,Kafka作為中間緩沖層,解決日志收集與存儲的解耦問題。
yum或源碼安裝);nginx-logs),設置分區數(如3)和副本數(如2);/etc/filebeat/filebeat.yml),添加Kafka輸出插件:output.kafka:
enabled: true
hosts: ["kafka1.centos:9092", "kafka2.centos:9092"]
topic: "nginx-logs"
④ 啟動服務:systemctl start filebeat kafka elasticsearch;bin/logstash -f /etc/logstash/conf.d/kafka-to-es.conf
⑥ 驗證:通過Kibana(Elasticsearch可視化工具)查看Nginx日志。Kafka與HDFS結合,實現實時數據寫入HDFS,支持離線分析與歷史數據存儲。
order-data),設置合適的分區數;val sparkConf = new SparkConf().setAppName("KafkaToHDFS")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "kafka1.centos:9092,kafka2.centos:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "hdfs-writer",
"auto.offset.reset" -> "latest"
)
val topics = Array("order-data")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => record.value()).saveAsTextFile("hdfs://namenode:8020/user/hadoop/order-data")
ssc.start()
ssc.awaitTermination()
④ 啟動Spark Streaming程序,驗證HDFS中是否有數據寫入。Kafka與HBase結合,實現實時數據寫入HBase,支持低延遲隨機讀寫。
hbase-site.xml),設置Zookeeper地址和HDFS路徑:<property>
<name>hbase.rootdir</name>
<value>hdfs://namenode:8020/hbase</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>zookeeper1.centos:2181,zookeeper2.centos:2181</value>
</property>
③ 編寫Kafka生產者程序,發送數據到Topic(如user-data);// 初始化HBase連接
Configuration config = HBaseConfiguration.create();
try (Connection connection = ConnectionFactory.createConnection(config);
Table table = connection.getTable(TableName.valueOf("user_table"))) {
// 消費Kafka數據
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
consumer.subscribe(Arrays.asList("user-data"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 解析數據并寫入HBase
Put put = new Put(Bytes.toBytes(record.key()));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("info"), Bytes.toBytes(record.value()));
table.put(put);
}
}
}
⑤ 啟動HBase和Kafka服務,運行生產者和消費者程序,驗證HBase中是否有數據寫入。Kafka在CentOS上的穩定運行需要完善的監控,常見工具包括Kafka Exporter+Prometheus+Grafana、Kafka Manager、Burrow等。
./kafka_exporter --kafka.server=kafka1.centos:9092 --web.listen-address=:9308
② 配置Prometheus(采集Kafka指標):修改prometheus.yml,添加Job:scrape_configs:
- job_name: 'kafka'
static_configs:
- targets: ['kafka1.centos:9308']
③ 配置Grafana(可視化指標):添加Prometheus數據源,導入Kafka儀表盤(如ID:3955);kafka_server_brokertopicmetrics_messages_in_total)、延遲(kafka_consumer_fetch_manager_metrics_records_lag)、分區狀態(kafka_controller_kafkacontroller_offline_partitions_count)等。Kafka作為數據源,Spark Streaming作為流處理引擎,實現實時數據處理(如實時統計、ETL)。
createDirectStream方法讀取Kafka Topic數據,進行處理后寫入HDFS、數據庫或Kafka。spark-submit --class com.example.KafkaToHDFS --master yarn --deploy-mode cluster kafka-to-hdfs.jar;