溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

springboot + rabbitmq 做智能家居,我也沒想到會這么簡單

發布時間:2020-08-12 16:19:58 來源:ITPUB博客 閱讀:263 作者:程序員內點事 欄目:編程語言

本文收錄在個人博客: www.chengxy-nds.top,共享技術資源,共同進步

前一段有幸參與到一個智能家居項目的開發,由于之前都沒有過這方面的開發經驗,所以對智能硬件的開發模式和技術棧都頗為好奇。

springboot + rabbitmq 做智能家居,我也沒想到會這么簡單智能可燃氣體報警器

產品是一款可燃氣體報警器,如果家中燃氣泄露濃度到達一定閾值,報警器檢測到并上傳氣體濃度值給后臺,后臺以電話、短信、微信等方式,提醒用戶家中可能有氣體泄漏。

用戶還可能向報警器發一些關閉報警、調整音量的指令等。整體功能還是比較簡單的,大致的邏輯如下圖所示: springboot + rabbitmq 做智能家居,我也沒想到會這么簡單

但當我真正的參與其中開發時,其實有一點小小的失望,因為在整個研發過程中,并沒用到什么新的技術,還是常規的幾種中間件,只不過換個用法而已。

技術選型用 rabbitmq 來做核心的組件,主要考慮到運維成本低,組內成員使用的熟練度比較高。


下面和小伙伴分享一下如何用 springboot + rabbitmq 搭建物聯網( IOT)平臺,其實智能硬件也沒想象的那么高不可攀!

很多小伙伴可能有點懵? rabbitmq 不是消息隊列嗎? 怎么又能做智能硬件了?

其實 rabbitmq有兩種協議,我們平時接觸的消息隊列是用的 AMQP協議,而用在智能硬件中的是 MQTT協議。

MQTT 全稱(Message Queue Telemetry Transport):一種基于發布/訂閱( publish/ subscribe)模式的 輕量級通訊協議,通過訂閱相應的主題來獲取消息,是物聯網( Internet of Thing)中的一個標準傳輸協議。

該協議將消息的發布者( publisher)與訂閱者( subscriber)進行分離,因此可以在不可靠的網絡環境中,為遠程連接的設備提供可靠的消息服務,使用方式與傳統的MQ有點類似。 springboot + rabbitmq 做智能家居,我也沒想到會這么簡單

TCP協議位于傳輸層, MQTT 協議位于應用層, MQTT 協議構建于 TCP/IP協議上,也就是說只要支持 TCP/IP協議棧的地方,都可以使用 MQTT協議。

MQTT協議為什么在物聯網(IOT)中如此受偏愛?而不是其它協議,比如我們更為熟悉的 HTTP協議呢?

  • 首先 HTTP協議它是一種同步協議,客戶端請求后需要等待服務器的響應。而在物聯網(IOT)環境中,設備會很受制于環境的影響,比如帶寬低、網絡延遲高、網絡通信不穩定等,顯然異步消息協議更為適合 IOT應用程序。

  • HTTP是單向的,如果要獲取消息客戶端必須發起連接,而在物聯網(IOT)應用程序中,設備或傳感器往往都是客戶端,這意味著它們無法被動地接收來自網絡的命令。

  • 通常需要將一條命令或者消息,發送到網絡上的所有設備上。 HTTP要實現這樣的功能不但很困難,而且成本極高。

前邊說過 MQTT是一種輕量級的協議,它只專注于發消息, 所以此協議的結構也非常簡單。

MQTT協議中,一個 MQTT數據包由: 固定頭(Fixed header)、 可變頭(Variable header)、 消息體(payload)三部分構成。

  • 固定頭(Fixed header),所有數據包中都有固定頭,包含數據包類型及數據包的分組標識。
  • 可變頭(Variable header),部分數據包類型中有可變頭。
  • 內容消息體(Payload),存在于部分數據包類,是客戶端收到的具體消息內容。

springboot + rabbitmq 做智能家居,我也沒想到會這么簡單 1、固定頭

固定頭部,使用兩個字節,共16位: springboot + rabbitmq 做智能家居,我也沒想到會這么簡單(4-7)位表示消息類型,使用4位二進制表示,可代表如下的16種消息類型,不過 0 和 15位置屬于保留待用,所以共14種消息事件類型。 springboot + rabbitmq 做智能家居,我也沒想到會這么簡單 DUP Flag(重試標識)

DUP Flag:保證消息可靠傳輸,消息是否已送達的標識。默認為0,只占用一個字節,表示第一次發送,當值為1時,表示當前消息先前已經被傳送過。

