Kafka源码阅读04-API层之Handler和Apis

回顾

之前通过网络层的阅读,我们知道了和客户端直接进行读写的是Processor,但是它会将请求通过RequestChannel发送给KafkaRequestHandler,同时也会接收KafkaApis通过RequestChannel回复的响应。因此从本篇开始阅读API层,也就是Handler和Apis,它们都是位于server包内。

Handler线程的创建

回顾请求的调用链

1
2
3
Processor.processCompleteReceives
\-- requestChannel.sendRequest(request)
\-- requestQueue.put(request)

ProcessorrequestChannel字段调用sendRequest()方法,该方法将请求放入其requestQueue字段中:

1
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)

既然有入队,就肯定有出队,找到其poll()方法的调用处:

1
2
3
// 取得下一个请求,或者阻塞直到超时  
def receiveRequest(timeout: Long): RequestChannel.BaseRequest =
requestQueue.poll(timeout, TimeUnit.MILLISECONDS)

继续找到该方法的调用处,在KafkaRequestHandler.run()方法中:

1
val req = requestChannel.receiveRequest(300) // 300ms超时

KafkaRequestHandler实现了Runnable接口,也就是说,它在调用start()方法时就会启动线程,执行run()方法,查找使用它的地方,为KafkaRequestHandlerPool的字段:

1
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)

Poll类管理了Handler线程,它默认创建了numThreads个Handler线程。

PS:这里使用了ArrayBuffer而非固定大小的Array,和之前提到的ProcessorAcceptor使用ConcurrentHashMap保存一样,都是为了支持resize操作。

再看看Pool类的使用处,它是KafkaServer的字段

1
var requestHandlerPool: KafkaRequestHandlerPool = null

KafkaServer才是真正的Kafka服务器的类,而之前介绍的SocketServer类只是它用来管理网络的部分,也是其中的一个字段:

1
var socketServer: SocketServer = null

再看看requestHandlerPool的使用处,位于KafkaServerstartup()方法:

1
2
3
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId,
socketServer.requestChannel,
apis, time, config.numIoThreads)

第1个参数为配置的broker.id,第2个参数为RequestChannel对象,第3个参数为KafkaApis对象,第4个参数为KafkaServer的构造参数,为SystemTime对象,位于common.util包内。

第5个参数为Handler线程的数量,为config.numIoThreads,对应配置文件的num.io.threads,默认值为8(见KafkaConfig.scalaDefaults伴生对象)。

而Pool类中创建Handler线程代码如下:

1
2
3
4
5
6
7
8
for (i <- 0 until numThreads) {
createHandler(i)
}

def createHandler(id: Int): Unit = synchronized {
runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start()
}

id为Handler线程的编号(从0开始),aggregateIdleMeter为度量指标相关(配合time字段计算Handler线程闲置的时间,我们依旧忽略之),threadPoolSize为线程池大小(即线程数量)。剩余参数都是Pool类的构造参数。

至此,我们知道了KafkaServer管理了Handler线程池,会根据配置的num.io.threads创建对应数量的Handler线程,并且多个Handler线程共享了KafkaApis对象和RequestChannel对象。

Handler线程实现

忽略了日志和度量指标的部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def run() {
while (!stopped) {
// 从requestChannel中取得请求,timeout为300ms
val req = requestChannel.receiveRequest(300)

req match {
// Shutdown类型的请求,直接退出Handler线程
case RequestChannel.ShutdownRequest =>
debug(s"Kafka request handler $id on broker $brokerId received shut down command")
shutdownComplete.countDown()
return

// 正常请求
case request: RequestChannel.Request =>
try {
request.requestDequeueTimeNanos = endTime
// 交给apis处理
apis.handle(request)
} catch { // 处理api.handle()可能抛出的异常
case e: FatalExitError =>
shutdownComplete.countDown()
Exit.exit(e.statusCode)
case e: Throwable => error("Exception when handling request", e)
} finally {
request.releaseBuffer()
}

case null => // continue
}
}
shutdownComplete.countDown()
}

逻辑很简单,Handler线程反复地从requestChannel中取得请求,交由apis进行处理,如果处理出错会捕获异常,并以异常中包含的错误码退出当前线程。这也解释了前一篇的疑问:为何请求的发送对象是Handler线程,而响应却来自于Apis。

值得注意的地方是,这里还有个ShutdownRequest,是用来退出Handler线程的,找到它的调用处:

1
def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest)

RequestChannel对象调用该方法将Shutdown请求加入队列中,而该方法的调用处位于KafkaRequestHandler内:

1
def initiateShutdown(): Unit = requestChannel.sendShutdownRequest()

