日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

多線程并發(fā)是JAVA語言中非常重要的一塊內(nèi)容,同時,也是Java基礎(chǔ)的一個難點(diǎn)。說它重要是因為多線程是日常開發(fā)中頻繁用到的知識,說它難是因為多線程并發(fā)涉及到的知識點(diǎn)非常之多,想要完全掌握J(rèn)ava的并發(fā)相關(guān)知識并非易事。也正因此,Java并發(fā)成了Java面試中最高頻的知識點(diǎn)之一。本系列文章將從Java內(nèi)存模型、volatile關(guān)鍵字、synchronized關(guān)鍵字、ReetrantLock、Atomic并發(fā)類以及線程池等方面來系統(tǒng)的認(rèn)識Java的并發(fā)知識。通過本系列文章的學(xué)習(xí)你將深入理解volatile關(guān)鍵字的作用,了解到synchronized實現(xiàn)原理、AQS和CLH隊列鎖,清晰的認(rèn)識自旋鎖、偏向鎖、樂觀鎖、悲觀鎖...等等一系列讓人眼花繚亂的并發(fā)知識。

一、線程池基礎(chǔ)知識

在Java語言中,雖然創(chuàng)建并啟動一個線程非常方便,但是由于創(chuàng)建線程需要占用一定的操作系統(tǒng)資源,在高并發(fā)的情況下,頻繁的創(chuàng)建和銷毀線程會大量消耗CPU和內(nèi)存資源,對程序性能造成很大的影響。為了避免這一問題,Java給我們提供了線程池。

線程池是一種基于池化技術(shù)思想來管理線程的工具。在線程池中維護(hù)了多個線程,由線程池統(tǒng)一的管理調(diào)配線程來執(zhí)行任務(wù)。通過線程復(fù)用,減少了頻繁創(chuàng)建和銷毀線程的開銷。

本章內(nèi)容我們先來了解一下線程池的一些基礎(chǔ)知識,學(xué)習(xí)如何使用線程池以及了解線程池的生命周期。

1.線程池的使用

線程池的使用和創(chuàng)建可以說非常的簡單,這得益于JDK提供給我們良好封裝的API。線程池的實現(xiàn)被封裝到了ThreadPoolExecutor中,我們可以通過ThreadPoolExecutor的構(gòu)造方法來實例化出一個線程池,代碼如下:

// 實例化一個線程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 10, 60,
        TimeUnit.SECONDS, new ArrayBlockingQueue<>(20));
// 使用線程池執(zhí)行一個任務(wù)        
executor.execute(() -> {
    // Do something
});
// 關(guān)閉線程池,會阻止新任務(wù)提交,但不影響已提交的任務(wù)
executor.shutdown();
// 關(guān)閉線程池,阻止新任務(wù)提交,并且中斷當(dāng)前正在運(yùn)行的線程
executor.showdownNow();

創(chuàng)建好線程池后直接調(diào)用execute方法并傳入一個Runnable參數(shù)即可將任務(wù)交給線程池執(zhí)行,通過shutdown/shutdownNow方法可以關(guān)閉線程池。

ThreadPoolExecutor的構(gòu)造方法中參數(shù)眾多,對于初學(xué)者而言在沒有了解各個參數(shù)的作用的情況下很難去配置合適的線程池。因此Java還為我們提供了一個線程池工具類Executors來快捷的創(chuàng)建線程池。Executors提供了很多簡便的創(chuàng)建線程池的方法,舉兩個例子,代碼如下:

// 實例化一個單線程的線程池
ExecutorService singleExecutor = Executors.newSingleThreadExecutor();
// 創(chuàng)建固定線程個數(shù)的線程池
ExecutorService fixedExecutor = Executors.newFixedThreadPool(10);
// 創(chuàng)建一個可重用固定線程數(shù)的線程池
ExecutorService executorService2 = Executors.newCachedThreadPool();

但是,通常來說在實際開發(fā)中并不推薦直接使用Executors來創(chuàng)建線程池,而是需要根據(jù)項目實際情況配置適合自己項目的線程池,關(guān)于如何配置合適的線程池這是后話,需要我們理解線程池的各個參數(shù)以及線程池的工作原理之后才能有答案。