QoS Level(消息質量等級)

QoS Level:消息的質量等級,后邊會詳細介紹

RETAIN(持久化)

  • 值為 1:表示發送的消息需要一直持久保存,而且不受服務器重啟影響,不但要發送給當前的訂閱者,且以后新加入的客戶端訂閱了此 Topic,訂閱者也會馬上得到推送。 注意:新加入的訂閱者,只會取出最新的一個 RETAIN flag = 1的消息推送。

  • 值為 0:僅為當前訂閱者推送此消息。

Remaining Length(剩余長度)

在當前消息中剩余的 byte(字節)數,包含可變頭部和消息體payload。

2、可變頭

固定頭部僅定義了消息類型和一些標志位,一些消息的元數據需要放入可變頭部中??勺冾^部內容字節長度 + 消息體payload = 剩余長度。

可變頭部居于固定頭部和payload中間,包含了協議名稱,版本號,連接標志,用戶授權,心跳時間等內容。

可變頭存在于這些類型的消息:PUBLISH (QoS > 0)、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK。

3、消息體payload

消息體payload只存在于 CONNECT、 PUBLISH、 SUBSCRIBE、 SUBACK、 UNSUBSCRIBE這幾種類型的消息:

  • CONNECT:包含客戶端的 ClientId、訂閱的 Topic、 Message以及 用戶名密碼。
  • PUBLISH:向對應主題發送消息。
  • SUBSCRIBE:要訂閱的主題以及 QoS。
  • SUBACK:服務器對于 SUBSCRIBE所申請的主題及 QoS進行確認和回復。
  • UNSUBSCRIBE:取消要訂閱的主題。

消息質量(Quality of Service),即消息的發送質量,發布者( publisher)和訂閱者( subscriber)都可以指定 qos等級,有 QoS 0、 QoS 1、 QoS 2三個等級。

下邊分別說明一下這三個等級的區別。

1、Qos 0At most once(至多一次),只發送一次消息,不保證消息是否成功送達,沒有確認機制,消息可能會丟失或重復。

springboot + rabbitmq 做智能家居,我也沒想到會這么簡單 2、Qos 1At least once(至少一次),相對于 QoS 0而言 Qos 1增加了 ack確認機制,發送者( publisher)推送消息到MQTT代理( broker)時,兩者自身都會先持久化消息,只有當 publisher 或者 Broker分別收到 PUBACK確認時,才會刪除自身持久化的消息,否則就會重發。

但有個問題,盡管我們可以通過確認來保證一定收到客戶端 或 服務器的 message,可我們卻不能保證僅收到一次 message,也就是當客戶端 publisher沒收到 Brokerpuback或者 Broker沒有收到 subscriberpuback,那么就會一直重發。

publisher -> broker 大致流程:

  1. publisher store msg -> publish ->broker (傳遞message)
  2. broker -> puback -> publisher delete msg (確認傳遞成功)

springboot + rabbitmq 做智能家居,我也沒想到會這么簡單 3、Qos 2Exactly once(只有一次),相對于 QoS 1, QoS 2升級實現了僅接受一次 message, publisherbroker 同樣對消息進行持久化,其中 publisher 緩存了 message和 對應的 msgID,而 broker 緩存了 msgID,可以保證消息不重復,由于又增加了一個 confirm 機制,整個流程變得復雜很多。

publisher -> broker 大致流程:

  1. publisher store msg -> publish ->broker -> broker store
  2. msgID(傳遞message) broker -> puberc (確認傳遞成功)
  3. publisher -> pubrel ->broker delete msgID (告訴broker刪除msgID)
  4. broker -> pubcomp -> publisher delete msg (告訴publisher刪除msg)
springboot + rabbitmq 做智能家居,我也沒想到會這么簡單圖片源于網絡,如有侵權聯系刪除

LWT 全稱為 Last Will and Testament,其實遺囑是一個由客戶端預先定義好的主題和對應消息,附加在 CONNECT的數據包中,包括 遺愿主題、 遺愿 QoS、 遺愿消息等。

當MQTT代理 Broker 檢測到有客戶端 client非正常斷開連接時,再由服務器主動發布此消息,然后相關的訂閱者會收到消息。

舉個栗子:聊天室中所有人都訂閱一個叫 talk的主題 ,但小富由于網絡抖動突然斷開了鏈接,這時聊天室中所有訂閱主題 talk的客戶端都會收到一個 “ 小富離開聊天室” 的遺愿消息。

