在 JDK1.5 后,推出了幾個(gè)并發(fā)的工具類,位于 JUC(JAVA.util.concurrent)包下。
CountDownLatch
CountDownLatch 類是使一個(gè)線程等待其他線程各自執(zhí)行完畢后再執(zhí)行。
類似于現(xiàn)實(shí)中某個(gè)活動(dòng)需要等到全部人齊了才可以開始。
實(shí)現(xiàn)原理:
- 基于 AQS 的共享模式。
從ReentrantLock的實(shí)現(xiàn)看AQS的原理及應(yīng)用
- 這個(gè)類是一個(gè)同步計(jì)數(shù)器,主要用于線程間的控制。
- 當(dāng) CountDownLatch 的 count 計(jì)數(shù) > 0 時(shí),本線程的 await() 會(huì)造成阻塞,直到 count 變?yōu)?0,開始執(zhí)行本線程。
package test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test1 {
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(2); // 計(jì)數(shù)器初始化為 2,要等兩個(gè)線程執(zhí)行完畢
System.out.println("主線程開始執(zhí)行");
ExecutorService es1 = Executors.newSingleThreadExecutor();
es1.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
System.out.println("子線程:" + Thread.currentThread().getName() + "執(zhí)行");
}catch (InterruptedException e){
e.printStackTrace();
}
latch.countDown(); // 使計(jì)數(shù)器減一
}
});
ExecutorService es2 = Executors.newSingleThreadExecutor();
es2.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println("子線程:" + Thread.currentThread().getName() + "執(zhí)行");
latch.countDown();
}
});
System.out.println("等待兩個(gè)線程執(zhí)行完畢");
try {
latch.await(); // 主線程掛起,等待兩個(gè)線程執(zhí)行完
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println("兩個(gè)子線程都執(zhí)行完畢,繼續(xù)執(zhí)行主線程");
}
}
主線程開始執(zhí)行
等待兩個(gè)線程執(zhí)行完畢
子線程:pool-2-thread-1執(zhí)行
子線程:pool-1-thread-1執(zhí)行
兩個(gè)子線程都執(zhí)行完畢,繼續(xù)執(zhí)行主線程
CyclicBarrier
與 CountDownLatch 功能一樣,不過(guò)它可以重復(fù)循環(huán),而 CountDownLatch 只能執(zhí)行一次。
實(shí)現(xiàn)原理:
- 基于 ReentrantLock 和 Condition
//同步操作鎖
private final ReentrantLock lock = new ReentrantLock();
//線程攔截器
private final Condition trip = lock.newCondition();
//每次攔截的線程數(shù)
private final int parties;
//換代前執(zhí)行的任務(wù)
private final Runnable barrierCommand;
//表示柵欄的當(dāng)前代
private Generation generation = new Generation();
//計(jì)數(shù)器
private int count;
//靜態(tài)內(nèi)部類Generation
private static class Generation {
boolean broken = false;
- 上面貼出了 CyclicBarrier 所有的成員變量,可以看到 CyclicBarrier 內(nèi)部使通過(guò)條件 trip 來(lái)對(duì)線程進(jìn)行阻塞。
- 并且其內(nèi)部維護(hù)了兩個(gè) int 型變量 parites 和 count,parties 表示每次攔截的線程數(shù),該值在構(gòu)造時(shí)進(jìn)行賦值。count 是內(nèi)部計(jì)數(shù)器,他的初始值和 parties 相同,以后隨著每次 await 方法的調(diào)用而減一,直到減為零將喚醒主線程。
- CyclicBarrier 有一個(gè)靜態(tài)內(nèi)部類 Generation,該類的對(duì)象代表柵欄的當(dāng)前代,就像玩游戲時(shí)代表的本局游戲,利用它可以實(shí)現(xiàn)循環(huán)等待。
- barrierCommand 表示換代前執(zhí)行的任務(wù),當(dāng) count 減為零時(shí)表示本局游戲結(jié)束,需要轉(zhuǎn)到下一局。在轉(zhuǎn)到下一局游戲之前,利用它可以實(shí)現(xiàn)循環(huán)等待。
package test;
import java.util.concurrent.CyclicBarrier;
public class Test2 {
static class TaskThread extends Thread{
CyclicBarrier barrier;
public TaskThread(CyclicBarrier barrier){
this.barrier = barrier;
}
@Override
public void run(){
try{
Thread.sleep(100);
System.out.println(getName() + "到達(dá)柵欄 A");
barrier.await(); // 等待所有線程都執(zhí)行到這,才執(zhí)行主線程
System.out.println(getName() + "沖破柵欄 A"); // 主線程完成后繼續(xù)執(zhí)行
Thread.sleep(2000);
System.out.println(getName() + "到達(dá)柵欄 B");
barrier.await();
System.out.println(getName() + "沖破柵欄 B");
}catch (Exception e){
e.printStackTrace();
}
}
}
public static void main(String[] args) {
int threadNum = 5;
CyclicBarrier barrier = new CyclicBarrier(threadNum, new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "完成任務(wù)");
}
});
for (int i = 0; i < threadNum; i++) {
new TaskThread(barrier).start();
}
}
}
Semaphore
該類用于控制信號(hào)量的個(gè)數(shù),可以控制同時(shí)訪問(wèn)資源的線程個(gè)數(shù),并提供了同步機(jī)制。例如,實(shí)現(xiàn)一個(gè)文件允許的并發(fā)訪問(wèn)數(shù)。
Semaphore 的主要方法:
- acquire():從此信號(hào)量中獲取一個(gè)許可,若已超過(guò)許可量,則阻塞此請(qǐng)求線程。
- release():釋放一個(gè)許可,將其返回給信號(hào)量。
- availablePermits():返回此信號(hào)量中當(dāng)前可用的許可數(shù)。
- hasQueuedThreads():查詢是否有線程正在等待獲取。
package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class Test {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final Semaphore sp = new Semaphore(3); // 創(chuàng)建 Semaphore 信號(hào)量,初始化許可大小為3
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(100);
}catch (InterruptedException e){
e.printStackTrace();
}
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
sp.acquire(); // 請(qǐng)求獲取許可,如果有可獲取許可,則繼續(xù)往下指向,許可數(shù)減一。
} catch (InterruptedException e){
e.printStackTrace();
}
System.out.println("線程" + Thread.currentThread().getName() +
"進(jìn)入,當(dāng)前已有" + (3 - sp.availablePermits()) + "個(gè)并發(fā)") ;
try{
Thread.sleep((long)(Math.random() * 10000));
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println("線程" + Thread.currentThread().getName() + "即將離開");
sp.release(); // 釋放許可證,許可數(shù)+1
}
};
service.execute(runnable);
}
}
}
Exchanger
這個(gè)類用于交換數(shù)據(jù),只能用于兩個(gè)線程。當(dāng)一個(gè)線程運(yùn)行到 exchange() 方法時(shí)會(huì)阻塞,另一個(gè)線程運(yùn)行到 exchange() 時(shí),兩者交換數(shù)據(jù),然后執(zhí)行后面的程序。
package test;
import java.util.concurrent.Exchanger;
public class Test3 {
static class Producer extends Thread{ // 生產(chǎn)者線程
private Exchanger<Integer> exchanger; // 交換標(biāo)志
private static int data = 0;
Producer(String name, Exchanger<Integer> exchanger){
super("Producer-" + name);
this.exchanger = exchanger;
}
@Override
public void run(){
for (int i = 1; i < 5; i++) {
try {
Thread.sleep(1000);
data = i;
System.out.println(getName() + "交換前:" + data);
data = exchanger.exchange(data); // 將此 data 與 消費(fèi)者的 data 進(jìn)行交換
System.out.println(getName() + "交換后:" + data);
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
}
static class Consumer extends Thread{ // 消費(fèi)者線程
private Exchanger<Integer> exchanger; // 交換標(biāo)志
private static int data = 0;
Consumer(String name, Exchanger<Integer> exchanger){
super("Consumer-" + name);
this.exchanger = exchanger;
}
@Override
public void run(){
while(true){
data = 0;
System.out.println(getName() + "交換前:" + data);
try{
data = exchanger.exchange(data); // 將此 data 與生產(chǎn)者的 data 進(jìn)行交換,因?yàn)橄葓?zhí)行到這,會(huì)阻塞知道生產(chǎn)者執(zhí)行到交換
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println(getName() + "交換后:" + data);
}
}
}
public static void main(String[] args) throws InterruptedException {
Exchanger<Integer> exchanger = new Exchanger<>();
new Producer("", exchanger).start();
new Consumer("", exchanger).start();
Thread.sleep(7000);
}
}