Kafka源码阅读05-消息协议阅读之Message

回顾

之前阅读了网络层和API层,在阅读API层支持的Kafka协议之前,首先得明确Kafka的消息概念。Kafka服务器被称为broker,与其交互的是客户端,分为生产者(Producer)消费者(Consumer),客户端与服务端通过消息进行交互。

Kafka使用日志文件(下文称为消息日志)来保存消息,通过log.dirs配置指定日志文件的存放目录。注意这里的日志文件不同于Kafka本身的日志(记录运行时的一些信息)。而对于每个分区,都会在log.dirs下创建一个子目录来存放消息日志,其命名为<topic>-<partition>,在该目录下会有像这样的文件:

1
2
$ ls
00000000000000000020.index 00000000000000000020.log 00000000000000000020.timeindex leader-epoch-checkpoint

同一分区的不同消息是通过offset来唯一标识的,注意它并不是消息在消息日志中实际存储位置的偏移量,而是类似id一样的概念,从0开始递增,表示分区内第offset条消息。

消息日志的命名规则是[baseOffset].log,比如这里的20就是该日志的第baseOffset,即消息日志中的第1条消息的offset。相应地,有同名的.index文件,为消息建立了索引方便查询消息,但并没有对每条消息都建立了索引。

因此首先看看Kafka的消息实现,即message包,本文主要讲Message类。

消息格式

Message类的注释给出了格式说明,如下图所示:

1
2
3
4
5
字节数 |   4   |   1   |     1     |     8     |   4    |  K  |  4  |    V    |
字段名 | CRC32 | magic | attribute | timestamp | keylen | key | len | payload |
/ \
| 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
0~2: 压缩类型;3: 时间戳类型

补充说明:

  • magic代表消息格式,其值为0代表v0,为1代表v1;
  • v0版本的消息使用绝对offset,且不包含timestamp字段,attribute第3位不使用;
  • v1版本的消息使用相对offset,且包含timestamp字段,attribute第3位为时间戳类型;
  • K是字段keylen的值,V是字段len的值。

外部消息和内部消息

看看消息的主构造器

1
2
3
class Message(val buffer: ByteBuffer,
private val wrapperMessageTimestamp: Option[Long] = None,
private val wrapperMessageTimestampType: Option[TimestampType] = None)
  • buffer:消息的字节缓冲区;
  • wrapperMessageTimestamp:外部消息的时间戳;
  • wrapperMessageTimestampType:外部消息的时间戳类型;

这里的wrapperMessage指的是外部消息,因为Kafka会对多个消息一起进行压缩提高压缩率,所以将N个消息压缩后的消息称为外部消息,而这N个消息则称为内部消息

1
2
3
外部消息offset | 100 |     105    | 106 | 107 | ...
/ \
内部消息offset | 0 | 1 | 2 | 3 | 4 |

这样做是因为生产者对一批消息压缩时,它是不知道消息的offset时(因为offset是由broker指定的),所以就简单地将offset字段从0开始依次递增来设置。

而broker在收到这批消息时,它知道前1个消息的offset(比如在这里就是100),也知道生产者发送过来的这批消息的数量(5),那么下一个外部消息的offset就被设置为100+5=105。

消费者取得的是外部消息,当消费者通过解压得到每个消息时,可以用外部offset和内部offset计算出内部消息的绝对offset(101~105)。

getter方法

对这种基于字节的消息协议的实现很简单,利用ByteBuffer对象存储字节序列,然后用伴生对象的常量来指定某个字段的长度(length)和偏移量(offset),从而通过其字节区间[offset, offset + length)访问该字段。

再次注意:这里提到的偏移量指的是字节在缓冲区中的位置,不同于消息的offset。

Message伴生对象中定义一系列常量来记录各字段的offset和length:

