RabbitMQ Linux客戶端使用指南
在使用Linux客戶端前,需先確保Linux系統已安裝并運行RabbitMQ服務端。以下是通用安裝步驟(以Ubuntu為例):
sudo apt update
sudo apt install -y erlang
sudo apt install -y rabbitmq-server
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server
sudo systemctl status rabbitmq-server # 查看狀態(顯示"active (running)"即為正常)
以上步驟確保Linux系統具備RabbitMQ服務端能力,可作為客戶端連接自身服務,或供其他機器的客戶端連接。
Linux環境下,RabbitMQ客戶端主要通過命令行工具(如rabbitmqadmin
)或編程語言客戶端庫(如Python的pika
、Java的amqp-client
)實現。以下是常用工具的安裝方法:
rabbitmqadmin
是RabbitMQ官方提供的命令行工具,用于管理隊列、交換機、發送/接收消息等。
# 下載rabbitmqadmin(替換為你的RabbitMQ服務器IP)
wget http://<服務器IP>:15672/cli/rabbitmqadmin
# 添加執行權限
chmod +x rabbitmqadmin
# 移動到系統路徑(可選)
sudo mv rabbitmqadmin /usr/local/bin/
使用前需配置認證信息(默認用戶guest
,密碼guest
,僅限本地訪問):
./rabbitmqadmin -u admin -p admin123 list queues # 替換為你的用戶名密碼
以Python為例,使用pika
庫實現消息收發:
pip install pika # 安裝pika庫
Java項目需在pom.xml
中添加依賴:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version> <!-- 使用最新版本 -->
</dependency>
使用rabbitmqadmin
向指定隊列發送消息:
./rabbitmqadmin -u admin -p admin123 publish exchange= amqp.default routing_key=test_queue payload="Hello, RabbitMQ!"
exchange=
:使用默認交換機(空字符串)。routing_key=test_queue
:目標隊列名稱。payload
:消息內容。使用rabbitmqadmin
監聽隊列并接收消息:
./rabbitmqadmin -u admin -p admin123 get queue=test_queue no_ack=true
no_ack=true
:表示無需手動確認消息(簡化示例,生產環境建議設置為false
)。./rabbitmqadmin -u admin -p admin123 declare queue name=test_queue durable=true
durable=true
:隊列持久化(服務器重啟后仍存在)。./rabbitmqadmin -u admin -p admin123 delete queue name=test_queue
./rabbitmqadmin -u admin -p admin123 list queues
以上命令適用于快速測試,生產環境建議使用編程語言客戶端實現更復雜的邏輯(如消息確認、持久化、錯誤處理)。
以下是Python客戶端完整的“生產者-消費者”模型代碼:
import pika
# 連接RabbitMQ服務器(替換為你的服務器IP、用戶名、密碼)
credentials = pika.PlainCredentials('admin', 'admin123')
parameters = pika.ConnectionParameters('192.168.1.100', 5672, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# 聲明隊列(確保存在)
channel.queue_declare(queue='test_queue', durable=True)
# 發送消息
message = "Hello from Python producer!"
channel.basic_publish(
exchange='',
routing_key='test_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化(1為非持久化,2為持久化)
)
)
print(f" [x] Sent '{message}'")
# 關閉連接
connection.close()
import pika
# 連接RabbitMQ服務器
credentials = pika.PlainCredentials('admin', 'admin123')
parameters = pika.ConnectionParameters('192.168.1.100', 5672, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# 聲明隊列(確保存在)
channel.queue_declare(queue='test_queue', durable=True)
# 定義消息處理回調函數
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
# 手動確認消息(避免消息丟失)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 消費消息(自動應答設置為False)
channel.basic_consume(
queue='test_queue',
on_message_callback=callback,
auto_ack=False # 關鍵:手動確認消息
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() # 進入阻塞監聽狀態
關鍵說明:
delivery_mode=2
(消息持久化)、durable=True
(隊列持久化)確保服務器重啟后消息不丟失。auto_ack=False
配合basic_ack
避免消息未處理完就被刪除(生產環境必用)。try-except
捕獲連接異常、通道異常等。sudo systemctl status rabbitmq-server
。sudo firewall-cmd --list-ports
(確保包含5672
、15672
)。guest
用戶僅限本地訪問,遠程連接需創建新用戶并授權。queue_declare
名稱需相同)。sudo tail -f /var/log/rabbitmq/rabbitmq.log
(定位具體錯誤)。