2.線程池的生命周期

線程池從誕生到死亡,中間會經(jīng)歷RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五個生命周期狀態(tài)。

    • RUNNING 表示線程池處于運(yùn)行狀態(tài),能夠接受新提交的任務(wù)且能對已添加的任務(wù)進(jìn)行處理。RUNNING狀態(tài)是線程池的初始化狀態(tài),線程池一旦被創(chuàng)建就處于RUNNING狀態(tài)。
    • SHUTDOWN 線程處于關(guān)閉狀態(tài),不接受新任務(wù),但可以處理已添加的任務(wù)。RUNNING狀態(tài)的線程池調(diào)用shutdown后會進(jìn)入SHUTDOWN狀態(tài)。
    • STOP 線程池處于停止?fàn)顟B(tài),不接收任務(wù),不處理已添加的任務(wù),且會中斷正在執(zhí)行任務(wù)的線程。RUNNING狀態(tài)的線程池調(diào)用了shutdownNow后會進(jìn)入STOP狀態(tài)。
    • TIDYING 當(dāng)所有任務(wù)已終止,且任務(wù)數(shù)量為0時,線程池會進(jìn)入TIDYING。當(dāng)線程池處于SHUTDOWN狀態(tài)時,阻塞隊列中的任務(wù)被執(zhí)行完了,且線程池中沒有正在執(zhí)行的任務(wù)了,狀態(tài)會由SHUTDOWN變?yōu)門IDYING。當(dāng)線程處于STOP狀態(tài)時,線程池中沒有正在執(zhí)行的任務(wù)時則會由STOP變?yōu)門IDYING。
    • TERMINATED 線程終止?fàn)顟B(tài)。處于TIDYING狀態(tài)的線程執(zhí)行terminated()后進(jìn)入TERMINATED狀態(tài)。

根據(jù)上述線程池生命周期狀態(tài)的描述,可以畫出如下所示的線程池生命周期狀態(tài)流程示意圖。

Java并發(fā)系列終結(jié)篇:徹底搞懂Java線程池的工作原理

 

1.ThreadPoolExecutor中的參數(shù)

上一小節(jié)中,我們使用ThreadPoolExecutor的構(gòu)造方法來創(chuàng)建了一個線程池。其實在ThreadPoolExecutor中有多個構(gòu)造方法,但是最終都調(diào)用到了下邊代碼中的這一個構(gòu)造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        // ...省略校驗相關(guān)代碼
        
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    
    // ...    

}

這個構(gòu)造方法中有7個參數(shù)之多,我們逐個來看每個參數(shù)所代表的含義:

  • corePoolSize 表示線程池的核心線程數(shù)。當(dāng)有任務(wù)提交到線程池時,如果線程池中的線程數(shù)小于corePoolSize,那么則直接創(chuàng)建新的線程來執(zhí)行任務(wù)。
  • workQueue 任務(wù)隊列,它是一個阻塞隊列,用于存儲來不及執(zhí)行的任務(wù)的隊列。當(dāng)有任務(wù)提交到線程池的時候,如果線程池中的線程數(shù)大于等于corePoolSize,那么這個任務(wù)則會先被放到這個隊列中,等待執(zhí)行。
  • maximumPoolSize 表示線程池支持的最大線程數(shù)量。當(dāng)一個任務(wù)提交到線程池時,線程池中的線程數(shù)大于corePoolSize,并且workQueue已滿,那么則會創(chuàng)建新的線程執(zhí)行任務(wù),但是線程數(shù)要小于等于maximumPoolSize。
  • keepAliveTime 非核心線程空閑時保持存活的時間。非核心線程即workQueue滿了之后,再提交任務(wù)時創(chuàng)建的線程,因為這些線程不是核心線程,所以它空閑時間超過keepAliveTime后則會被回收。
  • unit 非核心線程空閑時保持存活的時間的單位
  • threadFactory 創(chuàng)建線程的工廠,可以在這里統(tǒng)一處理創(chuàng)建線程的屬性
  • handler 拒絕策略,當(dāng)線程池中的線程達(dá)到maximumPoolSize線程數(shù)后且workQueue已滿的情況下,再向線程池提交任務(wù)則執(zhí)行對應(yīng)的拒絕策略

