上文中咱們簡單提到了JDK9中Flow接口中的靜態內部類實現了響應式流的JAVA API,并且提供了一個一個Publisher的實現類SubmissionPublisher。本文將先梳理一下接口中具體的處理流程,然后再以幾個調用者的例子來幫助大家理解。
JDK9中的實現
再放上一下上文中的響應式流的交互流程:
- 訂閱者向發布者發送訂閱請求。
- 發布者根據訂閱請求生成令牌發送給訂閱者。
- 訂閱者根據令牌向發布者發送請求N個數據。
- 發送者根據訂閱者的請求數量返回M(M<=N)個數據
- 重復3,4
- 數據發送完畢后由發布者發送給訂閱者結束信號
該流程的角度是以接口調用的交互來說的,而考慮實際的coding工作中,我們的調用流程其實為:
- 創建發布者
- 創建訂閱者
- 訂閱令牌交互
- 發送信息
接下來我們按照這個流程來梳理一下代碼細節。
創建發布者
對于實現響應流的最開始的步驟,便是創建一個發布者。之前提到在JDK9中提供了一個發布者的簡單實現SubmissionPublisher。SubmissionPublisher繼承自Flow.Publisher,他有三種構造函數:
public SubmissionPublisher() {
this(ASYNC_POOL, Flow.defaultBufferSize(), null);
}
public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
this(executor, maxBufferCapacity, null);
}
public SubmissionPublisher(Executor executor, int maxBufferCapacity,
BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler)
SubmissionPublisher將使用Executor作為“線程池”向訂閱者發送信息。如果需要需要設置線程池的話可以自己傳入,否則的話再無參的構造函數中將默認使用ForkJoinPool類的commonPool()方法獲取,即無餐構造方法中的ASYNC_POOL靜態變量。
SubmissionPublisher會為每一個訂閱者單獨的建立一個緩沖空間,其大小由入參maxBufferCapacity決定。默認情況下直接使用Flow.defaultBufferSize()來設置,默認為256。如果緩沖區滿了之后會根據發送信息時候的策略確定是阻塞等待還是拋棄數據。
SubmissionPublisher會在訂閱者發生異常的時候(onNext處理中),會調用最后一個參數handler方法,然后才會取消訂閱。默認的時候為null,也就是不會處理異常。
最簡單的創建SubmissionPublisher的方法就是直接使用無參構造方法:
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
上文書說到,因為SubmissionPublisher實現了AutoCloseable接口,所以可以用try來進行資源回收可以省略close()的調用:
try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()){
}
但是也可以手動的調用close()方法來顯示的關閉發布者,關閉后再發送數據就會拋出異常:
if (complete)
throw new IllegalStateException("Closed");
創建訂閱者
上文中咱們沒有手動創建訂閱者,而是直接調用SubmissionPublisher中的consume方法使用其內部的訂閱者來消費消息。在本節可以實現接口Flow.Subscriber<T>創建一個SimpleSubscriber類:
public class SimpleSubscriber implements Flow.Subscriber<Integer> {
private Flow.Subscription subscription;
/**
* 訂閱者名稱
*/
private String name;
/**
* 定義最大消費數量
*/
private final long maxCount;
/**
* 計數器
*/
private long counter;
public SimpleSubscriber(String name, long maxCount) {
this.name = name;
this.maxCount = maxCount <= 0 ? 1 : maxCount;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.printf("訂閱者:%s,最大消費數據: %d。%n", name, maxCount);
// 實際上是等于消費全部數據
subscription.request(maxCount);
}
@Override
public void onNext(Integer item) {
counter++;
System.out.printf("訂閱者:%s 接收到數據:%d.%n", name, item);
if (counter >= maxCount) {
System.out.printf("準備取消訂閱者: %s。已處理數據個數:%d。%n", name, counter);
// 處理完畢,取消訂閱
subscription.cancel();
}
}
@Override
public void onError(Throwable t) {
System.out.printf("訂閱者: %s,出現異常: %s。%n", name, t.getMessage());
}
@Override
public void onComplete() {
System.out.printf("訂閱者: %s 處理完成。%n", name);
}
}
SimpleSubscriber是一個簡單訂閱者類,其邏輯是根據構造參數可以定義其名稱name與最大處理數據值maxCount,最少處理一個數據。
當發布者進行一個訂閱的時候會生成一個令牌Subscription作為參數調用onSubscribe方法。在訂閱者需要捕獲該令牌作為后續與發布者交互的紐帶。一般來說在onSubscribe中至少調用一次request且參數需要>0,否則發布者將無法向訂閱者發送任何信息,這也是為什么maxCount需要大于0。
當發布者開始發送數據后,會異步的調用onNext方法并將數據傳入。該類中使用了一個計數器對數據數量進行校驗,當達到最大值的時候,則會通過令牌(subscription)異步通知發布者訂閱結束,然后發送者再異步的調用發訂閱者的onComplete方法,以處理完成流程。
其中的onError和onComplete方法只進行打印,這里就不再說了。
以上的這個訂閱者可以看作是一個push模型的實現,因為當開始訂閱時訂閱者就約定了需要接受的數量,然后在后續的處理(onNext)中不再請求新數據。
我們可以用以下的代碼創建一個名稱為S1,消費2個元素的訂閱者:
SimpleSubscriber sub1 = new SimpleSubscriber("S1", 2);
訂閱令牌交互
當我們可以創建了發送者和訂閱者之后,我們需要確認一下進行交互的順序,由于響應流的處理就是對于事件的處理,所以事件的順序十分重要,具體順序如下:
- 我們創建一個發布者publisher一個訂閱者subscriber
- 訂閱者subscriber通過調用發布者的subscribe()方法進行信息訂閱。如果訂閱成功,則發布者將生成一個令牌(Subscription)并作為入參調用訂閱者的訂閱事件方法onSubscribe()。如果調用異常則會直接調用訂閱者的onError錯誤處理方法,并拋出IllegalStateException異常然后結束訂閱。
- 在onSubscribe()中,訂閱者需要通過調用令牌(Subscription)的請求方法request(long)來異步的向發布者請求數據。
- 當發布者有數據可以發布的時候,則會異步的調用訂閱者的onNext()方法,直到所有消息的總數已經滿足了訂閱者調用request的數據請求上限。所以當訂閱者請求訂閱的消息數為Long.MAX_VALUE時,實際上是消費所有數據,即push模式。如果發布者沒有數據要發布了,則可以會調用發布者自己的close()方法并異步的調用所有訂閱者的onComplete()方法來通知訂閱結束。
- 發布者可以隨時向發布者請求更多的元素請求(一般在onNext里),而不用等到之前的處理完畢,一般是與之前的數據數量進行累加。
- 放發布者遇到異常的時候會調用訂閱者的onError()方法。
上面的描述中是只使用的一個訂閱者來進行描述的,后面的例子中將說明發布者可以擁有多個訂閱者(甚至0個訂閱者)。
發送信息
當發布者需要推送消息的時候會調用submit方法或者offer方法,上文中我們提到submit實際上是offer的一種簡單實現,本節咱們自己比較一下。
首先他們的方法簽名為:
int offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
int offer(T item, BiPredicate<Flow.Subscriber <? super T>,? super T> onDrop)
int submit(T item)
而submit 和 offer的直接方法為:
public int submit(T item) {
return doOffer(item, Long.MAX_VALUE, null);
}
public int offer(T item,
BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
return doOffer(item, 0L, onDrop);
可以看到他們的底層調用的都是 doOffer 方法,而doOffer的方法簽名為:
private int doOffer(T item, long nanos,
BiPredicate<Subscriber<? super T>, ? super T> onDrop)
所以我們可以直接看doOffer()方法。doOffer()方法是可選阻塞時長的,而時長根據入參數nanos來決定。而onDrop()是一個刪除判斷器,如果調用BiPredicate的test()方法結果為true則會再次重試(根據令牌中的nextRetry屬性與發布器中的retryOffer()方法組合判斷,但是具體實現還沒梳理明白);如果結果為flase則直接刪除內容。doOffer()返回的結果為正負兩種,正數的結果為發送了數據,但是訂閱者還未消費的數據(估計值,因為是異步多線程的);如果為負數,則返回的是重拾次數。
所以,根據submit()的參數我們可以發現,submit會一直阻塞直到數據可以被消費(因為不會阻塞超時,所以不需要傳入onDrop()方法)。而我們可以根據需要配置offer()選擇器。如果必須要求數據都要被消費的話,那就可以直接選擇submit(),如果要設置重試次數的話就可以選擇使用offer()
異步調用的例子
下面看一個具體的程序例子,程序將以3秒為周期進行數據發布:
public class PeriodicPublisher {
public static final int WAIT_TIME = 2;
public static final int SLEEP_TIME = 3;
public static void main(String[] args) {
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
// 創建4訂閱者
SimpleSubscriber subscriber1 = new SimpleSubscriber("S1", 2);
SimpleSubscriber subscriber2 = new SimpleSubscriber("S2", 4);
SimpleSubscriber subscriber3 = new SimpleSubscriber("S3", 6);
SimpleSubscriber subscriber4 = new SimpleSubscriber("S4", 10);
// 前三個訂閱者直接進行訂閱
publisher.subscribe(subscriber1);
publisher.subscribe(subscriber2);
publisher.subscribe(subscriber3);
// 第四個方法延遲訂閱
delaySubscribeWithWaitTime(publisher, subscriber4);
// 開始發送消息
Thread pubThread = publish(publisher, 5);
try {
// 等待處理完成
pubThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static Thread publish(SubmissionPublisher<Integer> publisher, int count) {
Thread t = new Thread(() -> {
IntStream.range(1,count)
.forEach(item ->{
publisher.submit(item);
sleep(item);
});
publisher.close();
});
t.start();
return t;
}
private static void sleep(Integer item) {
try {
System.out.printf("推送數據:%d。休眠 3 秒。%n", item);
TimeUnit.SECONDS.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void delaySubscribeWithWaitTime(SubmissionPublisher<Integer> publisher, Flow.Subscriber<Integer> sub) {
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(WAIT_TIME);
publisher.subscribe(sub);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
代碼后是運行結果如下:
訂閱者:S1,最大消費數據: 2。
推送數據:1。休眠 3 秒。
訂閱者:S3,最大消費數據: 6。
訂閱者:S2,最大消費數據: 4。
訂閱者:S2 接收到數據:1.
訂閱者:S3 接收到數據:1.
訂閱者:S1 接收到數據:1.
訂閱者:S4,最大消費數據: 10。
推送數據:2。休眠 3 秒。
訂閱者:S2 接收到數據:2.
訂閱者:S3 接收到數據:2.
訂閱者:S1 接收到數據:2.
訂閱者:S4 接收到數據:2.
準備取消訂閱者: S1。已處理數據個數:2。
推送數據:3。休眠 3 秒。
訂閱者:S4 接收到數據:3.
訂閱者:S2 接收到數據:3.
訂閱者:S3 接收到數據:3.
推送數據:4。休眠 3 秒。
訂閱者:S4 接收到數據:4.
訂閱者:S3 接收到數據:4.
訂閱者:S2 接收到數據:4.
準備取消訂閱者: S2。已處理數據個數:4。
推送數據:5。休眠 3 秒。
訂閱者:S3 接收到數據:5.
訂閱者:S4 接收到數據:5.
訂閱者: S3 處理完成。
訂閱者: S4 處理完成。
由于是異步執行,所以在“接收數據”部分的順序可能不同。
我們分析一下程序的執行流程。
- 創建一個發布者實例
- 創建四個訂閱者實例S1、S2、S3、S4,可以接收數據的數量分別為:2、4、6、10。
- 前三個訂閱者立即訂閱消息。
- S4的訂閱者單獨創建一個線程等待WAIT_TIME秒(2秒)之后進行數據的訂閱。
- 新建一個線程來以SLEEP_TIME秒(3秒)為間隔發布5個數據。
- 將publish線程join()住等待流程結束。
執行的日志滿足上述流程而針對一些關鍵點為:
- S4在發送者推送數據"1"的時候還未訂閱,所以S4沒有接收到數據"1"。
- 當發送數據"2"的時候S1已經接收夠了預期數據2個,所以取消了訂閱。之后只剩下S2、S3、S4。
- 當發送數據"4"的時候S2已經接收夠了預期數據4個,所以取消了訂閱。之后只剩下S3、S4。
- 當發送數據"5"的時候只剩下S3、S4,當發送完畢后publisher調用close()方法,通知S3、S4數據處理完成。
需要注意的是,如果在最后submit完畢之后直接close()然后結束進行的話可能訂閱者并不能執行完畢。但是由于在任意一次submit()之后都有一次3秒的等待,所以本程序是可以執行完畢的。
最后
本文中的例子是是簡單的實現,可以通過調整訂閱者中的request的參數,與在onNext中添加request調用來測試背壓的效果,還可以將submit調整為offer并添加onDrop方法以觀察拋棄信息時的流程。同時本文沒有提供Processor的例子,各位也可以自行學習。
總結一下流程: 訂閱者向發布者進行訂閱,然后發布者向訂閱者發送令牌。訂閱者使用令牌請求消息,發送者根據請求消息的數量推送消息。訂閱者可以隨時異步追加需要的更多信息。
JDK9中在Flow接口中實現了Java API的4個接口,并提供了SubmissionPublisher<T>作為Publisher<T>接口的簡單實現。