回顾
之前通过网络层的阅读,我们知道了和客户端直接进行读写的是Processor
,但是它会将请求通过RequestChannel
发送给KafkaRequestHandler
,同时也会接收KafkaApis
通过RequestChannel
回复的响应。因此从本篇开始阅读API层,也就是Handler和Apis,它们都是位于server
包内。
Handler线程的创建
回顾请求的调用链
1 | Processor.processCompleteReceives |
Processor
的requestChannel
字段调用sendRequest()
方法,该方法将请求放入其requestQueue
字段中:
1 | private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize) |
既然有入队,就肯定有出队,找到其poll()
方法的调用处:
1 | // 取得下一个请求,或者阻塞直到超时 |
继续找到该方法的调用处,在KafkaRequestHandler.run()
方法中:
1 | val req = requestChannel.receiveRequest(300) // 300ms超时 |
KafkaRequestHandler
实现了Runnable
接口,也就是说,它在调用start()
方法时就会启动线程,执行run()
方法,查找使用它的地方,为KafkaRequestHandlerPool
的字段:
1 | val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads) |
Poll类管理了Handler线程,它默认创建了numThreads
个Handler线程。
PS:这里使用了ArrayBuffer
而非固定大小的Array
,和之前提到的Processor
和Acceptor
使用ConcurrentHashMap
保存一样,都是为了支持resize
操作。
再看看Pool
类的使用处,它是KafkaServer
的字段
1 | var requestHandlerPool: KafkaRequestHandlerPool = null |
KafkaServer
才是真正的Kafka服务器的类,而之前介绍的SocketServer
类只是它用来管理网络的部分,也是其中的一个字段:
1 | var socketServer: SocketServer = null |
再看看requestHandlerPool
的使用处,位于KafkaServer
的startup()
方法:
1 | requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, |
第1个参数为配置的broker.id
,第2个参数为RequestChannel
对象,第3个参数为KafkaApis
对象,第4个参数为KafkaServer
的构造参数,为SystemTime
对象,位于common.util
包内。
第5个参数为Handler线程的数量,为config.numIoThreads
,对应配置文件的num.io.threads
,默认值为8(见KafkaConfig.scala
的Defaults
伴生对象)。
而Pool类中创建Handler线程代码如下:
1 | for (i <- 0 until numThreads) { |
id
为Handler线程的编号(从0开始),aggregateIdleMeter
为度量指标相关(配合time
字段计算Handler线程闲置的时间,我们依旧忽略之),threadPoolSize
为线程池大小(即线程数量)。剩余参数都是Pool类的构造参数。
至此,我们知道了KafkaServer
管理了Handler线程池,会根据配置的num.io.threads
创建对应数量的Handler线程,并且多个Handler线程共享了KafkaApis
对象和RequestChannel
对象。
Handler线程实现
忽略了日志和度量指标的部分:
1 | def run() { |
逻辑很简单,Handler线程反复地从requestChannel
中取得请求,交由apis
进行处理,如果处理出错会捕获异常,并以异常中包含的错误码退出当前线程。这也解释了前一篇的疑问:为何请求的发送对象是Handler线程,而响应却来自于Apis。
值得注意的地方是,这里还有个ShutdownRequest
,是用来退出Handler线程的,找到它的调用处:
1 | def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest) |
RequestChannel
对象调用该方法将Shutdown请求加入队列中,而该方法的调用处位于KafkaRequestHandler
内:
1 | def initiateShutdown(): Unit = requestChannel.sendShutdownRequest() |
进一步往上找,会发现位于KafkaRequestHandler
的shutdown()
方法内,用于Handler线程的正常退出(相对而言,apis.handle()
抛出异常则是异常退出,退出码不为0):
1 | def shutdown(): Unit = synchronized { |
至于awaitShutdown()
方法,则是Java线程退出的惯用法,即调用了CountDownLatch
对象的await()
方法,等待计数归0,可以看到run()
方法中不同的退出分支都会调用shutdownComplete.countDown()
方法,即将CountDownLatch
对象shutdownComplete
的计数减1,而其初始计数为1:
1 | private val shutdownComplete = new CountDownLatch(1) |
Apis
KafkaApis
对象的创建在KafkaRequestHandlerPool
创建之前,其构造参数有18个,因此暂时不详细列出,最为关键的还是requestChannel
。看看关键的handle()
方法:
1 | def handle(request: RequestChannel.Request) { |
取得请求头的apiKey
(见前一篇的请求头的解析一节),根据类型调用不同的handle*()
方法进行处理,这里看一个比较简单的例子,是对LIST_OFFSETS
请求的处理:
1 | def handleListOffsetRequest(request: RequestChannel.Request) { |
进一步的分析略过,总之Apis在处理完请求后,如果判断需要发送,则会创建响应的响应(*Response
)类型,并调用sendResponse()
方法:
1 | private def sendResponse(request: RequestChannel.Request, responseOpt: Option[AbstractResponse]): Unit = { |
可见实际上,还是利用requestChannel.sendResponse()
方法发送响应(参见前一篇的处理响应一节,会将响应加入Processor
的响应队列中)。这里的Send
接口表示的是待发送的数据,而String
则是用于调试的:
1 | object RequestChannel extends Logging { |
这里需要额外说明下,Kafka的日志系统使用的是SLF4J
,它本身只是日志的抽象层,而没有具体的实现,因此在编译运行Kafka时会提示警告:
SLF4J: Failed to load class “org.slf4j.impl.StaticLoggerBinder”. SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
只有将具体的日志jar包放入classpath中,才会成功打印日志,因此从Kafka源码中是无法确定isRequestLoggingEnabled
在哪里设置为true
,取决于实际日志包的配置。
总结
API层其实还是很简单的,创建num.io.threads
个Handler线程,从共享的RequestChannel
中取出请求(使用ArrayBlockingQueue
请求队列保证线程安全并且限制最大请求数),如果不是Handler调用shutdown()
方法加入的关闭请求,则将其交给Apis对象进行处理,处理完请求后会构造响应对象,通过RequestChannel
加入到Processor
内部的响应队列(使用LinkedBlockingQueue
响应队列保证线程安全,并且不限制最大响应数量)。
实际的请求的解析和响应的构造则集中于KafkaApis
类中,接下来则是通过不同的ApiKey
类依次看看Kafka支持哪些请求,并且内部是怎样处理这些请求。