2.線程池工作流程

線程池提交任務(wù)是從execute方法開始的,我們可以從execute方法來分析線程池的工作流程。

(1)當(dāng)execute方法提交一個任務(wù)時,如果線程池中線程數(shù)小于corePoolSize,那么不管線程池中是否有空閑的線程,都會創(chuàng)建一個新的線程來執(zhí)行任務(wù)。

Java并發(fā)系列終結(jié)篇:徹底搞懂Java線程池的工作原理

 

(2)當(dāng)execute方法提交一個任務(wù)時,線程池中的線程數(shù)已經(jīng)達(dá)到了corePoolSize,且此時沒有空閑的線程,那么則會將任務(wù)存儲到workQueue中。

Java并發(fā)系列終結(jié)篇:徹底搞懂Java線程池的工作原理

 

(3)如果execute提交任務(wù)時線程池中的線程數(shù)已經(jīng)到達(dá)了corePoolSize,并且workQueue已滿,那么則會創(chuàng)建新的線程來執(zhí)行任務(wù),但總線程數(shù)應(yīng)該小于maximumPoolSize。

Java并發(fā)系列終結(jié)篇:徹底搞懂Java線程池的工作原理

 

(4)如果線程池中的線程執(zhí)行完了當(dāng)前的任務(wù),則會嘗試從workQueue中取出第一個任務(wù)來執(zhí)行。如果workQueue為空則會阻塞線程。

Java并發(fā)系列終結(jié)篇:徹底搞懂Java線程池的工作原理

 

(5)如果execute提交任務(wù)時,線程池中的線程數(shù)達(dá)到了maximumPoolSize,且workQueue已滿,此時會執(zhí)行拒絕策略來拒絕接受任務(wù)。

Java并發(fā)系列終結(jié)篇:徹底搞懂Java線程池的工作原理

 

(6)如果線程池中的線程數(shù)超過了corePoolSize,那么空閑時間超過keepAliveTime的線程會被銷毀,但程池中線程個數(shù)會保持為corePoolSize。

Java并發(fā)系列終結(jié)篇:徹底搞懂Java線程池的工作原理

 

(7)如果線程池存在空閑的線程,并且設(shè)置了allowCoreThreadTimeOut為true。那么空閑時間超過keepAliveTime的線程都會被銷毀。

Java并發(fā)系列終結(jié)篇:徹底搞懂Java線程池的工作原理

 


3.線程池的拒絕策略

如果線程池中的線程數(shù)達(dá)到了maximumPoolSize,并且workQueue隊列存儲滿的情況下,線程池會執(zhí)行對應(yīng)的拒絕策略。在JDK中提供了RejectedExecutionHandler接口來執(zhí)行拒絕操作。實現(xiàn)RejectedExecutionHandler的類有四個,對應(yīng)了四種拒絕策略。分別如下:

  • DiscardPolicy 當(dāng)提交任務(wù)到線程池中被拒絕時,線程池會丟棄這個被拒絕的任務(wù)
  • DiscardOldestPolicy 當(dāng)提交任務(wù)到線程池中被拒絕時,線程池會丟棄等待隊列中最老的任務(wù)。
  • CallerRunsPolicy 當(dāng)提交任務(wù)到線程池中被拒絕時,會在線程池當(dāng)前正在運(yùn)行的Thread線程中處理被拒絕額任務(wù)。即哪個線程提交的任務(wù)哪個線程去執(zhí)行。
  • AbortPolicy 當(dāng)提交任務(wù)到線程池中被拒絕時,直接拋出RejectedExecutionException異常。

三、線程池源碼分析

從上一章對線程池的工作流程解讀來看,線程池的原理似乎并沒有很難。但是開篇時我說過想要讀懂線程池的源碼并不難,主要原因是線程池內(nèi)部運(yùn)用到了大量并發(fā)相關(guān)知識,另外還與線程池中用到的位運(yùn)算有關(guān)。

1.線程池中的位運(yùn)算(了解內(nèi)容)