1
2
3
4
5
6
7
8
object Message {
val CrcOffset = 0
val CrcLength = 4
val MagicOffset = CrcOffset + CrcLength
val MagicLength = 1
val AttributesOffset = MagicOffset + MagicLength
// ...
}

然后对于crcmagic这种整型字段的getter方法直接调用ByteBuffer.getInt(index)方法即可(注意不能用getInt()方法,因为它是从内部position开始读的)

1
2
def checksum: Long = ByteUtils.readUnsignedInt(buffer, CrcOffset)
def magic: Byte = buffer.get(MagicOffset)

对于多字节的字段crc,使用的是Java类ByteUtils的相关方法,将多个字节转换成目标整型,实际上还是首先调用ByteBuffergetXXX(index)方法

1
2
3
public static long readUnsignedInt(ByteBuffer buffer, int index) {
return buffer.getInt(index) & 0xffffffffL;
}

对于keypayload这种运行期才确定长度的字段,其编码方式是用户自定义的,所以只需要返回一个ByteBuffer即可,具体编解码应该在客户端进行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def payload: ByteBuffer = sliceDelimited(payloadSizeOffset)
def key: ByteBuffer = sliceDelimited(keySizeOffset)

// 由于key字段长度是动态的,所以无法在object中定义payloadSizeOffset常量,而需要在类中取得key的长度后计算而出
private def payloadSizeOffset = {
if (magic == MagicValue_V0) KeyOffset_V0 + max(0, keySize)
else KeyOffset_V1 + max(0, keySize)
}

private def sliceDelimited(start: Int): ByteBuffer = {
val size = buffer.getInt(start) // 取得前4个字节表示的长度
if(size < 0) {
null
} else {
// 拷贝一份,防止影响buffer的position,注意这里的拷贝并没拷贝内部的字节序列
var b = buffer.duplicate()
// 跳过表示长度的4个字节,回顾协议,key和payload之前都有4个字节表示长度
b.position(start + 4)
// 取得buffer从position开始的子序列,并重置长度和position
// 之后b就是指向key或payload字段并且长度合适的ByteBuffer了
b = b.slice()
b.limit(size)
b.rewind
b
}
}

时间戳

Kafka的消息格式在0.10.0的一个重要变化是加入了时间戳字段,见upgrade to 0.10.0.0,为了保持旧消息的兼容,才有了magic标识是否使用时间戳,并且支持对API版本的请求:Retrieving Supported API versions。值得一看的是timestamp的getter:

1
2
3
4
5
6
7
8
9
10
11
12
13
def timestamp: Long = {
if (magic == MagicValue_V0) // v0版本的消息不使用时间戳
Message.NoTimestamp
// 对v1版本的消息,有以下3种Case:
// 1. 外部消息的时间戳及其类型都为None;
// 2. 外部消息的时间戳类型为LogAppendTime且时间戳非None;
// 3. 外部消息的时间戳类型为CreateTime且时间戳非None。
// Case 2
else if (wrapperMessageTimestampType.exists(_ == TimestampType.LOG_APPEND_TIME) && wrapperMessageTimestamp.isDefined)
wrapperMessageTimestamp.get
else // Case 1, 3
buffer.getLong(Message.TimestampOffset)
}

后2种Case都代表当前消息是内部消息,也就是说和其他内部消息一起被压缩了,只有时间戳类型为LogAppendTime时才使用外部消息的时间戳。

辅助构造器

