回顾
之前通过网络层的阅读,我们知道了和客户端直接进行读写的是Processor,但是它会将请求通过RequestChannel发送给KafkaRequestHandler,同时也会接收KafkaApis通过RequestChannel回复的响应。因此从本篇开始阅读API层,也就是Handler和Apis,它们都是位于server包内。
Handler线程的创建
回顾请求的调用链
1 | Processor.processCompleteReceives |
Processor的requestChannel字段调用sendRequest()方法,该方法将请求放入其requestQueue字段中:
1 | private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize) |
既然有入队,就肯定有出队,找到其poll()方法的调用处:
1 | // 取得下一个请求,或者阻塞直到超时 |
继续找到该方法的调用处,在KafkaRequestHandler.run()方法中:
1 | val req = requestChannel.receiveRequest(300) // 300ms超时 |
KafkaRequestHandler实现了Runnable接口,也就是说,它在调用start()方法时就会启动线程,执行run()方法,查找使用它的地方,为KafkaRequestHandlerPool的字段:
1 | val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads) |
Poll类管理了Handler线程,它默认创建了numThreads个Handler线程。
PS:这里使用了ArrayBuffer而非固定大小的Array,和之前提到的Processor和Acceptor使用ConcurrentHashMap保存一样,都是为了支持resize操作。
再看看Pool类的使用处,它是KafkaServer的字段
1 | var requestHandlerPool: KafkaRequestHandlerPool = null |
KafkaServer才是真正的Kafka服务器的类,而之前介绍的SocketServer类只是它用来管理网络的部分,也是其中的一个字段:
1 | var socketServer: SocketServer = null |
再看看requestHandlerPool的使用处,位于KafkaServer的startup()方法:
1 | requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, |
第1个参数为配置的broker.id,第2个参数为RequestChannel对象,第3个参数为KafkaApis对象,第4个参数为KafkaServer的构造参数,为SystemTime对象,位于common.util包内。
第5个参数为Handler线程的数量,为config.numIoThreads,对应配置文件的num.io.threads,默认值为8(见KafkaConfig.scala的Defaults伴生对象)。
而Pool类中创建Handler线程代码如下:
1 | for (i <- 0 until numThreads) { |
id为Handler线程的编号(从0开始),aggregateIdleMeter为度量指标相关(配合time字段计算Handler线程闲置的时间,我们依旧忽略之),threadPoolSize为线程池大小(即线程数量)。剩余参数都是Pool类的构造参数。
至此,我们知道了KafkaServer管理了Handler线程池,会根据配置的num.io.threads创建对应数量的Handler线程,并且多个Handler线程共享了KafkaApis对象和RequestChannel对象。
Handler线程实现
忽略了日志和度量指标的部分:
1 | def run() { |
逻辑很简单,Handler线程反复地从requestChannel中取得请求,交由apis进行处理,如果处理出错会捕获异常,并以异常中包含的错误码退出当前线程。这也解释了前一篇的疑问:为何请求的发送对象是Handler线程,而响应却来自于Apis。
值得注意的地方是,这里还有个ShutdownRequest,是用来退出Handler线程的,找到它的调用处:
1 | def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest) |
RequestChannel对象调用该方法将Shutdown请求加入队列中,而该方法的调用处位于KafkaRequestHandler内:
1 | def initiateShutdown(): Unit = requestChannel.sendShutdownRequest() |
进一步往上找,会发现位于KafkaRequestHandler的shutdown()方法内,用于Handler线程的正常退出(相对而言,apis.handle()抛出异常则是异常退出,退出码不为0):
1 | def shutdown(): Unit = synchronized { |
至于awaitShutdown()方法,则是Java线程退出的惯用法,即调用了CountDownLatch对象的await()方法,等待计数归0,可以看到run()方法中不同的退出分支都会调用shutdownComplete.countDown()方法,即将CountDownLatch对象shutdownComplete的计数减1,而其初始计数为1:
1 | private val shutdownComplete = new CountDownLatch(1) |
Apis
KafkaApis对象的创建在KafkaRequestHandlerPool创建之前,其构造参数有18个,因此暂时不详细列出,最为关键的还是requestChannel。看看关键的handle()方法:
1 | def handle(request: RequestChannel.Request) { |
取得请求头的apiKey(见前一篇的请求头的解析一节),根据类型调用不同的handle*()方法进行处理,这里看一个比较简单的例子,是对LIST_OFFSETS请求的处理:
1 | def handleListOffsetRequest(request: RequestChannel.Request) { |
进一步的分析略过,总之Apis在处理完请求后,如果判断需要发送,则会创建响应的响应(*Response)类型,并调用sendResponse()方法:
1 | private def sendResponse(request: RequestChannel.Request, responseOpt: Option[AbstractResponse]): Unit = { |
可见实际上,还是利用requestChannel.sendResponse()方法发送响应(参见前一篇的处理响应一节,会将响应加入Processor的响应队列中)。这里的Send接口表示的是待发送的数据,而String则是用于调试的:
1 | object RequestChannel extends Logging { |
这里需要额外说明下,Kafka的日志系统使用的是SLF4J,它本身只是日志的抽象层,而没有具体的实现,因此在编译运行Kafka时会提示警告:
SLF4J: Failed to load class “org.slf4j.impl.StaticLoggerBinder”. SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
只有将具体的日志jar包放入classpath中,才会成功打印日志,因此从Kafka源码中是无法确定isRequestLoggingEnabled在哪里设置为true,取决于实际日志包的配置。
总结
API层其实还是很简单的,创建num.io.threads个Handler线程,从共享的RequestChannel中取出请求(使用ArrayBlockingQueue请求队列保证线程安全并且限制最大请求数),如果不是Handler调用shutdown()方法加入的关闭请求,则将其交给Apis对象进行处理,处理完请求后会构造响应对象,通过RequestChannel加入到Processor内部的响应队列(使用LinkedBlockingQueue响应队列保证线程安全,并且不限制最大响应数量)。
实际的请求的解析和响应的构造则集中于KafkaApis类中,接下来则是通过不同的ApiKey类依次看看Kafka支持哪些请求,并且内部是怎样处理这些请求。