溫馨提示×

Kafka消息隊列Linux如何實現

小樊
53
2025-09-28 02:33:54
欄目: 智能運維

一、安裝Java環境(Kafka前置依賴)
Kafka基于Java開發,需先安裝Java運行環境(JRE或JDK)。推薦使用OpenJDK 11及以上版本(兼容性更好):

  • Ubuntu/Debian系統:執行sudo apt update && sudo apt install openjdk-11-jdk -y;
  • CentOS/RHEL系統:執行sudo yum install java-11-openjdk -y。
    安裝完成后,通過java -version驗證是否成功(需顯示Java版本信息)。

二、下載并解壓Kafka
從Apache Kafka官網下載最新穩定版本(如3.6.1),使用wget命令獲取安裝包,再用tar解壓到指定目錄(如/opt/kafka):

wget https://downloads.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz
tar -xzf kafka_2.13-3.6.1.tgz
mv kafka_2.13-3.6.1 /opt/kafka  # 可選:移動到目標目錄

建議將Kafka目錄加入系統環境變量(編輯/etc/profile,添加export KAFKA_HOME=/opt/kafkaexport PATH=$PATH:$KAFKA_HOME/bin,然后執行source /etc/profile),方便后續命令調用。

三、配置Kafka(修改server.properties)
進入Kafka配置目錄($KAFKA_HOME/config),編輯server.properties文件,調整以下關鍵參數:

  • broker.id:集群中Broker的唯一標識(如broker.id=0);
  • listeners:Broker監聽的地址和端口(如listeners=PLAINTEXT://your_server_ip:9092,需替換為實際IP);
  • log.dirs:日志存儲目錄(如log.dirs=/data/kafka/logs,需提前創建目錄并賦予權限mkdir -p /data/kafka/logs && chown -R $USER:$USER /data/kafka/logs);
  • zookeeper.connect:ZooKeeper集群地址(如zookeeper.connect=localhost:2181,單節點ZooKeeper用此配置;集群需列出所有節點,如host1:2181,host2:2181,host3:2181)。

四、啟動ZooKeeper(Kafka依賴的協調服務)
Kafka通過ZooKeeper管理集群元數據(如Topic、分區信息),需先啟動ZooKeeper:

$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties

默認情況下,ZooKeeper會在2181端口啟動??赏ㄟ^netstat -an | grep 2181驗證是否運行正常。

五、啟動Kafka服務器
在另一個終端窗口中,啟動Kafka服務(后臺運行模式,避免占用當前終端):

$KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

啟動后,可通過netstat -an | grep 9092驗證Kafka是否在9092端口監聽(客戶端通信端口)。

六、創建Topic(消息存儲的主題)
使用Kafka命令行工具創建Topic,指定Topic名稱、分區數(partitions,決定并行處理能力)和副本因子(replication-factor,數據冗余備份數量,單節點集群設為1):

$KAFKA_HOME/bin/kafka-topics.sh --create \
  --topic my_topic \
  --bootstrap-server localhost:9092 \
  --partitions 3 \
  --replication-factor 1

創建成功后,可通過$KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server localhost:9092查看所有Topic。

七、測試Kafka功能(發送/接收消息)

  • 發送消息:啟動控制臺生產者,向指定Topic發送消息(輸入消息后按回車鍵):
    $KAFKA_HOME/bin/kafka-console-producer.sh --topic my_topic --bootstrap-server localhost:9092
    
  • 接收消息:啟動控制臺消費者,從指定Topic讀取消息(--from-beginning表示從最早的消息開始讀?。?pre class="hljs">$KAFKA_HOME/bin/kafka-console-consumer.sh --topic my_topic --from-beginning --bootstrap-server localhost:9092

在生產者終端輸入消息(如Hello Kafka),消費者終端會同步顯示該消息,驗證Kafka消息收發功能正常。

八、停止Kafka服務(可選)
測試完成后,可通過以下命令停止服務:

  • 停止消費者:在消費者終端按Ctrl+C;
  • 停止生產者:在生產者終端按Ctrl+C;
  • 停止Kafka服務器:執行$KAFKA_HOME/bin/kafka-server-stop.sh;
  • 停止ZooKeeper:執行$KAFKA_HOME/bin/zookeeper-server-stop.sh。

注意事項

  • 生產環境中,需配置多節點ZooKeeper集群(提高可靠性)、調整Kafka內存參數(如log.retention.hours控制日志保留時間)、啟用數據加密(SSL/TLS)和認證機制(SASL)以提升安全性;
  • 若不想手動安裝ZooKeeper,可使用Kafka KRaft模式(無需ZooKeeper,自3.0版本支持),簡化部署流程。

0
亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女