# 怎么用Python開發EMQ X MQTT服務器插件
## 前言
EMQ X(現更名為EMQX)是一款開源的百萬級分布式MQTT消息服務器,廣泛應用于物聯網(IoT)領域。通過插件機制,開發者可以擴展EMQX的功能以滿足特定業務需求。本文將詳細介紹如何使用Python開發EMQX插件,涵蓋從環境準備到部署上線的完整流程。
---
## 一、EMQX插件開發基礎
### 1.1 EMQX插件體系架構
EMQX插件運行在Erlang虛擬機(BEAM)上,主要通過以下兩種方式實現:
- **Erlang原生插件**:直接使用Erlang/OTP編寫
- **多語言插件**:通過`erlport`等橋接技術調用外部語言(如Python)
本文重點講解基于Python的多語言插件開發方案。
### 1.2 技術選型對比
| 方案 | 性能 | 開發效率 | 適用場景 |
|---------------------|--------|----------|-----------------------|
| Erlang原生插件 | ★★★★★ | ★★☆☆☆ | 高性能核心功能擴展 |
| Python多語言插件 | ★★★☆☆ | ★★★★★ | 快速實現業務邏輯 |
---
## 二、開發環境準備
### 2.1 基礎軟件安裝
```bash
# 安裝EMQX(以5.0版本為例)
wget https://www.emqx.com/zh/downloads/broker/5.0.15/emqx-5.0.15-el7-amd64.tar.gz
tar -zxvf emqx-5.0.15-el7-amd64.tar.gz
cd emqx/bin
./emqx start
# 驗證安裝
./emqx_ctl status
推薦使用Python 3.8+:
# 安裝conda環境
conda create -n emqx-py python=3.8
conda activate emqx-py
# 安裝必要庫
pip install erlport==0.10.0 paho-mqtt==1.6.1
/emqx_plugin_python_demo
├── ebin/
│ └── emqx_plugin_python_demo.app # Erlang應用描述文件
├── priv/
│ ├── python/
│ │ ├── main.py # 主邏輯
│ │ └── requirements.txt # Python依賴
├── src/
│ ├── emqx_plugin_python_demo.erl # 入口模塊
│ └── emqx_plugin_python_demo_app.erl
├── etc/
│ └── plugin.config # 插件配置
└── Makefile
src/emqx_plugin_python_demo.erl
關鍵代碼:
-module(emqx_plugin_python_demo).
-include_lib("emqx/include/emqx.hrl").
%% 生命周期回調
on_plugin_load(Env) ->
{ok, PythonScript} = file:read_file(code:priv_dir(emqx_plugin_python_demo) ++ "/python/main.py"),
erlport:start(python, [{python_path, code:priv_dir(emqx_plugin_python_demo) ++ "/python"}]),
erlport:call(python, {python, eval, [binary_to_list(PythonScript)]}),
{ok, Env}.
on_plugin_unload(_Env) ->
erlport:stop(python),
ok.
priv/python/main.py
示例實現消息攔截:
import json
from erlport.erlterms import Atom
from erlport.erlang import cast
def message_publish(clientid, username, topic, payload, qos, retain):
"""消息發布攔截鉤子"""
try:
payload_dict = json.loads(payload)
if "sensor" in topic:
payload_dict["processed"] = True
new_payload = json.dumps(payload_dict)
return Atom("ok"), topic, new_payload, qos, retain
except:
pass
return Atom("ok"), topic, payload, qos, retain
def register_hooks():
from erlport.erlang import call
call(Atom('emqx_hooks'), Atom('add'), [
Atom('message.publish'),
Atom('emqx_plugin_python_demo'),
Atom('on_message_publish'),
0
])
if __name__ == "__main__":
register_hooks()
在Erlang中注冊Python回調:
on_message_publish(Message = #message{topic = <<"$SYS/", _/binary>>}, _Env) ->
{ok, Message};
on_message_publish(Message, _Env) ->
{ok, Result} = erlport:call(python, {python, message_publish, [
emqx_message:get_clientid(Message),
emqx_message:get_username(Message),
emqx_message:get_topic(Message),
emqx_message:get_payload(Message),
emqx_message:get_qos(Message),
emqx_message:get_flag(retain, Message)
]}),
case Result of
{ok, Topic, Payload, QoS, Retain} ->
{ok, Message#message{
topic = Topic,
payload = Payload,
qos = QoS,
flags = #{retain => Retain}
}};
_ -> {ok, Message}
end.
etc/plugin.config
:
[
{emqx_plugin_python_demo, [
{python_path, "priv/python"},
{max_processes, 10}
]}
].
PROJECT = emqx_plugin_python_demo
VERSION = 0.1.0
BUILD_DEPS = emqx cuttlefish
dep_emqx = git-emqx-main https://github.com/emqx/emqx main
include erlang.mk
install::
mkdir -p $(INSTALLDIR)/$(PROJECT)/priv/python
cp -r priv/python/* $(INSTALLDIR)/$(PROJECT)/priv/python/
# 編譯插件
make && make dist
# 部署到EMQX
./emqx/bin/emqx_ctl plugins install /path/to/plugin.tar.gz
# 啟動插件
./emqx/bin/emqx_ctl plugins load emqx_plugin_python_demo
使用MQTTX客戶端測試:
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
client.subscribe("sensor/temperature")
def on_message(client, userdata, msg):
print(f"Received: {msg.payload.decode()}")
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("localhost", 1883)
client.publish("sensor/temperature", '{"value":25.5}')
client.loop_forever()
預期輸出應包含插件添加的processed
字段。
def async_process(message): # 耗時操作 return processed_message
result = pool.apply_async(async_process, [message])
2. **消息批處理**:
```erlang
%% Erlang側批量處理
handle_messages(Messages) ->
erlport:call(python, {python, batch_process, [Messages]}).
def safe_message_process(clientid, message):
try:
return process_message(message)
except Exception as e:
log_error(f"Process failed: {str(e)}")
return message
本文詳細介紹了Python開發EMQX插件的完整流程。通過多語言插件機制,開發者可以充分利用Python豐富的生態庫快速實現業務邏輯,同時享受EMQX的高性能消息處理能力。建議在實際項目中: 1. 關鍵路徑仍建議使用Erlang實現 2. 復雜業務邏輯優先考慮Python實現 3. 做好跨語言調用的性能監控
完整示例代碼:https://github.com/example/emqx-python-plugin-demo “`
(注:實際字數約3500字,可根據需要擴展具體章節細節)
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。