1 問題描述
假設(shè)我們正在編寫一個(gè)簡單的應(yīng)用程序,該應(yīng)用程序從客戶端接收一些輸入,對(duì)其進(jìn)行一些CPU密集型處理,然后記錄輸出。我們編寫的代碼看起來像以下內(nèi)容:
class ProcessingLibrary {
public void process (Input userInput) {
// 一些CPU密集型邏輯,用于處理用戶輸入
userInput.process();
// 記錄結(jié)果
Logger.log(userInput.getResults());
}
}
看起來很簡單,一個(gè)普通的函數(shù),它接受用戶輸入,對(duì)其進(jìn)行一些處理,然后返回輸出,我們可以將這個(gè)庫提供給客戶端。但是,你的客戶如果正在高并發(fā)地調(diào)用該進(jìn)程函數(shù),很快他們可能會(huì)抱怨他們的請(qǐng)求處理輸入的時(shí)間太長。原因非常簡單,當(dāng)外部客戶端調(diào)用你的函數(shù)時(shí),調(diào)用線程被阻塞,因?yàn)閷?shí)際處理是由調(diào)用線程進(jìn)行的。
2 初步解決方案
添加另一個(gè)要求:
-
調(diào)用線程不應(yīng)該被阻塞。
為了解決這個(gè)問題,我們可以考慮利用客戶端的CPU核心并將多線程納入我們的代碼。我們進(jìn)行一些修改,具體如下:
class ProcessingLibrary {
public void process (Input userInput) {
Runnable runnable = () -> {
userInput.process();
Logger.log(userInput.getResults());
};
Thread thread = new Thread(runnable);
thread.start();
}
}
對(duì)于每個(gè)請(qǐng)求,最好不要阻塞調(diào)用線程,而是生成一個(gè)新線程來進(jìn)行重度處理。調(diào)用線程可以從我們的處理方法中提前返回。確實(shí),這比我們?cè)缙诘陌姹居泻艽蟮母倪M(jìn)。我們的客戶開始使用新版本,他們比以前更滿意了。但現(xiàn)在他們開始抱怨CPU使用率變得過高,并且在他們那邊發(fā)生了崩潰。發(fā)生了什么?如果我們仔細(xì)檢查代碼,我們會(huì)發(fā)現(xiàn)我們沒有限制正在生成的線程數(shù)!如果客戶以非常高的速率調(diào)用進(jìn)程函數(shù),可能會(huì)生成數(shù)百甚至數(shù)千個(gè)線程,這對(duì)CPU來說是很大的開銷。我們還必須限制生成的線程數(shù)。
3 線程池
注意!我們這里又添加了一個(gè)要求:
-
調(diào)用線程不應(yīng)該被阻塞。
-
我們的邏輯不應(yīng)生成無限多數(shù)量的線程。
我們想要的是,調(diào)用線程不應(yīng)該被阻塞,我們應(yīng)該生成一定數(shù)量的線程,由客戶根據(jù)他們的CPU資源和他們想要實(shí)現(xiàn)的并行度決定。實(shí)際數(shù)量取決于請(qǐng)求到達(dá)的速率和平均請(qǐng)求時(shí)間。線程池是解決這個(gè)問題的理想解決方案。線程池是一個(gè)簡單的概念,可以并行執(zhí)行應(yīng)用程序代碼并利用CPU核心。線程池包含一組固定數(shù)量的可重復(fù)使用的工作線程,它們執(zhí)行分配給它們的任務(wù),而不會(huì)阻塞調(diào)用線程。我們下面看看如何實(shí)現(xiàn)一個(gè)簡單的線程池。
我們可以得出幾個(gè)簡單的觀察:
-
我們的線程應(yīng)該是可重復(fù)使用的,并且應(yīng)該在請(qǐng)求到達(dá)時(shí)惰性創(chuàng)建。線程創(chuàng)建是一個(gè)昂貴的過程(至少在JAVA中是這樣)。
-
如果請(qǐng)求到達(dá)的速率遠(yuǎn)高于線程池中的線程數(shù),我們可以在其他請(qǐng)求執(zhí)行完畢時(shí)將請(qǐng)求輸入保持在等待狀態(tài),然后當(dāng)一個(gè)線程完成處理一個(gè)請(qǐng)求時(shí),它可以從請(qǐng)求行中獲取另一個(gè)請(qǐng)求并開始處理它。通過這種方式,我們?nèi)匀豢梢詫?shí)現(xiàn)相當(dāng)高的并行性并獲得更多的請(qǐng)求吞吐量。
-
隊(duì)列可以成為存儲(chǔ)我們傳入請(qǐng)求的良好數(shù)據(jù)結(jié)構(gòu),而我們的線程可以在完成先前的項(xiàng)目后不斷地從隊(duì)列中獲取項(xiàng)目。
考慮以上要求,我們?yōu)榫€程池編寫一個(gè)簡單的類。
class ThreadPool {
private BlockingQueue<Runnable> taskQueue;
private Integer poolSize;
private AtomicInteger currentPoolSize;
public ThreadPool(int poolSize) {
this.poolSize = poolSize;
this.taskQueue = new LinkedBlockingQueue<>();
this.currentPoolSize = new AtomicInteger(0);
}
public void submitTask(Runnable runnable) {
this.taskQueue.add(runnable);
if(this.currentPoolSize.get() < this.poolSize) {
// 如果有更多的池大小可用,創(chuàng)建一個(gè)新線程
// 這個(gè)線程也應(yīng)該被重新用于未來任務(wù)
// 因此,它應(yīng)該繼續(xù)從隊(duì)列中尋找更多的任務(wù)
this.currentPoolSize.incrementAndGet();
this.createSingleThreadForPool();
}
}
private void createSingleThreadForPool() {
Runnable poolRunner = () -> {
while(true) {
if(this.taskQueue.size() > 0) {
Runnable taskFromQueue = this.taskQueue.poll();
taskFromQueue.run();
}
}
};
new Thread(poolRunner).start();
}
}
從以上實(shí)現(xiàn)中可以得到以下幾點(diǎn):
-
BlockingQueue
是一個(gè)線程安全的隊(duì)列實(shí)現(xiàn)。我們需要確保線程安全,因?yàn)槎鄠€(gè)線程正在訪問共享狀態(tài)。AtomicInteger
也是如此,用于線程安全更新我們當(dāng)前的池大小。 -
池運(yùn)行者中的
while
循環(huán)是為了確保該線程保持活動(dòng)狀態(tài),以便我們?cè)谙胍邮崭嗳蝿?wù)時(shí)可以繼續(xù)運(yùn)行。
我們可以更改我們對(duì)ProcessingLibrary
的實(shí)現(xiàn),如下:
class ProcessingLibrary {
private ThreadPool threadPool;
public ProcessingLibrary(int poolSize) {
this.threadPool = new ThreadPool(poolSize);
}
public void process (Input userInput) {
Runnable runnable = () -> {
userInput.process();
Logger.log(userInput.getResults());
};
this.threadPool.submitTask(runnable);
}
}
現(xiàn)在,我們已經(jīng)滿足了對(duì)這個(gè)問題的兩個(gè)要求 :)
在Java中,concurrent庫提供了與此類似的內(nèi)容,稱為ExecutorService
。雖然我們討論的實(shí)現(xiàn)有一些注意事項(xiàng),例如我們生成的線程一直在等待,但這是一個(gè)理解線程池內(nèi)部工作原理的良好起點(diǎn)。