总结一下,上面的方法主要包括如下几点:
工作线程数小于corePoolSize时,添加新的worker线程;如果达到了corePoolSize,此时一个任务如果能成功入队列(也就是说队列没有满时),需要再进一步来二次确认是否需要添加worker;如果任务不能入队列,将尝试添加一个worker直到worker数量达到maxPoolSize;如果线程池中线程数量 > core ,当线程空闲时间超过了keepalive时,则会销毁线程;由此可见线程池的队列如果是无界队列,那么设置线程池最大数量是无效的。java.util.concurrent.ThreadPoolExecutor.Workerworker的结构图:代码部分:private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread;// 对应worker的线程 /** Initial task to run. Possibly null. */ Runnable firstTask;// 初始时要运行的task /** Per-thread task counter */ volatile long completedTasks;// 运行完成的任务数 /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { // 设置AQS的state值为-1 setState(-1); // inhibit interrupts until runWorker // 初始要运行的task this.firstTask = firstTask; // 使用threadFactory创建一个线程 this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() {// 是否处理独占状态 return getState() != 0; } protected boolean tryAcquire(int unused) {// 尝试获取许可 if (compareAndSetState(0, 1)) {//cas设置state值从0到1 // 设置成功时将当前线程设置为独占线程 setExclusiveOwnerThread(Thread.currentThread()); // 获取成功 return true; } // 获取失败 return false; } // 尝试释放占有的许可 protected boolean tryRelease(int unused) { // 重置Worker的独占线程为null setExclusiveOwnerThread(null); // 状态置为0 setState(0); return true; } // 调用AQS的acquire方法 public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } // 调用AQS的release方法 public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }tryLock的使用主要在interruptIdleWorkers方法中:
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }它的主要作用是在对worker进行interrupt操作时需要先获取worker的独占锁。
而lock和unlock方法的调用:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { // 在每次运行一个任务之前要先对worker锁定,然后在执行完之后进行解锁 w.lock(); ---------------省略部分代码----------------- try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); -------------省略部分代码------------------ } finally { task = null; w.completedTasks++; // 进行解锁 w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }在每次运行一个任务之前要先对worker进行锁定,然后在执行完之后进行解锁。
另外线程池中还有一个锁:
private final ReentrantLock mainLock = new ReentrantLock(); /** * Wait condition to support awaitTermination */ private final Condition termination = mainLock.newCondition();它的作用主要是在停止线程池时来控制用来做停止操作的线程:
final void tryTerminate() { for (;;) { ----------省略部分代码---------------------- final ReentrantLock mainLock = this.mainLock; // 加锁 mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); // 唤醒 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }它主要为了客户端可以调用的java.util.concurrent.ThreadPoolExecutor#awaitTermination方法服务的,awaitTermination会调用conditon的await方法进行阻塞。在这里就不进行详细地分析了,后续分析线程池源码时再具体分析。
总结关于worker的部分我们就简要地介绍这么多。它继承AQS的主要目的是在每次运行一个任务之前要先对worker进行锁定,然后在执行完之后进行解锁,这样方便管理。
---来自腾讯云社区的---开发架构二三事
微信扫一扫打赏
支付宝扫一扫打赏