一般不会直接传入ByteBuffer,而是传入消息协议的各个字段来构造,也就是辅助构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def this(bytes: Array[Byte], 
key: Array[Byte],
timestamp: Long,
timestampType: TimestampType,
codec: CompressionCodec,
payloadOffset: Int,
payloadSize: Int,
magicValue: Byte) = {
// 计算总长度,分配对应大小的ByteBuffer
this(ByteBuffer.allocate(Message.CrcLength +
Message.MagicLength +
Message.AttributesLength +
(if (magicValue == Message.MagicValue_V0) 0
else Message.TimestampLength) +
Message.KeySizeLength +
(if(key == null) 0 else key.length) +
Message.ValueSizeLength +
(if(bytes == null) 0
else if(payloadSize >= 0) payloadSize
else bytes.length - payloadOffset)))
// 验证magic和timestamp是否对应:
// 1. magic只能为0或1;
// 2. 时间戳必须为非负数或-1(代表不使用时间戳);
// 3. magic为0时timestamp必须为-1,因为v0版本不支持时间戳;
validateTimestampAndMagicValue(timestamp, magicValue)
// skip crc, we will fill that in at the end
// 跳过CRC字段,先填充后面的部分
buffer.position(MagicOffset)
buffer.put(magicValue) // 填充 magic
// 根据压缩类型和时间戳类型计算 attribute 并填充
val attributes: Byte = LegacyRecord.computeAttributes(magicValue, CompressionType.forId(codec.codec), timestampType)
buffer.put(attributes)
// Only put timestamp when "magic" value is greater than 0
if (magic > MagicValue_V0) // 仅当magic大于0才可以填充时间戳
buffer.putLong(timestamp)
if(key == null) { // key为空则将keylen填充为-1,代表key不存在
buffer.putInt(-1)
} else { // 否则填充 keylen 和 key
buffer.putInt(key.length)
buffer.put(key, 0, key.length)
}
// 类似key,若bytes为空,填充len为-1,否则填充 bytes 的指定部分
// payloadOffset指定起始偏移量,payloadSize指定填充字节数(若<0则填充偏移量之后的所有字节)
val size = if(bytes == null) -1
else if(payloadSize >= 0) payloadSize
else bytes.length - payloadOffset
buffer.putInt(size)
if(bytes != null)
buffer.put(bytes, payloadOffset, size)
buffer.rewind() // 重置position为初始,以便之后getXXX()读取

// now compute the checksum and fill it in
// 后面字段填充完了,计算CRC校验值并填充到前4个字节,完成后position为4,
// 也就是对内部buffer调用slice()方法返回的是magic字段至末尾的部分而不包含CRC字段
ByteUtils.writeUnsignedInt(buffer, CrcOffset, computeChecksum)
}

其他辅助构造器都是基于这个辅助构造器构造的,代码就不一一贴出。

Record类

class MessageasRecord()方法和object MessagefromRecord()方法提供了Message类和Java的LegacyRecord类的互相转化:

1
2
3
4
5
6
// object Message
def fromRecord(record: LegacyRecord): Message = {
val wrapperTimestamp: Option[Long] = if (record.wrapperRecordTimestamp == null) None else Some(record.wrapperRecordTimestamp)
val wrapperTimestampType = Option(record.wrapperRecordTimestampType)
new Message(record.buffer, wrapperTimestamp, wrapperTimestampType)
}
1
2
3
4
5
// class Message
private[message] def asRecord: LegacyRecord = wrapperMessageTimestamp match {
case None => new LegacyRecord(buffer)
case Some(timestamp) => new LegacyRecord(buffer, timestamp, wrapperMessageTimestampType.orNull)
}

可见两者的构造器完全一致,其实去看实现的话大多数方法也是一致的。只不过Record提供了一系列write()方法可以将内部存储的字节写入到DataOutputStream类中,而Message本身没有,因此要将Message写入数据流时需要调用asRecord转换成Record对象再调用write()方法。

此外LegacyRecord还提供了writeCompressedRecordHeader()方法在创建消息集(Message Set)时会使用,到时候再去阅读。

总结

本文内容比较简单,主要是阅读Kafka的消息格式的实现,Java/Scala使用ByteBuffer来实现基于字节的消息协议。Kafka在0.10.0中做出了较大改变,添加了时间戳字段,因此使用了magic字段来区分不同版本的消息。最后,Scala类Message也提供了与Java类LegacyRecord的转换方法,从而实现向数据流的写入。