溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spring Boot 二三事:WEB 應用消息推送的那點事

發布時間:2020-07-17 16:43:10 來源:網絡 閱讀:894 作者:levenchen 欄目:編程語言

閱讀對象:本文適合SpringBoot 初學者及對SpringBoot感興趣的童鞋閱讀。

背景介紹:在企業級 WEB 應用開發中,為了更好的用戶體驗&提升響應速度,往往會將一些耗時費力的請求 (Excel導入or導出,復雜計算, etc.) 進行異步化處理。 由此帶來的一個重要的問題是如何通知用戶任務狀態,常見的方法大致分為2類4種:

  • HTTP Polling client pull
  • HTTP Long-Polling client pull
  • Server-Sent Events (SSE) server push
  • WebSocket server push
1. Polling 短輪詢

是一種非常簡單的實現方式。就是client通過定時任務不斷得重復請求服務器,從而獲取新消息,而server按時間順序提供自上次請求以后發生的單個或多個消息。

Spring Boot 二三事:WEB 應用消息推送的那點事

短輪詢的優點非常明顯,就是實現簡單。當兩個方向上的數據都非常少,并且請求間隔不是非常密集時,這種方法就會非常有效。例如,新聞評論信息可以每半分鐘更新一次,這對用戶來說是可以的。

它得缺點也是非常明顯,一旦我們對數據實時性要求非常高時,為了保證消息的及時送達,請求間隔必須縮短,在這種情況下,會加劇服務器資源的浪費,降低服務的可用性。另一個缺點就是在消息的數量較少時,將會有大量的 request做無用功,進而也導致服務器資源的浪費。

2. Long-Polling 長輪詢

長輪詢的官方定義是:

The server attempts to "hold open" (notimmediately reply to) each HTTP request, responding only when there are events to deliver. In this way, there is always a pending request to which the server can reply for the purpose of delivering events as they occur, thereby minimizing the latency in message delivery.

如果與Polling的方式相比,會發現Long-Polling的優點是通過hold open HTTP request 從而減少了無用的請求。

大致步驟為:

  1. client向server請求并等待響應。
  2. 服務端將請求阻塞,并不斷檢查是否有新消息。如果在這個期間有新消息產生時就立即返回。否則一直等待至請求超時。
  3. 當client 獲取到新消息請求超時,進行消息處理并發起下一次請求。

Spring Boot 二三事:WEB 應用消息推送的那點事

Long-Polling的缺點之一也是服務器資源的浪費,因為它和Polling的一樣都屬于被動獲取,都需要不斷的向服務器請求。在并發很高的情況下,對服務器性能是個嚴峻的考驗。

Note:因為以上2兩種方式的實現都比較簡單,所以我們這里就不做代碼演示了。接下來我們重點介紹一下Server-Sent EventsWebSocket。

3. Demo概要

下面我們將通過一個下載文件的案例進行演示SSEWebSocket的消息推送,在這之前,我們先簡單說一下我們項目的結構,整個項目基于SpringBoot 構建。

首先我們定義一個供前端訪問的APIDownloadController

@RestController
public class DownloadController {
    private static final Logger log = getLogger(DownloadController.class);
    @Autowired
    private MockDownloadComponent downloadComponent;  

    @GetMapping("/api/download/{type}")
    public String download(@PathVariable String type, HttpServletRequest request) {  // (A)
        HttpSession session = request.getSession();
        String sessionid = session.getId();
        log.info("sessionid=[{}]", sessionid);
        downloadComponent.mockDownload(type, sessionid);  // (B)
        return "success"; // (C)
    }
}
  • (A) type參數用于區分使用哪種推送方式,這里為sse,ws,stomp這三種類型。
  • (B) MockDownloadComponent用于異步模擬下載文件的過程。
  • (C) 因為下載過程為異步化,所以該方法不會被阻塞并立即向客戶端返回success,用于表明下載開始。

DownloadController中我們調用MockDownloadComponentmockDownload()的方法進行模擬真正的下載邏輯。

@Component
public class MockDownloadComponent {
    private static final Logger log = LoggerFactory.getLogger(DownloadController.class);

