HashedWheelTimer

场景

netty内部有一个时间轮的Timer来管理大量的定时调度,例如管理心跳检测

使用方法

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.4.Final</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) throws InterruptedException {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(10, TimeUnit.SECONDS);
System.out.println("start:" + LocalDateTime.now().format(formatter));
// timeout1
Timeout t1 = hashedWheelTimer.newTimeout(timeout -> {
System.out.println("task1 :" + LocalDateTime.now().format(formatter));
}, 20, TimeUnit.SECONDS);
// timeout2
Timeout t2 = hashedWheelTimer.newTimeout(timeout -> {
System.out.println("task2 :" + LocalDateTime.now().format(formatter));
}, 20, TimeUnit.SECONDS);
Thread.sleep(15000);
t1.cancel();
System.out.println("task1 cancel :" + LocalDateTime.now().format(formatter));
}

output
start:2018-06-01 18:33:41
task1 cancel :2018-06-01 18:33:56
task2 :2018-06-01 18:34:11

时间轮算法

  • 经典图
    timingWheel

    时间轮一圈分为多个格子,每个格子代表一段时间,这个时间长短影响Timer精度,格子中包含落在这个格子的任务,每个任务带有一个round轮数,指针转动到一个格子,会检查任务的round,等于0则执行任务;任务添加到格子中需要根据一定的规则例如取模或者startTime等计算出deadline; netty timer的做法是在从一个队列中增量的添加到格子,根据deadline的计算。

  • 多级时间轮
    timingWheels

    单层的时间轮,在极端情况下,单个格子的链表很长,所以有了多级的时间轮设计 FIXME
    kafka分级时间轮

性能测试评估

添加任务 O(1)
取消任务 O(1)
执行任务 最差O(n),格子越少,则链表越长,时间越长,如果格子越大,则链表越短,空间消耗越大
性能测试 FIXME

类图和时序图

uml

  • Timer 接口
    提供newTimeout方法定义创建新的任务,stop方法定义停止时间轮;
  • HashedWheelTimer 核心类:
    Timer的实现,创建时间轮的结构,初始化一个任务队列和一个取消的任务队列、格子bucket数组、格子大小tickDuration,创建work调度线程,当调用newTimeout时,判断work线程状态,第一次则启动调度线程;
  • HashedWheelBucket 格口类:
    保存任务类的链表结构,提供给队列放入timeout任务的方法,提供work调度执行任务的方法,提供执行完后或取消移除任务的方法;
  • HashedWheelTimeout 任务包装类:
    提供链表结构基础,提供任务的取消移除方法,以及任务状态的维护,提供任务的执行入口;
  • Worker 调度线程类:
    时间轮指针转动的调度逻辑,转动一个格子,将任务从队列中补充到该bucket中,处理round=0的任务,再处理已取消的任务;

时序图 FIXME

代码细节

创建HashedWheelTimer

初始化时间轮

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// 构造函数
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts) {
// 参数检查
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}

// 创建bucket数组 Normalize ticksPerWheel to power of two and initialize the wheel.
wheel = createWheel(ticksPerWheel);
// 由于tick 指针是递增的,tick & mask 可以定位bucket的下标
mask = wheel.length - 1;

// 统一转换为纳米单位 Convert tickDuration to nanos.
this.tickDuration = unit.toNanos(tickDuration);

// 判断一个格子时间是否太长而溢出 Prevent overflow.
if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length));
}
// 创建work调度线程(默认守护线程)
workerThread = threadFactory.newThread(worker);

// 内存泄漏检测 FIXME
leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;

this.maxPendingTimeouts = maxPendingTimeouts;

if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) { // CAS FIXME
reportTooManyInstances();
}
}

// mask标记的作用演示,下面可以得到0~7的下标,即时间轮的bucket下标
int mask = 8 - 1;
long tick = 0;
for (int i = 0; i < 100; i++) {
System.out.println(tick & mask);
tick++;
}

create HashedWheelBucket[]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// createWheel
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException(
"ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
if (ticksPerWheel > 1073741824) { // 不能大于 2^30
throw new IllegalArgumentException(
"ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
}
// 调整标准化输入的数字为不大于2^n
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
// 创建bucket数组
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i ++) {
wheel[i] = new HashedWheelBucket();
}
return wheel;
}

