Kafka源码阅读03: 网络层阅读之RequestChannel

回顾

前2篇分析了SocketServer的启动以及Acceptor/Processor,对配置listener的网络地址,都会创建1个Acceptor和N个Processor,其中N为配置num.network.thread。每个Acceptor会创建1个默认的NIO Selector,每个Processor则都会创建1个Kafka自行实现Selector接口的KSelector

socket都会被封装成Channel,即通道,代表socket两端的连接。Acceptor会创建一个Channel监听网络地址,并在其Selector注册读事件,然后负责将所有客户端的连接转换成Channel均衡地发送给Processor

Processor则在其Selector上注册从Acceptor收到的Channel的读/写/关闭连接等事件,并分别处理。但是Processor只负责读取请求(Request)和写入响应(Response),对于已完成的请求,会将其作为参数传给requestChannel调用sendRequest()。另一方面,Processor内部的响应队列,则是由requestChannel调用sendResponse()得到的。

RequestChannel包含requestQueue字段缓存请求,另外它也和SocketServer一样保存了所有Processor的id和自身组成的ConcurrentHashMap

处理请求

发送请求

Processor.processCompletedReceives()方法中,会将封装了网络地址信息的请求传递给sendRequest()方法:

1
2
3
4
5
6
val header = RequestHeader.parse(receive.payload)
val context = new RequestContext(header, receive.source, channel.socketAddress,
channel.principal, listenerName, securityProtocol)
val req = new RequestChannel.Request(processor = id, context = context,
startTimeNanos = time.nanoseconds, memoryPool, receive.payload, requestChannel.metrics)
requestChannel.sendRequest(req)

sendRequest()方法的实现:

1
2
3
4
// 发送待处理的请求, requestQueue 有容量上限, 由 queue.max.requests 配置, 若达到了上限则会阻塞。
def sendRequest(request: RequestChannel.Request) {
requestQueue.put(request)
}

“发送”只是将请求放到了内部的请求队列中,而出队方法是在server.KafkaRequestHandler中调用的,不属于网络层的事情,暂时不管。因此看看Request类型的构造,传入了headercontext

请求头的解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static RequestHeader parse(ByteBuffer buffer) {
try {
// 前2个字节为 Api Key, 代表请求的类型
short apiKey = buffer.getShort();
// 后2个字节为 Api Version, 即客户端使用的API版本
short apiVersion = buffer.getShort();
// 通过上述字段创建 Schema 对象, 即完整的消息头
Schema schema = schema(apiKey, apiVersion);
// ByteBuffer.getXXX() 会修改内部偏移量, 因此需要将偏移量重置为最开始以便从头读取 buffer
// 从头读取 buffer, 进而用 scheme.read() 构造请求头
buffer.rewind();
return new RequestHeader(schema.read(buffer));
} // 异常处理(略)
}

Api Key的类型参考Api Key,消息协议类型参考消息协议

1
2
3
4
5
Request Header => api_key api_version correlation_id client_id 
api_key => INT16
api_version => INT16
correlation_id => INT32
client_id => NULLABLE_STRING

这个头部即Schema类,其read()方法会把后面的correlation id和client id给读入,构造消息头。

请求上下文的构造

1
2
val context = new RequestContext(header, receive.source, channel.socketAddress,  channel.principal,
listenerName, securityProtocol)

其实就是简单地将对应参数赋值给内部字段:

1
2
3
4
5
6
public final RequestHeader header;  // 消息头
public final String connectionId; // 连接id,包含本地和远程的地址和表示连接的index
public final InetAddress clientAddress; // 客户端地址
public final KafkaPrincipal principal; // Channel的principal字段,用于信息认证
public final ListenerName listenerName; // 配置 listeners 的名字部分(比如PLAINTEXT)
public final SecurityProtocol securityProtocol; // 安全协议, 根据listenerName解析的

消息头上一小节刚看完,连接id也是阅读源码至今一直见到的用于标识一条TCP连接的字符串,最后2个字段均为解析listener配置时解析出的EndPoint类的字段。

请求对象的创建

RequestChannel.Request字段比较多就不一一解释了(很多都是metric相关的),核心是请求body(0.10版本是字段现在是方法):

1
2
3
4
5
6
7
def body[T <: AbstractRequest](implicit classTag: ClassTag[T], nn: NotNothing[T]): T = {
bodyAndSize.request match {
case r: T => r
case r =>
throw new ClassCastException(s"Expected request with type ${classTag.runtimeClass}, but found ${r.getClass}")
}
}

该方法仅仅是检查请求类型T是否合法,若不合法则抛出异常。

关键部分是bodyAndSize进行类型匹配,该字段初始化:

1
private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)