    @Async // (A)
    public void mockDownload(String type, String sessionid) {
        for (int i = 0; i < 100; i++) {
            try {
                TimeUnit.MILLISECONDS.sleep(100); // (B)

                int percent = i + 1;
                String content = String.format("{\"username\":\"%s\",\"percent\":%d}", sessionid, percent); // (C)
                log.info("username={}'s file has been finished [{}]% ", sessionid, percent);

                switch (type) { // (D)
                    case "sse":
                        SseNotificationController.usesSsePush(sessionid, content);
                        break;
                    case "ws":
                        WebSocketNotificationHandler.usesWSPush(sessionid, content);
                        break;
                    case "stomp":
                        this.usesStompPush(sessionid, content);
                        break;
                    default:
                        throw new UnsupportedOperationException("");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
  • (A) 我們使用@Async讓使其異步化。
  • (B) 模擬下載耗時。
  • (C) 消息的格式為{"username":"abc","percent":1}。
  • (D) 根據不同的type選擇消息推送方式。
4. Server-Sent Events

SSE 是W3C定義的一組API規范,這使服務器能夠通過HTTP將數據推送到Web頁面,它具有如下特點:

  • 單向半雙工:只能由server向client推送消息
  • 基于http:數據被編碼為“text/event-stream”內容并使用HTTP流機制進行傳輸
  • 數據格式無限制:消息只是遵循規范定義的一組key-value格式&UTF-8編碼的文本數據流,我們可以在消息payload中可以使用JSON或者XML或自定義數據格式。
  • http 長連接: 消息的實際傳遞是通過一個長期存在的HTTP連接完成的,消耗資源更少
  • 簡單易用的API

Spring Boot 二三事:WEB 應用消息推送的那點事

瀏覽器支持情況:
Spring Boot 二三事:WEB 應用消息推送的那點事

Note:IE 瀏覽器可通過第三方JS庫進行支持SSE

4.1 SpringBoot 中使用SSE

從Spring 4.2開始支持SSE規范,我們只需要在Controller中返回SseEmitter對象即可。

Note:Spring 5 中提供了Spring Webflux 可以更加方便的使用SSE,但是為更貼近我們的實際項目,所以文本僅演示使用Spring MVC SSE。

我們在服務器端定義一個SseNotificationController用于和客戶端處理和保存SSE連接. 其endpoint/api/sse-notification。

@RestController
public class SseNotificationController {

    public static final Map<String, SseEmitter> SSE_HOLDER = new ConcurrentHashMap<>(); // (A)

    @GetMapping("/api/sse-notification")
    public SseEmitter files(HttpServletRequest request) {
        long millis = TimeUnit.SECONDS.toMillis(60);
        SseEmitter sseEmitter = new SseEmitter(millis); // (B)

        HttpSession session = request.getSession();
        String sessionid = session.getId();

        SSE_HOLDER.put(sessionid, sseEmitter); 
        return sseEmitter;
    }

    /**
     * 通過sessionId獲取對應的客戶端進行推送消息
     */
    public static void usesSsePush(String sessionid, String content) {  // (C)
        SseEmitter emitter = SseNotificationController.SSE_HOLDER.get(sessionid);
        if (Objects.nonNull(emitter)) {
            try {
                emitter.send(content);
            } catch (IOException | IllegalStateException e) {
                log.warn("sse send error", e);
                SseNotificationController.SSE_HOLDER.remove(sessionid);
            }
        }
    }

}
  • (A) SSE_HOLDER保存了所有客戶端的SseEmitter,用于后續通知對應客戶端。
  • (B) 根據指定超時時間創建一個SseEmitter對象, 它是SpringMVC提供用于操作SSE的類。
  • (C) usesSsePush()提供根據sessionId向對應客戶端發送消息。發送只需要調用SseEmittersend()方法即可。

至此服務端已經完成,我們使用Vue編寫客戶端Download.html進行測試。核心代碼如下:

     usesSSENotification: function () {
                var tt = this;
                var url = "/api/sse-notification";
                var sseClient = new EventSource(url);  // (A)
                sseClient.onopen = function () {...}; // (B)

                sseClient.onmessage = function (msg) {   // (C)
                    var jsonStr = msg.data;
                    console.log('message', jsonStr);
                    var obj = JSON.parse(jsonStr);
                    var percent = obj.percent;
                    tt.sseMsg += 'SSE 通知您:已下載完成' + percent + "%\r\n";
                    if (percent === 100) {
                        sseClient.close();  // (D)
                    }
                };
                sseClient.onerror = function () {
                    console.log("EventSource failed.");
                };
            }
  • (A) 開啟一個新的 SSE connection 并訪問 /api/sse-notification。
  • (B) 當連接成功時的callback。
  • (C) 當有新消息時的callback。
  • (D) 當下載進度為100%時,關閉連接。

效果演示
Spring Boot 二三事:WEB 應用消息推送的那點事

4. WebSocket

WebSocket 類似于標準的TCP連接,它是IETF(RFC 6455)定義的通過TCP進行實時全雙工通信一種通信方式,這意味這它的功能更強大,常用于如股票報價器,聊天應用。

相比于SSE,它不僅可以雙向通信,而且甚至還能處理音頻/視頻等二進制內容。

Note:使用WebSocket,在高并發情況下,服務器將擁有許多長連接。這對網絡代理層組件及WebSocket服務器都是一個不小的性能挑戰,我們需要考慮其負載均衡方案。同時連接安全等問題也不容忽視。

4.1 Spring WebSocket (低級API)

Spring 4提供了一個新的Spring-WebSocket模塊,用于適應各種WebSocket引擎,它與Java WebSocket API標準(JSR-356)兼容,并且提供了額外的增強功能。

Note: 對于應用程序來說,直接使用WebSocket API會大大增加開發難度,所以Spring為我們提供了 STOMP over WebSocket 更高級別的API使用WebSocket。在本文中將會分別演示通過low level API及higher level API進行演示。

如果想在SpringBoot中使用WebSocket,首先需要引入spring-boot-starter-websocket依賴

     <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

然后就可以配置相關信息,我們先通過low level API進行演示。

首先需要自定義一個WebSocketNotificationHandler用于處理WebSocket 的連接及消息處理。我們只需要實現WebSocketHandler或子類TextWebSocketHandler BinaryWebSocketHandler。

public class WebSocketNotificationHandler extends TextWebSocketHandler {

    private static final Logger log = getLogger(WebSocketNotificationHandler.class);

    public static final Map<String, WebSocketSession> WS_HOLDER= new ConcurrentHashMap<>();  // (A)

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {   // (B)
        String httpSessionId = (String) session.getAttributes().get(HttpSessionHandshakeInterceptor.HTTP_SESSION_ID_ATTR_NAME);
        WS_HOLDER.put(httpSessionId, session);
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        log.info("handleTextMessage={}", message.getPayload()); 
    }

    public static void usesWSPush(String sessionid, String content) {    // (C)
        WebSocketSession wssession = WebSocketNotificationHandler.WS_HOLDER.get(sessionid);
        if (Objects.nonNull(wssession)) {
            TextMessage textMessage = new TextMessage(content);
            try {
                wssession.sendMessage(textMessage);
            } catch (IOException | IllegalStateException e) {
                WebSocketNotificationHandler.SESSIONS.remove(sessionid);
            }
        }
    }
}
  • (A) WS_HOLDER用于保存客戶端的WebSocket Session
  • (B) 重寫afterConnectionEstablished()方法,當連接建立之后,按sessionIdWebSocket Session保存至WS_HOLDER,用于后續向client推送消息。
  • (C) 根據sessionId獲取對應WebSocket Session,并調用WebSocket SessionsendMessage(textMessage)方法向client發送消息。

使用@EnableWebSocket開啟WebSocket,并實現WebSocketConfigurer進行配置。

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {

        WebSocketNotificationHandler notificationHandler = new WebSocketNotificationHandler(); 

        registry.addHandler(notificationHandler, "/ws-notification") // (A)
                .addInterceptors(new HttpSessionHandshakeInterceptor())  // (B)
                .withSockJS();  // (C)
    }
}
  • (A) 將我們自定義的WebSocketNotificationHandler注冊至WebSocketHandlerRegistry.
  • (B) HttpSessionHandshakeInterceptor是一個內置的攔截器,用于傳遞HTTP會話屬性到WebSocket會話。當然你也可以通過HandshakeInterceptor接口實現自己的攔截器。
  • (C) 開啟SockJS的支持,SockJS的目標是讓應用程序使用WebSocket API時,當發現瀏覽器不支持時,無需要更改任何代碼,即可使用非WebSocket替代方案,盡可能的模擬WebSocket。關于SockJS的更多資料,可參考https://github.com/sockjs/sockjs-client

server端至此就基本大功告成,接下來我們來完善一下client端Download.html,其核心方法如下:

usesWSNotification: function () {
                var tt = this;
                var url = "http://localhost:8080/ws-notification";
                var sock = new SockJS(url);   // (A)
                sock.onopen = function () {
                    console.log('open');
                    sock.send('test');
                };

                sock.onmessage = function (msg) {   // (B)
                    var jsonStr = msg.data;

                    console.log('message', jsonStr);

                    var obj = JSON.parse(jsonStr);
                    var percent = obj.percent;
                    tt.wsMsg += 'WS 通知您:已下載完成' + percent + "%\r\n";
                    if (percent === 100) {
                        sock.close();
                    }
                };

                sock.onclose = function () { 
                    console.log('ws  close');
                };
            }
  • (A) 首先需要在項目中引入SockJS Client , 并根據指定URL創建一個SockJS對象。
  • (B) 當有新消息時的callback,我們可以在該方法中處理我們的消息。

效果演示
Spring Boot 二三事:WEB 應用消息推送的那點事

4.2 STOMP over WebSocket (高級API)

WebSocket雖然定義了兩種類型的消息,文本和二進制,但是針對消息的內容沒有定義,為了更方便的處理消息,我們希望Client和Server都需要就某種協議達成一致,以幫助處理消息。那么,有沒有已經造好的輪子呢?答案肯定是有的。這就是STOMP。

" rel="nofollow">STOMP是一種簡單的面向文本的消息傳遞協議,它其實是消息隊列的一種協議, 和AMQP,JMS是平級的。 只不過由于它的簡單性恰巧可以用于定義WS的消息體格式。雖然STOMP是面向文本的協議,但消息的內容也可以是二進制數據。同時STOMP 可已使用任何可靠的雙向流網絡協議,如TCP和WebSocket,目前很多服務端消息隊列都已經支持了STOMP, 比如RabbitMQ, ActiveMQ等。

它結構是一種基于幀的協議,一幀由一個命令,一組可選的Header和一個可選的Body組成。

COMMAND
header1:value1
header2:value2

Body^@

客戶端可以使用SENDSUBSCRIBE命令發送或訂閱消息。 通過destination標記述消息應由誰來接收處理,形成了類似于MQ的發布訂閱機制。
Spring Boot 二三事:WEB 應用消息推送的那點事

STOMP的優勢也非常明顯,即:

  1. 不需要創建自定義消息格式
  2. 我們可以使用現有的stomp.js客戶端
  3. 可以實現消息路由及廣播
  4. 可以使用第三方成熟的消息代理中間件,如RabbitMQ, ActiveMQ等

最重要的是,Spring STOMP 為我們提供了能夠像Spring MVC一樣的編程模型,減少了我們的學習成本。

下面將我們的DEMO稍作調整,使用Spring STOMP來實現消息推送,在本例中我們使用SimpleBroker模式,我們的應用將會內置一個STOMP Broker,將所有信息保存至內存中。
Spring Boot 二三事:WEB 應用消息推送的那點事

具體代碼如下:

@Configuration
@EnableWebSocketMessageBroker  // (A)
public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {

        registry.addEndpoint("/ws-stomp-notification")
                .addInterceptors(httpSessionHandshakeInterceptor())   // (B)
                .setHandshakeHandler(httpSessionHandshakeHandler())  // (C)
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/app")  // (D)
                .enableSimpleBroker("/topic", "/queue");  // (E)
    }

    @Bean
    public HttpSessionHandshakeInterceptor httpSessionHandshakeInterceptor() {
        return new HttpSessionHandshakeInterceptor();
    }

    @Bean
    public HttpSessionHandshakeHandler httpSessionHandshakeHandler() {
        return new HttpSessionHandshakeHandler();
    }

}
  • (A) 使用@EnableWebSocketMessageBroker注解開啟支持STOMP
  • (B) 創建一個攔截器,用于傳遞HTTP會話屬性到WebSocket會話。
  • (C) 配置一個自定義的HttpSessionHandshakeHandler,其主要作用是按sessionId標記識別連接。
  • (D) 設置消息處理器路由前綴,當消息的destination/app開頭時,將會把該消息路由到server端的對應的消息處理方法中。(在本例中無實際意義)
  • (E) 設置客戶端訂閱消息的路徑前綴

HttpSessionHandshakeHandler代碼如下:

public class HttpSessionHandshakeHandler extends DefaultHandshakeHandler {

    @Override
    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
        String sessionId = (String) attributes.get(HttpSessionHandshakeInterceptor.HTTP_SESSION_ID_ATTR_NAME);
        return new HttpSessionPrincipal(sessionId);

    }
}

當我們需要向client發送消息時,只需要注入SimpMessagingTemplate對象即可,是不是感覺非常熟悉?! 沒錯,這種Template模式和我們日常使用的RestTemplate JDBCTemplate是一樣的。
我們只需要調用SimpMessagingTemplateconvertAndSendToUser()方法即可向對應用戶發送消息了。

  private void usesStompPush(String sessionid, String content) {
        String destination = "/queue/download-notification";
        messagingTemplate.convertAndSendToUser(sessionid, destination, content);
    }

在瀏覽器端,client可以使用stomp.js和sockjs-client進行如下連接:

usesStompNotification: function () {
                var tt = this;
                var url = "http://localhost:8080/ws-stomp-notification";
                // 公共topic
                // var notificationTopic = "/topic/download-notification";
                // 點對點廣播
                var notificationTopic = "/user/queue/download-notification"; // (A)

                var socket = new SockJS(url);
                var stompClient = Stomp.over(socket);

                stompClient.connect({}, function (frame) {
                    console.log("STOMP connection successful");

                    stompClient.subscribe(notificationTopic, function (msg) {   // (B)
                        var jsonStr = msg.body;

                        var obj = JSON.parse(jsonStr);
                        var percent = obj.percent;
                        tt.stompMsg += 'STOMP 通知您:已下載完成' + percent + "%\r\n";
                        if (percent === 100) {
                            stompClient.disconnect()
                        }

                    });

                }, function (error) {
                    console.log("STOMP protocol error " + error)
                })
            }
  • (A) 如果想針對特定用戶接收消息,我們需要以/user/為前綴,Spring STOMP會把以/user/為前綴的消息交給UserDestinationMessageHandler進行處理并發給特定的用戶,當然這個/user/是可以通過WebSocketBrokerConfig進行個性化配置的,為了簡單起見,我們這里就使用默認配置,所以我們的topic url就是/user/queue/download-notification。
  • (B) 設置stompClient消息處理callback進行消息處理。

效果演示
Spring Boot 二三事:WEB 應用消息推送的那點事

5 總結

在文中為大家簡單講解了幾種常用的消息推送方案,并通過一個下載案例重點演示了SSEWebSocket這兩種server push模式的消息推送。當然還有很多細節并沒有在文中說明,建議大家下載源碼對照參考。
Spring Boot 二三事:WEB 應用消息推送的那點事

相比較這幾種模式,小編認為如果我們的需求僅僅是向客戶端推送消息,那么使用SSE的性價比更高一些,Long-Polling次之。使用WebSocket有一種殺雞用牛刀的感覺,并且給我們系統也帶來了更多的復雜性,得不償失,所以不太推薦。而Polling雖然實現方式最簡單且兼容性最強,但是其效率過低,所以不建議使用。當然如果您有其他見解,歡迎留言討論交流。

文中示例源碼:https://github.com/leven-space/SpringBootNotification.git

如果您覺得這篇文章有用,請留下您的小

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女