在向線程池提交任務(wù)時有兩個比較重要的參數(shù)會決定任務(wù)的去向,這兩個參數(shù)分別是線程池的狀態(tài)和線程池中的線程數(shù)。在ThreadPoolExecutor內(nèi)部使用了一個AtomicInteger類型的整數(shù)ctl來表示這兩個參數(shù),代碼如下:

public class ThreadPoolExecutor extends AbstractExecutorService {
    // Integer.SIZE = 32.所以 COUNT_BITS= 29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 00001111 11111111 11111111 11111111 這個值可以表示線程池的最大線程容量
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
    // 將-1左移29位得到RUNNING狀態(tài)的值
    private static final int RUNNING    = -1 << COUNT_BITS;    
    // 線程池運(yùn)行狀態(tài)和線程數(shù)
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    
    // ...
}    

因為涉及多線程的操作,這里為了保證原子性,ctl參數(shù)使用了AtomicInteger類型,并且通過ctlOf方法來計算出了ctl的初始值。如果你不了解位運(yùn)算大概很難理解上述代碼的用意。

我們知道,int類型在Java中占用4byte的內(nèi)存,一個byte占用8bit,所以Java中的int類型共占用32bit。對于這個32bit,我們可以進(jìn)行高低位的拆分。做Android開發(fā)的同學(xué)應(yīng)該都了解View測量流程中的MeasureSpec參數(shù),這個參數(shù)將32bit的int拆分成了高2位和低30位,分別表示View的測量模式和測量值。而這里的ctl與MeasureSpec類似,ctl將32位的int拆分成了高3位和低29位,分別表示線程池的運(yùn)行狀態(tài)和線程池中的線程個數(shù)。

下面我們通過位運(yùn)算來驗證一下ctl是如何工作的,當(dāng)然,如果你不理解這個位運(yùn)算的過程對理解線程池的源碼影響并不大,所以對以下驗證內(nèi)容不感興趣的同學(xué)可以直接略過。

可以看到上述代碼中RUNNING的值為-1左移29位,我們知道在計算機(jī)中**負(fù)數(shù)是以其絕對值的補(bǔ)碼來表示的,而補(bǔ)碼是由反碼加1得到。**因此-1在計算機(jī)中存儲形式為1的反碼+1

1的原碼:00000000 00000000 00000000 00000001
                                            +
1的反碼:11111111 11111111 11111111 11111110
       ---------------------------------------
-1存儲: 11111111 11111111 11111111 11111111

接下來對-1左移29位可以得到RUNNING的值為:

// 高三位表示線程狀態(tài),即高三位為111表示RUNNING
11100000 00000000 00000000 00000000

而AtomicInteger初始線程數(shù)量是0,因此ctlOf方法中的“|”運(yùn)算如下:

RUNNING:  11100000 00000000 00000000 00000000
                                               |
線程數(shù)為0:  00000000 00000000 00000000 00000000
          ---------------------------------------
得到ctl:   11100000 00000000 00000000 00000000

通過RUNNING|0(線程數(shù))即可得到ctl的初始值。同時還可以通過以下方法將ctl拆解成運(yùn)行狀態(tài)和線程數(shù):

    // 00001111 11111111 11111111 11111111
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
    // 獲取線程池運(yùn)行狀態(tài)
    private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
    // 獲取線程池中的線程數(shù)
    private static int workerCountOf(int c)  { return c & COUNT_MASK; }

假設(shè)此時線程池為RUNNING狀態(tài),且線程數(shù)為0,驗證一下runStateOf是如何得到線程池的運(yùn)行狀態(tài)的:

 COUNT_MASK:  00001111 11111111 11111111 11111111
                                                  
 ~COUNT_MASK: 11110000 00000000 00000000 00000000
                                                   &
 ctl:         11100000 00000000 00000000 00000000
             ----------------------------------------
 RUNNING:     11100000 00000000 00000000 00000000            
 
復(fù)制代碼

如果不理解上邊的驗證流程沒有關(guān)系,只要知道通過runStateOf方法可以得到線程池的運(yùn)行狀態(tài),通過workerCountOf可以得到線程池中的線程數(shù)即可。

