一、阻塞隊列簡介
阻塞隊列常用于生產者和消費者場景,生產者往往是往隊列里添加元素的線程,消費者
是從隊列里拿元素的線程嗎,阻塞隊列就是生產者存放元素的容器,是消費者拿元素的容器
1.常見阻塞場景
當前隊列中沒有數據的情況下,消費端的所有線程都會被自動阻塞(掛起),直到有數據放入隊列
當隊列種數據填充滿的情況下,生產者端的所有線程都會被自動阻塞(掛起),直到隊列中
有空的位置,線程被自動喚醒
2.BlockingQueue
2.1.放入數據:
- offer(anobject):表示如果可以將anobject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則返回false(本方法不阻塞當前執行方法的線程)
- offer(e o,long timeout,TimeUnit unit):可以設定等待的時間,如果在指定的時間還不能往隊列中加入blockQuene,則返回失敗
- put(anobject):將anobject加到BlockingQueue里,如果BlockQueue沒有控件,則調用此方法的線程被阻塞直到BlockingQueue里面有空間再繼續
2.2.獲取數據:
- poll(long timeout,TimeUnit unit):從BlockQueue中取出一個隊首的對象,如果在指定時間內隊列一旦有時間可以取,則立即返回隊列中的數據,否則直到時間超過還沒有數據可取,返回失敗
- take():取走BlockingQueue排在位首的對象,若BlockingQueue為空,則阻塞進入等待狀態,直到BlockingQueue有新的數據加入
- drainTo():一次性從BlockingQueue獲取所有可用的數據對象(還可以指定獲取數據的個數)通過該方法,可以提升獲取數據的效率,無需分多次分批加鎖或釋放鎖。
二、JAVA中的阻塞隊列
1.ArrayBlockingQueue:由數組結構組成的有界阻塞隊列
他是用數組實現的有界阻塞隊列,并按照先進先出的原則對元素進行排序,默認情況下不保證線程公平的訪問隊列,公平的訪問隊列就是指阻塞的所有生產者線程或消費者線程當隊列不可用時,可以按照阻塞的先后順序訪問隊列,即先阻塞的生產者先生產,先阻塞的消費者線程可以先從隊列里獲取元素,通常情況下為了保證公平性會降低吞吐量。
2.LinkedBlockingQueue:由鏈表結構組成的有限阻塞隊列
他是基于鏈表的阻塞隊列,同ArrayListBlockingQueue類似,此隊列按照先進先出的原則對元素進行排序,其內部也會維持著一個數據緩沖隊列,當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據并緩存在隊列內部,而生產者立即返回,只有當隊列緩沖區達到緩存容量的最大值時(可以指定該值),才會阻塞生產者線程,直到消費者從隊列中消費掉一份數據,生產者線程會被喚醒,反之,對于消費者這段的處理也基于同樣的原理,而LinkedBlockingQueue之所以能夠高效的處理處理并發數據,還因為其對于生產者端和消費者端分別采用獨立的鎖來控制數據同步。
以上兩個常用的阻塞隊列,還有五種不再詳細介紹。
下面分析ArrayBlockingQueue的源代碼:
private static final long serialVersionUID = -817911632652898426L; final Object[] items;//阻塞隊列維護的一個object類型的數組 int takeIndex;//隊首元素 int putIndex;//隊尾元素 int count;//隊列中的元素 final ReentrantLock lock;//重入鎖 private final Condition notEmpty;//條件對象判斷數組不是滿的 private final Condition notFull;//條件對象判斷數組不是空的 transient Itrs itrs; final int dec(int i) { return ((i == 0) ? items.length : i) - 1; } /** * Returns item at index i. */ @SuppressWarnings("unchecked") final E itemAt(int i) { return (E) items[i]; } /** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); } /** * Extracts element at current take position, advances, and signals. * Call only when holding lock. */ private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; } //取元素 public void put(E e) throws InterruptedException { Objects.requireNonNull(e); final ReentrantLock lock = this.lock;//鎖 lock.lockInterruptibly(); try { while (count == items.length) notFull.await();//阻塞線程,等待notFull.signalAll()喚醒 enqueue(e); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await();//阻塞線程,等待notEmpty.await()喚醒 return dequeue(); } finally { lock.unlock(); } }
使用阻塞隊列就無需考慮同步和線程間通信的問題。
public class VolatikeDemo { private int queueSize=10; private ArrayBlockingQueue<Integer> queue=new ArrayBlockingQueue<>(queueSize); public static void main(String args[]){ VolatikeDemo demo=new VolatikeDemo(); Consumer consumer=demo.new Consumer(); Producer producer=demo.new Producer(); consumer.start(); producer.start(); } class Consumer extends Thread{ @Override public void run() { while(true){ try { int res=queue.take(); System.out.println(res); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Producer extends Thread{ @Override public void run() { while(true){ try { this.sleep(200); queue.put(1); } catch (InterruptedException e) { e.printStackTrace(); } } } } }

三、線程池
在編程中經常會使用線程來異步處理任務,但是每個線程的創建和銷毀都需要一定的 開銷。如果每次執行一個任務都需要打開一個新線程去執行,則這些線程的創建和銷毀 將消耗大量的資源,并且線程都是各自為政的,很難對其進行控制,更何況還有一堆的 線程在執行,這時就需要線程池來對線程進行管理,在java 1.5中提供了Executor框架用于 把任務的提交和執行解耦,任務的提交交給Runnable或者Callable,而Executor框架用來 處理任務,Executor框架中的核心成員就是ThreadPoolExecutor,他是線程池的核心類。
1.ThreadPoolExecutor
可以通過創建ThreadPoolExecutor來創建一個線程池,ThreadPoolExecutor類一共有四個構造方法
其中擁有最多參數的構造方法:
ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(?,?,?,?,?);
- corePoolSize:核心線程數,默認情況下線程池是空的,只有任務提交是才會線程,如果當前運行的線程數少于corePoolSize,則創建新線程來處理任務,如果等于或者多余corepoolsize則不會創建新線程,如果調用prestartAllcoreThread()方法:線程池會提前創建并啟動所有的核心線程來等待任務。
- maximumPoolSize:線程池允許創建的最大線程數,如果任務隊列滿了,并且線程數小于maximumPoolSize,則線程仍會創建新線程來處理任務。
- keepAliveTime:非核心線程閑置的超時時間,超過這個時間則回收,如果任務很多,并且每個任務的執行時間的時間很短,則可以調大keepAliveTime來提高線程的利用率。
- TimeUnit:keepAliveTime參數的時間單位,可選的單位有天,小時,分鐘,秒,毫秒等
- workQueue:任務隊列,如果當前線程數大與corePoolSize則將任務添加到此任務隊列中,該任務隊列是BlockingQuenu類型的,也就是阻塞隊列
- ThreadFactory:線程工廠,可以線程工廠給每個創建出來的線程池設置名字,一般情況下無需要設置此參數
- RejectedExecutionHandler:飽和策略,這是當任務隊列和線程池都滿了時所采取的應對策略默認是無法處理新任務(AbordPolicy)并拋出RejectedExecutionException異常,此外還有三種策略,分別如下:1.CallersRunsPolicy:用調查者所在的線程來處理任務,此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度(簡言之,降低提交速度)
- DiscardPolicy:不能執行的任務,并將該任務刪除
- DiscardOldestPolicy:丟棄隊列最近的任務,并執行當前的任務。
2.線程池的處理流程和原理

2.1.提交任務后,線程池先判斷線程數是否達到了核心線程數(corepoolSize)
如果還沒有達到核心線程數,則創建核心線程處理任務,否則執行下一步。
2.2.線程池判斷任務隊列是否滿了,如果沒滿,將任務加入任務隊列,否則執行
下一步。
2.3.線程池判斷線程數是否達到最大線程數,如果未達到,則創建非核心線程處理任務,
否則就執行飽和策略,默認會拋出RejectedExecutionExeception異常。

通過線程池的執行示意圖我們可以看出,如果我們執行ThreadPoolExecutor的execute方法,
會遇到各種情況
- 如果線程池中的線程數沒有達到核心線程數,則創建核心線程執行任務。
- 如果線程池中的線程數大于或等于核心線程數,則加入任務隊列,線程池中的空閑線程會不斷的從任務隊列中取任務執行。
- 如果任務隊列滿了,并且線程數沒有達到最大線程數,則創建非核心線程去處理任務。
- 如果線程數超過了最大線程數,則執行飽和策略。
3.四種常用的線程池
3.1.FixedThreadPool:他是可重用固定線程數的線程池,他的主要特點如下:
- 只有核心線程,沒有非核心線程,并且數量是固定的,keepAliveTime設置為0意味著多余的線程會被立即終止,因此不會產生多余的線程,采用了無界阻塞隊列LinkedBlockingQueue。
- 當執行execute方法時:如果當前運行的線程數未達到核心線程數,則創建一個新線程來處理任務,如果運行線程數等于核心線程數,則將任務添加到阻塞隊列中,FixThread就是擁有固定數量核心線程的線程池,并且這些核心線程不會被回收,當線程池中有空閑線程就會去任務隊列取任務

4.CacheThreadPool
4.1.CacheThreadPool:他是一個根據需要創建線程的線程池,他的主要特點如下:
- CacheThreadPool的corePoolSize為0,maximumPoolSize設置為最大值,他沒有核心線程,非核心線程是無界的,keepAliveTime設置為60s,并使用了阻塞隊列SynchronousQueue,他是一個不存儲元素的阻塞隊列,每個插入操作必須要等待另外一個線程的移除操作,同樣一個移除操作也需要等待插入操作。

- 當執行execute方法時:首先會執行synchroniusQueue的offer方法來提交任務,并且查詢線程池中是否有空閑的線程執行SynchronousQueue的poll方法來移除任務,如果有則配對成功,將任務交給這個線程去處理,如果沒有則配對失敗,創建新的線程去執行任務當線程池中的線程空閑時,他會執行SynchronusQueue的poll方法,等待synchronoudQueue提交的新任務,如果60s沒有新任務提交到synchronousQueue,則這個線程就會終止。cacheThreadPool適合大量需要立即處理并且耗時少的任務。
- SingleThreadExecutor:他是使用單個工作線程的線程池,corePoolSize和maximumPoolSize都為1,意味著SingleThreadExecutor只有一個核心線程,其他核心參數都和FixThreadPool一樣,SingleThreadExecutor執行execute方法時,如果當前運行的線程數未達到核心線程數,也就是當前沒有運行的線程,則創建一個新線程來處理任務,如果當前有運行的線程,則將任務添加到阻塞隊列中,因此SingleThreadExecutor能確保所有的任務都在一個線程中按照順序逐一執行

最后
如果你看到了這里,覺得文章寫得不錯就給個贊唄!歡迎大家評論討論!如果你覺得那里值得改進的,請給我留言。一定會認真查詢,修正不足,定期免費分享技術干貨。喜歡的小伙伴可以關注一下哦。謝謝!