回顾
之前阅读了网络层和API层,在阅读API层支持的Kafka协议之前,首先得明确Kafka的消息概念。Kafka服务器被称为broker,与其交互的是客户端,分为生产者(Producer)和消费者(Consumer),客户端与服务端通过消息进行交互。
Kafka使用日志文件(下文称为消息日志)来保存消息,通过log.dirs配置指定日志文件的存放目录。注意这里的日志文件不同于Kafka本身的日志(记录运行时的一些信息)。而对于每个分区,都会在log.dirs下创建一个子目录来存放消息日志,其命名为<topic>-<partition>,在该目录下会有像这样的文件:
1 | $ ls |
同一分区的不同消息是通过offset来唯一标识的,注意它并不是消息在消息日志中实际存储位置的偏移量,而是类似id一样的概念,从0开始递增,表示分区内第offset条消息。
消息日志的命名规则是[baseOffset].log,比如这里的20就是该日志的第baseOffset,即消息日志中的第1条消息的offset。相应地,有同名的.index文件,为消息建立了索引方便查询消息,但并没有对每条消息都建立了索引。
因此首先看看Kafka的消息实现,即message包,本文主要讲Message类。
消息格式
Message类的注释给出了格式说明,如下图所示:
1 | 字节数 | 4 | 1 | 1 | 8 | 4 | K | 4 | V | |
补充说明:
magic代表消息格式,其值为0代表v0,为1代表v1;- v0版本的消息使用绝对offset,且不包含
timestamp字段,attribute第3位不使用; - v1版本的消息使用相对offset,且包含
timestamp字段,attribute第3位为时间戳类型; K是字段keylen的值,V是字段len的值。
外部消息和内部消息
看看消息的主构造器
1 | class Message(val buffer: ByteBuffer, |
buffer:消息的字节缓冲区;wrapperMessageTimestamp:外部消息的时间戳;wrapperMessageTimestampType:外部消息的时间戳类型;
这里的wrapperMessage指的是外部消息,因为Kafka会对多个消息一起进行压缩提高压缩率,所以将N个消息压缩后的消息称为外部消息,而这N个消息则称为内部消息。
1 | 外部消息offset | 100 | 105 | 106 | 107 | ... |
这样做是因为生产者对一批消息压缩时,它是不知道消息的offset时(因为offset是由broker指定的),所以就简单地将offset字段从0开始依次递增来设置。
而broker在收到这批消息时,它知道前1个消息的offset(比如在这里就是100),也知道生产者发送过来的这批消息的数量(5),那么下一个外部消息的offset就被设置为100+5=105。
消费者取得的是外部消息,当消费者通过解压得到每个消息时,可以用外部offset和内部offset计算出内部消息的绝对offset(101~105)。
getter方法
对这种基于字节的消息协议的实现很简单,利用ByteBuffer对象存储字节序列,然后用伴生对象的常量来指定某个字段的长度(length)和偏移量(offset),从而通过其字节区间[offset, offset + length)访问该字段。
再次注意:这里提到的偏移量指的是字节在缓冲区中的位置,不同于消息的offset。
在Message伴生对象中定义一系列常量来记录各字段的offset和length:
1 | object Message { |
然后对于crc和magic这种整型字段的getter方法直接调用ByteBuffer.getInt(index)方法即可(注意不能用getInt()方法,因为它是从内部position开始读的)
1 | def checksum: Long = ByteUtils.readUnsignedInt(buffer, CrcOffset) |
对于多字节的字段crc,使用的是Java类ByteUtils的相关方法,将多个字节转换成目标整型,实际上还是首先调用ByteBuffer的getXXX(index)方法
1 | public static long readUnsignedInt(ByteBuffer buffer, int index) { |
对于key和payload这种运行期才确定长度的字段,其编码方式是用户自定义的,所以只需要返回一个ByteBuffer即可,具体编解码应该在客户端进行:
1 | def payload: ByteBuffer = sliceDelimited(payloadSizeOffset) |
时间戳
Kafka的消息格式在0.10.0的一个重要变化是加入了时间戳字段,见upgrade to 0.10.0.0,为了保持旧消息的兼容,才有了magic标识是否使用时间戳,并且支持对API版本的请求:Retrieving Supported API versions。值得一看的是timestamp的getter:
1 | def timestamp: Long = { |
后2种Case都代表当前消息是内部消息,也就是说和其他内部消息一起被压缩了,只有时间戳类型为LogAppendTime时才使用外部消息的时间戳。
辅助构造器
一般不会直接传入ByteBuffer,而是传入消息协议的各个字段来构造,也就是辅助构造器
1 | def this(bytes: Array[Byte], |
其他辅助构造器都是基于这个辅助构造器构造的,代码就不一一贴出。
Record类
class Message的asRecord()方法和object Message的fromRecord()方法提供了Message类和Java的LegacyRecord类的互相转化:
1 | // object Message |
1 | // class Message |
可见两者的构造器完全一致,其实去看实现的话大多数方法也是一致的。只不过Record提供了一系列write()方法可以将内部存储的字节写入到DataOutputStream类中,而Message本身没有,因此要将Message写入数据流时需要调用asRecord转换成Record对象再调用write()方法。
此外LegacyRecord还提供了writeCompressedRecordHeader()方法在创建消息集(Message Set)时会使用,到时候再去阅读。
总结
本文内容比较简单,主要是阅读Kafka的消息格式的实现,Java/Scala使用ByteBuffer来实现基于字节的消息协议。Kafka在0.10.0中做出了较大改变,添加了时间戳字段,因此使用了magic字段来区分不同版本的消息。最后,Scala类Message也提供了与Java类LegacyRecord的转换方法,从而实现向数据流的写入。