进一步往上找,会发现位于KafkaRequestHandlershutdown()方法内,用于Handler线程的正常退出(相对而言,apis.handle()抛出异常则是异常退出,退出码不为0):

1
2
3
4
5
6
7
def shutdown(): Unit = synchronized {
info("shutting down")
for (handler <- runnables)
handler.initiateShutdown()
for (handler <- runnables)
handler.awaitShutdown()
}

至于awaitShutdown()方法,则是Java线程退出的惯用法,即调用了CountDownLatch对象的await()方法,等待计数归0,可以看到run()方法中不同的退出分支都会调用shutdownComplete.countDown()方法,即将CountDownLatch对象shutdownComplete的计数减1,而其初始计数为1:

1
private val shutdownComplete = new CountDownLatch(1)

Apis

KafkaApis对象的创建在KafkaRequestHandlerPool创建之前,其构造参数有18个,因此暂时不详细列出,最为关键的还是requestChannel。看看关键的handle()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def handle(request: RequestChannel.Request) {
try {
request.header.apiKey match {
case ApiKeys.PRODUCE => handleProduceRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
// 其他类型的的ApiKeys...
}
} catch {
case e: FatalExitError => throw e
case e: Throwable => handleError(request, e)
} finally {
request.apiLocalCompleteTimeNanos = time.nanoseconds
}
}

取得请求头的apiKey(见前一篇的请求头的解析一节),根据类型调用不同的handle*()方法进行处理,这里看一个比较简单的例子,是对LIST_OFFSETS请求的处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
def handleListOffsetRequest(request: RequestChannel.Request) {
val version = request.header.apiVersion()

// 根据请求头的API版本进行不同的处理
val mergedResponseMap = if (version == 0)
handleListOffsetRequestV0(request)
else
handleListOffsetRequestV1AndAbove(request)

// 参数2为将requestThrottleMs作为输入的函数,会根据该输入创建ListOffsetResponse对象
// 并且在sendResponseMaybeThrottle类根据requestThrottleMs判断是否创建该对象并发送
sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
}

进一步的分析略过,总之Apis在处理完请求后,如果判断需要发送,则会创建响应的响应(*Response)类型,并调用sendResponse()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private def sendResponse(request: RequestChannel.Request, responseOpt: Option[AbstractResponse]): Unit = {
// 对响应的每个非0错误码更新度量指标
responseOpt.foreach(response => requestChannel.updateErrorMetrics(request.header.apiKey, response.errorCounts.asScala))

responseOpt match {
case Some(response) =>
// 若响应存在,则构造响应的Send,并调用requestChannel的sendResponse()方法发送
val responseSend = request.context.buildResponse(response)
// 如果RequestChannel伴生对象的isRequestloggingEnabled则构造该字符串
val responseString =
if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.apiVersion))
else None
requestChannel.sendResponse(new RequestChannel.Response(request, Some(responseSend), SendAction, responseString))
case None =>
requestChannel.sendResponse(new RequestChannel.Response(request, None, NoOpAction, None))
}
}

可见实际上,还是利用requestChannel.sendResponse()方法发送响应(参见前一篇的处理响应一节,会将响应加入Processor的响应队列中)。这里的Send接口表示的是待发送的数据,而String则是用于调试的:

1
2
3
4
5
6
7
object RequestChannel extends Logging {
private val requestLogger = Logger("kafka.request.logger")
// ...

def isRequestLoggingEnabled: Boolean = requestLogger.underlying.isDebugEnabled
// ...
}

这里需要额外说明下,Kafka的日志系统使用的是SLF4J,它本身只是日志的抽象层,而没有具体的实现,因此在编译运行Kafka时会提示警告:

SLF4J: Failed to load class “org.slf4j.impl.StaticLoggerBinder”. SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

只有将具体的日志jar包放入classpath中,才会成功打印日志,因此从Kafka源码中是无法确定isRequestLoggingEnabled在哪里设置为true,取决于实际日志包的配置。

总结

API层其实还是很简单的,创建num.io.threads个Handler线程,从共享的RequestChannel中取出请求(使用ArrayBlockingQueue请求队列保证线程安全并且限制最大请求数),如果不是Handler调用shutdown()方法加入的关闭请求,则将其交给Apis对象进行处理,处理完请求后会构造响应对象,通过RequestChannel加入到Processor内部的响应队列(使用LinkedBlockingQueue响应队列保证线程安全,并且不限制最大响应数量)。

实际的请求的解析和响应的构造则集中于KafkaApis类中,接下来则是通过不同的ApiKey类依次看看Kafka支持哪些请求,并且内部是怎样处理这些请求。