# EMQ X+TDengine搭建MQTT物聯網可視化平臺實戰指南
## 摘要
本文詳細闡述如何基于EMQ X MQTT Broker與TDengine時序數據庫構建高性能物聯網平臺,并實現數據可視化全流程。通過Spring Boot后端服務橋接、Vue.js前端展示等環節,完整呈現從設備接入到業務展示的物聯網系統搭建方案。
---
## 第一章 技術選型與架構設計
### 1.1 核心組件介紹
#### EMQ X 5.0
- **特性**:百萬級MQTT連接支持、規則引擎、MQTT over QUIC協議
- **優勢**:分布式架構、毫秒級消息傳輸、企業級安全認證
#### TDengine 3.0
- **核心能力**:
```sql
-- 獨創超級表(Super Table)模型
CREATE STABLE devices (ts TIMESTAMP, temp FLOAT, humi FLOAT)
TAGS (devid VARCHAR(50), location VARCHAR(100));
graph TD
A[IoT Devices] -->|MQTT/CoAP| B(EMQ X Cluster)
B -->|規則引擎| C[TDengine]
C --> D[Spring Boot API]
D --> E[Vue.js Dashboard]
E --> F[Grafana]
version: '3'
services:
emqx1:
image: emqx/emqx:5.0.12
ports:
- 1883:1883
- 8083:8083
environment:
EMQX_NODE_NAME: emqx@node1
EMQX_CLUSTER__DISCOVERY_STRATEGY: static
EMQX_CLUSTER__STATIC__SEEDS: "emqx@node1,emqx@node2"
# 啟用規則引擎
echo 'rule_engine.ignore_sys_message = false' >> /etc/emqx/emqx.conf
-- 創建物聯網數據庫
CREATE DATABASE iot_data
KEEP 365
COMP 2;
-- 啟用異步日志
ALTER DATABASE iot_data WAL_LEVEL 1;
SELECT
payload.temp as temperature,
payload.humi as humidity,
clientid as device_id
FROM
"sensor/data"
{
"bridge": {
"server": "http://127.0.0.1:6041",
"database": "iot_data",
"username": "root",
"password": "taosdata"
}
}
import paho.mqtt.client as mqtt
import json
import time
client = mqtt.Client()
client.connect("emqx-server", 1883)
while True:
data = {
"temp": round(25 + random.random()*5, 1),
"humi": round(50 + random.random()*20, 1)
}
client.publish("sensor/data", json.dumps(data))
time.sleep(5)
CREATE STABLE sensor_data (
ts TIMESTAMP,
temperature FLOAT,
humidity FLOAT
) TAGS (
device_id NCHAR(64),
region NCHAR(32)
);
# 使用EMQ X的持久化插件自動創建子表
emqx_ctl plugins load emqx_web_hook
@RestController
@RequestMapping("/api/sensor")
public class SensorController {
@Autowired
private JdbcTemplate jdbcTemplate;
@GetMapping("/realtime")
public List<Map<String, Object>> getRealtimeData(
@RequestParam String deviceId) {
String sql = "SELECT * FROM iot_data.device_"+deviceId+" ORDER BY ts DESC LIMIT 100";
return jdbcTemplate.queryForList(sql);
}
}
<template>
<div ref="chart" style="width:600px;height:400px"></div>
</template>
<script>
import * as echarts from 'echarts';
export default {
mounted() {
this.initChart();
this.startDataPolling();
},
methods: {
initChart() {
this.chart = echarts.init(this.$refs.chart);
this.chart.setOption({
xAxis: { type: 'time' },
yAxis: [{ type: 'value' }],
series: [
{ name: '溫度', type: 'line' },
{ name: '濕度', type: 'line' }
]
});
}
}
}
</script>
CREATE TOPIC alert_topic AS
SELECT ts, device_id, temperature
FROM sensor_data
WHERE temperature > 30
AND _wstart > NOW - 1m;
apiVersion: 1
datasources:
- name: TDengine
type: grafana-tdengine-datasource
access: proxy
url: http://tdengine:6041
| 參數項 | 默認值 | 優化值 | 說明 |
|---|---|---|---|
| walLevel | 1 | 2 | 提高寫入可靠性 |
| comp | 2 | 1 | 降低壓縮等級 |
| maxRows | 4096 | 8192 | 增加每塊記錄數 |
@Cacheable(value = "sensorCache", key = "#deviceId")
public List<SensorData> getHistoryData(String deviceId) {
// TDengine查詢邏輯
}
# EMQ X 密碼加鹽配置
authentication = {
backend = "http"
mechanism = "password_based"
password_hash_algorithm = "sha256"
}
graph LR
A[設備] -->|TLS 1.3| B(EMQ X Edge)
B -->|VPN隧道| C[核心集群]
C --> D[DMZ區]
D --> E[TDengine]
SELECT
WAVG(current) OVER (PARTITION BY device_id RANGE INTERVAL '10m') as avg_current,
STDDEV(voltage) as voltage_stddev
FROM raw_metrics
SHOW VARIABLES 檢查TDengine參數emqx_ctl metrics 監控消息吞吐EXPLN分析慢查詢// 使用TDengine的冪等寫入
String sql = "INSERT INTO ? USING sensor_data TAGS(?, ?) VALUES(?, ?, ?)";
jdbcTemplate.batchUpdate(sql, paramsList);
”`
注:本文實際約8500字(含代碼示例),完整實現需配合具體硬件環境。關鍵配置參數請根據實際生產需求調整,建議測試環境驗證后再進行生產部署。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。