遺囑的相關參數:

  • Will Flag:是否使用 LWT,1 開啟
  • Will Topic:遺愿主題名,不可使用通配符
  • Will Qos:發布遺愿消息時使用的 QoS
  • Will Retain:遺愿消息的 Retain 標識
  • Will Message:遺愿消息內容

那客戶端 Client 有哪些場景是非正常斷開連接呢?

  • Broker 檢測到底層的 I/O 異常;
  • 客戶端 未能在心跳 Keep Alive 的間隔內和 Broker 進行消息交互;
  • 客戶端 在關閉底層 TCP 連接前沒有發送 DISCONNECT 數據包;
  • 客戶端 發送錯誤格式的數據包到 Broker,導致關閉和客戶端的連接等。

注意:當客戶端通過發布 DISCONNECT 數據包斷開連接時,屬于正常斷開連接,并不會觸發 LWT 的機制,與此同時 Broker 還會丟棄掉當前客戶端在連接時指定的相關 LWT 參數。

MQTT協議廣泛應用于物聯網、移動互聯網、智能硬件、車聯網、電力能源等領域。使用的場景也是非常非常多,下邊列舉一些:

  • 物聯網M2M通信,物聯網大數據采集
  • Android消息推送,WEB消息推送
  • 移動即時消息,例如Facebook Messenger
  • 智能硬件、智能家具、智能電器
  • 車聯網通信,電動車站樁采集
  • 智慧城市、遠程醫療、遠程教育
  • 電力、石油與能源等行業市場

具體 rabbitmq 的環境搭建就不贅述了,網上教程比較多,有條件的用服務器,沒條件的像我搞個 Windows版的也很快樂嘛。

springboot + rabbitmq 做智能家居,我也沒想到會這么簡單在這里插入圖片描述

我們先開啟 rabbitmqmqtt協議,因為默認安裝下是關閉的,命令如下:


rabbitmq-plugins enable rabbitmq_mqtt

上一步中安裝 rabbitmq環境并開啟 mqtt協議后,實際上 mqtt 消息代理服務就搭建好了,接下來要做的就是實現客戶端消息的推送和訂閱。

這里使用 spring-integration-mqtt、 org.eclipse.paho.client.mqttv3兩個工具包實現。


<!--mqtt依賴包-->

< dependency>
     < groupId>org.springframework.integration </ groupId>
     < artifactId>spring-integration-mqtt </ artifactId>
</ dependency>

< dependency>
     < groupId>org.eclipse.paho </ groupId>
        < artifactId>org.eclipse.paho.client.mqttv3 </ artifactId>
     < version>1.2.0 </ version>
</ dependency>

消息的發送比較簡單,主要是應用到 @ServiceActivator注解,需要注意 messageHandler.setAsync屬性,如果設置成 false,關閉異步模式發送消息時可能會阻塞。


@Configuration

public  class  IotMqttProducerConfig {

    @Autowired
    private MqttConfig mqttConfig;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory =  new DefaultMqttPahoClientFactory();
        factory.setServerURIs(mqttConfig.getServers());
         return factory;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
         return  new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel =  "iotMqttInputChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =  new MqttPahoMessageHandler(mqttConfig.getServerClientId(), mqttClientFactory());
        messageHandler.setAsync( false);
        messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
         return messageHandler;
    }
}

MQTT 對外提供發送消息的 API時,需要使用 @MessagingGateway 注解,去提供一個消息網關代理,參數 defaultRequestChannel 指定發送消息綁定的 channel。

可以實現三種 API接口, payload 為發送的消息, topic 發送消息的主題, qos 消息質量。


@MessagingGateway(defaultRequestChannel = 
"iotMqttInputChannel")

public interface IotMqttGateway {

     // 向默認的 topic 發送消息
     void sendMessage2Mqtt( String payload);
     // 向指定的 topic 發送消息
     void sendMessage2Mqtt( String payload,@Header(MqttHeaders.TOPIC)  String topic);
     // 向指定的 topic 發送消息,并指定服務質量參數
     void sendMessage2Mqtt(@Header(MqttHeaders.TOPIC)  String topic, @Header(MqttHeaders.QOS) int qos,  String payload);
}

消息訂閱和我們平時用的MQ消息監聽實現思路基本相似, @ServiceActivator注解表明當前方法用于處理 MQTT消息, inputChannel 參數指定了用于接收消息的 channel。



/**

 * @Author: xiaofu
 * @Description: 消息訂閱配置
 * @date 2020/6/8 18:24
 */

@Configuration
public  class  IotMqttSubscriberConfig {