标准化格子参数大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// normalizeTicksPerWheel
// 算法:从1开始,循环左移1位,直到某个2^n结果符合要求
private static int normalizeTicksPerWheel(int ticksPerWheel) {
int normalizedTicksPerWheel = 1;
while (normalizedTicksPerWheel < ticksPerWheel) {
normalizedTicksPerWheel <<= 1;
}
return normalizedTicksPerWheel;
}
// java8 hashmap的算法,跟上面的逻辑反过来, 效率更好,无符号右移1、2、4、8、16个位置,将ticksPerWheel后面的0都转换为了1
private static int normalizeTicksPerWheel(int ticksPerWheel) {
int n = ticksPerWheel - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
// 这里1073741824 = 2^30,防止溢出
return (n < 0) ? 1 : (n >= 1073741824) ? 1073741824 : n + 1;
}
例如:normalizeTicksPerWheel(513), 2^9 < 513 < 2^10 -> 1024
1000000000 512
0100000000 512 >>> 1
1100000000 768
0011000000 768 >>> 2
1111000000 960
0000111100 960 >>> 4
1111111100 1020
0000000011 1020 >>> 8
1111111111 1023
0000000000 1023 >>> 16
1111111111 1023
1023 + 1 = 1024

Timer添加timeout任务、启动

newTimeout

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
// 任务等待阈值的等待判断
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
// 只有任务进来才会判断是否启动work线程
start();

// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
// 计算任务执行时间点到tick开始的时间长度
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

// 检测数值溢出 Guard against overflow.
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
// 创建HashedWheelTimeout
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
// 添加HashedWheelTimeout到一个JCTool的并发队列中 JCTool queue FIXME
timeouts.add(timeout);
// 返回HashedWheelTimeout, 用户可以对timeout进行取消操作
return timeout;
}

时间轮start

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void start() {
// AtomicIntegerFieldUpdater 方式控制worker的状态 FIXME
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
// CAS方式无锁方式控制状态的更新 FIXME
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
// 状态更新成功,启动线程
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}

// Wait until the startTime is initialized by the worker.
// 线程启动时第一步会创建startTime时间,等待startTime 创建成功后才往下走,才能计算task的deadline
while (startTime == 0) {
try {
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}

时间轮stop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Override
public Set<Timeout> stop() {
if (Thread.currentThread() == workerThread) {
throw new IllegalStateException(
HashedWheelTimer.class.getSimpleName() +
".stop() cannot be called from " +
TimerTask.class.getSimpleName());
}
// CAS 方式更新状态为停止状态
if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
// workerState can be 0 or 2 at this moment - let it always be 2.
if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
}

return Collections.emptySet();
}

// 中断workerThread FIXME
try {
boolean interrupted = false;
while (workerThread.isAlive()) {
workerThread.interrupt();
try {
workerThread.join(100);
} catch (InterruptedException ignored) {
interrupted = true;
}
}

if (interrupted) {
Thread.currentThread().interrupt();
}
} finally {
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
}
// 返回未处理的任务
return worker.unprocessedTimeouts();
}

