創建任務并執行任務
無參創建
CompletableFuture<String> noArgsFuture = new CompletableFuture<>();
轉入相應任務,無返回值
runAsync方法可以在后臺執行異步計算,但是此時并沒有返回值。持有一個Runnable對象。
CompletableFuture noReturn = CompletableFuture.runAsync(()->{
//執行邏輯,無返回值
});
傳入相應任務,有返回值
此時我們看到返回的是CompletableFuture<T>此處的T就是你想要的返回值的類型。其中的Supplier<T>是一個簡單的函數式接口。
CompletableFuture<String> hasReturn = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "hasReturn";
}});
此時可以使用lambda表達式使上面的邏輯更加清晰
CompletableFuture<String> hasReturnLambda = CompletableFuture.supplyAsync(TestFuture::get);
private static String get() {
return "hasReturnLambda";
}
獲取返回值
異步任務也是有返回值的,當我們想要用到異步任務的返回值時,我們可以調用CompletableFuture的get()阻塞,直到有異步任務執行完有返回值才往下執行。
我們將上面的get()方法改造一下,使其停頓十秒時間。
private static String get() {
System.out.println("Begin Invoke getFuntureHasReturnLambda");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
} System.out.println("End Invoke getFuntureHasReturnLambda");
return "hasReturnLambda";
}
然后進行調用
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> funtureHasReturnLambda = (CompletableFuture<String>) getFuntureHasReturnLambda(); System.out.println("Main Method Is Invoking");
funtureHasReturnLambda.get();
System.out.println("Main Method End");
}
可以看到輸出如下,只有調用get()方法的時候才會阻塞當前線程。
Main Method Is Invoking
Begin Invoke getFuntureHasReturnLambdaEnd Invoke getFuntureHasReturnLambdaMain Method End
自定義返回值
除了等待異步任務返回值以外,我們也可以在任意時候調用complete()方法來自定義返回值。
CompletableFuture<String> funtureHasReturnLambda = (CompletableFuture<String>) getFuntureHasReturnLambda();
System.out.println("Main Method Is Invoking");
new Thread(()->{
System.out.println("Thread Is Invoking ");
try {
Thread.sleep(1000);
funtureHasReturnLambda.complete("custome value");
} catch (InterruptedException e) {
e.printStackTrace(); } System.out.println("Thread End ");
}).run();String value = funtureHasReturnLambda.get();System.out.println("Main Method End value is "+ value);
我們可以發現輸出是新起線程的輸出值,當然這是因為我們的異步方法設置了等待10秒,如果此時異步方法等待1秒,新起的線程等待10秒,那么輸出的值就是異步方法中的值了。
Main Method Is Invoking
Begin Invoke getFuntureHasReturnLambdaThread Is Invoking Thread End
Main Method End value is custome value
按順序執行異步任務
如果有一個異步任務的完成需要依賴前一個異步任務的完成,那么該如何寫呢?是調用get()方法獲得返回值以后然后再執行嗎?這樣寫有些麻煩,CompletableFuture為我們提供了方法來完成我們想要順序執行一些異步任務的需求。thenApply、thenAccept、thenRun這三個方法。這三個方法的區別就是。
方法名是否可獲得前一個任務的返回值是否有返回值thenApply能獲得有thenAccept能獲得無thenRun不可獲得無
所以一般來說thenAccept、thenRun這兩個方法在調用鏈的最末端使用。接下來我們用真實的例子感受一下。
//thenApply 可獲取到前一個任務的返回值,也有返回值
CompletableFuture<String> seqFutureOne = CompletableFuture.supplyAsync(()-> "seqFutureOne");
CompletableFuture<String> seqFutureTwo = seqFutureOne.thenApply(name -> name + " seqFutureTwo");
System.out.println(seqFutureTwo.get());//thenAccept 可獲取到前一個任務的返回值,但是無返回值
CompletableFuture<Void> thenAccept = seqFutureOne .thenAccept(name -> System.out.println(name + "thenAccept"));
System.out.println("-------------");
System.out.println(thenAccept.get());//thenRun 獲取不到前一個任務的返回值,也無返回值
System.out.println("-------------");
CompletableFuture<Void> thenRun = seqFutureOne.thenRun(() -> {
System.out.println("thenRun");
});System.out.println(thenRun.get());
返回的信息如下
seqFutureOne seqFutureTwo
seqFuture.NEThenAccept-------------null
-------------thenRunnull
thenApply和thenApplyAsync的區別
我們可以發現這三個方法都帶有一個后綴為Async的方法,例如thenApplyAsync。那么帶Async的方法和不帶此后綴的方法有什么不同呢?我們就以thenApply和thenApplyAsync兩個方法進行對比,其他的和這個一樣的。
這兩個方法區別就在于誰去執行這個任務,如果使用thenApplyAsync,那么執行的線程是從ForkJoinPool.commonPool()中獲取不同的線程進行執行,如果使用thenApply,如果supplyAsync方法執行速度特別快,那么thenApply任務就是主線程進行執行,如果執行特別慢的話就是和supplyAsync執行線程一樣。接下來我們通過例子來看一下,使用sleep方法來反應supplyAsync執行速度的快慢。
//thenApply和thenApplyAsync的區別
System.out.println("-------------");
CompletableFuture<String> supplyAsyncWithSleep = CompletableFuture.supplyAsync(()->{
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace(); } return "supplyAsyncWithSleep Thread Id : " + Thread.currentThread();
});CompletableFuture<String> thenApply = supplyAsyncWithSleep .thenApply(name -> name + "------thenApply Thread Id : " + Thread.currentThread());
CompletableFuture<String> thenApplyAsync = supplyAsyncWithSleep .thenApplyAsync(name -> name + "------thenApplyAsync Thread Id : " + Thread.currentThread());
System.out.println("Main Thread Id: "+ Thread.currentThread());
System.out.println(thenApply.get());System.out.println(thenApplyAsync.get());System.out.println("-------------No Sleep");
CompletableFuture<String> supplyAsyncNoSleep = CompletableFuture.supplyAsync(()->{
return "supplyAsyncNoSleep Thread Id : " + Thread.currentThread();
});CompletableFuture<String> thenApplyNoSleep = supplyAsyncNoSleep .thenApply(name -> name + "------thenApply Thread Id : " + Thread.currentThread());
CompletableFuture<String> thenApplyAsyncNoSleep = supplyAsyncNoSleep .thenApplyAsync(name -> name + "------thenApplyAsync Thread Id : " + Thread.currentThread());
System.out.println("Main Thread Id: "+ Thread.currentThread());
System.out.println(thenApplyNoSleep.get());System.out.println(thenApplyAsyncNoSleep.get());
我們可以看到輸出位
-------------
Main Thread Id: Thread[main,5,main]
supplyAsyncWithSleep Thread Id : Thread[ForkJoinPool.commonPool-worker-1,5,main]------thenApply Thread Id : Thread[ForkJoinPool.commonPool-worker-1,5,main]
supplyAsyncWithSleep Thread Id : Thread[ForkJoinPool.commonPool-worker-1,5,main]------thenApplyAsync Thread Id : Thread[ForkJoinPool.commonPool-worker-1,5,main]
-------------No Sleep
Main Thread Id: Thread[main,5,main]
supplyAsyncNoSleep Thread Id : Thread[ForkJoinPool.commonPool-worker-2,5,main]------thenApply Thread Id : Thread[main,5,main]
supplyAsyncNoSleep Thread Id : Thread[ForkJoinPool.commonPool-worker-2,5,main]------thenApplyAsync Thread Id : Thread[ForkJoinPool.commonPool-worker-2,5,main]
可以看到supplyAsync方法執行速度慢的話thenApply方法執行線程和supplyAsync執行線程相同,如果supplyAsync方法執行速度快的話,那么thenApply方法執行線程和Main方法執行線程相同。
組合CompletableFuture
將兩個CompletableFuture組合到一起有兩個方法
- thenCompose():當第一個任務完成時才會執行第二個操作
- thenCombine():兩個異步任務全部完成時才會執行某些操作
thenCompose() 用法
我們定義兩個異步任務,假設第二個定時任務需要用到第一個定時任務的返回值。
public static CompletableFuture<String> getTastOne(){
return CompletableFuture.supplyAsync(()-> "topOne");
}public static CompletableFuture<String> getTastTwo(String s){
return CompletableFuture.supplyAsync(()-> s + " topTwo");
}
我們利用thenCompose()方法進行編寫
CompletableFuture<String> thenComposeComplet = getTastOne().thenCompose(s -> getTastTwo(s));
System.out.println(thenComposeComplet.get());
輸出就是
topOne topTwo
如果還記得前面的thenApply()方法的話,應該會想這個利用thenApply()方法也是能夠實現類似的功能的。
//thenApply
CompletableFuture<CompletableFuture<String>> thenApply = getTastOne()
.thenApply(s -> getTastTwo(s));
System.out.println(thenApply.get().get());
但是我們發現返回值是嵌套返回的一個類型,而想要獲得最終的返回值需要調用兩次get()
thenCombine() 用法
例如我們此時需要計算兩個異步方法返回值的和。求和這個操作是必須是兩個異步方法得出來值的情況下才能進行計算,因此我們可以用thenCombine()方法進行計算。
CompletableFuture<Integer> thenComposeone = CompletableFuture.supplyAsync(() -> 192);
CompletableFuture<Integer> thenComposeTwo = CompletableFuture.supplyAsync(() -> 196);
CompletableFuture<Integer> thenComposeCount = thenComposeOne .thenCombine(thenComposeTwo, (s, y) -> s + y);
System.out.println(thenComposeCount.get());
此時thenComposeOne和thenComposeTwo都完成時才會調用傳給thenCombine方法的回調函數。
組合多個CompletableFuture
在上面我們用thenCompose()和thenCombine()兩個方法將兩個CompletableFuture組裝起來,如果我們想要將任意數量的CompletableFuture組合起來呢?可以使用下面兩個方法進行組合。
- allOf():等待所有CompletableFuture完后以后才會運行回調函數
- anyOf():只要其中一個CompletableFuture完成,那么就會執行回調函數。注意此時其他的任務也就不執行了。
接下來演示一下兩個方法的用法
//allOf()
CompletableFuture<Integer> one = CompletableFuture.supplyAsync(() -> 1);
CompletableFuture<Integer> two = CompletableFuture.supplyAsync(() -> 2);
CompletableFuture<Integer> three = CompletableFuture.supplyAsync(() -> 3);
CompletableFuture<Integer> four = CompletableFuture.supplyAsync(() -> 4);
CompletableFuture<Integer> five = CompletableFuture.supplyAsync(() -> 5);
CompletableFuture<Integer> six = CompletableFuture.supplyAsync(() -> 6);
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(one, two, three, four, five, six);voidCompletableFuture.thenApply(v->{ return Stream.of(one,two,three,four, five, six)
.map(CompletableFuture::join) .collect(Collectors.toList());}).thenAccept(System.out::println);CompletableFuture<Void> voidCompletableFuture1 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
} catch (Exception e) {
} System.out.println("1");
});
我們定義了6個CompletableFuture等待所有的CompletableFuture等待所有任務完成以后然后將其值輸出。
anyOf()的用法
CompletableFuture<Void> voidCompletableFuture1 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
} catch (Exception e) {
}System.out.println("voidCompletableFuture1");
});CompletableFuture<Void> voidCompletableFutur2 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(2000);
} catch (Exception e) {
}System.out.println("voidCompletableFutur2");
});CompletableFuture<Void> voidCompletableFuture3 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(3000);
} catch (Exception e) {
}System.out.println("voidCompletableFuture3");
});CompletableFuture<Object> objectCompletableFuture = CompletableFuture .anyOf(voidCompletableFuture1, voidCompletableFutur2, voidCompletableFuture3);objectCompletableFuture.get();
這里我們定義了3個CompletableFuture進行一些耗時的任務,此時第一個CompletableFuture會率先完成。打印結果如下。
voidCompletableFuture1
異常處理
我們了解了CompletableFuture如何異步執行,如何組合不同的CompletableFuture,如何順序執行CompletableFuture。那么接下來還有一個重要的一步,就是在執行異步任務時發生異常的話該怎么辦。我們先寫個例子。
CompletableFuture.supplyAsync(()->{
//發生異常
int i = 10/0;
return "Success";
}).thenRun(()-> System.out.println("thenRun"))
.thenAccept(v -> System.out.println("thenAccept"));
CompletableFuture.runAsync(()-> System.out.println("CompletableFuture.runAsync"));
執行結果為,我們發現只要執行鏈中有一個發生了異常,那么接下來的鏈條也就不執行了,但是主流程下的其他CompletableFuture還是會運行的。
CompletableFuture.runAsync
exceptionally()
我們可以使用exceptionally進行異常的處理
//處理異常
CompletableFuture<String> exceptionally = CompletableFuture.supplyAsync(() -> {
//發生異常
int i = 10 / 0;
return "Success";
}).exceptionally(e -> { System.out.println(e); return "Exception has Handl";
});System.out.println(exceptionally.get());
打印如下,可以發現其接收值是異常信息,也能夠返回自定義返回值。
JAVA.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
Exception has Handl
handle()
調用handle()方法也能夠捕捉到異常并且自定義返回值,他和exceptionally()方法不同一點是handle()方法無論發沒發生異常都會被調用。例子如下
System.out.println("-------有異常-------");
CompletableFuture.supplyAsync(()->{
//發生異常
int i = 10/0;
return "Success";
}).handle((response,e)->{
System.out.println("Exception:" + e);
System.out.println("Response:" + response);
return response;
});System.out.println("-------無異常-------");
CompletableFuture.supplyAsync(()->{
return "Sucess";
}).handle((response,e)->{
System.out.println("Exception:" + e);
System.out.println("Response:" + response);
return response;
});
打印如下,我們可以看到在沒有發生異常的時候handle()方法也被調用了
-------有異常-------
Exception:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
Response:null
-------無異常-------
Exception:null
Response:Sucess