    @Autowired
    private MqttConfig mqttConfig;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory =  new DefaultMqttPahoClientFactory();
        factory.setServerURIs(mqttConfig.getServers());
         return factory;
    }

    @Bean
    public MessageChannel iotMqttInputChannel() {
         return  new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =  new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory(), mqttConfig.getDefaultTopic());
        adapter.setCompletionTimeout( 5000);
        adapter.setConverter( new DefaultPahoMessageConverter());
        adapter.setQos( 1);
        adapter.setOutputChannel(iotMqttInputChannel());
         return adapter;
    }

     /**
     * @author xiaofu
     * @description 消息訂閱
     * @date 2020/6/8 18:20
     */

    @Bean
    @ServiceActivator(inputChannel =  "iotMqttInputChannel")
    public MessageHandler handlerTest() {

         return message -> {
             try {
                 String string = message.getPayload().toString();
                System.out.println( "接收到消息:" + string);
            }  catch (MessagingException ex) {
                 //logger.info(ex.getMessage());
            }
        };
    }
}

額~ 由于本渣渣對硬件一竅不通,為了模擬硬件的發送消息,只能借助一下工具,其實硬件端實現 MQTT協議,跟我們前邊的基本沒什么區別,只不過換種語言嵌入到硬件中而已。

這里選的測試工具為 mqttbox,下載地址: http://workswithweb.com/mqttbox.html

我們用先用 mqttbox模擬向主題 mqtt_test_topic發送消息,看后臺是否能成功接收到。

springboot + rabbitmq 做智能家居,我也沒想到會這么簡單看到后臺成功拿到了向主題 mqtt_test_topic發送的消息。 springboot + rabbitmq 做智能家居,我也沒想到會這么簡單

mqttbox模擬訂閱主題 mqtt_test_topic,在后臺向主題 mqtt_test_topic發送一條消息,這里我簡單的寫了個 controller調用API發送消息。

http://127.0.0.1:8080/fun/testMqtt?topic=mqtt_test_topic&message=我是后臺向主題 mqtt_test_topic 發送的消息 springboot + rabbitmq 做智能家居,我也沒想到會這么簡單我們看 mqttbox的訂閱消息,已經成功的接收到了后臺的消息,到此我們的 MQTT通信環境就算搭建成功了。如果把 mqttbox工具換成具體硬件設備,整個流程就是我們常說的智能家居了,其實真的沒那么難。 springboot + rabbitmq 做智能家居,我也沒想到會這么簡單

在我們實際的生產環境中遇到過的問題,這里分享一下讓大家少踩坑。

在客戶端 connect連接的時,會有一個 clientId 參數,需要每個客戶端都保持唯一的。但我們在開發測試階段 clientId直接在代碼中寫死了,而且服務都是單實例部署,并沒有暴露出什么問題。


MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory(), mqttConfig.getDefaultTopic());

然而在生產環境內側的時候,由于服務是多實例集群部署,結果出現了下邊的奇怪問題。同一時間內只能有一個客戶端能拿到消息,其他客戶端不但不能消費消息,而且還在不斷的掉線重連: Lost connection: 已斷開連接; retrying...。

springboot + rabbitmq 做智能家居,我也沒想到會這么簡單這就是由于 clientId相同導致客戶端間相互競爭消費,最后將 clientId獲取方式換成從發號器中拿,問題就好了,所以這個地方是需要特別注意的。

平時程序在開發環境沒問題,可偏偏到了生產環境就一大堆問題,很多都是因為服務部署方式不同導致的。所以多學習分布式還是很有必要的。

MQTT它只是一種協議,支持 MQTT協議的消息中間件產品非常多,下邊的也只是其中的一部分

  • Mosquitto
  • Eclipse Paho
  • RabbitMQ
  • Apache ActiveMQ
  • HiveMQ
  • JoramMQ
  • ThingMQ
  • VerneMQ
  • Apache Apollo
  • emqttd Xively
  • IBM Websphere .....

我也是第一次做和硬件相關的項目,之前聽到智能家居都會覺得好高大上,但實際上手開發后發現,技術嘛萬變不離其宗,也只是換種用法而已。

雙手奉上項目 demo 的 github地址 :https://github.com/chengxy-nds/springboot-rabbitmq-mqtt.git

感興趣的小伙伴可以下載跑一跑,實現起來非常的簡單。

整理了幾百本各類技術電子書,送給小伙伴們,公號輸入【666】自行領取。和一些小伙伴們建了一個技術交流群,一起探討技術、分享技術資料,旨在共同學習進步,如果感興趣就加入我們吧!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女