前言
今天正式开始阅读Kafka源码,作为阅读笔记的第一篇,先简单地介绍下背景。
阅读的Kafka版本是1.1.0,服务端源码在core.main.scala.kafka目录下,该目录下的源码文件仅有Kafka.scala,也就是服务端的启动入口,其他的若干个模块都阻止在各子目录下,这里首先阅读的是网络层,也就是network子目录下的代码。
阅读思路是直接看公用方法,然后再给一些逻辑以及用到的字段作注释,否则单看某些字段不看语境也不知道做什么。注释里会给英文两边加空格,逗号也使用英文逗号,方便vim快捷键按词前进/后退。
使用Intellij Idea阅读的,之前用得比较少,也很折腾了下配置过程,记录一些阅读源码的方法:
- 光标选中+单击鼠标左键:跳转至变量/函数定义处;
- Navigate菜单栏的Back和Forward,快捷键是
Ctrl+Alt+Left/Right:后退/前进到前/后一次阅读的地方,一般时配合跳转功能回退; - 光标选中+鼠标右键,选择Find Usages,快捷键是
Alt+Shift+F7:查看变量/函数所有使用的地方; - 快捷键
Alt+F7:查看类的所有字段和方法。
对应我阅读C/C++源码时vim的Ctrl+J(YCM)/Ctrl+](ctags)跳转,Ctrl+O回退,LeaderF查看类的字段和方法。之前vim一直没配置查找所有调用处的功能,一直是手动写个简单脚本用egrep在当前目录下递归搜关键词的……
不过IDE优点就是功能更大更全,上手新语言时直接使用,不必每接触一门语言旧学习怎么定制功能。
SocketServer
注释表明了它是一个NIO套接字服务器,其线程模型是:
- 1个Acceptor线程处理新连接;
- Acceptor有N个Processor线程,其中每个都有自己的Selector从套接字中读取请求;
- M个Handler线程,处理请求,并将回应发给Processor线程用于写入。
SocketServer的主构造器有以下参数:
1 | val config: KafkaConfig // 配置文件 |
混入了Logging和KafkaMetricsGroup特质,后者继承自前者,但并没重写info()等日志方法,而是将不同类型的metric(度量指标)给组织起来,提供了各种metric的工厂方法。
对于日志,设置了类相关的前缀表明broker id:
1 | private val logContext = new LogContext(s"[SocketServer brokerId=${config.brokerId}] ") |
涉及到的一些字段,我添上了注释并按相关度整合了下:
1 | // 每个ip的最大连接数, 配置: max.connections.per.ip |
服务器启动源码分析
1 | def startup() { |
再看看 Acceptor 和 Processor 的创建:
1 | private def createAcceptorAndProcessors(processorsPerListener: Int, |
对每个Acceptor,会创建多个Processor,类似地,也存入ConcurrentHashMap中:
1 | private def addProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = synchronized { |
RequestChannel/Acceptor/Processor的字段
从上述代码可知,重点是RequestChannel/Acceptor/Processor这3个类型,于是现在看看它们创建时除去传入构造器的参数外初始化的其他字段(依然忽略metrics相关的)。
首先是RequestChannel的字段:
1 | // 创建了固定长度的请求队列,queueSize由 queued.max.requests 决定 |
此外,配置是使用kafka.server.KafkaConfig类实现的,默认配置在伴生对象kafka.server.Defaults中(比如默认的queueSize为500),并在KafkaConfig的伴生对象的configDef字段创建时加载。
再就是Acceptor的字段:
1 | // NIO Selector, 用于注册 connect, read, write 等事件,并将事件分发给 Acceptor, Processors |
其中 NSelector 就是 Selector 的别名:
1 | import java.nio.channels.{Selector => NSelector} |
最后是Processor的字段:
1 | // SocketChannel 的并发队列, 用于管理 Acceptor 分配的socket连接 |
不得不说这里为何要使用 ConcurrentLinkedQueue 和 LinkedBlockingDeque 还是不清楚,但还是先不要在意细节,注意这里保存了2份Response,一个只是临时缓存处理后的响应,另一个则是真正待发送的响应,因为用key记录了连接信息:
1 | // 作为 inflightResponses 的key, 记录了本地地址/远程地址, 以及连接对应的索引 |
服务器启动总结
总结下来,启动SocketServer时其实就是根据配置参数创建了1个RequestChannel,M个Acceptor,M*N个Processor。其中M是监听地址的数量,N是num.network.thread配置的Acceptor对应的Processors的数量。
每个监听地址除了ip和port外,还有协议类型和名称,这些共同组成了EndPoint类。
SocketServer保存EndPoint到Acceptor的映射和Processor.id到Processor的映射;requestChannel持有M*N个Processor的id到其自身的映射;- 每个
Acceptor持有1个Selector; - 每个
Acceptor持有1个监听EndPoint的ServerSocketChannel; - 每个
Acceptor持有N个Processor组成的数组; - 每个
Processor持有1个Selector(Kafka自己实现的Selectable接口); - 每个
Processor持有一组socket连接; acceptors和processors都启动了线程(供M*(N+1)个)构成了整个网络层的处理。
Kafka的网络层是使用Reactor模式的,使用了Java NIO,所有的socket读写都是非阻塞模式,具体框架可以参考《Apacha Kafka源码剖析》一书,我目前也是照着这本书的思路去看源码。
不过对Java NIO不熟悉,虽然看了眼核心还是分发事件的Selector(I/O多路复用),但是封装得比较好。抽空去看看。
网络层运转的核心还是Acceptor和Processor的线程函数,也就是这2个类的run()方法,也是接下来要读的部分。
为什么使用ConcurrentHashMap?
《Apacha Kafka源码剖析》书中使用的是Kafka 0.10.0.1版本,其中acceptors和processors的类型是:
1 | private val processors = new Array[Processor](totalProcessorThreads) |
而1.1.0版本就都用ConcurrentHashMap来保存了,看源码时我也在想为什么不用数组去存processors,因为key就是从0到N-1。搜了下这个结构在Java 8用了不同于7的实现,抽空去看看。
然后看到了addListeners/removeListeners方法,前者根据新的Seq[EndPoint]重新创建acceptors和processors,后者则将指定的Seq[EndPoint]对应的Acceptor从acceptors中删除。而这两个方法在0.10.0.1版本中没有,所以就能用固定长度的数组来保存processors,也能用不支持并发访问的mutable.Map来保存acceptors。
不过还有个不明白的地方,看到直接访问acceptors和processors的都是SocketServer内部,而除了boundPort()方法和stopProcessingRequests()外,所在访问它们的方法都直接用synchronized关键字保护了,而boundPort()方法仅在xxxTest.scala中被调用了,这样的话使用ConcurrentHashMap是否必要?