这个类使一个线程等待其他线程各自执行完毕后再执行。是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。
CountDownLatch内部类java.util.concurrent.CountDownLatch.Sync private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { // 设置锁的数量 setState(count); } int getCount() { // 获取锁的数量 return getState(); } // 尝试获取锁 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero // 每释放一个count减1 for (;;) {// 无限循环 // 用AQS状态值作为锁的count值 int c = getState(); // 锁数量为0时返回false if (c == 0) return false; // 不为0,减1 int nextc = c-1; // 将最新值cas设置到state中去 if (compareAndSetState(c, nextc)) return nextc == 0; } } }tryAcquireShared方法在调用时只有在state值为0时才会返回1,否则会一直返回-1。
CountDownLatch属性与构造方法 private final Sync sync; public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }内部的主要工作是通过Sync来处理。
下面来看下该类的主要的两个方法,await和countDown方法。
java.util.concurrent.CountDownLatch#await()方法 public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }内部调用的是java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly方法:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0)//获取不到许可时,调用doAcquireSharedInterruptibly方法 doAcquireSharedInterruptibly(arg); } private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 向AQS队列添加一个SHARED状态的节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) {// 无限循环 // 获取前置节点 final Node p = node.predecessor(); // 如果前置节点是头节点 if (p == head) { // 尝试获取共享许可 int r = tryAcquireShared(arg); if (r >= 0) { // 成功了则设置头节点并进行传播 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 判断是否需要在失败时进行park(即等待) if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) // 取消获取许可 cancelAcquire(node); } }这里有一点需要注意的是如果有new CountDownLatch(10),那么state的值就被设置为10,调用await方法的线程调用tryAcquireShared方法时会返回-1,然后进入shouldParkAfterFailedAcquire方法,线程最终会park,直到state被调用countDown方法的线程减少到0。
java.util.concurrent.CountDownLatch#countDown方法 public void countDown() { sync.releaseShared(1); }此时再回过头来看下releaseShared方法:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero // 每释放一个count减1 for (;;) {// 无限循环 // 用AQS状态值作为锁的count值 int c = getState(); // 锁数量为0时返回false if (c == 0) return false; // 不为0,减1 int nextc = c-1; // 将最新值cas设置到state中去 if (compareAndSetState(c, nextc)) return nextc == 0; } } // 共享模式下的释放动作-表示唤醒后继节点并确保传播 private void doReleaseShared() { for (;;) {// 无限循环 Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { // cas头节点的waitStatus if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) continue; // loop to recheck cases // 唤醒后继节点 unparkSuccessor(h); } else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }tryReleaseShared方法会减少state的值,直到为0,在此之前都是返回true;doReleaseShared方法会尝试唤醒AQS中处于SIGNAL状态的头节点和后继节点。这里唤醒SIGNAL状态的头节点操作位于doAcquireSharedInterruptibly中的无限for循环(之前讲伪唤醒时有分析过这个)。示例worker线程:public class Worker implements Runnable{ private CountDownLatch downLatch; private String name; public Worker(CountDownLatch downLatch, String name){ this.downLatch = downLatch; this.name = name; } public void run() { this.doWork(); try{ TimeUnit.SECONDS.sleep(new Random().nextInt(10)); }catch(InterruptedException ie){ } System.out.println(this.name + "工作结束!"); this.downLatch.countDown(); } private void doWork(){ System.out.println(this.name + "正在工作!"); } } wait线程,也就是boss线程:public class Boss implements Runnable { private CountDownLatch downLatch; public Boss(CountDownLatch downLatch){ this.downLatch = downLatch; } public void run() { System.out.println("老板正在等所有人工作完成......"); try { this.downLatch.await(); } catch (InterruptedException e) { } System.out.println("所有人工作完成了,老板开始检查工作!"); } } main方法: public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); CountDownLatch latch = new CountDownLatch(3); Worker w1 = new Worker(latch,"zs"); Worker w2 = new Worker(latch,"ls"); Worker w3 = new Worker(latch,"ww"); Boss boss = new Boss(latch); executor.execute(w3); executor.execute(w2); executor.execute(w1); executor.execute(boss); executor.shutdown(); }总共三个许可,三个worker,一个boss,每个worker工作完成之后打卡,boss一直等待直到所有worker工作完成。
---来自腾讯云社区的---开发架构二三事
微信扫一扫打赏
支付宝扫一扫打赏