我承認這篇文章有標題黨的嫌疑,看完這篇文章并不會讓你月薪兩萬。如果想月薪兩萬甚至更多,并不是靠一篇文章,一本書,一個項目來實現的。
但是一個合格的程序員對響應式編程多少都應該有些了解,甚至有個清楚的認識。
希望這篇文章能夠讓你對響應式編程有個基本的認識,以及響應式編程會帶來哪些好處,解決哪些問題,或者說為什么響應式編程如此重要。
響應式編程發展過程
響應式編程的概念是微軟最開始提出并且在.NET平臺上實現的一個庫。后來這個模型被大家接受并認可,ReactiveX 就實現了很多其它語言對應的庫,大名鼎鼎的RXJAVA就是針對Java語言實現的。
后來ReactiveX 和 Reactor共同制定了Reactive Stream標準,ReactiveX和Reactor都是在這個標準下實現的框架。Spring5 正式引入Reactor 并基于該框架實現了WEB-FLUX。
此外Java8引進了Stream流以及lamada表達式,Java9引入了Flow,也是對響應式編程的一種支持。
什么是響應式編程
reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change
這是維基百科對響應式編程給出的定義。我對這個定義的評價是,懂得自然懂,不懂的還是不懂。
我提煉一下這個定義的關鍵詞 聲明式,數據流, 傳遞變化(響應),我自己再給加一個異步,因為實際上生產級別代碼都是進行異步響應的,幾乎很少進行同步響應。
在具體介紹響應式編程之前,先簡單解釋一下這幾個關鍵詞的含義。
聲明式編程
聲明式和指令式對應。指令式大家比較熟悉,就是依次寫出完成某個任務的每條指令。
比如從一個蘋果列表里,找出所有紅色的蘋果,指令式編程是這樣做的。
List<Apple> apples = new ArrayList<Apple>();
for (Apple apple : apples){
if (apple.getColor() == "red"){
System.out.println(apple);
}
}
聲明式編程,只要寫出你想要什么就OK了。
典型的聲明式語言的就是sql,對應上面的找紅色蘋果的需求,應該是這樣的 select * from apple where color = red。
簡單的講,聲明式編程就是聊天式編程,和計算機說你想要什么就OK了。
數據流
再說說數據流,其實數據流可以把它想象成水流,里面流淌的是數據,事件,信號等內容。如果大家對Java8引入的Stream流有一定了解的話,就會好理解。如果不了解的可以通過我這篇文章做個入門。
傳遞變化(響應)
傳遞變化(響應),其實就是對響應二字的體現。所謂的響應就是你和某個人打了招呼,然后某人回應你了。某人對你的回應就是響應。
將上面的場景對應到面向對象的編程里面,就是觀察者(訂閱)模式。觀察者對被觀察者的某些行為做出對應的動作。
有些前端程序員對觀察者模式可能比較陌生,那么大家比較熟悉的Ajax回調函數也是響應式編程的一種體現,比如如下JS代碼
$.ajax("example.action")
.done(function(){
console.log("success")
})
.fail(function(){
console.log("error")
})
.always(function(){
console.log("complete")
});
這就是典型的異步回調,當請求成功的時候會有一種響應動作,請求失敗的時候會有另一種響應動作。
異步
關于響應的方式,有同步響應和異步響應。實際應用中大部分都會采用異步響應。
同步:你給旅行社打電話預定一張機票,接線員接到你的電話后,開始查詢航班信息,然后進行預訂,這期間你一直拿著電話等他的結果。
異步:你給旅行社打電話預定一張機票,接線員接到電話后,記錄下你要預定的航班信息,然后就掛掉電話。等他預定好之后,把預定結果打電話告訴你。這就是異步。
很明顯異步操作對你來說效率更高,因為你不用一直等接線員的操作,你可以干其他事情。
上面的場景也被很多人稱為好萊塢規則。很多好演員去好萊塢報名拍戲,經紀公司會登記下演員的姓名,等有合適的機會的時候,經紀公司會給演員打電話,而不用演員一直在現場等,或者不斷的給經紀公司打電話詢問。 don't call me I will call you。
小試牛刀
其實介紹完上面那些東西,可能對響應式編程的理解還是模糊的。那么我們就以Reactor框架為例子做一個簡單的說明。畢竟程序員都喜歡show me the code。
上面提到了響應式編程的核心是基于觀察者(訂閱)模式的。觀察者觀察被觀察者的行為,根據不同的行為做出不同的響應行為。
在Reactor框架中用兩個類來表示Publisher,分別是Flux和Mono。Flux表示0...N個元素序列;Mono表示零或一個元素序列。
Flux/Mono可以發布三類值 正常數值,異常信號,完成信號。三類信號不會同時存在,最多同時發布兩類信號。
舉個例子,我們假設讓Flux發射一個1-6的6個整數的數字流,6個數字流發送完成后,會緊跟著發送一個完成信號,告訴訂閱者或者觀察者,數據流完成。同樣的,如果發送正常數據的過程中出現異常,也可以發送一個異常信號給訂閱者或者觀察者,表示出現異常,將停止發送。異常信號和完成信號不能同時存在,因為出現任何一個該數據流都將結束。但是信號流里面可以即沒有異常信號也沒有完成信號,這表示該流是一個無限流。
Flux.just(1,2,3,4,5,6)
上面這行代碼表示發布者發布了6條消息,下面我們訂閱者6條消息,也就是對這6條消息進行響應。
Flux.just(1,2,3,4,5,6).subscribe(System.out::print)
在控制臺將會打印出1,2,3,4,5,6。
注意,只有訂閱的時候才會對事件或者元素進行響應。
上面的例子,我們對元素或者事件沒有做任何操作,僅僅是將它們原封不動地打印了出來,這顯然不是我們想要的。接下來我們對元素做一些有意義的操作。
操作符
map
對數據流里面的每個元素執行一次map里面的函數。示意圖如下