context解析请求的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public RequestAndSize parseRequest(ByteBuffer buffer) {
if (isUnsupportedApiVersionsRequest()) {
// 未支持的 ApiVersion 被视为v0请求并且不被处理
ApiVersionsRequest apiVersionsRequest = new ApiVersionsRequest((short) 0, header.apiVersion());
return new RequestAndSize(apiVersionsRequest, 0);
} else {
ApiKeys apiKey = header.apiKey();
try {
short apiVersion = header.apiVersion();
// 根据API版本将字节缓存解析成Struct的各个字段
Struct struct = apiKey.parseRequest(apiVersion, buffer);
// 根据请求类型/API版本/Struct字段创建实际的请求类型
AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, struct);
return new RequestAndSize(body, struct.sizeOf());
} catch (Throwable ex) {
// 异常处理(略)
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
// 类 AbstractRequest 的方法
public static AbstractRequest parseRequest(ApiKeys apiKey, short apiVersion, Struct struct) {
switch (apiKey) {
case PRODUCE:
return new ProduceRequest(struct, apiVersion);
case FETCH:
return new FetchRequest(struct, apiVersion);
// 其他类型的请求...(略)
default:
// 异常处理(略)
}
}

这部分代码都是Java实现的,为了能根据不同请求类型/API版本得到对应的请求类型的示例,实现得较为复杂,细节也不深入去看,总之,在Kafka中可以像这样调用body()方法得到实际的请求对象:

1
2
// 将请求的 ByteBuffer 解析成 MetadataRequest 对象
val metadataRequest = request.body[MetadataRequest]

取出请求

之前介绍了Processor仅仅是将请求加入阻塞队列requestQueue中,那么何时取出呢?找到其poll()方法的调用处:

1
2
3
// 取得下个请求, 或者阻塞直到超时
def receiveRequest(timeout: Long): RequestChannel.BaseRequest =
requestQueue.poll(timeout, TimeUnit.MILLISECONDS)

再看看上述方法的调用处:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 类 KafkaRequestHandler 的方法
def run() {
while (!stopped) {
// ...
val req = requestChannel.receiveRequest(300) // 300ms超时
// ...
req match {
case RequestChannel.ShutdownRequest =>
// ...
case request: RequestChannel.Request =>
// ...
case null => // continue
}
}
// ...
}

由于是API层的代码,所以略去了其他代码,只保留了req相关的。可以看到请求Handler线程会反复地从请求队列中取出请求,然后根据请求地类型进行不同处理。

多线程取出请求安全吗?

由于ArrayBlockingQueue是线程安全的,所以多个Handler线程从中取出请求是线程安全的。另一方面,关于顺序性,即来自同一个客户端的多个请求,必须保证取出的顺序也一致。《Apache Kafka源码剖析》书上给出了解释:Processor.run()方法通过多处注册/取消 读/写事件 来保证每个连接上只有一个请求和一个对应的响应来实现的。

具体而言,可以回顾我上一篇源码分析中Processor的部分:

  1. processCompletedReceives()中,一旦接收到完整的请求req,在调用sendRequest(req)后会取消监听该Channel的读事件;
  2. processCompleteSends()中,只有当响应成功返回客户端(将响应从缓存的inflightResponses移除)后,才会重新注册该Channel的读事件;
  3. processNewResponses()中判断请求类型是SendAction时,会注册Channel的写事件;
  4. poll()中向底层socket发送数据时,如果判断数据完毕,则会取消注册Channel的写事件。

处理响应

Processor自己维护了响应队列,并在processNewResponses()中调用dequeueResponse()方法依次出队, 那么,可以找到其对应方法enqueueResponse()的调用处:

1
2
3
4
5
6
7
8
9
10
11
def sendResponse(response: RequestChannel.Response) {
if (isTraceEnabled) {
// 判断响应类型并打印日志(略)
}

val processor = processors.get(response.processor)
// 如果 processor 已经关闭了, 可能会被移出 processors (此时返回null), 因此直接丢掉响应
if (processor != null) {
processor.enqueueResponse(response)
}
}

值得注意的是这里对processor != null的判断,Kafka 1.1.0的SocketServer支持resizeThreadPoll()方法来改变网络线程数量(也就是Acceptor对应Processor的数量),如果网络线程数减少的话,那么多出的Processor会调用shutdown()方法关闭,并通过connectionId将其从AcceptorRequestChannel中的processors字段移除。

继续找到该方法的调用处,位于KafkaApissendResponse()方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private def sendResponse(request: RequestChannel.Request, responseOpt: Option[AbstractResponse]): Unit = {
// 更新metrics(略)

responseOpt match {
case Some(response) =>
val responseSend = request.context.buildResponse(response)
val responseString =
if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.apiVersion))
else None
requestChannel.sendResponse(new RequestChannel.Response(request, Some(responseSend), SendAction, responseString))
case None =>
requestChannel.sendResponse(new RequestChannel.Response(request, None, NoOpAction, None))
}
}

此时RequestChannel只是将响应转发给了Processor,它本身并不维护响应队列(在0.10.0.1版本中则是维护了多个响应队列),真正维护响应队列的是Processor本身。

总结

关于Kafka网络层的阅读至此就告一段落,不得不说但作为Kafka的基础设施的Java NIO实现的部分更为复杂(KafkaChannelKafkaSelector),但阅读源码不应太陷入细节(感觉我已经有些陷进去了……)。对于Kafka,最重要的还是它的业务层,也就是对Kafka协议的实现。

RequestChannel的作用很简单:

  • 维护请求队列,接收来自Processor的请求,并转发给KafkaRequestHandler进行处理;
  • KafkaApis获取响应,发送给Processor

加上文章开始总结的Acceptor/Processor,以及统筹全局的SocketServer,构成了Kafka的网络层,简单描述:

1
2
3
4
5
|             Network Layer                 |       API Layer        |
| Acceptor -> Processors <-> RequestChannel | -> KafkaRequestHandler |
| | <- KafkaApis |
| Client -> SocketChannel -> Acceptor | |
| Client <-> KafkaChannel <-> Processor | |

可以发现不管是什么Channel,都是起到了连接的作用。Acceptor通过最简单的SocketChannel与监听套接字连接,监听连接事件,并将接受的连接转发给Processor,之后Processor通过比较复杂的KafkaChannel与客户端连接,监听读/写事件并和客户端进行数据的交互。

数据分为来自客户端的请求和来自服务端的响应,Processor不负责这部分,而是通过RequestChannel将请求发送给KafkaRequestHandler,再从RequestChannel接收响应。

问题来了,而实际发送响应给RequestChannel的却是KafkaApis,因此请求=>响应的过程是由它们共同完成的,也就是接下来要阅读的API层。