java.util.concurrent.CountDownLatch
使用Java多线程编程时经常遇到主线程需要等待子线程执行完成以后才能继续执行,那么接下来介绍一种简单的方式使主线程等待。
- CountDownLatch是一个同步辅助工具,用于使一个或多个线程等待(即阻塞)知道一组在其他线程中的任务结束。
- CountDownLatch必须用给定的count(一个int类型的大于等于0的值)进行初始化。调用await方法将使线程阻塞,直到当前计数(count值)由于countdown方法的调用而达到零,此后所有等待的线程被释放并且任何后续调用await方法也会立即返回。CountDownLatch被设计为只触发一次,即Count值在运行过程中无法重置。如果需要重置计数的版本,可以考虑使用CyclicBarrier.
- CountDownLatch是一种通用的同步工具。 CountDownLatch可以被认为是一个简单的on/off锁存器或门:所有线程调用await方法等待开关打开,直到countDown方法被调用打开开关为止。 创建一个CountDownLatch,指定count的值为N,那么这个CountDownLatch对象可以让一个线程等待其他N个线程结束(调用countDown方法即认为结束),或者调用了这个CountDownLatch的countDown方法N次。
测试代码如下:
- public class ThreadWait {
- public static void main(String[] args) throws InterruptedException {
- ExecutorService exector = Executors.newFixedThreadPool(5);
- int threadNumber = 13;
- final CountDownLatch countDownLatch = new CountDownLatch(threadNumber);
- for (int i = 0; i < threadNumber; i++) {
- final int threadID = i;
- exector.execute(
- () -> {
- try {
- Thread.sleep(2000);
- System.out.println(String.format("threadID:[%s] finished!!", threadID));
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- countDownLatch.countDown(); //这个不管是否异常都需要数量减,否则会被堵塞无法结束
- }
- }
- );
- }
- countDownLatch.await();//保证之前的所有的线程都执行完成,才会走下面的
- System.out.println(countDownLatch.getCount());
- System.out.println("main thread finished!!");
- }
- }
结果为:
- threadID:[0]finished!!
- threadID:[1]finished!!
- threadID:[4]finished!!
- threadID:[3]finished!!
- threadID:[2]finished!!
- threadID:[9]finished!!
- threadID:[8]finished!!
- threadID:[5]finished!!
- threadID:[6]finished!!
- threadID:[7]finished!!
- threadID:[10]finished!!
- threadID:[11]finished!!
- threadID:[12]finished!!
- 0
- main thread finished!!
CountDownLatch源码解析:
1.构造器
构造器方法,必须指定count值,且count值不能小于0,注释的意思:创建CountDownLatch对象时需指定count值,count值即当前线程从调用await方法时处于阻塞状态转换到就绪状态时countDown方法必须调用的次数!
2.await()方法:
- public void await() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);
- }
-
- public final void acquireSharedInterruptibly(int arg)
- throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- if (tryAcquireShared(arg) < 0)
- doAcquireSharedInterruptibly(arg);
- }
-
- private void doAcquireSharedInterruptibly(int arg)
- throws InterruptedException {
- final Node node = addWaiter(Node.SHARED);
- boolean failed = true;
- try {
- for (;;) {
- final Node p = node.predecessor();
- if (p == head) {
- int r = tryAcquireShared(arg);
- if (r >= 0) {
- setHeadAndPropagate(node, r);
- p.next = null; // help GC
- failed = false;
- return;
- }
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- throw new InterruptedException();
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
主要看parkAndCheckInterrupt()方法,就是是如何将主线程阻塞住的方法:
- private final boolean parkAndCheckInterrupt() {
- LockSupport.park(this); //通过LockSupport.park()方法将线程交给系统阻塞;
- return Thread.interrupted();
- }
await方法使当前线程等待直到count值为0,或者当前线程被打断!如果当前的count值为0,那么await方法直接返回,当前线程不会阻塞!如果当前的count值大于0,那么当前线程阻塞(线程调度机制无法给当前线程分配CPU时间片),直到以下两种情况任意一种发生为止:
- count值通过countDown方法的调用达到0
- 其他线程打断了当前线程
3.countDown()方法,我们看看最终被countDown调用的unparkSuccessor()方法;
- private void unparkSuccessor(Node node) {
- /*
- * If status is negative (i.e., possibly needing signal) try
- * to clear in anticipation of signalling. It is OK if this
- * fails or if status is changed by waiting thread.
- */
- int ws = node.waitStatus;
- if (ws < 0)
- compareAndSetWaitStatus(node, ws, 0);
-
- /*
- * Thread to unpark is held in successor, which is normally
- * just the next node. But if cancelled or apparently null,
- * traverse backwards from tail to find the actual
- * non-cancelled successor.
- */
- Node s = node.next;
- if (s == null || s.waitStatus > 0) {
- s = null;
- for (Node t = tail; t != null && t != node; t = t.prev)
- if (t.waitStatus <= 0)
- s = t;
- }
- if (s != null)
- LockSupport.unpark(s.thread);
- }
我们可以看到最终使用LockSupport.unpark()方法唤醒了主线程。
注:LockSupport类中的park与unpark方法都是使用的unsafe中的native本地方法;
当前的count值减一,如果count值为0则释放所有等待的线程!如果当前count值大于0,则减一,如果count值为0,则所有等待的线程对于线程调度机制来说都是活跃的了!
最后我们来看一段最简单的使用park与unpark方法阻塞唤醒线程代码:
- public static void main(String[] args) {
-
- Thread t = new Thread(() -> {
- System.out.println("阻塞线程1");
- LockSupport.park();
- System.out.println("线程1执行完啦");
- });
-
- t.start();
-
- try {
- Thread.sleep(2000);
- System.out.println("唤醒线程1");
- LockSupport.unpark(t);
- Thread.sleep(2000);
- System.out.println("主线程结束");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- }
运行结果:
- 阻塞线程1
- 唤醒线程1
- 线程1执行完啦
- 主线程结束