回顾
之前阅读了网络层和API层,在阅读API层支持的Kafka协议之前,首先得明确Kafka的消息概念。Kafka服务器被称为broker,与其交互的是客户端,分为生产者(Producer)和消费者(Consumer),客户端与服务端通过消息进行交互。
Kafka使用日志文件(下文称为消息日志)来保存消息,通过log.dirs
配置指定日志文件的存放目录。注意这里的日志文件不同于Kafka本身的日志(记录运行时的一些信息)。而对于每个分区,都会在log.dirs
下创建一个子目录来存放消息日志,其命名为<topic>-<partition>
,在该目录下会有像这样的文件:
1 | $ ls |
同一分区的不同消息是通过offset来唯一标识的,注意它并不是消息在消息日志中实际存储位置的偏移量,而是类似id一样的概念,从0开始递增,表示分区内第offset条消息。
消息日志的命名规则是[baseOffset].log
,比如这里的20就是该日志的第baseOffset,即消息日志中的第1条消息的offset。相应地,有同名的.index
文件,为消息建立了索引方便查询消息,但并没有对每条消息都建立了索引。
因此首先看看Kafka的消息实现,即message
包,本文主要讲Message
类。
消息格式
Message
类的注释给出了格式说明,如下图所示:
1 | 字节数 | 4 | 1 | 1 | 8 | 4 | K | 4 | V | |
补充说明:
magic
代表消息格式,其值为0代表v0,为1代表v1;- v0版本的消息使用绝对offset,且不包含
timestamp
字段,attribute
第3位不使用; - v1版本的消息使用相对offset,且包含
timestamp
字段,attribute
第3位为时间戳类型; K
是字段keylen
的值,V
是字段len
的值。
外部消息和内部消息
看看消息的主构造器
1 | class Message(val buffer: ByteBuffer, |
buffer
:消息的字节缓冲区;wrapperMessageTimestamp
:外部消息的时间戳;wrapperMessageTimestampType
:外部消息的时间戳类型;
这里的wrapperMessage
指的是外部消息,因为Kafka会对多个消息一起进行压缩提高压缩率,所以将N个消息压缩后的消息称为外部消息,而这N个消息则称为内部消息。
1 | 外部消息offset | 100 | 105 | 106 | 107 | ... |
这样做是因为生产者对一批消息压缩时,它是不知道消息的offset时(因为offset是由broker指定的),所以就简单地将offset字段从0开始依次递增来设置。
而broker在收到这批消息时,它知道前1个消息的offset(比如在这里就是100),也知道生产者发送过来的这批消息的数量(5),那么下一个外部消息的offset就被设置为100+5=105。
消费者取得的是外部消息,当消费者通过解压得到每个消息时,可以用外部offset和内部offset计算出内部消息的绝对offset(101~105)。
getter方法
对这种基于字节的消息协议的实现很简单,利用ByteBuffer
对象存储字节序列,然后用伴生对象的常量来指定某个字段的长度(length)和偏移量(offset),从而通过其字节区间[offset, offset + length)访问该字段。
再次注意:这里提到的偏移量指的是字节在缓冲区中的位置,不同于消息的offset。
在Message
伴生对象中定义一系列常量来记录各字段的offset和length:
1 | object Message { |
然后对于crc
和magic
这种整型字段的getter方法直接调用ByteBuffer.getInt(index)
方法即可(注意不能用getInt()
方法,因为它是从内部position开始读的)
1 | def checksum: Long = ByteUtils.readUnsignedInt(buffer, CrcOffset) |
对于多字节的字段crc
,使用的是Java类ByteUtils
的相关方法,将多个字节转换成目标整型,实际上还是首先调用ByteBuffer
的getXXX(index)
方法
1 | public static long readUnsignedInt(ByteBuffer buffer, int index) { |
对于key
和payload
这种运行期才确定长度的字段,其编码方式是用户自定义的,所以只需要返回一个ByteBuffer
即可,具体编解码应该在客户端进行:
1 | def payload: ByteBuffer = sliceDelimited(payloadSizeOffset) |
时间戳
Kafka的消息格式在0.10.0的一个重要变化是加入了时间戳字段,见upgrade to 0.10.0.0,为了保持旧消息的兼容,才有了magic
标识是否使用时间戳,并且支持对API版本的请求:Retrieving Supported API versions。值得一看的是timestamp
的getter:
1 | def timestamp: Long = { |
后2种Case都代表当前消息是内部消息,也就是说和其他内部消息一起被压缩了,只有时间戳类型为LogAppendTime
时才使用外部消息的时间戳。
辅助构造器
一般不会直接传入ByteBuffer
,而是传入消息协议的各个字段来构造,也就是辅助构造器
1 | def this(bytes: Array[Byte], |
其他辅助构造器都是基于这个辅助构造器构造的,代码就不一一贴出。
Record类
class Message
的asRecord()
方法和object Message
的fromRecord()
方法提供了Message
类和Java的LegacyRecord
类的互相转化:
1 | // object Message |
1 | // class Message |
可见两者的构造器完全一致,其实去看实现的话大多数方法也是一致的。只不过Record
提供了一系列write()
方法可以将内部存储的字节写入到DataOutputStream
类中,而Message
本身没有,因此要将Message
写入数据流时需要调用asRecord
转换成Record对象再调用write()
方法。
此外LegacyRecord
还提供了writeCompressedRecordHeader()
方法在创建消息集(Message Set)时会使用,到时候再去阅读。
总结
本文内容比较简单,主要是阅读Kafka的消息格式的实现,Java/Scala使用ByteBuffer
来实现基于字节的消息协议。Kafka在0.10.0中做出了较大改变,添加了时间戳字段,因此使用了magic
字段来区分不同版本的消息。最后,Scala类Message
也提供了与Java类LegacyRecord
的转换方法,从而实现向数据流的写入。