Fetch协议 Fetch API用于为某些分区获取日志,逻辑上它指定主题 ,分区 和起始offset 来取得消息,消息格式参考The Messages Fetch
KafkaApis.handleFetchRequest 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 val versionId = request.header.apiVersionval clientId = request.header.clientIdval fetchRequest = request.body[FetchRequest ]val fetchContext = fetchManager.newContext(fetchRequest.metadata(), fetchRequest.fetchData(), fetchRequest.toForget(), fetchRequest.isFromFollower()) val erroneous = mutable.ArrayBuffer [(TopicPartition , FetchResponse .PartitionData )]() val interesting = mutable.ArrayBuffer [(TopicPartition , FetchRequest .PartitionData )]() if (fetchRequest.isFromFollower()) { if (authorize(request.session, ClusterAction , Resource .ClusterResource )) { fetchContext.foreachPartition((part, data) => { if (!metadataCache.contains(part.topic)) { erroneous += part -> new FetchResponse .PartitionData (Errors .UNKNOWN_TOPIC_OR_PARTITION , FetchResponse .INVALID_HIGHWATERMARK , FetchResponse .INVALID_LAST_STABLE_OFFSET , FetchResponse .INVALID_LOG_START_OFFSET , null , MemoryRecords .EMPTY ) } else { interesting += (part -> data) } }) } else { fetchContext.foreachPartition((part, data) => { erroneous += part -> new FetchResponse .PartitionData (Errors .TOPIC_AUTHORIZATION_FAILED , FetchResponse .INVALID_HIGHWATERMARK , FetchResponse .INVALID_LAST_STABLE_OFFSET , FetchResponse .INVALID_LOG_START_OFFSET , null , MemoryRecords .EMPTY ) }) } } else { fetchContext.foreachPartition((part, data) => { if (!authorize(request.session, Read , new Resource (Topic , part.topic))) erroneous += part -> new FetchResponse .PartitionData (Errors .TOPIC_AUTHORIZATION_FAILED , FetchResponse .INVALID_HIGHWATERMARK , FetchResponse .INVALID_LAST_STABLE_OFFSET , FetchResponse .INVALID_LOG_START_OFFSET , null , MemoryRecords .EMPTY ) else if (!metadataCache.contains(part.topic)) erroneous += part -> new FetchResponse .PartitionData (Errors .UNKNOWN_TOPIC_OR_PARTITION , FetchResponse .INVALID_HIGHWATERMARK , FetchResponse .INVALID_LAST_STABLE_OFFSET , FetchResponse .INVALID_LOG_START_OFFSET , null , MemoryRecords .EMPTY ) else interesting += (part -> data) }) }
可见主要是调用authorize
方法进行 ACL 认证,以及查询metadataCache
判断请求的分区是否存在。对于 follower,认证是基于整个请求的,操作是ClusterAction
;对于 consumer,认证是基于每个分区的,类型是Read
。
只有经过认证且存在于metadataCache
的分区对应的请求会加入interesting
中,其它分区会构造一个默认的不合法响应加入erroneous
中。
接下来定义了如下回调函数:
1 2 3 def convertedPartitionData (tp: TopicPartition , data: FetchResponse .PartitionData ): FetchResponse .PartitionData def processResponseCallback (responsePartitionData: Seq [(TopicPartition , FetchPartitionData )])
然后调用ReplicaManager.fetchMessages
方法对 interesting
请求进行处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 if (interesting.isEmpty) processResponseCallback(Seq .empty) else { replicaManager.fetchMessages( fetchRequest.maxWait.toLong, fetchRequest.replicaId, fetchRequest.minBytes, fetchRequest.maxBytes, versionId <= 2 , interesting, replicationQuota(fetchRequest), processResponseCallback, fetchRequest.isolationLevel)
ReplicaManager.fetchMessages 主要实现 方法说明:从 leader 副本取得消息,等待足够数据可以获取。一旦超时或者请求条件被满足则回调被调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 def fetchMessages (timeout: Long , replicaId: Int , fetchMinBytes: Int , fetchMaxBytes: Int , hardMaxBytesLimit: Boolean , fetchInfos: Seq [(TopicPartition , PartitionData )], quota: ReplicaQuota = UnboundedQuota , responseCallback: Seq [(TopicPartition , FetchPartitionData )] => Unit , isolationLevel: IsolationLevel ) { val isFromFollower = Request .isValidBrokerId(replicaId) val fetchOnlyFromLeader = replicaId != Request .DebuggingConsumerId && replicaId != Request .FutureLocalReplicaId val fetchOnlyCommitted = !isFromFollower && replicaId != Request .FutureLocalReplicaId def readFromLog (): Seq [(TopicPartition , LogReadResult )] = { } val logReadResults = readFromLog() val logReadResultValues = logReadResults.map { case (_, v) => v } val bytesReadable = logReadResultValues.map(_.info.records.sizeInBytes).sum val errorReadingData = logReadResultValues.foldLeft(false ) ((errorIncurred, readResult) => errorIncurred || (readResult.error != Errors .NONE )) if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) { val fetchPartitionData = logReadResults.map { case (tp, result) => tp -> FetchPartitionData (result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records, result.lastStableOffset, result.info.abortedTransactions) } responseCallback(fetchPartitionData) } else { val fetchPartitionStatus = logReadResults.map { case (topicPartition, result) => val fetchInfo = fetchInfos.collectFirst { case (tp, v) if tp == topicPartition => v }.getOrElse(sys.error(s"Partition $topicPartition not found in fetchInfos" )) (topicPartition, FetchPartitionStatus (result.info.fetchOffsetMetadata, fetchInfo)) } val fetchMetadata = FetchMetadata (fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader, fetchOnlyCommitted, isFromFollower, replicaId, fetchPartitionStatus) val delayedFetch = new DelayedFetch (timeout, fetchMetadata, this , quota, isolationLevel, responseCallback) val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey (tp) } delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) } }
从本地日志文件中读取得到请求的每个分区的结果(LogReadResult
);
若出现以下错误,则立刻将读取结果构造成 FetchPartitionData
交给回调函数处理;
timeout(对应请求的 max_wait_time
字段)小于0,即客户端不想等待;
读取结果为空,即客户端请求的任何分区都无法从本地读到结果;
读取字节数不小于 fetchMinBytes
(对应请求的 min_bytes
字段);
在读取某个请求的分区的结果时存在错误。
否则,遍历每个分区的读取结果,和请求中同一分区的请求字段一起构造 FetchPartitionStatus
;
构造 DelayedFetch
对象,尝试完成请求,否则将其放入 delayedFetchPurgatory
中延迟处理。
关键的部分就是 readFromLog()
函数和延迟处理的部分。延迟处理相关设施(purgatory,DelayedOperation
)在之后去阅读,本篇最后阅读 readFromLog()
和发送响应的回调函数的实现。
responseCallback 即 KafkaApis.handleFetchRequest
方法中定义的回调函数 processResponseCallback
,用来在处理请求完成,构造响应后将响应发送给客户端。
这部分不细看了,因为有不少逻辑是为了实现事务以及配置限额的,这不是目前我阅读源码的重点。核心处理分为两步:
通过 convertedPartitionData
将 PartitionData
转换成和兼容旧版本的响应结构;
调用 KafkaApis.sendResponse
发送响应,在之前的 Produce 请求(2): 发送响应 中都看过这个方法,简单回顾下,实际上就是把响应加入 Processor
的响应队列,之后的发送由 Processor
处理,参考 网络层阅读之 Acceptor 和 Processor 的 4.2 节。
readFromLog 1 2 3 4 5 6 7 8 9 10 11 12 13 def readFromLog (): Seq [(TopicPartition , LogReadResult )] = { val result = readFromLocalLog( replicaId = replicaId, fetchOnlyFromLeader = fetchOnlyFromLeader, readOnlyCommitted = fetchOnlyCommitted, fetchMaxBytes = fetchMaxBytes, hardMaxBytesLimit = hardMaxBytesLimit, readPartitionInfo = fetchInfos, quota = quota, isolationLevel = isolationLevel) if (isFromFollower) updateFollowerLogReadResults(replicaId, result) else result }
调用 readFromLocalLog
,如果 Fetch 请求来自 follower 则还需要调用 updateFollowerLogReadResults
更新 follower 的结果。
readFromLocalLog 首先看看内部定义的 read
函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 def read (tp: TopicPartition , fetchInfo: PartitionData , limitBytes: Int , minOneMessage: Boolean ): LogReadResult = { val offset = fetchInfo.fetchOffset val partitionFetchSize = fetchInfo.maxBytes val followerLogStartOffset = fetchInfo.logStartOffset try { val localReplica = if (fetchOnlyFromLeader) getLeaderReplicaIfLocal(tp) else getReplicaOrException(tp) val initialHighWatermark = localReplica.highWatermark.messageOffset val lastStableOffset = if (isolationLevel == IsolationLevel .READ_COMMITTED ) Some (localReplica.lastStableOffset.messageOffset) else None val maxOffsetOpt = if (readOnlyCommitted) Some (lastStableOffset.getOrElse(initialHighWatermark)) else None val initialLogEndOffset = localReplica.logEndOffset.messageOffset val initialLogStartOffset = localReplica.logStartOffset val fetchTimeMs = time.milliseconds val logReadInfo = localReplica.log match { case Some (log) => val adjustedFetchSize = math.min(partitionFetchSize, limitBytes) val fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt, minOneMessage, isolationLevel) if (shouldLeaderThrottle(quota, tp, replicaId)) FetchDataInfo (fetch.fetchOffsetMetadata, MemoryRecords .EMPTY ) else if (!hardMaxBytesLimit && fetch.firstEntryIncomplete) FetchDataInfo (fetch.fetchOffsetMetadata, MemoryRecords .EMPTY ) else fetch case None => error(s"Leader for partition $tp does not have a local log" ) FetchDataInfo (LogOffsetMetadata .UnknownOffsetMetadata , MemoryRecords .EMPTY ) } LogReadResult (info = logReadInfo, highWatermark = initialHighWatermark, leaderLogStartOffset = initialLogStartOffset, leaderLogEndOffset = initialLogEndOffset, followerLogStartOffset = followerLogStartOffset, fetchTimeMs = fetchTimeMs, readSize = partitionFetchSize, lastStableOffset = lastStableOffset, exception = None ) } catch { } }
流程:首先取得本地副本(实际上对 Consumer 和 Follower 而言都是 Leader 副本),然后取得 HW,LEO 等字段,记录时间戳,然后通过本地副本读取本地数据。这里还利用了 V3 版本请求的 max_bytes
字段,限制读取的字节数上限,但如果第一条消息长度就超出上限的话,仍然会返回整条消息(此时读取字节数超过了 max_bytes
)。
注意 LogReadResult
的第一个字段是从本地日志读取的信息:
1 2 3 4 5 case class FetchDataInfo (fetchOffsetMetadata: LogOffsetMetadata , // offset 元数据, 包括: // offset; Segment 的基础 offset; 相对于 Segment 的物理偏移字节数 records: Records , // 消息集 firstEntryIncomplete: Boolean = false, abortedTransactions: Option [List [AbortedTransaction ]] = None
主要是前两个字段,消息集就不说了,元数据的作用是记录了 offset 对应消息相对本地 Segment 的实际偏移量。这里回顾一个基本概念,Kafka 的每个分区都用本地文件记录消息,为了防止单个文件过大,会根据文件大小和写入时间分成多个文件,单个文件被称为 Segment (对应代码中的 LogSegment
类),而 Log
类则是管理这些 Segment。因此,记录消息的物理偏移量,便于在从本地 Segment 中快速通过 offset 定位到对应消息。
接着看 readFromLocalLog
的逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 var limitBytes = fetchMaxBytes val result = new mutable.ArrayBuffer [(TopicPartition , LogReadResult )]var minOneMessage = !hardMaxBytesLimitreadPartitionInfo.foreach { case (tp, fetchInfo) => val readResult = read(tp, fetchInfo, limitBytes, minOneMessage) val recordBatchSize = readResult.info.records.sizeInBytes if (recordBatchSize > 0 ) minOneMessage = false limitBytes = math.max(0 , limitBytes - recordBatchSize) result += (tp -> readResult) } result
可见,每个分区都对应一条读取结果(LogReadResult
),包含 offset 对应消息,还有 HW/LEO 等信息 。V3 开始外部的 max_bytes
字段限制所有消息的最大字节数,而每个分区都有自己的 partition_max_bytes
限制单条消息的最大字节数。
读完这部分代码后,可以回顾 Fetch 请求的协议(V3 版本),并附上注释说明:
1 2 3 4 5 6 7 8 9 10 11 Fetch Request (Version: 3 ) => replica_id max_wait_time min_bytes max_bytes [topics] replica_id => INT32 max_wait_time => INT32 min_bytes => INT32 max_bytes => INT32 topics => topic [partitions] topic => STRING partitions => partition fetch_offset partition_max_bytes partition => INT32 fetch_offset => INT64 partition_max_bytes => INT32
其中 fetch_offset 可由 FetchContext
的相关方法取得:
1 2 3 4 5 trait FetchContext extends Logging { def getFetchOffset (part: TopicPartition ): Option [Long ]
updateFollowerLogReadResults 当 replica id 大于 0 时,代表客户端为 Follower,在从本地日志读取信息后,会调用该方法更新 Follower 的 fetch 状态,并更新读取结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private def updateFollowerLogReadResults (replicaId: Int , readResults: Seq [(TopicPartition , LogReadResult )]): Seq [(TopicPartition , LogReadResult )] = { readResults.map { case (topicPartition, readResult) => var updatedReadResult = readResult nonOfflinePartition(topicPartition) match { case Some (partition) => partition.getReplica(replicaId) match { case Some (replica) => case Some (replica) => if (partition.updateReplicaLogReadResult(replica, readResult)) partition.leaderReplicaIfLocal.foreach { leaderReplica => updatedReadResult = readResult.updateLeaderReplicaInfo(leaderReplica) } case None => updatedReadResult = readResult.withEmptyFetchInfo } case None => warn(s"While recording the replica LEO, the partition $topicPartition hasn't been created." ) } topicPartition -> updatedReadResult } }
然后对读取结果调用 updateLeaderReplicaInfo
更新为 leader 副本的信息:
1 2 3 4 def updateLeaderReplicaInfo (leaderReplica: Replica ): LogReadResult = copy(highWatermark = leaderReplica.highWatermark.messageOffset, leaderLogStartOffset = leaderReplica.logStartOffset, leaderLogEndOffset = leaderReplica.logEndOffset.messageOffset)
利用 Scala case 类的 copy
方法,返回更新对应字段后的对象。这里将读取结果的 HW,LogStartOffset,LEO 更新为 leader 副本维护的相应信息。因为 follower 副本发送 Fetch 请求时,leader 副本可能更新 HW(如果之前 follower 没有同步到最新),因此需要把更新后的 HW 发送给 follower。
顺带提下这里涉及到的另一个概念:低水位(LW, Low Watermark)。LW 即所有副本中最小的 LogStartOffset,一般情况下 LW 都是 0,但是如果服务端收到了 DeleteRecords
请求,删除日志文件的部分记录(消息)时,会更新 LW。
总结 本篇阅读了 Fetch 请求的处理流程,主要根据 replica id 字段分 Consumer 和 Follower 来处理:
会话认证,判断请求分区是否存在,将没有问题的分区对应的请求构成 Map 由 ReplicaManager
处理;
ReplicaManager
对每个分区,找到其 leader 副本;
leader 副本从本地读取请求的 offset 开始的若干消息(由全局的以及各分区的 max_bytes
字段来限制读取最大字节数),和维护的其它信息构成读取结果;
对 follower 副本的请求,还会将 leader 副本的 HW,LEO,LogStartOffset 更新到读取结果中;
根据读取结果和请求的相关字段判断是否立刻发送响应,比如读取没问题时,读取字节数超过了 min_bytes
即可发送;
否则,构造 DelayedFetch
对象传入 DelayedFetchPurgatory
对象中,此时 purgatory 还会判断一次处理是否完成,若已完成则不用延迟处理。
主要区别还是第 4 步,因为 follower 的 Fetch 请求是用来与 leader 同步的,因此需要将 HW 记录在结果中让 follower 更新自己的 HW。