大家好,我是哪吒。
上一篇簡(jiǎn)單聊一聊公平鎖和非公平鎖,parallel并行流,提到了一個(gè)IntStream.rangeClosed并行流問題,很多小伙伴,對(duì)這個(gè)比較陌生,想用線程池的方式改造一下。
一、IntStream.rangeClosed并行流
@Data
public class LockTest1 {
public static void mAIn(String[] args) {
IntStream.rangeClosed(1, 100000).parallel().forEach(i -> new LockTest1().increase());
System.out.println(time);
}
private static int time = 0;
private static Object lock = new Object();
public void increase() {
synchronized (lock) {
time++;
}
}
}
二、線程池方式改造
不會(huì)那些新特性,還是原始的香啊,寫起代碼,得心應(yīng)手。
1、創(chuàng)建線程池
這時(shí)候,有些小伙伴,又陷入了選擇恐懼癥。用哪個(gè)線程池比較好呢?
簡(jiǎn)單回顧一下:
- 單線程池newSingleThreadExecutor(),只有一個(gè)核心線程的線程池,保證任務(wù)按FIFO順序一個(gè)個(gè)執(zhí)行;
- 固定線程數(shù)線程池newFixedThreadPool(10),固定數(shù)量的可復(fù)用的線程數(shù),來執(zhí)行任務(wù)。當(dāng)線程數(shù)達(dá)到最大核心線程數(shù),則加入隊(duì)列等待有空閑線程時(shí)再執(zhí)行;
- 可緩存線程池newCachedThreadPool(),創(chuàng)建的都是非核心線程,而且最大線程數(shù)為Interge的最大值,空閑線程存活時(shí)間是1分鐘。如果有大量耗時(shí)的任務(wù),則不適該創(chuàng)建方式,它只適用于生命周期短的任務(wù);
- 固定線程數(shù)newScheduledThreadPool(10),支持定時(shí)和周期性任務(wù)newScheduledThreadPool(10),顧名思義,在固定線程數(shù)的前提下,添加了定時(shí)任務(wù)。
最常用的還是固定線程數(shù)線程池newFixedThreadPool(10)。
@Data
public class LockTest2 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(200);
for (int i = 0; i < 100000; i++) {
Thread0926 thread = new Thread0926();
executorService.execute(thread);
}
System.out.println(time);
}
private static int time = 0;
private static Object lock = new Object();
public void increase() {
synchronized (lock) {
time++;
}
}
}
2、線程類
public class Thread0926 implements Runnable{
@Override
public void run() {
LockTest2 lockTest = new LockTest2();
lockTest.increase();
}
}
3、信心滿滿,走起來
我草,這不對(duì)啊,不應(yīng)該是100000嘛?又把老子整不會(huì)了~
三、再次解決并發(fā)時(shí)i++原子性問題
上一篇測(cè)試過,使用synchronized代碼塊是可以解決i++線程安全問題的,這次怎么不好使了?
上面的代碼中,synchronized (lock)鎖住了time++,lock是靜態(tài)變量,所以屬于類級(jí)別的鎖。但是新建的線程是一個(gè)新的類,超出了鎖的范圍,所以失效。
那么,在當(dāng)前類中,開啟線程,是不是就可以了呢?試一下
public class LockTest4 {
private static int time = 0;
private static Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 200; i++) {
Thread thread = new Thread(() -> {
for (int j = 0; j < 500; j++) {
synchronized (lock) {
time++;
}
}
});
thread.start();
}
Thread.sleep(3000);
System.out.println(time);
}
}
當(dāng)然,在synchronized代碼塊中,使用synchronized (LockTest4.class)也是可以的,效果是一樣的。
四、并行流與多線程
并行流的本質(zhì)的是并行,多線程的本質(zhì)是并發(fā)。
并行指當(dāng)多核CPU中的一個(gè)CPU執(zhí)行一個(gè)線程時(shí),其它CPU能夠同時(shí)執(zhí)行另一個(gè)線程,兩個(gè)線程之間不會(huì)搶占CPU資源,可以同時(shí)運(yùn)行。
并發(fā)指在一段時(shí)間內(nèi)CPU處理多個(gè)線程,這些線程會(huì)搶占CPU資源,CPU資源根據(jù)時(shí)間片周期在多個(gè)線程之間來回切換,多個(gè)線程在一段時(shí)間內(nèi)同時(shí)運(yùn)行,而在同一時(shí)刻不是同時(shí)運(yùn)行的。
1、并行和并發(fā)的區(qū)別?
- 并行指多個(gè)線程在一段時(shí)間的每個(gè)時(shí)刻都同時(shí)運(yùn)行,并發(fā)指多個(gè)線程在一段時(shí)間內(nèi)同時(shí)運(yùn)行(不是同一時(shí)刻,一段時(shí)間內(nèi)交叉執(zhí)行)。
- 并行的多個(gè)線程不會(huì)搶占系統(tǒng)資源,并發(fā)的多個(gè)線程會(huì)搶占系統(tǒng)資源。
- 并行是多CPU的產(chǎn)物,單核CPU中只有并發(fā),沒有并行。
2、并行和并發(fā)的使用場(chǎng)景
(1)IO密集場(chǎng)景
場(chǎng)景應(yīng)用程序開發(fā),提供http接口、數(shù)據(jù)庫查詢、微服務(wù)調(diào)用都是IO請(qǐng)求,IO請(qǐng)求時(shí)幾乎不消耗cpu,這是為了提供cup使用率,建議使用多線程并發(fā),線程數(shù)可以遠(yuǎn)大于cpu核數(shù)。
(2)cup密集場(chǎng)景
對(duì)應(yīng)大量的加減乘除運(yùn)算、md5、hash等運(yùn)算操作,需要持續(xù)使用cpu,需要讓多核cpu并行運(yùn)算,適合使用forkjoin并行計(jì)算。