回顾
前2篇分析了SocketServer的启动以及Acceptor/Processor,对配置listener的网络地址,都会创建1个Acceptor和N个Processor,其中N为配置num.network.thread。每个Acceptor会创建1个默认的NIO Selector,每个Processor则都会创建1个Kafka自行实现Selector接口的KSelector。
socket都会被封装成Channel,即通道,代表socket两端的连接。Acceptor会创建一个Channel监听网络地址,并在其Selector注册读事件,然后负责将所有客户端的连接转换成Channel均衡地发送给Processor。
Processor则在其Selector上注册从Acceptor收到的Channel的读/写/关闭连接等事件,并分别处理。但是Processor只负责读取请求(Request)和写入响应(Response),对于已完成的请求,会将其作为参数传给requestChannel调用sendRequest()。另一方面,Processor内部的响应队列,则是由requestChannel调用sendResponse()得到的。
而RequestChannel包含requestQueue字段缓存请求,另外它也和SocketServer一样保存了所有Processor的id和自身组成的ConcurrentHashMap。
处理请求
发送请求
在Processor.processCompletedReceives()方法中,会将封装了网络地址信息的请求传递给sendRequest()方法:
1 | val header = RequestHeader.parse(receive.payload) |
sendRequest()方法的实现:
1 | // 发送待处理的请求, requestQueue 有容量上限, 由 queue.max.requests 配置, 若达到了上限则会阻塞。 |
“发送”只是将请求放到了内部的请求队列中,而出队方法是在server.KafkaRequestHandler中调用的,不属于网络层的事情,暂时不管。因此看看Request类型的构造,传入了header和context。
请求头的解析
1 | public static RequestHeader parse(ByteBuffer buffer) { |
Api Key的类型参考Api Key,消息协议类型参考消息协议:
1 | Request Header => api_key api_version correlation_id client_id |
这个头部即Schema类,其read()方法会把后面的correlation id和client id给读入,构造消息头。
请求上下文的构造
1 | val context = new RequestContext(header, receive.source, channel.socketAddress, channel.principal, |
其实就是简单地将对应参数赋值给内部字段:
1 | public final RequestHeader header; // 消息头 |
消息头上一小节刚看完,连接id也是阅读源码至今一直见到的用于标识一条TCP连接的字符串,最后2个字段均为解析listener配置时解析出的EndPoint类的字段。
请求对象的创建
RequestChannel.Request字段比较多就不一一解释了(很多都是metric相关的),核心是请求body(0.10版本是字段现在是方法):
1 | def body[T <: AbstractRequest](implicit classTag: ClassTag[T], nn: NotNothing[T]): T = { |
该方法仅仅是检查请求类型T是否合法,若不合法则抛出异常。
关键部分是bodyAndSize进行类型匹配,该字段初始化:
1 | private val bodyAndSize: RequestAndSize = context.parseRequest(buffer) |
context解析请求的方法:
1 | public RequestAndSize parseRequest(ByteBuffer buffer) { |
1 | // 类 AbstractRequest 的方法 |
这部分代码都是Java实现的,为了能根据不同请求类型/API版本得到对应的请求类型的示例,实现得较为复杂,细节也不深入去看,总之,在Kafka中可以像这样调用body()方法得到实际的请求对象:
1 | // 将请求的 ByteBuffer 解析成 MetadataRequest 对象 |
取出请求
之前介绍了Processor仅仅是将请求加入阻塞队列requestQueue中,那么何时取出呢?找到其poll()方法的调用处:
1 | // 取得下个请求, 或者阻塞直到超时 |
再看看上述方法的调用处:
1 | // 类 KafkaRequestHandler 的方法 |
由于是API层的代码,所以略去了其他代码,只保留了req相关的。可以看到请求Handler线程会反复地从请求队列中取出请求,然后根据请求地类型进行不同处理。
多线程取出请求安全吗?
由于ArrayBlockingQueue是线程安全的,所以多个Handler线程从中取出请求是线程安全的。另一方面,关于顺序性,即来自同一个客户端的多个请求,必须保证取出的顺序也一致。《Apache Kafka源码剖析》书上给出了解释:Processor.run()方法通过多处注册/取消 读/写事件 来保证每个连接上只有一个请求和一个对应的响应来实现的。
具体而言,可以回顾我上一篇源码分析中Processor的部分:
- 在
processCompletedReceives()中,一旦接收到完整的请求req,在调用sendRequest(req)后会取消监听该Channel的读事件; - 在
processCompleteSends()中,只有当响应成功返回客户端(将响应从缓存的inflightResponses移除)后,才会重新注册该Channel的读事件; - 在
processNewResponses()中判断请求类型是SendAction时,会注册Channel的写事件; - 在
poll()中向底层socket发送数据时,如果判断数据完毕,则会取消注册Channel的写事件。
处理响应
Processor自己维护了响应队列,并在processNewResponses()中调用dequeueResponse()方法依次出队, 那么,可以找到其对应方法enqueueResponse()的调用处:
1 | def sendResponse(response: RequestChannel.Response) { |
值得注意的是这里对processor != null的判断,Kafka 1.1.0的SocketServer支持resizeThreadPoll()方法来改变网络线程数量(也就是Acceptor对应Processor的数量),如果网络线程数减少的话,那么多出的Processor会调用shutdown()方法关闭,并通过connectionId将其从Acceptor和RequestChannel中的processors字段移除。
继续找到该方法的调用处,位于KafkaApis的sendResponse()方法中:
1 | private def sendResponse(request: RequestChannel.Request, responseOpt: Option[AbstractResponse]): Unit = { |
此时RequestChannel只是将响应转发给了Processor,它本身并不维护响应队列(在0.10.0.1版本中则是维护了多个响应队列),真正维护响应队列的是Processor本身。
总结
关于Kafka网络层的阅读至此就告一段落,不得不说但作为Kafka的基础设施的Java NIO实现的部分更为复杂(KafkaChannel和KafkaSelector),但阅读源码不应太陷入细节(感觉我已经有些陷进去了……)。对于Kafka,最重要的还是它的业务层,也就是对Kafka协议的实现。
RequestChannel的作用很简单:
- 维护请求队列,接收来自
Processor的请求,并转发给KafkaRequestHandler进行处理; - 从
KafkaApis获取响应,发送给Processor。
加上文章开始总结的Acceptor/Processor,以及统筹全局的SocketServer,构成了Kafka的网络层,简单描述:
1 | | Network Layer | API Layer | |
可以发现不管是什么Channel,都是起到了连接的作用。Acceptor通过最简单的SocketChannel与监听套接字连接,监听连接事件,并将接受的连接转发给Processor,之后Processor通过比较复杂的KafkaChannel与客户端连接,监听读/写事件并和客户端进行数据的交互。
数据分为来自客户端的请求和来自服务端的响应,Processor不负责这部分,而是通过RequestChannel将请求发送给KafkaRequestHandler,再从RequestChannel接收响应。
问题来了,而实际发送响应给RequestChannel的却是KafkaApis,因此请求=>响应的过程是由它们共同完成的,也就是接下来要阅读的API层。