AbstractServerThread Acceptor
和Processor
的抽象基类,封装了一些辅助的变量和方法(这里重新组织了下代码顺序):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 private val startupLatch = new CountDownLatch (1 )@volatile private var shutdownLatch = new CountDownLatch (0 )private val alive = new AtomicBoolean (true )protected def isRunning : Boolean = alive.getdef shutdown (): Unit = { if (alive.getAndSet(false )) wakeup() shutdownLatch.await() } def awaitStartup (): Unit = startupLatch.awaitprotected def startupComplete (): Unit = { shutdownLatch = new CountDownLatch (1 ) startupLatch.countDown() } protected def shutdownComplete (): Unit = shutdownLatch.countDown()def close (channel: SocketChannel ): Unit = { if (channel != null ) { debug("Closing connection from " + channel.socket.getRemoteSocketAddress()) connectionQuotas.dec(channel.socket.getInetAddress) CoreUtils .swallow(channel.socket().close(), this , Level .ERROR ) CoreUtils .swallow(channel.close(), this , Level .ERROR ) } }
Acceptor.run() 由于循环嵌套还是有点深的,先忽略对Channels的处理部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 def run () { serverChannel.register(nioSelector, SelectionKey .OP_ACCEPT ) startupComplete() try { var currentProcessor = 0 while (isRunning) { try { val ready = nioSelector.select(500 ) if (ready > 0 ) { } } catch { case e: ControlThrowable => throw e case e: Throwable => error("Error occurred" , e) } } } finally { debug("Closing server socket and selector." ) CoreUtils .swallow(serverChannel.close(), this , Level .ERROR ) CoreUtils .swallow(nioSelector.close(), this , Level .ERROR ) shutdownComplete() } }
外层try-finally
块没有catch
,也就是说一切异常都在while
循环体内进行处理,循环体内则是一个大的try-catch
,注意重抛ControlThrowable
的手法,可以参考Scala 2.13 ControlThrowable 和Scala 2.12 ControlThrowable 。
Selector
的处理和Linux的epoll_wait
如出一辙,所以这里还是很熟悉的,不同的是没有处理ready <= 0
的情况,接口文档里写的是
@return The number of keys, possibly zero, whose ready-operation sets were update
select()
方法不会返回负值,像epoll_wait
返回-1的情况,Selector
是直接抛出异常了,文档里也写了3种异常:
IOException
: If an I/O error occurs;
ClosedSelectorException
: If this selector is closed;
IllegalArgumentException
: If the value of the timeout argument is negative.
接下来看ready > 0
时的代码,也是核心的处理逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 try { val key = iter.next iter.remove() if (key.isAcceptable) { val processor = synchronized { currentProcessor = currentProcessor % processors.size processors(currentProcessor) } accept(key, processor) } else throw new IllegalStateException ("Unrecognized key state for acceptor thread." ) currentProcessor = currentProcessor + 1 } catch { case e: Throwable => error("Error while accepting connection" , e) }
思路很简单,就是用round robin算法简单做下负载均衡,调用accept()
方法将key对应的连接分配给指定processor,因此核心其实是accept()
方法。
PS:一个细节,外层catch
处理了ControlThrowable
,而内层catch
并没处理,因为该异常是实现流程控制的,在迭代器到达末尾时才会抛出该异常,所以迭代循环中不会抛出该异常。另一个细节,这里每次迭代都把迭代器移除,这里大概是Java不会像C++一样,对象销毁的时候自动析构吧,而且Jave的Set
移除迭代器之后不影响继续遍历。
看看accept()
的实现(删掉了日志语句):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def accept (key: SelectionKey , processor: Processor ) { val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel ] val socketChannel = serverSocketChannel.accept() try { connectionQuotas.inc(socketChannel.socket().getInetAddress) socketChannel.configureBlocking(false ) socketChannel.socket().setTcpNoDelay(true ) socketChannel.socket().setKeepAlive(true ) if (sendBufferSize != Selectable .USE_DEFAULT_BUFFER_SIZE ) socketChannel.socket().setSendBufferSize(sendBufferSize) processor.accept(socketChannel) } catch { case e: TooManyConnectionsException => close(socketChannel) } }
跟socket编程里一样的套路,只不过检查了同一个IP的最大连接数是否超限,并且给表示连接的socket设置了一些选项,然后实际上还是调用了Processor.accept()
方法:
1 2 3 4 def accept (socketChannel: SocketChannel ) { newConnections.add(socketChannel) wakeup() }
这里就很简单了,把配置好的SocketChannel
给加入Processor
内部的并发队列newConnections
中,其类型前一篇提过,是ConcurrentLinkedQueue
。
Acceptor.run()总结 抛开一些程序设计上的细节性知识,其实Acceptor
线程的逻辑就是:
循环,从Selector
中等待I/O事件就绪;
遍历所有的I/O事件,将isAcceptable
的套接字取出,并调用socket的accept()
取得新连接;
检查最大连接数,没超限的话进行一些socket选项配置;
将配置后的socket存入Processor
的内部队列中。
可以看到Acceptor
仅仅做了中介的作用,它是直接和客户端的连接请求打交道的,将成功的连接处理后传递给Processor
,这样Processor
就可以专心去处理网络数据的读写。
另一方面,我们可以看到Channel
(在这里是SocketChannel
类)其实就是对socket句柄的封装。
Processor.run() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 override def run () { startupComplete() try { while (isRunning) { try { configureNewConnections() processNewResponses() poll() processCompletedReceives() processCompletedSends() processDisconnected() } catch { case e: Throwable => processException("Processor got uncaught exception." , e) } } } finally { shutdownComplete() } }
只用照着try
作用域内的方法一个个地看下来就行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private def configureNewConnections () { while (!newConnections.isEmpty) { val channel = newConnections.poll() try { selector.register(connectionId(channel.socket), channel) } catch { case e: Throwable => close(channel) } } }
可见Acceptor
仅仅将表示连接的SocketChannel
交给Processor
,而Processor
则会为其注册读事件,同时交给selector
管理时会将其包装为KafkaChannel
,这个包装过程是由ChannelBuilder
接口完成的,而接口指向的实际对象是在Processor.createSelector()
中ChannelBuilders.serverChannelBuilder()
方法创建的,对PLAINTEXT
协议,即PlaintextChannelBuilder
,其buildChannel()
方法的调用和实现依次为:
1 2 3 4 5 6 KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool);key.attach(channel);
1 2 3 4 5 6 7 8 9 10 11 @Override public KafkaChannel buildChannel (String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException { try { PlaintextTransportLayer transportLayer = new PlaintextTransportLayer (key); PlaintextAuthenticator authenticator = new PlaintextAuthenticator (configs, transportLayer); return new KafkaChannel (id, transportLayer, authenticator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE); } catch (Exception e) { } }
1 2 3 4 public PlaintextTransportLayer (SelectionKey key) throws IOException { this .key = key; this .socketChannel = (SocketChannel) key.channel(); }
可以发现key
和channel: SocketChannel
被存到了KafkaChannel.transportLayer
字段中,因此在后面的源码中,给KafkaChannel
注册和取消读/写事件到Selector
上时是使用transportLayer
的addInterestOps()
和removeInterestOps()
方法:
1 2 3 4 5 6 7 8 9 @Override public void addInterestOps (int ops) { key.interestOps(key.interestOps() | ops); } @Override public void removeInterestOps (int ops) { key.interestOps(key.interestOps() & ~ops);
其实也就是调用了SelectionKey
的interestOps()
方法,不过包装了位运算|
和&~
来表示添加和移除。
2. processNewResponses 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private def processNewResponses () { var curr: RequestChannel .Response = null while ({curr = dequeueResponse(); curr != null }) { val channelId = curr.request.context.connectionId try { curr.responseAction match { case RequestChannel .NoOpAction => updateRequestMetrics(curr) trace("Socket server received empty response to send, registering for read: " + curr) openOrClosingChannel(channelId).foreach(c => selector.unmute(c.id)) case RequestChannel .SendAction => val responseSend = curr.responseSend.getOrElse( throw new IllegalStateException (s"responseSend must be defined for SendAction, response: $curr " )) sendResponse(curr, responseSend) case RequestChannel .CloseConnectionAction => updateRequestMetrics(curr) trace("Closing socket connection actively according to the response code." ) close(channelId) } } catch { } } }
可以看到,Processor
仅仅是对缓存在responseQueue
中的响应进行处理,但是从请求到响应的转换并不是它的工作,查找了responseQueue
的使用地方,可以看到实际上响应是由RequestChannel.sendResponse()
方法发送过来的,更上一层,是KafkaApis.sendResponse()
方法调用该方法,因此实际上是KafkaApis
(位于kafka.server
包内)完成对请求的处理。
至于updateRequestMetrics()
方法和异常处理的部分我们不再关心。
3. poll 1 2 3 4 5 6 7 8 9 private def poll () { try selector.poll(300 ) catch { case e @ (_: IllegalStateException | _: IOException ) => error(s"Processor $id poll failed due to illegal state or IO exception" ) } }
关键是selector.poll()
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 @Override public void poll (long timeout) throws IOException { if (timeout < 0 ) throw new IllegalArgumentException ("timeout should be >= 0" ); boolean madeReadProgressLastCall = madeReadProgressLastPoll; clear(); boolean dataInBuffers = !keysWithBufferedRead.isEmpty(); if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty() || (madeReadProgressLastCall && dataInBuffers)) timeout = 0 ; if (!memoryPool.isOutOfMemory() && outOfMemory) { log.trace("Broker no longer low on memory - unmuting incoming sockets" ); for (KafkaChannel channel : channels.values()) { if (channel.isInMutableState() && !explicitlyMutedChannels.contains(channel)) { channel.unmute(); } } outOfMemory = false ; } long startSelect = time.nanoseconds(); int numReadyKeys = select(timeout); long endSelect = time.nanoseconds(); this .sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) { Set<SelectionKey> readyKeys = this .nioSelector.selectedKeys(); if (dataInBuffers) { keysWithBufferedRead.removeAll(readyKeys); Set<SelectionKey> toPoll = keysWithBufferedRead; keysWithBufferedRead = new HashSet <>(); pollSelectionKeys(toPoll, false , endSelect); } pollSelectionKeys(readyKeys, false , endSelect); readyKeys.clear(); pollSelectionKeys(immediatelyConnectedKeys, true , endSelect); immediatelyConnectedKeys.clear(); } else { madeReadProgressLastPoll = true ; } long endIo = time.nanoseconds(); this .sensors.ioTime.record(endIo - endSelect, time.milliseconds()); maybeCloseOldestConnection(endSelect); addToCompletedReceives(); }
这部分继续深究的话比较复杂,Kafka在这方面考虑了不少,上述分析中对一些字段也只是简单地提了下,到此为止。总之,最重要的是直到poll()
会填充Selector
内部维护的已完成接收 /已完成发送 /已断开 的Channel
,以便之后处理。
PS:在处理完成的发送时,在调用send()
向socket写入数据的同时取消监听对应Channel
的OP_WRITE
事件:
1 2 3 4 5 6 7 8 9 private boolean send(Send send) throws IOException { send.writeTo(transportLayer); if (send.completed()) transportLayer.removeInterestOps(SelectionKey .OP_WRITE ); return send.completed(); }
4. processCompletedReceives 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private def processCompletedReceives () { selector.completedReceives.asScala.foreach { receive => try { openOrClosingChannel(receive.source) match { case Some (channel) => val header = RequestHeader .parse(receive.payload) val context = new RequestContext (header, receive.source, channel.socketAddress, channel.principal, listenerName, securityProtocol) val req = new RequestChannel .Request (processor = id, context = context, startTimeNanos = time.nanoseconds, memoryPool, receive.payload, requestChannel.metrics) requestChannel.sendRequest(req) selector.mute(receive.source) case None => } } catch { } } }
4. processCompleteSends 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private def processCompletedSends () { selector.completedSends.asScala.foreach { send => try { val resp = inflightResponses.remove(send.destination).getOrElse { throw new IllegalStateException (s"Send for ${send.destination} completed, but not in `inflightResponses`" ) } updateRequestMetrics(resp) selector.unmute(send.destination) } catch { } }
5. processDisconnected 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private def processDisconnected () { selector.disconnected.keySet.asScala.foreach { connectionId => try { val remoteHost = ConnectionId .fromString(connectionId).getOrElse { throw new IllegalStateException (s"connectionId has unexpected format: $connectionId " ) }.remoteHost inflightResponses.remove(connectionId).foreach(updateRequestMetrics) connectionQuotas.dec(InetAddress .getByName(remoteHost)) } catch { } } }
Processor.run()总结 Processor
使用了Kafka自己实现的Selector
(别名为KSelector
),比Acceptor
使用的NIO默认的Selector
(别名为NSelector
)有更多的功能,因为Processor
要维护监听socket的读/写事件状态,即OP_READ
和OP_WRITE
。
一些具体的实现在org.apache.kafka.common
的network
包和request
包中(Java实现),这里暂时不细看。
归结其流程为:
从将Acceptor
收到的新连接全部注册OP_READ
事件,因为Kafka服务端不主动向客户端发送请求,只被动响应客户端的请求;
根据响应类型处理缓存的响应:NoOpAction
=>重新注册Channel
的读事件,SendAction
=>注册Channel
的写事件,将响应缓存,并交由RequestChannel
发送,CloseConnectionAction
=>关闭Channel
;
轮询Selector
得到就绪的I/O事件(可读/可写/断开);
对所有完成接收的数据(请求),封装后给RequestChannel
发送;
对所有完成发送的数据(响应),从缓存中移除,并重新监听对应Channel
的读事件;
对所有断开的连接,更新connectionQuotas
维护的网络地址=>连接数的映射。
Processor
本身只是做完成读/写/断开三种事件的处理,发送和接收实际上都是通过RequestChannel
。至于Processor
是由SocketServer.newProcessor()
方法创建的,其内部的requestChannel
字段就是SocketServer
的同名字段。
因此,接下来就是阅读RequestChannel
。