接下來我們進(jìn)入線程池的源碼的源碼分析環(huán)節(jié)。

2.ThreadPoolExecutor的execute

向線程池提交任務(wù)的方法是execute方法,execute方法是ThreadPoolExecutor的核心方法,以此方法為入口來進(jìn)行剖析,execute方法的代碼如下:

   public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // 獲取ctl的值
        int c = ctl.get();
        // 1.線程數(shù)小于corePoolSize
        if (workerCountOf(c) < corePoolSize) {
            // 線程池中線程數(shù)小于核心線程數(shù),則嘗試創(chuàng)建核心線程執(zhí)行任務(wù)
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 2.到此處說明線程池中線程數(shù)大于核心線程數(shù)或者創(chuàng)建線程失敗
        if (isRunning(c) && workQueue.offer(command)) {
            // 如果線程是運(yùn)行狀態(tài)并且可以使用offer將任務(wù)加入阻塞隊列未滿,offer是非阻塞操作。
            int recheck = ctl.get();
            // 重新檢查線程池狀態(tài),因為上次檢測后線程池狀態(tài)可能發(fā)生改變,如果非運(yùn)行狀態(tài)就移除任務(wù)并執(zhí)行拒絕策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果是運(yùn)行狀態(tài),并且線程數(shù)是0,則創(chuàng)建線程
            else if (workerCountOf(recheck) == 0)
                // 線程數(shù)是0,則創(chuàng)建非核心線程,且不指定首次執(zhí)行任務(wù),這里的第二個參數(shù)其實沒有實際意義
                addWorker(null, false);
        }
        // 3.阻塞隊列已滿,創(chuàng)建非核心線程執(zhí)行任務(wù)
        else if (!addWorker(command, false))
            // 如果失敗,則執(zhí)行拒絕策略
            reject(command);
    }

execute方法中的邏輯可以分為三部分:

  • 1.如果線程池中的線程數(shù)小于核心線程,則直接調(diào)用addWorker方法創(chuàng)建新線程來執(zhí)行任務(wù)。
  • 2.如果線程池中的線程數(shù)大于核心線程數(shù),則將任務(wù)添加到阻塞隊列中,接著再次檢驗線程池的運(yùn)行狀態(tài),因為上次檢測過之后線程池狀態(tài)有可能發(fā)生了變化,如果線程池關(guān)閉了,那么移除任務(wù),執(zhí)行拒絕策略。如果線程依然是運(yùn)行狀態(tài),但是線程池中沒有線程,那么就調(diào)用addWorker方法創(chuàng)建線程,注意此時傳入任務(wù)參數(shù)是null,即不指定執(zhí)行任務(wù),因為任務(wù)已經(jīng)加入了阻塞隊列。創(chuàng)建完線程后從阻塞隊列中取出任務(wù)執(zhí)行。
  • 3.如果第2步將任務(wù)添加到阻塞隊列失敗了,說明阻塞隊列任務(wù)已滿,那么則會執(zhí)行第三步,即創(chuàng)建非核心線程來執(zhí)行任務(wù),如果非核心線程創(chuàng)建失敗那么就執(zhí)行拒絕策略。

可以看到,代碼的執(zhí)行邏輯和我們在第二章中分析的線程池的工作流程是一樣的。

接下來看下execute方法中創(chuàng)建線程的方法addWoker,addWoker方法承擔(dān)了核心線程和非核心線程的創(chuàng)建,通過一個boolean參數(shù)core來區(qū)分是創(chuàng)建核心線程還是非核心線程。先來看addWorker方法前半部分的代碼:

   // 返回值表示是否成功創(chuàng)建了線程
   private boolean addWorker(Runnable firstTask, boolean core) {
        // 這里做了一個retry標(biāo)記,相當(dāng)于goto.
        retry:
        for (int c = ctl.get();;) {
            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;

            for (;;) {
                // 根據(jù)core來確定創(chuàng)建最大線程數(shù),超過最大值則創(chuàng)建線程失敗,注意這里的最大值可能有s三個corePoolSize、maximumPoolSize和線程池線程的最大容量
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                // 通過CAS來將線程數(shù)+1,如果成功則跳出循環(huán),執(zhí)行下邊邏輯    
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 線程池的狀態(tài)發(fā)生了改變,退回retry重新執(zhí)行
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
            }
        }
        
        // ...省略后半部分
       
        return workerStarted;
    }

