回顾 在06: Produce请求之写入本地日志 中,对ReplicaManager
类的appendToLocalLog
方法的阅读,主要集中在对异常场景的处理:
非admin客户端写入__consumer_offsets
等特殊主题;
找不到请求的主题+分区;
请求的是离线分区;
当前broker不是请求分区的leader;
请求的acks
字段不合法,或者为-1(all)但ISR数量小于min.insync.replicas
配置。
会抛出异常被捕获后生成LogAppendResult
对象(见server/ReplicaManager.scala)
1 2 3 4 5 6 case class LogAppendResult (info: LogAppendInfo , exception: Option [Throwable ] = None ) { def error : Errors = exception match { case None => Errors .NONE case Some (e) => Errors .forException(e) } }
对上述异常场景,LogAppendResult.info
被置为无效值:
1 2 3 4 object LogAppendInfo { val UnknownLogAppendInfo = LogAppendInfo (-1 , -1 , RecordBatch .NO_TIMESTAMP , -1 L, RecordBatch .NO_TIMESTAMP , -1 L, RecordsProcessingStats .EMPTY , NoCompressionCodec , NoCompressionCodec , -1 , -1 , offsetsMonotonic = false ) }
appendToLocalLog
返回的LogAppendResult
在 07: Produce请求之发送响应 中会用来生成PartitionResponse
对象和对应主题分区构成Map
传给发送响应给客户端的回调函数中。
也就是说,最关键的部分我们之前暂且跳过了,也就是在正常清空下如何写入本地日志文件,然后生成LogAppendInfo
。
Log.append代码分析 在cluster
包的Partition.scala
中,将当前分区的leaderEpoch
字段传入了appendAsLeader
。
1 val info = log.appendAsLeader(records, leaderEpoch = this .leaderEpoch, isFromClient)
log
为Log
对象,位于log
包下的Log.scala
。该方法会调用append
:
1 2 3 def appendAsLeader (records: MemoryRecords , leaderEpoch: Int , isFromClient: Boolean = true ): LogAppendInfo = { append(records, isFromClient, assignOffsets = true , leaderEpoch) }
这里只考虑来自客户端的请求,因此接下来阅读时默认isFromClient
和assignOffsets
为true 。
1 2 3 4 5 private def append (records: MemoryRecords , isFromClient: Boolean , assignOffsets: Boolean , leaderEpoch: Int ): LogAppendInfo = { maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent} " ) { } }
1 2 3 4 5 6 7 8 9 private def maybeHandleIOException [T ](msg: => String )(fun: => T ): T = { try { fun } catch { case e: IOException => logDirFailureChannel.maybeAddOfflineLogDir(dir.getParent, msg, e) throw new KafkaStorageException (msg, e) } }
maybeHandleIOException
捕获fun
可能抛出的IOException
,进一步抛出KafkaStorageException
会被上层捕获生成LogAppendResult
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient) if (appendInfo.shallowCount == 0 ) return appendInfo var validRecords = trimInvalidBytes(records, appendInfo) lock synchronized { checkIfMemoryMappedBufferClosed() if (assignOffsets) { val offset = new LongRef (nextOffsetMetadata.messageOffset) appendInfo.firstOffset = offset.value val now = time.milliseconds val validateAndOffsetAssignResult = try { LogValidator .validateMessagesAndAssignOffsets(validRecords, offset, time, now, appendInfo.sourceCodec, appendInfo.targetCodec, config.compact, config.messageFormatVersion.messageFormatVersion.value, config.messageTimestampType, config.messageTimestampDifferenceMaxMs, leaderEpoch, isFromClient) } catch { case e: IOException => throw new KafkaException ("Error in validating messages while appending to log '%s'" .format(name), e) } validRecords = validateAndOffsetAssignResult.validatedRecords appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp appendInfo.lastOffset = offset.value - 1 appendInfo.recordsProcessingStats = validateAndOffsetAssignResult.recordsProcessingStats if (config.messageTimestampType == TimestampType .LOG_APPEND_TIME ) appendInfo.logAppendTime = now if (validateAndOffsetAssignResult.messageSizeMaybeChanged) { for (batch <- validRecords.batches.asScala) { if (batch.sizeInBytes > config.maxMessageSize) { throw new RecordTooLargeException ("Message batch size is %d bytes which exceeds the maximum configured size of %d." .format(batch.sizeInBytes, config.maxMessageSize)) } } } } else { } validRecords.batches.asScala.foreach { batch => if (batch.magic >= RecordBatch .MAGIC_VALUE_V2 ) _leaderEpochCache.assign(batch.partitionLeaderEpoch, batch.baseOffset) } if (validRecords.sizeInBytes > config.segmentSize) { throw new RecordBatchTooLargeException ("Message batch size is %d bytes which exceeds the maximum configured segment size of %d." .format(validRecords.sizeInBytes, config.segmentSize)) } val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(validRecords, isFromClient) maybeDuplicate.foreach { duplicate => appendInfo.firstOffset = duplicate.firstOffset appendInfo.lastOffset = duplicate.lastOffset appendInfo.logAppendTime = duplicate.timestamp appendInfo.logStartOffset = logStartOffset return appendInfo } val segment = maybeRoll(messagesSize = validRecords.sizeInBytes, maxTimestampInMessages = appendInfo.maxTimestamp, maxOffsetInMessages = appendInfo.lastOffset) val logOffsetMetadata = LogOffsetMetadata ( messageOffset = appendInfo.firstOffset, segmentBaseOffset = segment.baseOffset, relativePositionInSegment = segment.size) segment.append(firstOffset = appendInfo.firstOffset, largestOffset = appendInfo.lastOffset, largestTimestamp = appendInfo.maxTimestamp, shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp, records = validRecords) for ((producerId, producerAppendInfo) <- updatedProducers) { producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata) producerStateManager.update(producerAppendInfo) } for (completedTxn <- completedTxns) { val lastStableOffset = producerStateManager.completeTxn(completedTxn) segment.updateTxnIndex(completedTxn, lastStableOffset) } producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1 ) updateLogEndOffset(appendInfo.lastOffset + 1 ) updateFirstUnstableOffset() if (unflushedMessages >= config.flushInterval) flush() appendInfo } }
注释中标出TODO的部分暂时还不了解原理,包括且不限于:
leader epoch;
对事务的支持;
stable offset(似乎也是用于事务?)
流程总结 首先是代码逻辑的大体流程:
records.batches
为一组Record Batch,对每个batch都校验CRC是否合法,字节数是否超过配置max.message.bytes
;
若存在不合法的batch,则会抛出异常最终被ReplicaManager.appendToLocalLog
捕获(仅限于Produce请求处理的情况),生成包含错误的响应;
利用records计算出第1条消息和最后1条消息的offset,消息集的数量,合法batch的字节数之和,消息offset是否单调递增,以及消息集的编码方式,构造要返回的LogAppendInfo
对象,记为info
;
验证合法消息的数量,并截断不合法的字节数,得到validRecords
;(TODO:此处实现似乎不合理,因为存在不合法的batch直接就抛异常了,但当前最新版本2.3的Kafka源码也是这么处理的 )
检测内存映射缓存是否被关闭;
将LEO赋给info.firstOffset
,并取得当前时间戳now
;
更新validRecords
的offset为绝对offset,若batch是压缩的则重新压缩,将最后1条消息的offset赋给info.lastOffset
,并设置info
的消息集最大时间戳及对应消息的offset;
若时间戳类型为LOG_APPEND_TIME
,将now
赋给info.logAppendTime
(默认为-1);
若重新压缩的validRecords
字节数发生变化,重新检查每个batch
的字节数是否超过配置max.message.bytes
;
检查validRecords
字节数是否超过配置log.segments.bytes
;
取得当前的LogSegment
对象,将validRecords
添加进去;
更新LEO为validRecords
最后1条消息的offset+1;
若未冲刷的消息数量超过了配置flush.messages
,则将所有LogSegments
写入本地磁盘。
核心还是用绝对offset替换相对offset。生产者向服务端发送请求时,由于不知道消息集落盘时的offset,所以只能设置offsets为0,1,2,…n-1,也就是相对offset。而分区的leader broker则维护了其LEO,因此收到请求时,会将offsets修改为LEO,LEO+1,LEO+2,…LEO+n-1,最后将LEO更新为LEO+n。而更新的offsets会包含在响应里发送给生产者,这样客户端就可以通过消息送达的回调函数得到发送消息的绝对offset。
每个Log
对应1个分区的消息日志,而消息日志是分为多个文件(日志片段,Log Segment)对应LogSegment
对象,负责实际写入磁盘。
这里回顾用到的3个Kafka服务端配置:
max.message.bytes
:每个消息集的最大字节数(这是0.11开始的含义,见upgrade 0.11 message format ;
log.segment.bytes
:Log Segment的最大字节数(所以需要检测消息集字节数是否超过这个值,否则即使新建文件写入消息集,也无法容纳整个消息集);
flush.messages
(Topic级别):允许LogSegment
对象缓存的消息数量,如果缓存消息数超过了该配置就会调用fsync
写入磁盘。
此外,Record Batch 即消息集(Message Set) ,Record(记录)即Message(消息)。之所以这里区分,是因为从Kafka 0.11版本开始,消息集的概念发生了变化。在此之前,消息集仅仅是一组消息之前加上Log Overhead(即offset和message size)。而Kafka 0.11版本新增了,V2版本的消息集,增加了独有的header,比如第1条消息和最后1条消息的offset可直接通过header计算得到,还有些其他字段是leader epoch以及实现事务相关的字段。而每条消息(记录)的key和value用varint而非固定4字节的int表示长度,并且消息本身也有header。
具体参考:https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets