回顾
前一节介绍了Message
的格式及其实现,本来是继续阅读MessageSet
,但后来发现在Kafka 0.11.0之后Message
和MessageSet
(消息集)发生了较大改变,详细参考Kafka Protocol - Messagesets,实际和生产者消费者交互的是RecordBatch
而非MessageSet
,原来的MessageSet
只是简单地在若干Message
之前加入offset字段和消息数量字段,现在的RecordBatch
多了不少字段,比如ProducerId
/ProducerEpoch
等。目前脱离了对API协议的实际处理过程去看这些数据结构的实现很难明白其实际意义,因此先阅读API请求。
本文就先阅读生产者的请求,其类型为Produce,对应KafkaApis.handle()
中的下列分支:
1 | case ApiKeys.PRODUCE => handleProduceRequest(request) |
Produce协议
本节参考Produce API。Produce API使用通用的消息集格式,由于在发送时无法确定消息的offset,因此生产者可以随意填充该字段。
请求格式
Kafka 1.1使用的Produce请求是v2(实际上和v0及v1相同):
1 | ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]] |
这种描述格式是Kafka wiki的标准请求格式,field => type
代表字段field
是type
类型,field => [type]
代表field
字段包含若干个type
类型,也就是[]
代表数组。
因此这里的消息请求的格式,可以看作包含1个2字节整型RequiredAcks
,1个4字节整型Timeout
,接下来是N个结构,每个结构都有1个TopicName
,以及若干个子结构,每个子结构由1个Partition
/MessageSetSize
/MessageSet
组成。
然后介绍官方对上述参数的定义:
RequiredAcks
(下文简称acks)指定服务端在响应请求之前应该受到多少确认(ack):
- 0:服务器不发送任何响应(这是服务器不回复请求的唯一情况);
- 1:服务器在等待数据写入本地日志后才会发送响应;
- -1:服务器在等待所有同步副本提交消息之后才发送响应。
0和1很好理解,0就是生产者发完就不管了,1就是等待消息被写入本地日志之后再返回,这里涉及到同步副本(isr,in-sync replicas)这个概念。这里简单介绍下。用Kafka自带脚本创建topic时会指定
--replication-factor
,也就是消息日志的复制数量,此时会创建多个副本(replicas)来保存消息日志,在Leader写入消息日志到本地时,副本也会从Leader取得消息,写入到自己的消息日志。暂且不提其同步过程,可以认为目前存活且消息写入跟上Leader的副本就是同步副本。Timeout
服务器可以等待
RequiredAcks
指定数量的确认所用的最长时间,单位:ms。这个参数并不是请求时间的确切限制,因为:- 网络传输延迟不包含在内;
- 计时器在处理请求时才开始,因此如果很多请求正在排队等待处理,那么这个等待时间不包含在内;
- 我们不会终止本地写操作,因此如果本地写入时间超时,将不予考虑,要获得这种类型的超时,客户端应该使用socket的超时。
TopicName
:发布数据的目标主题;Partition
:发布数据的目标分区;MessageSetSize
:紧接着的MessageSet
字段的字节数;MessageSet
:消息集的标准格式,参考Protocol - Messagesets,注意Kafka 1.1使用的是v2版本的RecordBatch。
响应格式
Kafka 1.1使用的是0.10.0后支持的v2版本,因此v0版本和0.9.0后支持的v1版本就先无视了。
1 | ProduceResponse => [TopicName [Partition ErrorCode Offset Timestamp]] ThrottleTime |
Topic
:响应对应的主题;Partition
:响应对应的分区;ErrorCode
:当前分区的错误码;错误码是基于分区的,因为指定分区可能不可用或者无法在其他主机上维护而其他分区可能成功接受了Produce请求;
Offset
:赋值给消息集中第1条消息的offset;Timestamp
:从UTC epoch至今的毫秒数,根据时间戳类型有不同的设定:- 时间戳类型为
LogAppendTime
,则为broker赋值给该消息集的时间戳,消息集内的所有内部消息都拥有同一个时间戳; - 时间戳类型为
CreateTime
,则该字段总是-1。
如果没有错误码返回,那么生产者可以认为Produce请求的时间戳已被broker接受。
- 时间戳类型为
ThrottleTime
:由于超过了quota(限额)而导致请求被限流的时间间隔,单位:毫秒。
handleProduceRequest
1 | def handleProduceRequest(request: RequestChannel.Request) { |
其中header
和sizeofBodyInBytes
在network.RequestChannel
类中定义
1 | class Request(/* ... */ |
请求头之前在网络层阅读之RequestChannel中提过,这里简单回顾下。RequestHeader
为Java类,定义在org.apache.kafka.common.requests
包中,包含以下字段
1 | private final ApiKeys apiKey; // 请求类型 |
对应消息协议的Headers:
1 | Request Header => api_key api_version correlation_id client_id |
在Processor
处理客户端的请求字节序列时,会调用RequestHeader.parse
方法构造请求头,然后和字节序列buffer
一起发送给RequestChannel
,Handler
线程从中取得请求发送给KafkaApis
处理。
后面是一些认证相关的代码,调用了authorize
方法,由于不影响主要流程,所以暂且跳过,最后会进入以下分支:
1 | val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId |
留意最后的操作,提到了purgatory这个概念:如果请求被放入purgatory,那么就会被(purgatory)持有引用,因此将其置为null
防止被垃圾收集。也是之后涉及再看。
其中,entriesPerPartition
是之前认证过程得到的信息:
1 | val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]() |
Idea调试
追根刨底去看metadataCache
的构造和读取略麻烦,而且偏离了我们这篇文章的核心目的(了解Kafka怎么处理Produce请求)这里就利用Intellij Idea调试先看看里面到底是什么,也是阅读源码以来第1次调试。
首先zkServer
命令启动Zookeeper服务端,然后在Idea中在定义authorizedRequestInfo
处设断点,调试模式启动Kafka的core模块(即Kafka服务端),然后启动Kafka客户端,向test
主题发送字符串hello
,此时可以看到metadataCache
的结构:
brokerId
= 0cache
= “HashMap” size = 2- 0 = …
- _1 = “__consumer_offsets”
value
= {char[18]@5303}hash
= -970371369
- _2 = “HashMap” size = 50
- _1 = “__consumer_offsets”
- 1 = …
- _1 = “test”
value
= {char[4]@5410}hash
= 3556498
- _2 = “HashMap” size = 1
- _1 = “test”
- 0 = …
可见其cache
字段为HashMap
类型,包含了所有的topic,一个是我们创建的test
主题,一个是用来管理消费者提交的offset的__consumer_offsets
。
因此保证了authorizedRequestInfo
,也就是传入appendRecords
的entriesPerPartition
参数,它的topic都是目前现有的。
ReplicaManager.appendRecords
将消息添加到分区的首领副本,等待它们被复制到其他副本。无论是timeout或者acks的条件被满足,都会触发回调函数。如果回调函数本身已经在某个对象上被同步,那么传递这个对象来避免死锁。
1 | def appendRecords(timeout: Long, |
先看else分支,可以得知,传入的entriesPerPartition
为TopicPartition
到MemoryRecords
(消息)的Map
而传入的responseCallback
为发送响应给客户端的回调函数,响应类型也是Map
,key也是TopicPartition
,只不过value变成了PartitionResponse
。也就是说,无论是请求还是响应,都是以分区为单位的,对于错误的响应,只有error
字段起作用,而正确的响应是包含baseOffset
,logAppendTime
和logStartOffset
等字段,前2个字段在上一篇消息协议阅读中简单提过,分别是消息日志中第1个offset以及发送的消息被写入消息日志的时间戳,现在具体阅读acks合法时的处理流程。
time字段
首先取得毫秒级的time
:
1 | val sTime = time.milliseconds |
其中time
为replicaManager
的构造参数,而replicaManager
也是KafkaApis
的构造参数:
1 | class ReplicaManager(val config: KafkaConfig, |
1 | class KafkaApis(val requestChannel: RequestChannel, |
KafkaApis
对象是在KafkaServer
的startup
方法中创建的,层层追溯如下:
1 | apis = new KafkaApis(socketServer.requestChannel, replicaManager, /* ... */) |
1 | replicaManager = createReplicaManager(isShuttingDown) |
1 | protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = |
1 | class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, |
1 | public interface Time { |
可见time
的SystemTime
对象,作为计时器,包含以下常用方法:
milliseconds
:取得毫秒级时间戳;nanoseconds
:取得纳秒级时间戳;sleep(long ms)
:当前线程休眠指定毫秒数。
因此Kafka中一切用到计时器的类都会使用该对象,回过头看appendRecords
代码:
1 | val sTime = time.milliseconds // 取得当前毫秒级时间戳 |
也就是说首先会调用appendToLocalLog
方法
appendToLocalLog
将消息添加到本地副本日志中
1 | private def appendToLocalLog(internalTopicsAllowed: Boolean, |
首先是区分了消费主题是否为内部主题,比如__consumer_offsets
,这种主题并不是存储生产/消费的消息的,因此只允许Admin客户端读写。至于brokerTopicStats
也是度量指标相关的,暂且略过。
1 | // 从内部的 allPartitions 中找到 topicPartition, PS: allPartitions 是从本地消息日志中读取的 |
处理了2种错误:分区是离线的(Offline)和分区是未知,而对于已知分区,则是将appendRecordsToLeader
方法返回的info
来构造该分区对应的LogAppendResult
作为返回结果。
这里通过getPartition
返回的partition
类型是Partition
,位于cluster
包中:
1 | class Partition(val topic: String, |
除了主题名topic
和分区号partitionId
外,还会引用replicaManager
用于将信息写入副本中。还通过isOffline
来区分分区是否在副本broker上。
Partition.appendRecordsToLeader
1 | def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0): LogAppendInfo = { |
这里有几个方法暂时没看细节,将其列出(对于server
包之外的标注包名),之后有可能的话单独阅读:
log.Log.appendAsLeader
:将消息集,分配的offset,leader副本的epoch写入本地消息日志;DelayedOperation.checkAndComplete(key: Any)
:检查某些**延迟操作(delayed operations)**用给定的key能否完成,若能则完成;cluster.Partition.maybeIncrementLeaderHW
:检查并且可能增加分区的HW,仅当分区ISR改变或者任意副本的LEO改变时才更新。
由于本小节涉及到分区的操作,来回顾一些基本概念,每个分区都有多个broker来保存,实现消息的冗余备份,这些broker称为该分区的副本(replica)。对每个分区,存在唯一的leader副本(通过选举产生),与客户端进行直接读写,而其他副本为follower,不断地从leader复制最新的消息。与leader保持同步的follower被称为**ISR(in-sync replica)**,而某些follower会因为某些原因复制速度较慢或者和leader断开连接(通过某种规则判断),此时会从ISR中移除,直到重新跟上进度会重新加入ISR。
HW(high watermark, 高水位)即最新已提交的(committed)消息的offset,即所有ISR的分区日志上都写入了该消息,消费者无法拉取比HW更大的offset,从而保证leader一旦不可用,消费者之前消费的消息存在于任意ISR的消息日志中。
**LEO(log end offset)**是所有副本都会维护的offset,即当前副本最后一个消息的offset+1,也就是如果有新的消息写入,那么它的offset即之前的LEO,而副本将消息写入消息日志后,LEO会递增。
至于epoch这个概念是Kafka 0.11引入的,暂时还不清楚具体功能,之后再提。
appendToLocalLog总结
在之前将客户端发送的请求解析成了分区到消息集的映射,而返回值是分区到LogAppendResult
的映射,因此只对遍历整个Map
,对每对分区消息集进行处理得到LogAppendResult
即可:
- 对
__consumer_offsets
这样的内部主题,验证请求头的client id是否为管理员(admin)的id,否则返回Cannot append to internal topic的错误; - 在
ReplicaManager
维护的当前broker上的分区列表中找到对应的分区; - 若查找失败则返回*Partition … doesn’t exist on …*的错误;
- 若分区不可用,则返回*Partition … is in an offline log directory on broker …*的错误;
- 若当前broker不是分区的leader,则返回*Leader not local for partition … on broker …*的错误;
- 若acks字段为-1,且ISR数量小于
min.insync.replicas
配置的数量,则返回Number of insync replicas for partition … is … below required minimum的错误; - 将消息集写入本地日志,并给当前分区分配offsets和leader epoch;
- 处理延后处理的Fetch请求,可能更新HW;
- 若更新HW,则处理延后处理的请求。
前面的流程都是一些合法性判断,主要是7~9这几步,待深入阅读的内容:
- 对指定分区,写入日志后如何分配offsets和leader epoch?
- 延后处理是怎么实现的?
关于延后处理,主要是ReplicaManager
的以下字段
1 | val delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce], |
都是Purgatory
(炼狱),在辅助构造器中进行默认构造:
1 | DelayedOperationPurgatory[DelayedProduce]( |
都是泛型类DelayedOperationPurgatory
,类型参数不同。
总结
本篇开始阅读Produce请求的处理,首先从官网阅读了Kafka 1.1对应的Produce请求和响应协议,然后阅读KafkaApis
类的处理方法handleProduceRequest
。
跳过了加密/认证的部分,实际上是由ReplicaManager
来处理,调用appendRecords
方法,接受了客户端Produce请求中的acks和timeout两个关键字段。
首先验证acks是否合法(-1, 0 or 1),对不合法acks发送INVALID_REQUIRED_ACKS
响应。
然后调用appendToLocalLog
方法,也是本篇主要阅读的部分。
之后的处理,以及appendRecords
接收的回调函数(比如如何发送响应)的实现,日志的写入,分区的offsets和leader epoch的更新,以及如何延迟处理将在之后进行阅读。