JDK1.8的LongAdder分析

文档地址在这里 LongAdder Api

直接看代码

add
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
// cells非null或者在base上cas递增失败
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
// getProbe()是个随机数,与m求与运算,能获得一个最小0最大m的数
(a = as[getProbe() & m]) == null ||
// 利用随机数,去cells上随机取一个Cell做CAS递增
!(uncontended = a.cas(v = a.value, v + x)))
// 如果递增失败,即出现竞争,调longAccumulate方法
longAccumulate(x, null, uncontended);
}
}
  • cells:是一个 Cell 数组,竞争时随机取其中一个 Cell 去做递增,以降低并发的竞争问题。
  • base:无竞争时在 base 上面递增。
longAccumulate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
// 如果当前的随机数probe未初始化则去初始化
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
// 随机数重新选取以后,视为未竞争过,新的随机数会可能会导致递增的Cell变更
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
// 利用随机数从cells中取一个Cell
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
// 进入临界区,加锁
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
// 双重检测,如果通过就去将cells对应位置设置为新创建的Cell对象
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0; // 解锁
}
if (created) // 新Cell创建成功即递增成功,返回
break;
// 未通过双重检测,有其他线程在同一个cells位置新增了cell
continue; // Slot is now non-empty
}
}
collide = false;
}
// 本方法被调用前已经在Cell上CAS递增过的话,就用wasUncontended作为标识,本次不再尝试去做CAS递增
// 而是在调用advanceProbe以后再去重新尝试,advanceProbe会修改probe的值,下次循环会选取不同的Cell去尝试CAS递增
// 只要之前执行过CAS递增,那么当前调用进来,取得的Cell还是之前方法获取到的Cell,
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
// 尝试去递增,成功就退出循环,返回
break;
else if (n >= NCPU || cells != as)
// 限制cells不得大于或等于cpu核心数,超过了就不再对cells扩容
// cells!=as表明最近被扩容过
// 以上满足任何一个都去标记collide为false,避免扩容
collide = false; // At max size or stale
else if (!collide)
// 执行到这里证明满足扩容需要的条件,设置collide为true,在下次循环中去扩容,如果那是还满足扩容条件的话
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
// 进入临界区,去对cells扩容
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1]; // 容量乘以2
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0; // 解锁
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
// 当cells为null或为空时,竞争设置cellsBusy为1,去初始化cells,初始化完再设回0,相当于加锁
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) { // 双重检测
Cell[] rs = new Cell[2]; // 第一次初始化容量为2
rs[h & 1] = new Cell(x); // 根据随机数选取一个Cell
cells = rs;
init = true;
}
} finally {
cellsBusy = 0; // 走出临界区,相当于释放锁
}
if (init) // 初始化的同时也完成了递增,直接返回
break;
}
// 竞争初始化cells失败的去尝试在base上做递增,如果成功就直接返回
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
advanceProbe
1
2
3
4
5
6
7
static final int advanceProbe(int probe) {
probe ^= probe << 13; // xorshift
probe ^= probe >>> 17;
probe ^= probe << 5;
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
return probe;
}

伪随机算法 xorshift,将产生的随机数放进当前的运行线程。

伪共享问题

1
2
3
4
5
6
7
8
9
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
// 省略部分代码
}

有一点需要关注的是 sun.misc.Contended 注解,他是为了解决伪共享问题(False sharing)。

1
2
3
4
5
6
7
8
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.TYPE})
public @interface Contended {
// 指定对象所属的group,不同group是相互隔离的,不会发生false sharing
// 相同的group是可能没有被隔离的
// 如果使用默认的空字符串,那么每个都相互隔离
String value() default "";
}

如果把 Contended 注在类上,那么类里所有未注 Contended 的属性都会进入一个匿名的 group,详细见代码注释。

资料

What is @Contended and False Sharing ?
false sharing

问题

  • writeReplace 方法是干啥的
Author

张阿力

Posted on

2018-03-21

Updated on

2018-03-21

Licensed under