前言
之前阅读了 Produce 和 Fetch 请求的实现,对于需要耗时处理的网络请求,都是利用 DelayedOperation
和 DelayedOperationPurgatory
来进行异步延迟操作,防止阻塞 KafkaRequestHandler
线程。
比如处理 Produce 请求时,ReplicaManager.appendRecords
方法在 ack 为 -1,有数据发送且有至少有一个分区的 append 操作成功时:
1 | val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock) |
再比如,处理 Fetch 请求时,ReplicaManager.fetchMessages
方法在 timeout 大于 0,读取本地数据没出错且响应积攒的字节数足够多时:
1 | val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, isolationLevel, responseCallback) |
用法很类似,首先创建 DelayedXXX 对象,然后对每个分区都创建 TopicPartitionOperationKey
对象组成 Seq
,将两者传入 purgatory 中进行 tryXXX 操作,从命名和注释可以猜到,这个操作是尝试完全请求,就像 Scala 的 Promise
类的 tryComplete
方法一样,异步操作的常见模式就是下面两个非阻塞操作:
- 启动一个任务异步执行,然后当前线程该干嘛干嘛;
- 想要确认任务是否执行结束时,看一眼,如果结束了就取得结果。
不过除了确认操作(Operation)是否完成外,还会在没有完成的时候,监控(Watch)相应的延迟操作。
1 | def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean |
1 | /* used by delayed-produce and delayed-fetch operations */ |
实际上 Key 可以是任意类型,只要实现了 keyLabel
方法,主题和分区组成的 Key 是用于延迟的 Produce 和 Fetch 操作,而对于其它请求/操作则用的其它类型的 Key,比如 JoinGroup 操作用 group id 和 consumer id 组成 Key。
通过前面的源码阅读可知,Produce 和 Fetch 请求全程都是按照分区去处理,也就是每个分区对应一个类型,然后对这个类型进行 map, filter 等等,所以这里传入 purgatory 的 key 可以唯一标识延迟处理的数据,比如 Fetch 操作中需要处理的 FetchMetadata
:
1 | case class FetchMetadata(/* 其它字段... */ |
DelayOperation
主要方法
首先看看 DelayedOperationPurgatory
类及其 tryCompleteElseWatch
方法的完整签名
1 | final class DelayedOperationPurgatory[T <: DelayedOperation](/* ... */) |
参数 1 是泛型参数 T
,该类型必须继承自 DelayedOperation
,该类型是抽象类
1 | abstract class DelayedOperation(override val delayMs: Long, |
设置了毫秒级延时 delayMs
以及可选的锁 lockOpt
。要实现一个延迟操作,也就是继承 DelayedOperation
类并重写(override)以下抽象方法:
1 | // 回调: 当延迟操作过期时执行, 因此 delayMs 到期时会强制完成 |
此外提供了原子 Boolean
类型的字段来标识是否完成
1 | private val completed = new AtomicBoolean(false) |
该原子变量是在 forceComplete
中设置为 true 的,可能有多个线程尝试完成同一个任务,由于是执行原子 Boolean
的 CAS 操作,只有第一个线程会返回 true,onComplete()
回调只会被调用一次。
前面的注释也提过,简单查看 DelayedProduce
和 DelayedFetch
的 tryComplete()
实现也能看到,每个代表任务已完成的分支,都会将 forceComplete()
的返回值作为 tryComplete()
的返回值。
此外,基类 TimerTask
继承自 Runnable
接口,因此 DelayedOperation
可以作为线程被启动,执行 run()
方法:
1 | override def run(): Unit = { |
如果任务未完成,则强制完成(期间会执行 onComplete
回调),并执行 onExpiration
回调。相当于多了个对超时的处理,因此可以猜测会在过期时启动线程来执行超时回调。
maybeTryComplete
该方法是 server 包私有的
1 | private[server] def maybeTryComplete(): Boolean = { |
看起来有点绕,分情况讨论。
- 单线程:调用
tryComplete
后退出循环,因为自己将tryCompletePending
置为了 false,解锁后retry
为false,此时和直接tryComplete
无异; - 双线程:记为 A 和 B,假设 A 上锁成功,且在
tryComplete
检查完成状态的时候 B 上锁失败,那么 B 将tryCompletePending
置为 true,这会导致两种情况:- A
tryComplete
成功,代表onComplete
已被调用,isCompleted
为 true,A 和 B 都会退出循环; - A
tryComplete
失败,isCompleted
为 false,由于tryCompletePending
被 B 置为 false,A 的retry
为 true,而由于不存在其它等待线程,所以 B 在getAndSet
时得到的值(赋给retry
)也是 true,A 和 B 重新争夺锁,也就是说至少会再调用一次tryComplete
。
- A
- 三个以上线程:存在 1 个持锁线程和 N 个等待线程(N > 1),
getAndSet
是原子操作,也就是说 N 个等待线程只有 1 个等待线程的retry
会被置为 true,其它线程都因为第一个调用getAndSet
的等待线程将tryCompletePending
置为 false 时退出,此时和双线程的情况无异。
核心就是双线程的情况,这种做法是为了针对这种场景:线程 A 检查完成状态的时候,此时还未完成,而线程 B 检查完成状态的时候,虽然实际已经完成了,但由于线程 A 正持有锁,B 不会检查状态。这种做法能让无论线程 A 还是 B,都会再调用一次 tryComplete
检查是否完成了。
举个例子,假设状态由 1 个 Boolean
表示,只有都为 true
时才算完成。如果不这么设计,那就可能出现下面的情况:
时刻 | 状态 | 线程 A | 线程 B |
---|---|---|---|
1 | false | 上锁 | |
2 | false | 取得状态 false | |
3 | true | 判断状态(已经是旧的状态)是否为 true | 上锁失败,不检查状态 |
4 | true | 返回 false | 返回 false |
而现在的做法就是在第 4 步,让线程 A 和 B 再次竞争 tryComplete
的机会,至少有一个线程能检查新的状态。
TimerTask
DelayedOperation.forceComplete
有一个关键的 cancel()
调用来自于基类 TimerTask
,从源码注释可知,这个方法是取消 timeout 计时器,即强行停止耗时超过 delayedMs
的延时任务。这个方法是在基类 TimerTask
中实现的。
1 | private[this] var timerTaskEntry: TimerTaskEntry = null |
实际上是调用了 TimerTaskEntry.remove
方法
1 | private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) extends Ordered[TimerTaskEntry] { |
TimerTaskEntry
是 TimerTaskList
(定时任务链表)上的一个节点(Entry),内部维护了链表的引用,调用 remove
即将当前节点从链表上移除。
1 | def remove(timerTaskEntry: TimerTaskEntry): Unit = { |
可见 TimerTaskList
是双向链表,移除节点时会锁住该节点,然后修改其 prev
和 next
,并维护了原子变量的 taskCounter
记录任务节点的数量。除了 remove
外只有 add
方法会在多线程下造成 race condition,因此要加锁。
1 | def add(timerTaskEntry: TimerTaskEntry): Unit = { |
PS:这里似乎锁住单个节点没必要,因为 add
和 remove
本身都已经上锁了,大概是防止将来有其它方法直接修改节点的字段吧,毕竟节点的字段(这里主要指list
)都不是私有的。(虽然我看到的对 list
的修改也就只在 add
/remove
方法中)。
总之,这里单独实现的 TimerTaskList
,除了实现了线程安全的增删操作外,主要是保证了链表上的节点(对应一个定时任务)是对应唯一的链表的,主要是为了保证节点在从链表 A 迁移到链表 B 时,不会继续留存在 A 中。
TimerTask
会绑定一个 TimerTaskEntry
类型的节点,该节点位于 TimerTaskList
类型的双向链表上,链表包含一个字段:expirationMs
,即任务的毫秒级 timeout。任务链表在源码注释中也被称为 bucket(桶),其本身也有一个原子类型的 expiration
字段代表任务链表本身的 timeout,提供了 setter 和 getter,需要注意的是 setter 返回的是 Boolean
而非 Unit
,为 true 时则代表 expiration
发生了变化。
TimerTask
在绑定新的 TimerTaskEntry
时,如果和之前的节点不一样,也会将其移除:
1 | private[timer] def setTimerTaskEntry(entry: TimerTaskEntry): Unit = { |
而在创建定时任务节点时,会自动和构造参数的 TimerTask
对象绑定:
1 | private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) extends Ordered[TimerTaskEntry] { |
总结
延时操作(DelayedOperation
)是一个抽象基类,继承自定时任务(TimerTask
),由派生类实现以下方法:
- 任务完成的回调;
- 任务超时的回调;
- 非阻塞地确认任务是否完成。
每个延时操作都是一个定时任务(TimerTask
),对应一个定时任务节点(TimerTaskEntry
),而每个任务节点都是存在一个 bucket(定时任务链表,TimerTaskList
)上的。通过线程安全的 remove
和 add
操作可以让节点从一个 bucket 移动到另一个 bucket,整个过程中节点都始终对应唯一的 bucket,不可能被多个 bucket 共享。这使得任务能安全地在多个 bucket 之间迁移。也就是接下来要阅读的时间轮 TimeWheel
。