代碼示例
Flux.range(1,6).map(i -> i*i).subscribe(System.out::println);
將會輸出 1 4 9 16 25 36
flatmap
該操作符邏輯上包含兩個操作,第一個操作是map操作,第二個是flatten,flatten類似于merge操作,將對每個元素進行映射之后,合并成一個新的流。示意圖如下。

代碼示例
Flux.just("apple-1","pear-2").flatMap(i ->
Flux.fromArray(i.split("-"))).subscribe(System.out::println);
以上代碼將會輸出 apple 1 pear 2;
filter
過濾出符合條件的元素。
代碼示例
Flux.range(1,6).filter(i -> i>3).subscribe(System.out::println)
以上代碼將會輸出 4,5,6
zip zip英文單詞有拉鏈的意思,在Reactor中,表示將兩個數據流合并到一起。示意圖如下。

示例代碼
Flux.zip(
Flux.just("A","B","C"),
Flux.just("1","2","3"),
(x,y) -> x + y
).subscribe(System.out::println);
以上代碼輸出 A1 B2 C3
還有很多操作符這里不一一介紹了,感興趣的可以看官網。
線程調度
Reactor自然也是支持多線程的。而且多線程調度很簡單。 Reactor中創建線程是通過Scheduler接口來表示的。
//創建一個線程
Scheduler single = Schedulers.single();
//創建等于CPU核心數量的線程
Scheduler parallel = Schedulers.parallel();
//創建有界限的線程池,不傳參數的默認創建10倍于CPU核心數量
Scheduler elastic = Schedulers.boundedElastic();
創建了線程,自然要分配線程,也就是線程調度。 切換線程上下文主要通過publishOn()和subscribeOn()兩個函數實現。
publishOn()會影響調用該函數之后的操作。而subscribeOn()會從源頭影響整個操作鏈,無論subscribeOn()調用發生在何處。
舉個例子:
Flux.just("hello")
.map(s -> {
System.out.println("[map] Thread name: " + Thread.currentThread().getName());
return s.concat(" world!");
})
//只改變publishOn()之后的操作的線程。
.publishOn(Schedulers.newSingle("thread-publishOn"))
.filter(s -> {
System.out.println("[filter] Thread name: " + Thread.currentThread().getName());
return s.startsWith("h");
})
//從源頭變整個操作鏈的線程
.subscribeOn(Schedulers.newSingle("thread-subscribeOn"))
.subscribe(s -> {
System.out.println("[subscribe] Thread name: " + Thread.currentThread().getName());
System.out.println(s);
});
上面的代碼輸出結果是這樣的
[map] Thread name: thread-subscribeOn-1
[filter] Thread name: thread-publishOn-2
[subscribe] Thread name: thread-publishOn-2
hello world!
建議各位把上面這段代碼復制到本地運行一下,同時可以把subscribeOn()和publishOn()分別注釋掉,感受一下區別。
這兩個函數經常用于有阻塞操作的時候,將阻塞操作調度到新的線程,以便提高效率。
響應編程解決哪些問題
響應式編程可以幫助解決兩類棘手問題,第一個問題就是大家熟悉的callback hell,第二個問題就是同步阻塞效率低的問題。
先說第一個問題,這里拿reactor官方的例子做個說明,找出某個用戶最喜愛的五個愛好。通過Callback的方式實現是這樣的。

