這篇文章將為大家詳細講解有關java中的響應式編程,文章內容質量較高,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
響應式編程
作為響應式編程方向上的第一步,微軟在.NET生態系統中創建了Rx庫(Reactive Extensions)。RxJava是在JVM上對它的實現。
響應式編程是一個異步編程范式,通常出現在面向對象的語言中,作為觀察者模式的一個擴展。
它關注數據的流動、變化的傳播。這意味著可以輕易地使用編程語言表示靜態(如數組)或動態(如事件發射源)數據流。
響應式流
隨著時間的推移,一個專門為Java的標準化出現了。它是一個規范,定義了一些接口和交互規則,用于JVM平臺上的響應式庫。它就是響應式流(Reactive Streams),它的這些接口已經被集成到Java 9里,在java.util.concurrent.Flow這個父類里。響應式流和迭代器較相似,不過迭代器是基于“拉”(pull)的,而響應式流是基于“推”(push)的。迭代器的使用其實是命令式編程,因為由開發者決定什么時候調用next()獲取下一個元素。在響應式流中,與上面等價的是發布者-訂閱者。但當有新的可用元素時,是由發布者推給訂閱者的。這個“推”就是響應式的關鍵所在。
另外,對被推過來元素的操作也是以聲明的方式進行的,程序員只需表達做什么就行了,不需要管怎么做。
發布者使用onNext方法向訂閱者推送新元素,使用onError方法告知一個錯誤,使用onComplete方法告知已經結束。
可見,錯誤處理和完成(結束)也是以一個良好的方式被處理。錯誤和結束都可以終止序列。
這種方式非常靈活。這種模式支持0個(沒有)元素/1個元素/n(多)個元素(包括無限序列,如果滴答的鐘表)這些情況。
Reactor粉墨登場
Reactor是第四代響應式庫,是一個響應式編程范式的實現,用于在JVM平臺上基于響應式流規范構建非阻塞異步應用。
它極大地實現了JVM上響應式流的規范(http://www.reactive-streams.org/)。
它是一個完全非阻塞響應式編程的基石,帶有高效需求管理(以管理“后壓”的形式)。
它直接集成Java函數式API,特別是CompletableFuture,Stream和Duration。
它支持使用reactor-netty工程實現非阻塞跨進程通信,適合微服務架構,支持HTTP(包括Websockets),TCP和UDP。
注:Reactor要求Java 8+
講了這么多,是不是要首先思考下,為什么我們需要這樣一個異步的響應式庫?
阻塞就是浪費
現代的應用能達到非常多的并發用戶,即使現代硬件的能力被持續改進,現代軟件的性能仍然是一個關鍵的關注點
大體上有兩種方式可以改進一個程序的性能:
1、并行化,使用更多的線程和更多的硬件資源
2、提高效率,在當前資源用量的情況下尋求更高效率
通常,Java開發者使用阻塞代碼來寫程序。這種實踐性很好,直到遇到性能瓶頸。
此時會引入額外線程,運行相似的阻塞代碼。但是這種擴展方法在資源利用方面會引起爭論和導致并發問題。
更糟糕的是,阻塞浪費資源。如果你仔細看,一旦一個程序涉及到一些延遲(特別是I/O,像數據庫請求或網絡調用),資源就被浪費,因為線程現在是空閑的,在等待數據。
所以并行化方式不是銀彈。我們有必要讓硬件發揮完全的力量,但是關于資源浪費的影響和原因也是非常復雜的。
異步性來營救
前面提到的第二種方式是尋求更高效率,可以作為資源浪費問題的一個解決方案。
通過寫異步非阻塞代碼,你能讓執行切換到其它活動的任務,使用相同的底層資源,稍后再回到當前的處理上。
但是如何產生異步代碼到JVM上呢?Java提供兩種異步編程模型:
1、Callbacks,異步方法沒有返回值,但是會帶一個回調,當結果可用時回調會被調用。
2、Futures,異步方法立即返回一個Future<T>,異步處理過程就是計算一個T值,使用Future對象包裝了對它的訪問。這個值不是立即可用的,該對象可以被輪詢來查看T值是否可用。
這兩種技術都足夠好嗎?并不是對每種情況都是的,兩種方式都有局限性。
回調比較難于組合在一起,很快就會導致代碼難以閱讀和維護(眾所周知的“回調地獄”)。
與回調相比,Futures稍微好一點,但是仍然在組合方面做得不好。組合多個Futures對象到一起是可行的但是并不容易。
Future
也有其它問題,很容易因為調用了get()方法造成了另一個阻塞。
另外,它不支持延遲計算,缺乏對多個值的支持,缺乏高級錯誤處理。
從命令式到響應式編程
像Reactor這樣的響應式庫的目標就是解決在JVM上“傳統”異步方式的弊端,同時也關注一些額外方面:
可組合性和可讀性。
數據作為流,被豐富的操作符操作,什么都不會發生,直到你訂閱,后壓,消費者通知生產者發射的速率太快了,高級別而不是高數值抽象。
可組合性和可讀性
可組合性,其實就是編排多個異步任務的能力,使前一個任務的結果作為后續任務的輸入,或以fork-join(分叉-合并)的方式執行若干個任務,或在更高的級別重復利用這些異步任務。
任務編排的能力和代碼的可讀性和可維護性緊密地耦合在一起。隨著異步處理在數量和復雜度上的增加,組合和閱讀代碼變得更加困難。
就像我們看到的,回調模型雖然簡單,但是當回調里嵌套回調,達到多層時就會變成回調地獄。
Reactor提供豐富的組合選項,使嵌套級別最小,讓代碼的組織結構能反映出在進行什么樣的抽象處理,且通常保持在同級別上。
裝配線類比
你可以認為響應式應用處理數據就像通過一個裝配(生產)線。Reactor既是傳送帶又是工作站。
原材料從一個源(原始發布者)持續不斷地獲取,以一個完成的產品被推送給消費者(訂閱者)結束。
原材料可以經過許多不同的轉換,如其它的中間步驟,或者是一個更大裝配線的一部分。
如果在某個地方出現一個小故障或阻塞了,出問題的工作站可以向上游發出通知來限制原材料的流動(速率)。
操作符
在Reactor里,操作符就是裝配線類比中的工作站。每一個操作符都向一個發布者添加某些行為,把上一步的發布者包裝到一個新的實例里。整個鏈就是這樣被鏈接起來的。
所以數據一開始從第一個發布者出來,然后沿著鏈往下游移動,且被每一個鏈接轉換。最后,一個訂閱者結束了這個處理。
響應式流規范并沒有明確規定操作符,不過Reactor就提供了豐富的操作符,它們涉及到很多方面,從簡單的轉換、過濾到復雜的編排、錯誤處理。
只要不訂閱,就什么都不發生
當你寫一個發布者鏈時,默認,數據是不會開始進入鏈中的。相反,你只是創建了異步處理的一個抽象描述。
通過訂閱這個行為(動作),才把發布者和訂閱者連接起來,然后才會觸發數據在鏈里流動。
這是在內部實現好的,通過來自于訂閱者的request信號往上游傳播,一路逆流而上直到最開始的發布者那里。
Reactor核心特性
Reactor引入可組合響應式的類型,實現了發布者接口,但也提供了豐富的操作符,就是Flux和Mono。
Flux
,流動,表示0到N個元素。
Mono
,單個,表示0或1個元素。
它們之間的不同主要在語義上,表示異步處理的粗略基數。
如一個http請求只會產生一個響應,把它表示為Mono<HttpResponse>顯然更有意義,且它只提供相對于0/1這樣上下文的操作符,因為此時count操作顯然沒有太大意義。
操作符可以改變處理的最大基數,也會切換到相關類型上。如count操作符雖然存在于Flux<T>上,但它的返回值卻是一個Mono<Long>。
Flux<T>
一個Flux<T>是一個標準的Publisher<T>,表示一個異步序列,可以發射0到N個元素,可以通過一個完成信號或錯誤信號終止。
就像在響應式流規范里那樣,這3種類型的信號轉化為對一個下游訂閱者的onNext,onComplete,onError3個方法的調用。
這3個方法也可以理解為事件/回調,且它們都是可選的。
如沒有onNext但有onComplete,表示一個空的有限序列。既沒有onNext也沒有onComplete,表示一個空的無限序列(沒有什么實際用途,可用于測試)。
無限序列也沒有必要是空的,如Flux.interval(Duration)產生一個Flux<Long> ,它是無限的,從鐘表里發射出的規則的“嘀嗒”。
Mono<T>
一個Mono<T>是一個特殊的Publisher<T>,最多發射一個元素,可以使用onComplete信號或onError信號來終止。
它提供的操作符只是Flux提供的一個子集,同樣,一些操作符(如把Mono和Publisher結合起來)可以把它切換到一個Flux。
如Mono#concatWith(Publisher)返回一個Flux,然而Mono#then(Mono)返回的是另一個Mono。
Mono可以用于表示沒有返回值的異步處理(與Runnable相似),用Mono<Void>表示。
創建Flux或Mono,并訂閱它們
最容易的方式就是使用它們各自的工廠方法:
Flux<String> seq1 = Flux.just("foo", "bar", "foobar"); List<String> iterable = Arrays.asList("foo", "bar", "foobar"); Flux<String> seq2 = Flux.fromIterable(iterable); Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); Mono<String> noData = Mono.empty(); Mono<String> data = Mono.just("foo");
當談到訂閱時,可以使用Java 8的lambda表達式,訂閱方法有多種不同的變體,帶有不同的回調。
下面是方法簽名:
//訂閱并觸發序列 subscribe(); //可以對每一個產生的值進行處理 subscribe(Consumer<? super T> consumer); //還可以響應一個錯誤 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer); //還可以在成功結束后執行一些代碼 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer); //還可以對Subscription執行一些操作 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer);
使用Disposable取消訂閱
這些基于lambda的訂閱方法都返回一個Disposable類型,通過調用它的dispose()來取消這個訂閱。
對于Flux和Mono,取消就是一個信號,表明源應該停止生產元素。然而,不保證立即生效,一些源可能生產元素非???,以致于還沒有收到取消信號就已經生產完了。
以上就是java中的響應式編程介紹,看完之后是否有所收獲呢?如果想了解更多相關內容,歡迎關注億速云行業資訊,感謝各位的閱讀。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。