回顾
前2篇分析了SocketServer
的启动以及Acceptor
/Processor
,对配置listener
的网络地址,都会创建1个Acceptor
和N个Processor
,其中N为配置num.network.thread
。每个Acceptor
会创建1个默认的NIO Selector
,每个Processor
则都会创建1个Kafka自行实现Selector
接口的KSelector
。
socket都会被封装成Channel
,即通道,代表socket两端的连接。Acceptor
会创建一个Channel
监听网络地址,并在其Selector
注册读事件,然后负责将所有客户端的连接转换成Channel
均衡地发送给Processor
。
Processor
则在其Selector
上注册从Acceptor
收到的Channel
的读/写/关闭连接等事件,并分别处理。但是Processor
只负责读取请求(Request)和写入响应(Response),对于已完成的请求,会将其作为参数传给requestChannel
调用sendRequest()
。另一方面,Processor
内部的响应队列,则是由requestChannel
调用sendResponse()
得到的。
而RequestChannel
包含requestQueue
字段缓存请求,另外它也和SocketServer
一样保存了所有Processor
的id和自身组成的ConcurrentHashMap
。
处理请求
发送请求
在Processor.processCompletedReceives()
方法中,会将封装了网络地址信息的请求传递给sendRequest()
方法:
1 | val header = RequestHeader.parse(receive.payload) |
sendRequest()
方法的实现:
1 | // 发送待处理的请求, requestQueue 有容量上限, 由 queue.max.requests 配置, 若达到了上限则会阻塞。 |
“发送”只是将请求放到了内部的请求队列中,而出队方法是在server.KafkaRequestHandler
中调用的,不属于网络层的事情,暂时不管。因此看看Request
类型的构造,传入了header
和context
。
请求头的解析
1 | public static RequestHeader parse(ByteBuffer buffer) { |
Api Key的类型参考Api Key,消息协议类型参考消息协议:
1 | Request Header => api_key api_version correlation_id client_id |
这个头部即Schema
类,其read()
方法会把后面的correlation id和client id给读入,构造消息头。
请求上下文的构造
1 | val context = new RequestContext(header, receive.source, channel.socketAddress, channel.principal, |
其实就是简单地将对应参数赋值给内部字段:
1 | public final RequestHeader header; // 消息头 |
消息头上一小节刚看完,连接id也是阅读源码至今一直见到的用于标识一条TCP连接的字符串,最后2个字段均为解析listener
配置时解析出的EndPoint
类的字段。
请求对象的创建
RequestChannel.Request
字段比较多就不一一解释了(很多都是metric相关的),核心是请求body(0.10版本是字段现在是方法):
1 | def body[T <: AbstractRequest](implicit classTag: ClassTag[T], nn: NotNothing[T]): T = { |
该方法仅仅是检查请求类型T是否合法,若不合法则抛出异常。
关键部分是bodyAndSize
进行类型匹配,该字段初始化:
1 | private val bodyAndSize: RequestAndSize = context.parseRequest(buffer) |
context
解析请求的方法:
1 | public RequestAndSize parseRequest(ByteBuffer buffer) { |
1 | // 类 AbstractRequest 的方法 |
这部分代码都是Java实现的,为了能根据不同请求类型/API版本得到对应的请求类型的示例,实现得较为复杂,细节也不深入去看,总之,在Kafka中可以像这样调用body()
方法得到实际的请求对象:
1 | // 将请求的 ByteBuffer 解析成 MetadataRequest 对象 |
取出请求
之前介绍了Processor
仅仅是将请求加入阻塞队列requestQueue
中,那么何时取出呢?找到其poll()
方法的调用处:
1 | // 取得下个请求, 或者阻塞直到超时 |
再看看上述方法的调用处:
1 | // 类 KafkaRequestHandler 的方法 |
由于是API层的代码,所以略去了其他代码,只保留了req
相关的。可以看到请求Handler
线程会反复地从请求队列中取出请求,然后根据请求地类型进行不同处理。
多线程取出请求安全吗?
由于ArrayBlockingQueue
是线程安全的,所以多个Handler
线程从中取出请求是线程安全的。另一方面,关于顺序性,即来自同一个客户端的多个请求,必须保证取出的顺序也一致。《Apache Kafka源码剖析》书上给出了解释:Processor.run()
方法通过多处注册/取消 读/写事件 来保证每个连接上只有一个请求和一个对应的响应来实现的。
具体而言,可以回顾我上一篇源码分析中Processor
的部分:
- 在
processCompletedReceives()
中,一旦接收到完整的请求req
,在调用sendRequest(req)
后会取消监听该Channel
的读事件; - 在
processCompleteSends()
中,只有当响应成功返回客户端(将响应从缓存的inflightResponses
移除)后,才会重新注册该Channel
的读事件; - 在
processNewResponses()
中判断请求类型是SendAction
时,会注册Channel
的写事件; - 在
poll()
中向底层socket发送数据时,如果判断数据完毕,则会取消注册Channel
的写事件。
处理响应
Processor
自己维护了响应队列,并在processNewResponses()
中调用dequeueResponse()
方法依次出队, 那么,可以找到其对应方法enqueueResponse()
的调用处:
1 | def sendResponse(response: RequestChannel.Response) { |
值得注意的是这里对processor != null
的判断,Kafka 1.1.0的SocketServer
支持resizeThreadPoll()
方法来改变网络线程数量(也就是Acceptor
对应Processor
的数量),如果网络线程数减少的话,那么多出的Processor
会调用shutdown()
方法关闭,并通过connectionId
将其从Acceptor
和RequestChannel
中的processors
字段移除。
继续找到该方法的调用处,位于KafkaApis
的sendResponse()
方法中:
1 | private def sendResponse(request: RequestChannel.Request, responseOpt: Option[AbstractResponse]): Unit = { |
此时RequestChannel
只是将响应转发给了Processor
,它本身并不维护响应队列(在0.10.0.1版本中则是维护了多个响应队列),真正维护响应队列的是Processor
本身。
总结
关于Kafka网络层的阅读至此就告一段落,不得不说但作为Kafka的基础设施的Java NIO实现的部分更为复杂(KafkaChannel
和KafkaSelector
),但阅读源码不应太陷入细节(感觉我已经有些陷进去了……)。对于Kafka,最重要的还是它的业务层,也就是对Kafka协议的实现。
RequestChannel
的作用很简单:
- 维护请求队列,接收来自
Processor
的请求,并转发给KafkaRequestHandler
进行处理; - 从
KafkaApis
获取响应,发送给Processor
。
加上文章开始总结的Acceptor
/Processor
,以及统筹全局的SocketServer
,构成了Kafka的网络层,简单描述:
1 | | Network Layer | API Layer | |
可以发现不管是什么Channel
,都是起到了连接的作用。Acceptor
通过最简单的SocketChannel
与监听套接字连接,监听连接事件,并将接受的连接转发给Processor
,之后Processor
通过比较复杂的KafkaChannel
与客户端连接,监听读/写事件并和客户端进行数据的交互。
数据分为来自客户端的请求和来自服务端的响应,Processor
不负责这部分,而是通过RequestChannel
将请求发送给KafkaRequestHandler
,再从RequestChannel
接收响应。
问题来了,而实际发送响应给RequestChannel
的却是KafkaApis
,因此请求=>响应的过程是由它们共同完成的,也就是接下来要阅读的API层。