(1) 处,q.top - q.base 表示现在队列内的任务数,q.top - q.base < a.length - 1 表示队列内任务数小于数组容量,那么就会往下执行,在数组的 top 位置增加参数里的 task,并且给 top 加 1,最后释放锁。(2)处 am & s 的含义见下面「WorkQueue 的 top 和 base 的维护」的描述,原理一样。
(3)当队列内任务数为 0个 或者 1个 或者 2个时,去执行 signalWork。
(4)m & r & SQMASK,m & r 是随机取一个 0 - m 的数,再与 SQMASK 求与获得一个将二进制最低位设为 0,得到一个偶数。外部任务提交的时候,会取 workQueues 上偶数下标的 WorkQueue,然后将任务加到队列内。
privatevoidunlockRunState(int oldRunState, int newRunState) { // (1) if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) { Objectlock= stealCounter; // (2) runState = newRunState; // clears RSIGNAL bit if (lock != null) synchronized (lock) { lock.notifyAll(); } } }
「1」处的 CAS 是为了设置 runState 为 newRunState,也比较好理解,那为什么会有失败的情况呢(要不然也不会有 if 里面的代码)。失败的情况发生在当锁被获取以后,又有新的线程去尝试加锁,这个时候会进入 awaitRunStateLock 方法,这个方法的逻辑见上述,最后在尝试了自旋重试还是无法获得锁时,会去设置 runState 的 signal 位为1,相当于标识了「当前有线程处于阻塞等待中,需要被唤醒」。这样,自然 unlockRunState 方法的 CAS 操作就失败了,然后 if 块里的代码逻辑也比较清楚了。newRunState 的 signal 位没有被设置,「2」处会清除 signal 位为 0。然后在 lock 上调用 notifyAll 去唤醒阻塞的线程。
一些理解了的机制
lockRunState 和 unlockRunState
runState 是一个整形,他是一个符合的状态变量,用他的二进制位表示一些状态:
1 2 3 4 5 6 7
// runState bits: SHUTDOWN must be negative, others arbitrary powers of two privatestaticfinalintRSLOCK=1; privatestaticfinalintRSIGNAL=1 << 1; privatestaticfinalintSTARTED=1 << 2; privatestaticfinalintSTOP=1 << 29; privatestaticfinalintTERMINATED=1 << 30; privatestaticfinalintSHUTDOWN=1 << 31;
左侧第一位表示加锁状态,1 为加锁。lockRunState 和 unlockRunState 详细的逻辑见上面的描述。lockRunState 方法利用 runState 的一个二进制位去作为加锁标识,来实现一个全局的锁。加锁失败会进行随机次数的自旋。自旋以后仍然无法获得锁时,就去在 stealCounter 上加锁并阻塞。这里的加锁失败就阻塞是为了避免很多线程尝试加锁,但是无法立刻获得锁时会导致的频繁的自旋,过多的自旋消耗了 CPU 资源。进入阻塞就释放了 CPU 时间。
ForkJoinTask 的 status 最左侧 4 位是任务状态,最右边 2 个字节是标签,最右边的 16 位 ForkJoinTask 本身不会去使用,他提供了一系列 public 方法给用户去设置和获取,提供出这个是为了让用户去给任务打标签,用来避免一些任务分发时的问题,具体见 ForkJoinTask 。status 左侧 4 位的定义如下:
item
binary
desc
DONE_MASK
11110000 - 00000000 - 00000000 - 00000000
用于执行位运算 比如 status & DONE_MASK
NORMAL
11110000 - 00000000 - 00000000 - 00000000
任务正常完成,负数
CANCELLED
11000000 - 00000000 - 00000000 - 00000000
任务被取消,负数,小于 NORMAL
EXCEPTIONAL
10000000 - 00000000 - 00000000 - 00000000
任务出现异常,负数,小于 CANCELLED
SIGNAL
00000000 - 000000001 - 00000000 - 00000000
任务等待其他任务完成,处于等待状态,正数
SMASK
00000000 - 00000000 - 11111111 - 11111111
用来执行位运算 比如 tag & SMASK
关于 tag 多提一下,应为 tag 是 status 的前 16 位置,在设置 tag 的方法需要用一些位运算来处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
publicfinalshortsetForkJoinTaskTag(short tag) { for (int s;;) { // s & ~SMASK 结果的左侧16位和当前status一样,右侧16位全为0 // tag & SMASK 结果的左侧16位全为0,右侧16位和参数tag一致 // 上面两者执行或运算,正好合并tag和当前的status if (U.compareAndSwapInt(this, STATUS, s = status, (s & ~SMASK) | (tag & SMASK))) return (short)s; } } publicfinalshortgetForkJoinTaskTag() { // 获取status的右侧16位,直接强转成short,抛弃status左侧16位的二进制位 return (short)status; }
WorkQueue 的 top 和 base 的维护
WorkQueue 是一个用数组实现的双端队列,队列内存 ForkJoinTask。每个线程内部会维护一个 WorkQueue。当一个任务扩散出新的分支任务时,新的任务就会被 push 进双端队列的有段,这个操作执行的时候只会去 push 自己线程的 WorkQueue,所以这个动作不存在竞争,是线程安全的(至少队列非空时是的)。新扩散的任务进入双端队列的右边,用 top 来维护到底新增到了哪个数组下标记。任务的窃取就从队列的左边去处理,用 base 作为下标,当一个线程执行任务的 join 进入,但是任务不能立刻执行时会去其他线程盗取一个任务,也就是从那个线程的双端队列的左边,利用 base 索引去拿出一个任务,然后自己执行,这个动作是存在竞争的,详细的内容等看到了对应的代码以后再补充。
1 2 3 4 5
staticfinalclassWorkQueue { volatileint base; // index of next slot for poll int top; // index of next slot for push ForkJoinTask<?>[] array; // the elements (initially unallocated) }
push
push 方法是双端队列右侧的新增操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
finalvoidpush(ForkJoinTask<?> task) { ForkJoinTask<?>[] a; ForkJoinPool p; intb= base, s = top, n; if ((a = array) != null) { // ignore if queue removed intm= a.length - 1; // fenced write for task visibility U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); U.putOrderedInt(this, QTOP, s + 1); if ((n = s - b) <= 1) { if ((p = pool) != null) p.signalWork(p.workQueues, this); } elseif (n >= m) growArray(); } }
poll
poll 是双端队列左侧的获取操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
final ForkJoinTask<?> poll() { ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t; while ((b = base) - top < 0 && (a = array) != null) { intj= (((a.length - 1) & b) << ASHIFT) + ABASE; t = (ForkJoinTask<?>)U.getObjectVolatile(a, j); if (base == b) { if (t != null) { if (U.compareAndSwapObject(a, j, t, null)) { base = b + 1; return t; } } elseif (b + 1 == top) // now empty break; } } returnnull; }
说说两个方法的 base 和 top 的维护
push 每次都需要在数组的 top + 1 个位置新增任务,pull 每次需要 base + 1 个位置去盗取任务。他们都用类似这样的方式去计算需要操作的下标:
1
(array.length - 1) & (base或top的值)
这里比较有意思,因为 array 被初始化的容量为 1 << n(n 是写死的 13,每次扩容的时候再左移一位),1 << n 再减去 1 的二进制表示全为 1。所以 array.length - 1 的二进制表示全为 1,那么他和 base 或者 top 执行逻辑与,在 base 或者 top 小于等于 array.length - 1 时得到的结果就是 base 或 top 本身。当 base 或者 top 大于 array.length - 1 时,从新从最小的数组下标开始(即从 0 开始)。
随着 base 和 top 的增大,( array.length - 1 ) & ( base 或 top 的值 ) 这个变量计算的结果永远不会超出数组的下标范围,并且可以循环利用数组元素。这里非常的优雅,开始看了很久没看明白,后来在 debug 的时候才看出玄机。
WorkQueue 的 top 和数组的维护为什么要用 putOrderedObject 和 putOrderedInt
// 将config从左侧17位开始的二进制位设为0 intp= config & SMASK; // ensure at least 2 slots intn= (p > 1) ? p - 1 : 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; //(1) n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; //(2) workQueues = newWorkQueue[n];
// 因为最高位一定为1,右移1位再与原值求或运算,相当于将最高和次最高位设置为1 n |= n >>> 1; // 第一步已经将最高和次最高位设置为1,这次可以移动2位再求或 // 一次性将右边第3第4位置设置为1 n |= n >>> 2; // 依次类推 n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; // 因为入参n其实被限制了最大值为65535(与SMARK做过与运算) // 所以执行到这里,最高位右侧的二进制位全为1了,再加1就获得大于2的最小的2的幂了 n = (n + 1); // 最后还会左移1位,相当于乘以2 n = n << 1;