這部分代碼會通過是否創(chuàng)建核心線程來確定線程池中線程數(shù)的值,如果是創(chuàng)建核心線程,那么最大值不能超過corePoolSize,如果是創(chuàng)建非核心線程那么線程數(shù)不能超過maximumPoolSize,另外無論是創(chuàng)建核心線程還是非核心線程,最大線程數(shù)都不能超過線程池允許的最大線程數(shù)COUNT_MASK(有可能設(shè)置的maximumPoolSize大于COUNT_MASK)。如果線程數(shù)大于最大值就返回false,創(chuàng)建線程失敗。

接下來通過CAS將線程數(shù)加1,如果成功那么就break retry結(jié)束無限循環(huán),如果CAS失敗了則就continue retry從新開始for循環(huán),注意這里的retry不是Java的關(guān)鍵字,是一個可以任意命名的字符。

接下來,如果能繼續(xù)向下執(zhí)行則開始執(zhí)行創(chuàng)建線程并執(zhí)行任務(wù)的工作了,看下addWorker方法的后半部分代碼:

   private boolean addWorker(Runnable firstTask, boolean core) {
        
        // ...省略前半部分

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 實例化一個Worker,內(nèi)部封裝了線程
            w = new Worker(firstTask);
            // 取出新建的線程
            final Thread t = w.thread;
            if (t != null) {
                // 這里使用ReentranLock加鎖保證線程安全
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int c = ctl.get();
                    // 拿到鎖湖重新檢查線程池狀態(tài),只有處于RUNNING狀態(tài)或者處于SHUTDOWN并且firstTask==null時候才會創(chuàng)建線程
                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        // 線程不是處于NEW狀態(tài),說明線程已經(jīng)啟動,拋出異常
                        if (t.getState() != Thread.State.NEW)
                            throw new IllegalThreadStateException();
                        // 將線程加入線程隊列,這里的worker是一個HashSet   
                        workers.add(w);
                        workerAdded = true;
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 開啟線程執(zhí)行任務(wù)
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

這部分邏輯其實比較容易理解,就是創(chuàng)建Worker并開啟線程執(zhí)行任務(wù)的過程,Worker是對線程的封裝,創(chuàng)建的worker會被添加到ThreadPoolExecutor中的HashSet中。也就是線程池中的線程都維護(hù)在這個名為workers的HashSet中并被ThreadPoolExecutor所管理,HashSet中的線程可能處于正在工作的狀態(tài),也可能處于空閑狀態(tài),一旦達(dá)到指定的空閑時間,則會根據(jù)條件進(jìn)行回收線程。

我們知道,線程調(diào)用start后就會開始執(zhí)行線程的邏輯代碼,執(zhí)行完后線程的生命周期就結(jié)束了,那么線程池是如何保證Worker執(zhí)行完任務(wù)后仍然不結(jié)束的呢?當(dāng)線程空閑超時或者關(guān)閉線程池又是怎樣進(jìn)行線程回收的呢?這個實現(xiàn)邏輯其實就在Worker中。看下Worker的代碼:

 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        // 執(zhí)行任務(wù)的線程
        final Thread thread;
        // 初始化Worker時傳進(jìn)來的任務(wù),可能為null,如果不空,則創(chuàng)建和立即執(zhí)行這個task,對應(yīng)核心線程創(chuàng)建的情況
        Runnable firstTask;

        Worker(Runnable firstTask) {
            // 初始化時設(shè)置setate為-1
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            // 通過線程工程創(chuàng)建線程
            this.thread = getThreadFactory().newThread(this);
        }
        
        // 線程的真正執(zhí)行邏輯
        public void run() {
            runWorker(this);
        }
        
        // 判斷線程是否是獨(dú)占狀態(tài),如果不是意味著線程處于空閑狀態(tài)
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        // 獲取鎖
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        // 釋放鎖
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        // ...
    }

Worker是位于ThreadPoolExecutor中的一個內(nèi)部類,它繼承了AQS,使用AQS來實現(xiàn)了獨(dú)占鎖的功能,但是并沒支持可重入。這里使用不可重入的特性來表示線程的執(zhí)行狀態(tài),即可以通過isHeldExclusively方法來判斷,如果是獨(dú)占狀態(tài),說明線程正在執(zhí)行任務(wù),如果非獨(dú)占狀態(tài),說明線程處于空閑狀態(tài)。關(guān)于AQS我們前邊文章中已經(jīng)詳細(xì)分析過了,不了解AQS的可以翻看前邊ReentranLock的文章。

另外,Worker還實現(xiàn)了Runnable接口,因此它的執(zhí)行邏輯就是在run方法中,run方法調(diào)用的是線程池中的runWorker(this)方法。任務(wù)的執(zhí)行邏輯就在runWorker方法中,它的代碼如下:


    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        // 取出Worker中的任務(wù),可能為空
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // task不為null或者阻塞隊列中有任務(wù),通過循環(huán)不斷的從阻塞隊列中取出任務(wù)執(zhí)行
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // ...
                try {
                    // 任務(wù)執(zhí)行前的hook點(diǎn)
                    beforeExecute(wt, task);
                    try {
                        // 執(zhí)行任務(wù)
                        task.run();
                        // 任務(wù)執(zhí)行后的hook點(diǎn)
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 超時沒有取到任務(wù),則回收空閑超時的線程
            processWorkerExit(w, completedAbruptly);
        }
    }

可以看到,runWorker的核心邏輯就是不斷通過getTask方法從阻塞隊列中獲取任務(wù)并執(zhí)行.通過這樣的方式實現(xiàn)了線程的復(fù)用,避免了創(chuàng)建線程。這里要注意的是這里是一個“生產(chǎn)者-消費(fèi)者”模式,getTask是從阻塞隊列中取任務(wù),所以如果阻塞隊列中沒有任務(wù)的時候就會處于阻塞狀態(tài)。getTask中通過判斷是否要回收線程而設(shè)置了等待超時時間,如果阻塞隊列中一直沒有任務(wù),那么在等待keepAliveTime時間后會拋出異常。最終會走到上述代碼的finally方法中,意味著有線程空閑時間超過了keepAliveTime時間,那么調(diào)用processWorkerExit方法移除Worker。processWorkerExit方法中沒有復(fù)雜難以理解的邏輯,這里就不再貼代碼了。我們重點(diǎn)看下getTask中是如何處理的,代碼如下:

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            // ...
           

            // Flag1. 如果配置了allowCoreThreadTimeOut==true或者線程池中的線程數(shù)大于核心線程數(shù),則timed為true,表示開啟指定線程超時后被回收
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
            // ...

            try {
                // Flag2. 取出阻塞隊列中的任務(wù),注意如果timed為true,則會調(diào)用阻塞隊列的poll方法,并設(shè)置超時時間為keepAliveTime,如果超時沒有取到任務(wù)則會拋出異常。
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

重點(diǎn)看getTask是如何處理空閑超時的邏輯的。我們知道,回收線程的條件是線程大于核心線程數(shù)或者配置了allowCoreThreadTimeOut為true,當(dāng)線程空閑超時的情況下就會回收線程。上述代碼在Flag1處先判斷了如果線程池中的線程數(shù)大于核心線程數(shù),或者開啟了allowCoreThreadTimeOut,那么就需要開啟線程空閑超時回收。所以在Flag2處,timed為true的情況下調(diào)用了阻塞隊列的poll方法,并傳入了超時時間為keepAliveTime,如果在keepAliveTime時間內(nèi),阻塞隊列一直為null那么久會拋出異常,結(jié)束runWorker的循環(huán)。進(jìn)而執(zhí)行runWorker方法中回收線程的操作。

這里需要我們理解阻塞隊列poll方法的使用,poll方法接受一個時間參數(shù),是一個阻塞操作,在給定的時間內(nèi)沒有獲取到數(shù)據(jù)就會拋出異常。其實說白了,阻塞隊列就是一個使用ReentranLock實現(xiàn)的“生產(chǎn)者-消費(fèi)者”模式,我們在深入理解Java線程的等待與喚醒機(jī)制(二)這篇文章中使用ReentranLock實現(xiàn)“生產(chǎn)者-消費(fèi)者”模型其實就是一個簡單的阻塞隊列,與JDK中的BlockingQueue實現(xiàn)機(jī)制類似。感興趣的同學(xué)可以自己查看ArrayBlockingQueue等阻塞隊列的實現(xiàn),限于文章篇幅,這里就不再贅述了。

3.ThreadPoolExecutor的拒絕策略

上一小節(jié)中我們多次提到線程池的拒絕策略,它是在reject方法中實現(xiàn)的。實現(xiàn)代碼也非常簡單,代碼如下:

    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

通過調(diào)用handler的rejectedExecution方法實現(xiàn)。這里其實就是運(yùn)用了策略模式,handler是一個RejectedExecutionHandler類型的成員變量,RejectedExecutionHandler是一個接口,只有一個rejectedExecution方法。在實例化線程池時構(gòu)造方法中傳入對應(yīng)的拒絕策略實例即可。前文已經(jīng)提到了Java提供的幾種默認(rèn)實現(xiàn)分別為DiscardPolicy、DiscardOldestPolicy、CallerRunsPolicy以及AbortPolicy。

以AbortPolicy直接拋出異常為例,來看下代碼實現(xiàn):

    public static class AbortPolicy implements RejectedExecutionHandler {

        public AbortPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

可以看到直接在rejectedExecution方法中拋出了
RejectedExecutionException來拒絕任務(wù)。其他的幾個策略實現(xiàn)也都比較簡單,有興趣可以自己查閱代碼。

4.ThreadPoolExecutor的shutdown

調(diào)用shutdown方法后,會將線程池標(biāo)記為SHUTDOWN狀態(tài),上邊execute的源碼可以看出,只有線程池是RUNNING狀態(tài)才接受任務(wù),因此被標(biāo)記位SHUTDOWN后,再提交任務(wù)會被線程池拒絕。shutdown的代碼如下:

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //檢查是否可以關(guān)閉線程
            checkShutdownAccess();
            // 將線程池狀態(tài)置為SHUTDOWN狀態(tài)
            advanceRunState(SHUTDOWN);
            // 嘗試中斷空閑線程
            interruptIdleWorkers();
            // 空方法,線程池關(guān)閉的hook點(diǎn)
            onShutdown(); 
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
    
    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }    

修改線程池為SHUTDOWN狀態(tài)后,會調(diào)用interruptIdleWorkers去中斷空閑線程線程,具體實現(xiàn)邏輯是在interruptIdleWorkers(boolean onlyOne)方法中,如下:

    
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                // 嘗試tryLock獲取鎖,如果拿鎖成功說明線程是空閑狀態(tài)
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        // 中斷線程
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

shutdown的邏輯比較簡單,里邊做了兩件比較重要的事情,即先將線程池狀態(tài)修改為SHUTDOWN,接著遍歷所有Worker,將空閑的Worker進(jìn)行中斷。

五、總結(jié)

本文深入地探究了線程池的工作流程和實現(xiàn)原理。就線程池的工作流程而言其實并不難以理解。但是在分析線程池的源碼時,如果沒有很好的并發(fā)基礎(chǔ)的話,大概率是難以讀懂線程池的源碼的。因為線程池內(nèi)部使用了大量并發(fā)知識,對任何一點(diǎn)用到的并發(fā)知識認(rèn)識不到位都會造成理解偏差。寫這篇文章參看了很多的其他線程池的相關(guān)文章,幾乎沒有找到一篇能夠剖析清楚線程池源碼的文章。歸根結(jié)底還是沒能系統(tǒng)地理解Atomic、Lock與AQS、CAS、阻塞隊列等并發(fā)相關(guān)知識。

作者:賭一包辣條
鏈接:
https://juejin.cn/post/6983213662383112206

分享到:
標(biāo)簽:線程
用戶無頭像

網(wǎng)友整理

注冊時間:

網(wǎng)站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學(xué)四六

運(yùn)動步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績評定2018-06-03

通用課目體育訓(xùn)練成績評定