时间轮调度逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
@Override
public void run() {
// 初始化时间轮调度开始的时间 Initialize the startTime.
startTime = System.nanoTime();
if (startTime == 0) {
// 这里要判断是否等于0是因为上面说到的线程启动时会等待startTime的初始化,0作为特殊标记不能是初始值
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}

// Notify the other threads waiting for the initialization at start().
// CountDownLatch FIXME 唤醒在start那里阻塞的线程
startTimeInitialized.countDown();

do {
// 指针走动到一个tick后返回当前tick的时间
final long deadline = waitForNextTick();
if (deadline > 0) { // 防止异常极端溢出情况
// 计算当前bucket的下标
int idx = (int) (tick & mask);
// 把取消队列里面的任务做取消操作
processCancelledTasks();
// 获取对应格子
HashedWheelBucket bucket = wheel[idx];
// 从队列中增量添加到对应的bucket中
transferTimeoutsToBuckets();
// 执行到点的任务
bucket.expireTimeouts(deadline);
// 等待下一个tick
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

// Fill the unprocessedTimeouts so we can return them from stop() method.
for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
// 处理取消队列的任务
processCancelledTasks();
}

// waitForNextTick
private long waitForNextTick() {
// 计算当前tick的deadline
long deadline = tickDuration * (tick + 1);
for (;;) {
final long currentTime = System.nanoTime() - startTime;
// 计算需要睡眠的时间,999999是为了弥补sleep按ms单位带来的精度问题
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {// FIXME
return -Long.MAX_VALUE;
} else {
return currentTime;// 返回当前时间
}
}

// Check if we run on windows, as if thats the case we will need
// to round the sleepTime as workaround for a bug that only affect
// the JVM if it runs on windows.
//
// See https://github.com/netty/netty/issues/356
if (PlatformDependent.isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}

try {
// 开始睡眠需要等待的时间
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}

// processCancelledTasks
private void processCancelledTasks() {
for (;;) {
// 这里使用了queue的pop
HashedWheelTimeout timeout = cancelledTimeouts.poll();
if (timeout == null) {
// all processed
break;
}
try {
// 将任务从对应的bucket移除
timeout.remove();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown while process a cancellation task", t);
}
}
}
}

// transferTimeoutsToBuckets
private void transferTimeoutsToBuckets() {
// 每个tick只增量补充前10W个任务,如果少于10W会按对应个数进行遍历
// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
// adds new timeouts in a loop.
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// Was cancelled in the meantime.
continue;
}

// 计算task从startTime开始需要经历多少个tick
long calculated = timeout.deadline / tickDuration;
// 减去已经走过的tick数,然后除以bucket个数,得到需要走多少round
timeout.remainingRounds = (calculated - tick) / wheel.length;

// 如果队列等待太久,过了处理时间,则按当前tick进行处理
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
// 计算对应的bucket数组下标
int stopIndex = (int) (ticks & mask);
// 添加timeout到bucket中
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}

HashedWheelBucket的链表结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
private static final class HashedWheelBucket {
// Used for the linked-list datastructure
// 链表头
private HashedWheelTimeout head;
// 链表尾
private HashedWheelTimeout tail;

// 添加元素:在链表尾插入元素
public void addTimeout(HashedWheelTimeout timeout) {
assert timeout.bucket == null;
timeout.bucket = this;
if (head == null) {
head = tail = timeout;
} else {
tail.next = timeout;
timeout.prev = tail;
tail = timeout;
}
}

// 移除元素:前一个的nex指向timeout的next, 后一个的prev指向timeout的prev
public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
HashedWheelTimeout next = timeout.next;
// remove timeout that was either processed or cancelled by updating the linked-list
if (timeout.prev != null) {
timeout.prev.next = next;
}
if (timeout.next != null) {
timeout.next.prev = timeout.prev;
}

if (timeout == head) {
// if timeout is also the tail we need to adjust the entry too
if (timeout == tail) {
tail = null;
head = null;
} else {
head = next;
}
} else if (timeout == tail) {
// if the timeout is the tail modify the tail to be the prev node.
tail = timeout.prev;
}
// null out prev, next and bucket to allow for GC.
timeout.prev = null;
timeout.next = null;
timeout.bucket = null;
timeout.timer.pendingTimeouts.decrementAndGet();
return next;
}

// 遍历元素 (链表指针遍历)
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;

// process all timeouts
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
if (timeout.remainingRounds <= 0) {
next = remove(timeout);
if (timeout.deadline <= deadline) {
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
next = remove(timeout);
} else {
timeout.remainingRounds --;
}
timeout = next;
}
}

public void clearTimeouts(Set<Timeout> set) {
// 省略,这里是从链表头开始逐个的取出放到set中
}
}

扩展知识点、思考 FIXME

确定不大于2^n数字方法
链表结构设计
睡眠时间精度问题
JCTool的并发队列
存储泄漏检测
数值精度的判断
Atomic* 类的CAS并发无锁更新
AtomicIntegerFieldUpdater的CAS
线程之间等待的操作 startTime初始化 CountDownLatch