前文回顾
前一篇阅读了appendToLocalLog
的部分,服务端在收到Produce请求时,会首先将消息写入本地消息日志:
1 | val sTime = time.milliseconds // 取得当前毫秒级时间戳 |
返回的localProduceResults
类型是Map[TopicPartition, LogAppendResult]
。
1 | // 正常结果: info有效,exception为None |
info
字段来自于Log.appendAsLeader
的返回值,即实际添加到本地日志的消息,包含消息集的第1条消息和最后1条消息的offset(生产者在发送消息集时是不知道最后写入日志文件时消息的offset,只有在服务端写入日志时才会设置)。
接下来阅读ReplicaManager.appendRecords
中的后续处理。
ProducePartitionStatus的处理
1 | // 将分区对应的处理结果转换成 ProducePartitionStatus 对象 |
1 | public static final class PartitionResponse { |
回顾一下,在使用Kafka客户端时,生产者可以通过回调取得消息的元数据,像主题和分区,是在生产者发送前就已知的,但offset和时间戳则是由服务端在此处设置的。见Kafka 1.1 Producer API的RecordMetadata
。
接下来是一个if-else分支
1 | if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) { |
如果delayedProduceRequestRequired
返回false
,则可以立刻发送响应,而且忽略了offset字段,因为该字段代表了下一批消息的第1个offset,而PartitionStatus
本身就包含当前消息集的baseOffset
。
那么为何else分支就意味着可以立刻发送响应呢?
1 | private def delayedProduceRequestRequired(requiredAcks: Short, |
可见,if分支意味着以下条件满足:
requiredAcks
为-1,即生产者要等待分区的所有ISR收到消息后才会返回;entriesPerPartition
不为空,即存在需要添加消息的分区;localProduceResults
中至少存在1条成功的结果。
相应地,else分支对应的是:
requiredAcks
为0或1,即客户端无需等待服务端的响应或者只需要等待leader收到消息;- 没有消息需要写入(无论是没有可写入的分区还是全部消息写入出现异常),那么ISR也没必要去从leader复制数据,因此也可以直接返回响应。
PS:第2个条件在处理Produce请求是是多余的判断,因为之前在KafkaApis.handleProduceRequest
中已经判断过了:
1 | if (authorizedRequestInfo.isEmpty) |
也就是说if分支里会等待所有ISR收到消息才会返回,查看if分支:
1 | // 构造 DelayedProduce 对象, 注意 timeout 仅在此处使用 |
还是利用了purgatory,先不研究其实现细节,大致可以理解为,创建一个DelayedProduce
对象,传入带offset和时间戳的消息集,设置timeout和响应回调,就能完成延迟生产。而purgatory只是用来确认是否完成,若没完成则将其扔进purgatory中。
也就是说,响应回调不再是像else分支(以及之前的错误处理分支)中一样由当前线程去调用,而是由DelayedProduce
对象去调用,从而实现了异步的方式等待所有ISR收到最新的消息,避免leader的Handler
线程阻塞在KafkaApis
对请求的处理中。
另外,值得注意的是timeout
是在构造这个DelayedProduce
对象时才使用,也就是之前的写入本地日志的时间是不计算在内的,当然网络传输时间也是,可以回顾上一篇阅读笔记 的2.1 请求格式中翻译的官网对timeout
的说明。
sendResponseCallback
1 | def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) { |
由于是接着之前的进行阅读,所以用到了一些之前创建的对象,见上一篇阅读笔记的handleProduceRequest:
unauthorizedTopicResponses
:对调用KafkaApis.authorize
方法认证失败的请求生成的错误响应;nonExistingTopicResponses
:对目标主题不在KafkaApis.metadataCache
中的请求生产的错误响应;numBytesAppended
:请求的总字节数,包含header部分。
检测出是否有错误响应是为了传给produceResponseCallback
,从而在acks为0时,关闭与客户端的socket连接来通知其更新元数据。而该回调被传入了ClientQuotaManager.maybeRecordAndThrottle
方法,在未启用quotas的情况下会直接调用produceResponseCallback
,分为以下3种情况:
acks为0,且存在错误响应:关闭与客户端的连接,会引起客户端更新元数据;
acks为0,且不存在错误响应:
1
sendNoOpResponseExemptThrottle(request)
1
2
3
4private def sendNoOpResponseExemptThrottle(request: RequestChannel.Request): Unit = {
quotas.request.maybeRecordExempt(request)
sendResponse(request, None)
}会进入
KafkaApis.sendResponse
的None
分支:1
requestChannel.sendResponse(new RequestChannel.Response(request, None, NoOpAction, None))
回顾网络层阅读的之Acceptor和Processor的4.2 processNewResponses,如果响应的类型是
NoOpAction
,只会给Processor
与客户端的连接Channel
重新注册读事件,并不会发送响应给客户端。acks不为0:
1
2sendResponseMaybeThrottle(request,
requestThrottleMs => new ProduceResponse(mergedResponseStatus.asJava, bandwidthThrottleTimeMs + requestThrottleMs))创建
ProduceResponse
的throttleMs
为bandwidthThrottleTimeMs
和requestThrottleMs
之和,这两者都有各自对应的quotas对象,若未启用则为0。最终也会进入KafkaApis.sendResponse
中:1
2
3
4
5val responseSend = request.context.buildResponse(response)
val responseString =
if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.apiVersion))
else None
requestChannel.sendResponse(new RequestChannel.Response(request, Some(responseSend), SendAction, responseString))将
SendAction
类型的响应通过RequestChannel
交给Processor
,进一步发送给客户端。
总结
本篇阅读了处理Produce请求的流程,接着写入本地日志后的代码继续阅读:
写入本地日志后会返回处理结果,包含了每个请求写入的分区的相关状态,新增了消息集的baseOffset
和写入日志的时间戳。对于acks字段为-1的情况,将timeout字段/消息集以及发送响应的回调丢给DelayedOperation
对象进行异步的延迟操作,并通过purgatory字段检查异步处理的结果。
无论是KafkaApis
本身,还是DelayedOperation
,处理完后都会调用sendResponseCallback
,acks不为0则根据Produce响应协议构造响应发送给客户端,acks为0则根据是否有错误响应而有不同的行为,若不包含错误响应则不进行操作,否则关闭socket,触发客户端重新获取元数据。
至此,完成了服务端对Produce请求的阅读,但是有不少细节没有深入,有待进一步阅读:
DelayedOperation
与DelayedOperationPurgatory
:延迟操作的实现;Log
类,对本地日志目录和日志片段(segment)文件直接操作;Partition
类,管理了分区的副本broker,还有leader epoch等。