- 基于回調的服務使用一個匿名 Callback 作為參數。后者的兩個方法分別在異步執行成功或異常時被調用。
- 獲取到Favorite ID的list后調用第一個服務的回調方法onSuccess。
- 如果 list 為空, 調用 suggestionService。
- 服務 suggestionService 傳遞 List<Favorite> 給第二個回調。
- 既然是處理 UI,我們需要確保消費代碼運行在 UI 線程。
- 使用 Java 8 Stream 來限制建議數量為5,然后在 UI 中顯示。
- 在每一層,我們都以同樣的方式處理錯誤:在一個 popup 中顯示錯誤信息。
- 回到Favorite ID這一層,如果返回 list,我們需要使用favoriteService 來獲取 Favorite對象。由于只想要5個,因此使用 stream 。
- 再一次回調。這次對每個ID,獲取 Favorite 對象在 UI 線程中推送到前端顯示。
采用Reactor 響應式編程代碼大概應該是這個樣子的

- 我們獲取到Favorite ID的流。
- 我們 異步的轉換 它們(ID)為 Favorite 對象(使用flatMap),現在我們有了Favorite流。
- 一旦 Favorite 為空,切換到 suggestionService。
- 我們只關注流中的最多5個元素。
- 最后,我們希望在 UI 線程中進行處理。
- 通過描述對數據的最終處理(在 UI 中顯示)和對錯誤的處理(顯示在 popup 中)來觸發(subscribe)。
可以看到通過采用響應式編程,大大提高了代碼的可讀性,邏輯表達也更清晰。
再來看第二個問題,同步阻塞通常被認為是低效率的。而異步非阻塞被認為是高效率的。而響應式編程,天生就是異步非阻塞的。
來舉個簡單例子說明一下,為什么同步阻塞是低效率的而異步非阻塞是高效率的。
同步和異步描述的是服務提供者提供服務的能力。當調用者向服務者發起請求后,服務提供者能夠立即返回,并且在處理完后通過某種方式通知調用者,那么就是異步的。相反如果服務提供者只在處理完之后才返回,或者要求調用者主動去查詢處理結果,就是同步。
阻塞和非阻塞描述的是調用者的狀態。當調用者向服務提供者發起請求后,一直等待處理結果返回,否則無法執行后續操作,就是阻塞狀態。如果調用后直接返回,繼續執行后續操作就是非阻塞狀態。
上面提到的打電話的例子就是異步非阻塞的例子,你給旅行社打電話,預定一張機票。旅行社接線員收到你的請求,就立刻給你回復(異步),告訴你請求已經收到,稍后會通知你。然后你就掛掉電話,去處理其他事情(非阻塞),等旅行社預定好之后,會立刻給你打電話通知你結果。
如果是同步阻塞的話,場景應該是這樣的,你給旅行社打電話預定機票,接線員接聽你的電話,然后處理訂票請求,你在電話另一端一直在等待,什么都做不了。更可怕的是,其他旅客的訂票請求一直打不進來,因為線路資源一直被你占用。這將是多么低效的處理方式。
總結
響應式編程雖好,但并不是包治百病,首先掌握起來就有一定難度,同時Debug也需要有一定的相關經驗。更主要的是,我們要根據業務場景來決定響應式編程是否能給我們帶來真正的好處。記住軟件工程里面,沒有銀彈。