[toc]
Pulsar AVRO schema 入门
一般而言,Avro 序列化需要提供序列化对象的格式规范,它使用 JSON 来表达。Pulsar 中则是可以直接传递 对象在内部自动生成。比如对于当前类型:
1 2 3 4 5 6 7
| @AllArgsConstructor @Data public class User {
private String name; private int age; }
|
发送一条消息:
1 2 3 4 5 6
| client.newProducer(Schema.AVRO(User.class)) .topic("my-topic") .create() .newMessage() .value(new User("xyz", 11)) .send();
|
然后查询该 topic 的 schema:
1 2
| $ curl -L http://localhost:8080/admin/v2/schemas/public/default/my-topic/schema {"version":0,"type":"AVRO","timestamp":0,"data":"{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"pulsar.schema\",\"fields\":[{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null}]}","properties":{"__jsr310ConversionEnabled":"false","__alwaysAllowNull":"true"}}
|
查询的结果中 data
字段(去掉 \
转义符)就是 Avro 对 schema 的定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| { "type": "record", "name": "User", "namespace": "pulsar.schema", "fields": [ { "name": "age", "type": "int" }, { "name": "name", "type": [ "null", "string" ], "default": null } ] }
|
Schema 在 Pulsar 中的使用
Pulsar 用到的各种 Schema 接口
首先 Apache Avro 库提供的 Schema
对象(位于 org.apache.avro
包)我们称之为 AVRO Schema
。
然后在 PulsarApi.proto
中我们可以看到 Schema
的定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| message Schema { enum Type { Avro = 4; }
required string name = 1; required bytes schema_data = 3; required Type type = 4; repeated KeyValue properties = 5; }
message CommandProducer { optional Schema schema = 7; }
message CommandSubscribe { optional Schema schema = 12;
|
PB 的 Schema
暴露给用户的接口则是 SchemaInfo
(只列出部分方法):
1 2 3 4 5 6 7 8 9 10 11 12 13
| public interface SchemaInfo {
String getName();
byte[] getSchema();
SchemaType getType(); Map<String, String> getProperties();
|
而 Pulsar 在创建生产者和消费者时指定的则是自己的 Schema
接口(只列出部分方法):
1 2 3 4 5 6 7 8 9 10 11 12
| public interface Schema<T> extends Cloneable{
byte[] encode(T message);
default T decode(byte[] bytes) { return decode(bytes, null); } default T decode(byte[] bytes, byte[] schemaVersion) { return decode(bytes); }
|
大多数实现中会将 SchemaInfo
保存到内部字段,就像前文阅读的 AbstractStructSchema
类一样。Pulsar Schema
类提供了 encode
/decode
方法负责在用户传入的 T
类型和 byte[]
中进行互相转换。
最后总结下见到的几个 schema 相关的类:
包 |
类 |
作用 |
org.apache.avro |
Schema |
底层使用的 Avro schema 类 |
org.apache.pulsar.common.api.proto |
Schema |
ProtoBuf 协议定义的 schema 描述,会被持久化,并在 Broker 端进行注册和校验 |
org.apache.pulsar.common.schema |
SchemaInfo |
对 ProtoBuf Schema 的接口包装 |
org.apache.pulsar.client.api |
Schema<T> |
构造生产者或消费者对象时可以指定,在内部会对对象进行编解码,同时包含 SchemaInfo 用于注册和验证 schema 兼容性。 |
Schema 在 Broker 端的处理
Broker 主要负责对生产者/消费者上传的 schema 进行注册和验证。Pulsar 的 Schema
对象在构造生产者和消费者时都会被设置为内部字段,位于 ProducerBase
和 ConsumerBase
中,并且在创建生产者或者订阅时会取得 SchemaInfo
附在 Producer 或者 Subscribe 命令上传递给 broker。
在 Broker 中并不是直接使用 protobuf 的 Schema
,而是转换成了 SchemaData
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| private SchemaData getSchema(Schema protocolSchema) { return SchemaData.builder() .data(protocolSchema.getSchemaData()) .isDeleted(false) .timestamp(System.currentTimeMillis()) .user(Strings.nullToEmpty(originalPrincipal)) .type(Commands.getSchemaType(protocolSchema.getType())) .props(protocolSchema.getPropertiesList().stream().collect( Collectors.toMap( KeyValue::getKey, KeyValue::getValue ) )).build(); }
|
在处理 Producer 请求时:
1 2 3 4
| private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic, SchemaData schema) { if (schema != null) { return topic.addSchema(schema); } else {
|
调用了 Topic#addSchema
注册 schema。
而在处理 Subscribe 请求时则是:
1 2 3
| if (schema != null) { return topic.addSchemaIfIdleOrCheckCompatible(schema) .thenCompose(v -> topic.subscribe(option));
|
调用 Topic#addSchemaIfIdleOrCheckCompatible
对 schema 进行兼容性检查。至于实现细节就不深入了,实际上都是调用 Pulsar 实现的 Schema 注册服务(SchemaRegistryService
)的相关方法。
Schema 在 Client 端的处理
Client 除了上传 schema 外,最重要的就是利用 schema 对用户传入的对象进行编解码。在 ProducerBase#newMessage
方法:
1 2 3
| public TypedMessageBuilder<T> newMessage() { return new TypedMessageBuilderImpl<>(this, schema); }
|
生产端的 schema 被传入了 TypedMessageBuilderImpl
中,在用 value
方法指定消息的值时:
1 2 3 4 5
| public TypedMessageBuilder<T> value(T value) { this.content = ByteBuffer.wrap(schema.encode(value)); return this; }
|
会用 encode
方法得到字节数组以进行网络传输。
而在消费端,收到新消息时,会将消息的 payload 和 schema
一起构造成 MessageImpl
对象,比如在 ConsumerImpl#newSingleMessage
:
1 2 3
| final MessageImpl<V> message = MessageImpl.create(topicName.toString(), batchMessageIdImpl, msgMetadata, singleMessageMetadata, payloadBuffer, createEncryptionContext(msgMetadata), cnx(), schema, redeliveryCount, poolMessages, consumerEpoch);
|
在 MessageImpl#getValue
中,会利用 schema 将收到的字节数组解码成 T
对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public T getValue() { SchemaInfo schemaInfo = getSchemaInfo(); if (schemaInfo != null && SchemaType.KEY_VALUE == schemaInfo.getType()) { } else { return decode(schema.supportSchemaVersioning() ? getSchemaVersion() : null); } }
private T decode(byte[] schemaVersion) { T value = poolMessage ? schema.decode(payload.nioBuffer(), schemaVersion) : null; if (value != null) { return value; } if (null == schemaVersion) { return schema.decode(getData()); } else { return schema.decode(getData(), schemaVersion); } }
|
Schema#AVRO 方法实现
AvroSchema
用户端一般传递 Class
对象即可,实际上底层会将其转换成 SchemaDefinition
对象:
1 2 3 4 5 6 7 8 9
| static <T> Schema<T> AVRO(Class<T> pojo) { return DefaultImplementation.getDefaultImplementation() .newAvroSchema(SchemaDefinition.builder().withPojo(pojo).build()); }
static <T> Schema<T> AVRO(SchemaDefinition<T> schemaDefinition) { return DefaultImplementation.getDefaultImplementation().newAvroSchema(schemaDefinition); }
|
在 SchemaDefinition
中会保存这个 Class
类型的字段:
回到 Schema#AVRO
方法,它实际上是基于 SchemaDefinition
对象创建了 AvroSchema
作为 schema 对象:
1 2 3 4 5 6 7 8 9
| public static <T> AvroSchema<T> of(SchemaDefinition<T> schemaDefinition) { ClassLoader pojoClassLoader = null; if (schemaDefinition.getPojo() != null) { pojoClassLoader = schemaDefinition.getPojo().getClassLoader(); } return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO), pojoClassLoader); }
|
1 2 3 4 5
| private AvroSchema(SchemaInfo schemaInfo, ClassLoader pojoClassLoader) { super(schemaInfo); this.pojoClassLoader = pojoClassLoader; }
|
其继承体系为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| +-----------+ |[I] Schema | +-----^-----+ | +--------+---------+ |[A] AbstractSchema| +--------^---------+ | +----------+-------------+ |[A] AbstractStructSchema| +----------^-------------+ | +------------+---------------+ |[A] AbstractBaseStructSchema| +------------^---------------+ | +-----+------+ | AvroSchema | +------------+
|
再依次看基类的构造方法:
1 2 3 4 5
| protected final SchemaInfo schemaInfo;
public AbstractStructSchema(SchemaInfo schemaInfo) { this.schemaInfo = schemaInfo; }
|
1 2 3 4 5 6 7
| protected final Schema schema;
public AvroBaseStructSchema(SchemaInfo schemaInfo) { super(schemaInfo); this.schema = parseAvroSchema(new String(schemaInfo.getSchema(), UTF_8)); }
|
可见 Schema#AVRO
方法最关键的是:
- 调用
SchemaUtils#parseSchemaInfo
得到 SchemaInfo
对象。
- 将
SchemaInfo
对象传入 AvroBaseStructSchema#parseAvroSchema
对象得到 Avro 库的 Schema
对象。
SchemaUtils#parseSchemaInfo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public static <T> SchemaInfo parseSchemaInfo(SchemaDefinition<T> schemaDefinition, SchemaType schemaType) { return SchemaInfoImpl.builder() .schema(createAvroSchema(schemaDefinition).toString().getBytes(UTF_8)) .properties(schemaDefinition.getProperties()) .name("") .type(schemaType).build(); }
public static Schema createAvroSchema(SchemaDefinition schemaDefinition) { Class pojo = schemaDefinition.getPojo();
if (StringUtils.isNotBlank(schemaDefinition.getJsonDef())) { return parseAvroSchema(schemaDefinition.getJsonDef()); } else if (pojo != null) { ThreadLocal<Boolean> validateDefaults = null; try { Field validateDefaultsField = Schema.class.getDeclaredField("VALIDATE_DEFAULTS"); validateDefaultsField.setAccessible(true); validateDefaults = (ThreadLocal<Boolean>) validateDefaultsField.get(null); } catch (NoSuchFieldException | IllegalAccessException e) { throw new RuntimeException("Cannot disable validation of default values", e); }
final boolean savedValidateDefaults = validateDefaults.get();
try { validateDefaults.set(false); return extractAvroSchema(schemaDefinition, pojo); } finally { validateDefaults.set(savedValidateDefaults); } } else { throw new RuntimeException("Schema definition must specify pojo class or schema json definition"); } }
|
针对 Avro schema,需要将 Schema#VALIDATE_DEFAULTS
设为 false(这似乎是为了和 Avro 1.8 生成的类兼容,参考 #5938)然后调用 extractAvroSchema
,再改回去。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public static Schema extractAvroSchema(SchemaDefinition schemaDefinition, Class pojo) { try { return parseAvroSchema(pojo.getDeclaredField("SCHEMA$").get(null).toString()); } catch (NoSuchFieldException | IllegalAccessException | IllegalArgumentException ignored) { ReflectData reflectData = schemaDefinition.getAlwaysAllowNull() ? new ReflectData.AllowNull() : new ReflectData(); AvroSchema.addLogicalTypeConversions(reflectData, schemaDefinition.isJsr310ConversionEnabled(), false); return reflectData.getSchema(pojo); } }
|
实际上是将 POJO 类通过 AVRO 库转换成 AVRO 的 Schema
类型。回顾 parseSchemaInfo
方法:
1
| .schema(createAvroSchema(schemaDefinition).toString().getBytes(UTF_8))
|
将 Schema
转换成字符串再按照 UTF-8 编码成字节作为 SchemaInfo
的 schema
字段。
SchemaUtils#parseAvroSchema
1 2 3 4 5
| public static Schema parseAvroSchema(String schemaJson) { final Schema.Parser parser = new Schema.Parser(); parser.setValidateDefaults(false); return parser.parse(schemaJson); }
|
这里实际上又将转换成字节数组的 schema 又转换回去了。
encode 和 decode 方法
AVRO schema 的编解码实现在 AbstractStructSchema
类:
1 2 3 4 5 6 7 8 9 10 11 12
| protected SchemaReader<T> reader; protected SchemaWriter<T> writer;
@Override public byte[] encode(T message) { return writer.write(message); }
@Override public T decode(byte[] bytes) { return reader.read(bytes); }
|
这里用到了前文我们忽略的 SchemaReader
和 SchemaWriter
,它们是在 AvroSchema
中设置的:
1 2 3 4 5 6 7 8 9
| private AvroSchema(SchemaInfo schemaInfo, ClassLoader pojoClassLoader) { boolean jsr310ConversionEnabled = getJsr310ConversionEnabledFromSchemaInfo(schemaInfo); setReader(new MultiVersionAvroReader<>(schema, pojoClassLoader, getJsr310ConversionEnabledFromSchemaInfo(schemaInfo))); setWriter(new AvroWriter<>(schema, jsr310ConversionEnabled)); }
|
首先来看 AvroWriter
,也就是编码(将 T
转换成 byte[]
):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public AvroWriter(Schema schema, boolean jsr310ConversionEnabled) { this.byteArrayOutputStream = new ByteArrayOutputStream(); this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, null); ReflectData reflectData = new ReflectData(); AvroSchema.addLogicalTypeConversions(reflectData, jsr310ConversionEnabled); this.writer = new ReflectDatumWriter<>(schema, reflectData); }
@Override public synchronized byte[] write(T message) { byte[] outputBytes = null; try { writer.write(message, this.encoder); } catch (Exception e) { throw new SchemaSerializationException(e); } finally { try { this.encoder.flush(); outputBytes = this.byteArrayOutputStream.toByteArray(); } catch (Exception ex) { throw new SchemaSerializationException(ex); } this.byteArrayOutputStream.reset(); } return outputBytes; }
|
实际上是构造了 Avro 库的 ReflectDatumWriter
进行编码。
再来看 MultiVersionAvroReader
:
1 2
| public MultiVersionAvroReader(Schema readerSchema, ClassLoader pojoClassLoader, boolean jsr310ConversionEnabled) { super(new AvroReader<>(readerSchema, pojoClassLoader, jsr310ConversionEnabled), readerSchema);
|
它是构造了 AvroReader
负责实际的解码:
1 2 3 4 5 6 7 8
| public AbstractMultiVersionReader(SchemaReader<T> providerSchemaReader) { this.providerSchemaReader = providerSchemaReader; }
@Override public T read(byte[] bytes, int offset, int length) { return providerSchemaReader.read(bytes); }
|
注:multi version schema 的实现更为复杂,内部用了一个 cache 来缓存不同版本的 schema reader。
而 AvroReader
实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public AvroReader(Schema schema, ClassLoader classLoader, boolean jsr310ConversionEnabled) { this.schema = schema; if (classLoader != null) { ReflectData reflectData = new ReflectData(classLoader); AvroSchema.addLogicalTypeConversions(reflectData, jsr310ConversionEnabled); this.reader = new ReflectDatumReader<>(schema, schema, reflectData); } else { this.reader = new ReflectDatumReader<>(schema); } }
public T read(InputStream inputStream) { try { BinaryDecoder decoderFromCache = decoders.get(); BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, decoderFromCache); if (decoderFromCache == null) { decoders.set(decoder); } return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, decoder));
|
实际上是构造了 Avro 库的 ReflectDatumReader
进行解码。
总结
至此,关于 Pulsar 处理 AVRO schema 的流程大致有了了解。其实就是对 Avro 库进行了包装,用来对用户传入的对象进行编解码。同时,将 AVRO schema 的 JSON 描述信息存放到了 schema info 的 data 字段中,Broker 会将其持久化,并对 Client 上传的 schema 描述信息进行注册和验证。更多的细节可以进一步阅读 Pulsar 的 schema 注册服务。