/** * The main pool control state, ctl, is an atomic integer packing * two conceptual fields * workerCount, indicating the effective number of threads * runState, indicating whether running, shutting down etc */ privatefinalAtomicIntegerctl=newAtomicInteger(ctlOf(RUNNING, 0));
ctlof 方法是执行两个参数的或运算:
1 2 3
privatestaticintctlOf(int rs, int wc) { return rs | wc; }
publicvoidexecute(Runnable command) { if (command == null) thrownewNullPointerException(); intc= ctl.get(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ // 如果当前工作线程数少于corePoolSize就增加工作线程(核心线程),并且任务直接在新工作线程中跑 if (workerCountOf(c) < corePoolSize) { // 如果增加成功,直接提交运行并返回 if (addWorker(command, true)) return; // 如果增加失败(失败原因见下文的addWorker方法注释),就去将任务放进队列中 c = ctl.get(); } // 如果线程池处于running状态,且任务入队成功 if (isRunning(c) && workQueue.offer(command)) { intrecheck= ctl.get(); // 重新检查ctl的runState,如果此时线程池已经被关闭,就去从队列删除任务 // 此处其实没有这段代码应该也可以,加了这段可以有助于在线程池被请求关闭后 // 可以减少一些新的任务进入队列,算是种优化,可以让线程池尽快退出,不被新的任务拖累 if (!isRunning(recheck) && remove(command)) reject(command); // 判断如果工作线程数为0,就去新增一个非核心线程,任务已经进入队列,就不需要设定初始任务参数了 elseif (workerCountOf(recheck) == 0) // (1) addWorker(null, false); } // 如果线程池不是running状态的,进入addWorker最终也会返回false,一样会拒绝任务 // 如果任务入队失败(比如对列满了),就去尝试新增一个非核心线程,并在这个线程上执行入队失败的任务 // 如果还是失败,证明池满了,那就只能拒绝了 elseif (!addWorker(command, false)) reject(command); }
// Check if queue empty only if necessary. if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) returnfalse;
for (;;) { intwc= workerCountOf(c); // 工作线程数超过容量就直接返回失败 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) returnfalse; // 利用AtomicInteger去尝试给ctl中的线程数+1,如果成功就退出retry循环 if (compareAndIncrementWorkerCount(c)) break retry; // compareAndIncrementWorkerCount 失败就一定是有其他线程增加或减少了工作线程数 // 这里重新获得ctl的值,判断运行状态是不是有变化,如果有变化重新执行retry循环 // 如果没变化,重新利用AtomicInteger的CAS机制,尝试去给工作线程数+1 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 以上其实是利用了AtomicInteger实现并发下,无锁递增ctl中的workCount // 只要AtomicInteger的CAS执行成功,就能跳出retry循环,否则就无限尝试,或者runState状态变换就 // 重新检测runState来判断是否继续执行
booleanworkerStarted=false; booleanworkerAdded=false; Workerw=null; try { w = newWorker(firstTask); finalThreadt= w.thread; if (t != null) { finalReentrantLockmainLock=this.mainLock; // 在mainLock上加锁 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. intrs= runStateOf(ctl.get());
// 在线程池为running或者shutdown的时候才去增加工作线程,后者还要求任务对象firstTask为null // 因为shutDown状态是不允许新的任务进队列,但是允许队列内还留存未处理任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 线程的创建是交给ThreadFactory的,ThreadFactory可能由用户提供,这里检查下线程的状态 if (t.isAlive()) // precheck that t is startable thrownewIllegalThreadStateException(); workers.add(w); ints= workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { // 出现异常,或者线程创建出问题,或导致workerStarted为false if (!workerStarted) addWorkerFailed(w); } return workerStarted; }
* 2. Before running any task, the lock is acquired to prevent * other pool interrupts while the task is executing, and then we * ensure that unless pool is stopping, this thread does not have * its interrupt set.
publicvoidsetCorePoolSize(int corePoolSize) { if (corePoolSize < 0) thrownewIllegalArgumentException(); intdelta= corePoolSize - this.corePoolSize; this.corePoolSize = corePoolSize; if (workerCountOf(ctl.get()) > corePoolSize) interruptIdleWorkers(); elseif (delta > 0) { // We don't really know how many new threads are "needed". // As a heuristic, prestart enough new workers (up to new // core size) to handle the current number of tasks in // queue, but stop if queue becomes empty while doing so. intk= Math.min(delta, workQueue.size()); while (k-- > 0 && addWorker(null, true)) { if (workQueue.isEmpty()) break; } } }