Pulsar AVRO schema 源码阅读:初识

[toc]

Pulsar AVRO schema 入门

一般而言,Avro 序列化需要提供序列化对象的格式规范,它使用 JSON 来表达。Pulsar 中则是可以直接传递 对象在内部自动生成。比如对于当前类型:

1
2
3
4
5
6
7
@AllArgsConstructor
@Data
public class User {

private String name;
private int age;
}

发送一条消息:

1
2
3
4
5
6
client.newProducer(Schema.AVRO(User.class))
.topic("my-topic")
.create()
.newMessage()
.value(new User("xyz", 11))
.send();

然后查询该 topic 的 schema:

1
2
$ curl -L http://localhost:8080/admin/v2/schemas/public/default/my-topic/schema
{"version":0,"type":"AVRO","timestamp":0,"data":"{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"pulsar.schema\",\"fields\":[{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null}]}","properties":{"__jsr310ConversionEnabled":"false","__alwaysAllowNull":"true"}}

查询的结果中 data 字段(去掉 \ 转义符)就是 Avro 对 schema 的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"type": "record",
"name": "User",
"namespace": "pulsar.schema",
"fields": [
{
"name": "age",
"type": "int"
},
{
"name": "name",
"type": [
"null",
"string"
],
"default": null
}
]
}

Schema 在 Pulsar 中的使用

Pulsar 用到的各种 Schema 接口

首先 Apache Avro 库提供的 Schema 对象(位于 org.apache.avro 包)我们称之为 AVRO Schema

然后在 PulsarApi.proto 中我们可以看到 Schema 的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
message Schema {
enum Type {
// ...
Avro = 4;
// ...
}

required string name = 1;
required bytes schema_data = 3;
required Type type = 4;
repeated KeyValue properties = 5;
}

message CommandProducer {
// ...
optional Schema schema = 7;
// ...
}

message CommandSubscribe {
// ...
optional Schema schema = 12;

PB 的 Schema 暴露给用户的接口则是 SchemaInfo(只列出部分方法):

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface SchemaInfo {

// schema 名字
String getName();

// 对于 AVRO 类型,为 Avro schema 序列化后的字节数组
byte[] getSchema();

// schema 类型,比如 AVRO
SchemaType getType();

// 用户自定义的一组属性
Map<String, String> getProperties();

而 Pulsar 在创建生产者和消费者时指定的则是自己的 Schema 接口(只列出部分方法):

1
2
3
4
5
6
7
8
9
10
11
12
public interface Schema<T> extends Cloneable{

// 将对象按照 schema 定义编码成字节数组
byte[] encode(T message);

// 将字节数组按照 schema 定义解码成对象,默认忽略 schema version
default T decode(byte[] bytes) {
return decode(bytes, null);
}
default T decode(byte[] bytes, byte[] schemaVersion) {
return decode(bytes);
}

大多数实现中会将 SchemaInfo 保存到内部字段,就像前文阅读的 AbstractStructSchema 类一样。Pulsar Schema 类提供了 encode/decode 方法负责在用户传入的 T 类型和 byte[] 中进行互相转换。

最后总结下见到的几个 schema 相关的类:

作用
org.apache.avro Schema 底层使用的 Avro schema 类
org.apache.pulsar.common.api.proto Schema ProtoBuf 协议定义的 schema 描述,会被持久化,并在 Broker 端进行注册和校验
org.apache.pulsar.common.schema SchemaInfo 对 ProtoBuf Schema 的接口包装
org.apache.pulsar.client.api Schema<T> 构造生产者或消费者对象时可以指定,在内部会对对象进行编解码,同时包含 SchemaInfo 用于注册和验证 schema 兼容性。

Schema 在 Broker 端的处理

Broker 主要负责对生产者/消费者上传的 schema 进行注册和验证。Pulsar 的 Schema 对象在构造生产者和消费者时都会被设置为内部字段,位于 ProducerBaseConsumerBase 中,并且在创建生产者或者订阅时会取得 SchemaInfo 附在 Producer 或者 Subscribe 命令上传递给 broker。

在 Broker 中并不是直接使用 protobuf 的 Schema,而是转换成了 SchemaData

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private SchemaData getSchema(Schema protocolSchema) {
return SchemaData.builder()
.data(protocolSchema.getSchemaData())
.isDeleted(false)
.timestamp(System.currentTimeMillis())
.user(Strings.nullToEmpty(originalPrincipal))
.type(Commands.getSchemaType(protocolSchema.getType()))
.props(protocolSchema.getPropertiesList().stream().collect(
Collectors.toMap(
KeyValue::getKey,
KeyValue::getValue
)
)).build();
}

在处理 Producer 请求时:

1
2
3
4
private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic, SchemaData schema) {
if (schema != null) {
return topic.addSchema(schema);
} else {

调用了 Topic#addSchema 注册 schema。

而在处理 Subscribe 请求时则是:

1
2
3
if (schema != null) {
return topic.addSchemaIfIdleOrCheckCompatible(schema)
.thenCompose(v -> topic.subscribe(option));

调用 Topic#addSchemaIfIdleOrCheckCompatible 对 schema 进行兼容性检查。至于实现细节就不深入了,实际上都是调用 Pulsar 实现的 Schema 注册服务(SchemaRegistryService)的相关方法。

Schema 在 Client 端的处理

Client 除了上传 schema 外,最重要的就是利用 schema 对用户传入的对象进行编解码。在 ProducerBase#newMessage 方法:

1
2
3
public TypedMessageBuilder<T> newMessage() {
return new TypedMessageBuilderImpl<>(this, schema);
}

生产端的 schema 被传入了 TypedMessageBuilderImpl 中,在用 value 方法指定消息的值时:

1
2
3
4
5
public TypedMessageBuilder<T> value(T value) {
/* ... */
this.content = ByteBuffer.wrap(schema.encode(value));
return this;
}

会用 encode 方法得到字节数组以进行网络传输。

而在消费端,收到新消息时,会将消息的 payload 和 schema 一起构造成 MessageImpl 对象,比如在 ConsumerImpl#newSingleMessage

1
2
3
final MessageImpl<V> message = MessageImpl.create(topicName.toString(), batchMessageIdImpl,
msgMetadata, singleMessageMetadata, payloadBuffer,
createEncryptionContext(msgMetadata), cnx(), schema, redeliveryCount, poolMessages, consumerEpoch);

MessageImpl#getValue 中,会利用 schema 将收到的字节数组解码成 T 对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public T getValue() {
SchemaInfo schemaInfo = getSchemaInfo();
if (schemaInfo != null && SchemaType.KEY_VALUE == schemaInfo.getType()) {
/* ... */
} else {
/* ... */
return decode(schema.supportSchemaVersioning() ? getSchemaVersion() : null);
}
}

private T decode(byte[] schemaVersion) {
// 对于设置了 poolMessage 的消息,直接对 payload 内部的 NIO buffer(可能在堆外)进行解码
// 否则会对 getData() 进行解码,getData() 会拷贝一份 payload
T value = poolMessage ? schema.decode(payload.nioBuffer(), schemaVersion) : null;
if (value != null) {
return value;
}
// 注:下面代码有些啰嗦,直接 schema.decode(getData(), schemaVersion) 就行
if (null == schemaVersion) {
return schema.decode(getData());
} else {
return schema.decode(getData(), schemaVersion);
}
}

Schema#AVRO 方法实现

AvroSchema

用户端一般传递 Class 对象即可,实际上底层会将其转换成 SchemaDefinition 对象:

1
2
3
4
5
6
7
8
9
static <T> Schema<T> AVRO(Class<T> pojo) {
// 实际上也是将 Class 对象转换成 SchemaDefinition 对象来构造
return DefaultImplementation.getDefaultImplementation()
.newAvroSchema(SchemaDefinition.builder().withPojo(pojo).build());
}

static <T> Schema<T> AVRO(SchemaDefinition<T> schemaDefinition) {
return DefaultImplementation.getDefaultImplementation().newAvroSchema(schemaDefinition);
}

SchemaDefinition 中会保存这个 Class 类型的字段:

1
private Class<T> pojo;

回到 Schema#AVRO 方法,它实际上是基于 SchemaDefinition 对象创建了 AvroSchema 作为 schema 对象:

1
2
3
4
5
6
7
8
9
public static <T> AvroSchema<T> of(SchemaDefinition<T> schemaDefinition) {
/* 这里会首先处理 schema reader/writer 用于支持 multi-schema,这里我们暂不关心这个特性 */
// getPojo() 返回的用户提供的 T 对应的 Class 对象,这里取得它的 ClassLoader
ClassLoader pojoClassLoader = null;
if (schemaDefinition.getPojo() != null) {
pojoClassLoader = schemaDefinition.getPojo().getClassLoader();
}
return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO), pojoClassLoader);
}
1
2
3
4
5
private AvroSchema(SchemaInfo schemaInfo, ClassLoader pojoClassLoader) {
super(schemaInfo);
this.pojoClassLoader = pojoClassLoader;
/* 设置 schema reader/writer (略) */
}

其继承体系为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
       +-----------+
|[I] Schema |
+-----^-----+
|
+--------+---------+
|[A] AbstractSchema|
+--------^---------+
|
+----------+-------------+
|[A] AbstractStructSchema|
+----------^-------------+
|
+------------+---------------+
|[A] AbstractBaseStructSchema|
+------------^---------------+
|
+-----+------+
| AvroSchema |
+------------+

再依次看基类的构造方法:

1
2
3
4
5
protected final SchemaInfo schemaInfo;

public AbstractStructSchema(SchemaInfo schemaInfo) {
this.schemaInfo = schemaInfo;
}
1
2
3
4
5
6
7
// org.apache.avro.schema
protected final Schema schema;

public AvroBaseStructSchema(SchemaInfo schemaInfo) {
super(schemaInfo);
this.schema = parseAvroSchema(new String(schemaInfo.getSchema(), UTF_8));
}

可见 Schema#AVRO 方法最关键的是:

  1. 调用 SchemaUtils#parseSchemaInfo 得到 SchemaInfo 对象。
  2. SchemaInfo 对象传入 AvroBaseStructSchema#parseAvroSchema 对象得到 Avro 库的 Schema 对象。

SchemaUtils#parseSchemaInfo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static <T> SchemaInfo parseSchemaInfo(SchemaDefinition<T> schemaDefinition, SchemaType schemaType) {
return SchemaInfoImpl.builder()
.schema(createAvroSchema(schemaDefinition).toString().getBytes(UTF_8))
.properties(schemaDefinition.getProperties())
.name("")
.type(schemaType).build();
}

// 将 SchemaDefinition 转换成 Avro Schema 对象
public static Schema createAvroSchema(SchemaDefinition schemaDefinition) {
Class pojo = schemaDefinition.getPojo();

if (StringUtils.isNotBlank(schemaDefinition.getJsonDef())) {
return parseAvroSchema(schemaDefinition.getJsonDef());
} else if (pojo != null) { // 在之前通过 withPojo 已经设置过 pojo
ThreadLocal<Boolean> validateDefaults = null;
try {
Field validateDefaultsField = Schema.class.getDeclaredField("VALIDATE_DEFAULTS");
validateDefaultsField.setAccessible(true);
validateDefaults = (ThreadLocal<Boolean>) validateDefaultsField.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException("Cannot disable validation of default values", e);
}

final boolean savedValidateDefaults = validateDefaults.get();

try {
// Disable validation of default values for compatibility
validateDefaults.set(false);
return extractAvroSchema(schemaDefinition, pojo);
} finally {
validateDefaults.set(savedValidateDefaults);
}
} else {
throw new RuntimeException("Schema definition must specify pojo class or schema json definition");
}
}

针对 Avro schema,需要将 Schema#VALIDATE_DEFAULTS 设为 false(这似乎是为了和 Avro 1.8 生成的类兼容,参考 #5938)然后调用 extractAvroSchema,再改回去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static Schema extractAvroSchema(SchemaDefinition schemaDefinition, Class pojo) {
try {
// 一般 pojo 是不包含 SCHEMA$ 这个字段的,因此会进入 catch 分支
return parseAvroSchema(pojo.getDeclaredField("SCHEMA$").get(null).toString());
} catch (NoSuchFieldException | IllegalAccessException | IllegalArgumentException ignored) {
// SchemaDefinition 的 alwaysAllowNull 默认为 true,因此回创建 AllowNull 对象
ReflectData reflectData = schemaDefinition.getAlwaysAllowNull()
? new ReflectData.AllowNull()
: new ReflectData();
// 给 reflectdata 添加一些逻辑类型(主要是日期和时间)的转换
AvroSchema.addLogicalTypeConversions(reflectData, schemaDefinition.isJsr310ConversionEnabled(), false);
return reflectData.getSchema(pojo);
}
}

实际上是将 POJO 类通过 AVRO 库转换成 AVRO 的 Schema 类型。回顾 parseSchemaInfo 方法:

1
.schema(createAvroSchema(schemaDefinition).toString().getBytes(UTF_8))

Schema 转换成字符串再按照 UTF-8 编码成字节作为 SchemaInfoschema 字段。

SchemaUtils#parseAvroSchema

1
2
3
4
5
public static Schema parseAvroSchema(String schemaJson) {
final Schema.Parser parser = new Schema.Parser();
parser.setValidateDefaults(false);
return parser.parse(schemaJson);
}

这里实际上又将转换成字节数组的 schema 又转换回去了。

encode 和 decode 方法

AVRO schema 的编解码实现在 AbstractStructSchema 类:

1
2
3
4
5
6
7
8
9
10
11
12
protected SchemaReader<T> reader;
protected SchemaWriter<T> writer;

@Override
public byte[] encode(T message) {
return writer.write(message);
}

@Override
public T decode(byte[] bytes) {
return reader.read(bytes);
}

这里用到了前文我们忽略的 SchemaReaderSchemaWriter,它们是在 AvroSchema 中设置的:

1
2
3
4
5
6
7
8
9
private AvroSchema(SchemaInfo schemaInfo, ClassLoader pojoClassLoader) {
/* ... */
// 用于从 SchemaInfo 的属性中取得 __jsr310ConversionEnabled 来判断是否支持 JSR 310
// 从而兼容旧的 Pulsar 客户端(不支持 JSR 310 转换)
boolean jsr310ConversionEnabled = getJsr310ConversionEnabledFromSchemaInfo(schemaInfo);
setReader(new MultiVersionAvroReader<>(schema, pojoClassLoader,
getJsr310ConversionEnabledFromSchemaInfo(schemaInfo)));
setWriter(new AvroWriter<>(schema, jsr310ConversionEnabled));
}

首先来看 AvroWriter,也就是编码(将 T 转换成 byte[]):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public AvroWriter(Schema schema, boolean jsr310ConversionEnabled) {
this.byteArrayOutputStream = new ByteArrayOutputStream();
this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, null);
ReflectData reflectData = new ReflectData();
// 这里会决定是否进行 JSR 310 转换
AvroSchema.addLogicalTypeConversions(reflectData, jsr310ConversionEnabled);
this.writer = new ReflectDatumWriter<>(schema, reflectData);
}

@Override
public synchronized byte[] write(T message) {
byte[] outputBytes = null;
try {
writer.write(message, this.encoder);
} catch (Exception e) {
throw new SchemaSerializationException(e);
} finally {
try {
this.encoder.flush();
// 从 write 的输出流中取出编码后的字节
outputBytes = this.byteArrayOutputStream.toByteArray();
} catch (Exception ex) {
throw new SchemaSerializationException(ex);
}
this.byteArrayOutputStream.reset();
}
return outputBytes;
}

实际上是构造了 Avro 库的 ReflectDatumWriter 进行编码。

再来看 MultiVersionAvroReader

1
2
public MultiVersionAvroReader(Schema readerSchema, ClassLoader pojoClassLoader, boolean jsr310ConversionEnabled) {
super(new AvroReader<>(readerSchema, pojoClassLoader, jsr310ConversionEnabled), readerSchema);

它是构造了 AvroReader 负责实际的解码:

1
2
3
4
5
6
7
8
public AbstractMultiVersionReader(SchemaReader<T> providerSchemaReader) {
this.providerSchemaReader = providerSchemaReader;
}

@Override
public T read(byte[] bytes, int offset, int length) {
return providerSchemaReader.read(bytes);
}

注:multi version schema 的实现更为复杂,内部用了一个 cache 来缓存不同版本的 schema reader。

AvroReader 实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public AvroReader(Schema schema, ClassLoader classLoader, boolean jsr310ConversionEnabled) {
this.schema = schema;
if (classLoader != null) {
ReflectData reflectData = new ReflectData(classLoader);
AvroSchema.addLogicalTypeConversions(reflectData, jsr310ConversionEnabled);
this.reader = new ReflectDatumReader<>(schema, schema, reflectData);
} else {
this.reader = new ReflectDatumReader<>(schema);
}
}

public T read(InputStream inputStream) {
try {
BinaryDecoder decoderFromCache = decoders.get();
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, decoderFromCache);
if (decoderFromCache == null) {
decoders.set(decoder);
}
return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, decoder));

实际上是构造了 Avro 库的 ReflectDatumReader 进行解码。

总结

至此,关于 Pulsar 处理 AVRO schema 的流程大致有了了解。其实就是对 Avro 库进行了包装,用来对用户传入的对象进行编解码。同时,将 AVRO schema 的 JSON 描述信息存放到了 schema info 的 data 字段中,Broker 会将其持久化,并对 Client 上传的 schema 描述信息进行注册和验证。更多的细节可以进一步阅读 Pulsar 的 schema 注册服务。

Python lambda 实现回调函数

[toc]

背景

本文偏实用性,从一个熟悉 C++ 但是对 Python 不太熟悉的用户角度,讲讲我的历程。不讨论语言上的细节。最近遇到一个 Python 中传递回调函数的场景,由于 Python 写得不多,导致 Python 的行为并不像我预期的那样。首先以一个错误示例开始:

1
2
3
4
5
6
7
l = []
for i in range(3):
def inner_f():
print(i)
l.append(inner_f)
for f in l:
f()

看起来这里的 i 捕获了循环变量 i,将闭包先存入列表 l 中然后延迟调用。然而输出结果违反了直觉:

1
2
3
2
2
2

按引用捕获,而非按值捕获

我的第一反应是这里的 i 并不是按值捕获,而是按引用捕获。我修改代码如下:

1
2
3
4
5
6
7
8
l = []
for i in range(3):
s = "prefix-" + str(i)
def inner_f():
print(s)
l.append(inner_f)
for f in l:
f()

打印结果却是:

1
2
3
prefix-2
prefix-2
prefix-2

这就令人非常迷惑了,s 应该每次循环都创建的局部变量,难道 s 的创建也推迟到函数运行了?这里打印了 is 的地址:

1
2
3
4
5
6
7
8
9
l = []
for i in range(3):
s = "prefix-" + str(i)
print('i: {} ({}), s: {} ({})'.format(i, id(i), s, id(s)))
def inner_f():
print('i: {} ({}), s: {} ({})'.format(i, id(i), s, id(s)))
l.append(inner_f)
for f in l:
f()

输出:

1
2
3
4
5
6
i: 0 (4536600848), s: prefix-0 (4539234672)
i: 1 (4536600880), s: prefix-1 (4539234800)
i: 2 (4536600912), s: prefix-2 (4539234736)
i: 2 (4536600912), s: prefix-2 (4539234736)
i: 2 (4536600912), s: prefix-2 (4539234736)
i: 2 (4536600912), s: prefix-2 (4539234736)

看起来确实……所有循环里的 is 都是复用了同一个变量。

Python 局部变量生命周期

其实到这里,我突然想起最开始入门 Python 的时候,那本小册子上特地讲了,Python 与很多主流语言处理局部变量的不同之处。

比如:

1
2
3
for i in range(3):
s = "prefix-" + str(i)
print("{}, {}".format(i, s))

循环变量 i 和在循环体内定义的变量 s,在脱离循环后,仍然可以访问。可见 Python 在循环中使用的所有局部变量。Python 的局部变量不会随着循环脱离作用域,而是会随着函数退出才脱离作用域。用 C++ 作为比方,可能上述代码被解释成类似下面这样的 C++ 代码:

1
2
3
4
5
6
int i;
std::string s;
for (i = 0; i < 3; i++) {
s = "prefix-" + std::to_string(i);
}
std::cout << i << ", " << s << std::endl;

闭包按值捕获

这个时候也许需要借助类似偏函数的概念。

1
2
3
4
5
6
7
8
9
10
11
12
from functools import partial

l = []
for i in range(3):
s = "prefix-" + str(i)
def inner_f(i, s):
print('i: {} ({}), s: {} ({})'.format(i, id(i), s, id(s)))
# 使用 partial 将二元谓词 inner_f 的两个参数绑定为 i 和 s 的值
l.append(partial(inner_f, i, s))

for f in l:
f()

输出:

1
2
3
i: 0 (4373719312), s: prefix-0 (4376353072)
i: 1 (4373719344), s: prefix-1 (4376353200)
i: 2 (4373719376), s: prefix-2 (4376353264)

使用 lambda 进行简化

其实 Python 也提供了方便的 lambda 表达式,而不用使用 functools 下面的工具。

1
2
3
4
5
6
7
8
l = []
for i in range(3):
s = "prefix-" + str(i)
l.append(
lambda i=i, s=s: print("i: {} ({}), s: {} ({})".format(i, id(i), s, id(s)))
)
for f in l:
f()

这里 i=is=s 看起来比较诡异,实际上是进行初始化,用外部变量(实参)来初始化 lambda 的内部变量(形参)。对于不熟悉这种语法的人而言,上面代码改成这样可能更好理解(当然,也更冗长了,实际上就用上面这种代码更好):

1
2
3
4
5
6
7
8
9
10
l = []
for i in range(3):
s = "prefix-" + str(i)
l.append(
lambda inner_i=i, inner_s=s: print(
"i: {} ({}), s: {} ({})".format(inner_i, id(inner_i), inner_s, id(inner_s))
)
)
for f in l:
f()

总之,输出也是符合期望的:

1
2
3
i: 0 (4427942160), s: prefix-0 (4430575856)
i: 1 (4427942192), s: prefix-1 (4430576048)
i: 2 (4427942224), s: prefix-2 (4430576112)

总结

总之磕磕绊绊,发现 Python 在处理稍微复杂一点点的逻辑时,并不是像初入门那么简单,易如 Python 也有所谓的坑。本文很多细节都没深究,只是以一个 C++er 的角度踩踩坑。

附录:C++ 风格的等价代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <functional>
#include <iostream>
#include <string>
#include <vector>
using namespace std;

int main(int argc, char* argv[]) {
vector<function<void()>> l;
for (int i = 0; i < 3; i++) {
std::string s{"prefix-" + std::to_string(i)};
l.emplace_back([ i, s{std::move(s)} ] {
std::cout << "i: " << i << ", s: " << s << std::endl;
});
}
for (auto&& f : l) {
f();
}
return 0;
}

上述 lambda 表达式中:

  • i:直接按值来捕获循环变量 i,发生一次 int 的拷贝。
  • s:对于 std::string 类型,使用移动构造来避免拷贝的开销。当然,在这个例子里没必要,因为对于短字符串的移动构造实际上还是发生了拷贝,一般都是 SSO(短字符串优化)实现。

这里用到了 C++14 的特性,也就是支持 lambda 表达式的捕获列表初始化(s{std::move(s)}),因此编译器至少需要支持 C++14 然后加上 -std=c++14 这种选项来编译。

Java Stream 简单学习

[toc]

背景

最近对项目做了一个重构,其中有一部分是把代码风格变得很函数式了。Java 8 引入了 lambda 表达式和 Stream 后使得函数式编程就更容易了。这里不谈及比较理论的东西,只谈下我认为的优缺点。

优点:

  • 表达简单直观(虽然使用 Java 还是比较啰嗦)
  • 提倡使用 immutable 对象,实现线程安全更容易
  • 能够通过将 stream 改成 parallel stream 将现有代码改成并行处理

缺点:

  • 性能有一定损失。可以用 JMH 进行性能测试,大多数情况这点性能损失不会有明显影响,除非代码处在关键路径上。
  • 调试相对比较麻烦。其实很多时候可以通过测试驱动的编码方式避免写了一大串代码后再去排查是不是链式调用的某一块出问题了。

了解 Stream

举个例子,给出下面这段处理代码:

1
2
3
4
5
final List<Integer> list1 = new ArrayList<>(Arrays.asList(1, 2, 3));
final List<Integer> list2 = new ArrayList<>(list1.size());
list1.forEach(x -> list2.add(x + 100));
final List<String> list3 = new ArrayList<>(list2.size());
list2.forEach(x -> list3.add("i-" + x));

从函数式思维的角度,我们实际上是依次对 list1 的每个元素执行以下运算:

  1. 加上 100
  2. 转换成字符串,并加上 i- 前缀。

理想的函数映射表达是:

1
final List<String> list3 = list1.map(x -> x + 100).map(x -> "i-" + x);

但是在实际的 Java 代码中,map 这样的操作不能直接作用于容器,而必须作用于容器对应的 stream。

1
final List<String> list3 = list1.stream().map(x -> x + 100).map(x -> "i-" + x).collect(Collectors.toList());

容器必须先调用 stream() 方法转换成流(Stream 接口)。其实从设计的角度也可以理解,假如对 List 扩充 map 接口:

1
<R> List<R> map(Function<? super T, ? extends R> mapper)

那么每次 map 都会创建新的 List 对象。而 Stream 是可以理解为它只记录了对流中每个元素的函数,每次 map 调用返回的只有记录了新的函数(Function)的 Stream,不会将容器中地元素进行拷贝。只有在 collect 调用时才会将元素从流中取出,放入新的容器中。

流操作会被复合成一个流管道(stream pipeline),它包含:

  • 源(source):可以是集合,数组或者函数生成器,或者 I/O 管道。
  • 中间操作(intermediate operation):将流转换成另一个流,比如前文的 map
  • 终端操作(terminal operation):产生结果或副作用,比如前文的 collect

流是惰性的(lazy),只有在终端操作才会对源数据进行计算,并且仅在需要时消耗源元素。

Java 容器都提供了 stream() 方法取得对应的流。类似的,用 parallelStream() 方法得到并行流,并行流会使用线程池来计算,本文只讨论顺序流。对于数组 T[] 可以用 Arrays#stream 静态方法将其转换成流,对于可变参数列表 T... 可以用 Streams#of 静态方法将其转换成流,对于两个整型表示的左闭右开区间,可以用 IntStream#range 或者 LongStream#range 将其转换成流。比如:

1
2
3
4
Arrays.stream(new int[]{0, 1, 2});
Stream.of(1, 2, 3);
IntStream.range(0, 3);
LongStream.range(0L, 3L);

上述四行代码都是将序列 0,1,2 转换成流。

流也是一次性的,在遍历一遍后,流就会处于关闭状态。比如:

1
2
3
Stream<Integer> stream = Stream.of(1, 2, 3);
stream.map(x -> x + 10);
stream.map(x -> x + 10);

在对同一个流第二次调用 map 时会报出异常:

java.lang.IllegalStateException: stream has already been operated upon or closed

因为无法对同一个流进行两次中间操作(如果允许的话,就会流就会出现两个分支),因此在进行一个中间操作后,这个流就无法再使用,必须对返回的流添加新的中间操作。注意到错误提示里还有个 or closed 描述,因为流也可以调用 close() 主动关闭,一般是对于 I/O 管道的流才需要这些操作。

常用的流操作示例

在对流有了一个基础认识后,这一节偏实用性,以 List 容器(列表)作为源,介绍一些常见的操作。

reduce

比如最简单的求和:

1
2
3
4
5
final List<Integer> list = Arrays.asList(1, 2, 3);
int sum = 0;
for (Integer x : list) {
sum += x;
}

实际上可以看作有一个初始值 0(identity),然后依次对流的每个元素 value 进行求和运算(identity + value)。此时可以用 reduce 中间操作:

1
final int sum = Stream.of(1, 2, 3).reduce(0, Integer::sum);

如果初始值的类型和流的元素类型不一致,那么需要第三个参数 combiner,它是函数 (T, T) -> T,其中 T 为初始值的类型,该函数必须满足:

1
combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)

其中 accumulatorreduce 的第二个参数。如果流不是并行的,这个参数不起作用,该函数不会被调用,这个时候传入一个简单的 (x, y) -> x 函数即可。对 combiner 感兴趣的可以阅读 这个讨论

1
2
3
final List<Integer> list = Arrays.asList(1, 2, 3);
final String result = list.stream().reduce("prefix" , (s, i) -> s + "-" + i, (x, y) -> x);
System.out.println(result)

上述代码得到的 result 是字符串 prefix-1-2-3。如果要在并行场景下也能工作,那么需要特别设计 combiner,比如:

1
2
final String result = list.parallelStream().reduce("prefix" , (s, i) -> s + "-" + i,
(s1, s2) -> s1 + s2.substring("prefix".length()));

类似地,假如要合并多个列表,比如:

1
2
3
final List<Integer> list1 = Arrays.asList(1, 2, 3);
final List<Integer> list2 = Arrays.asList(4, 5);
final List<Integer> list3 = Arrays.asList(6, 7, 8, 9);

直接的方式:

1
2
3
final List<Integer> list = new ArrayList<>(list1);
list.addAll(list2);
list.addAll(list3);

基于 reduce 的方式:

1
2
3
4
final List<Integer> list = Stream.of(list1, list2, list3).reduce(new ArrayList<>(), (lhs, rhs) -> {
lhs.addAll(rhs);
return lhs;
});

flatMap

接着前一节的示例,合并多个列表,虽然用 reduce 是自然的,但并不是最适合的。这种场景实际上应该用 flatMap

map 是将 Stream<T> 映射到 Stream<R>。但列表合并的需求,实际上是将 Stream<List<T>> 转换成 Stream<T>,无法直接将 List<T> 通过函数得到 T,而且两者关系也不是一对一,比如第 1 个 List<T> 包含 3 个元素,那么我们想要在返回的流中添加 3 个 T 。在这种数量发生斌华的场景,则是 flatMap 使用的时机。

1
<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);

它接收的函数是将 T 映射到 Stream<R>。如果 TList<E>,那么很自然地就能得到 Stream<E>。因此基于 flatMap 可以更自然地合并多个列表:

1
final List<Integer> list = Stream.of(list1, list2, list3).flatMap(List::stream).collect(Collectors.toList());

可见,flatMap 特别适合于流的元素(比如 List<T>)本身就可以转换成流(Stream<T>)的场景。对于列表嵌套,只需要多调用几次 flatMap 即可,比如:

1
2
3
4
5
6
final List<List<Integer>> list1 = Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 5));
final List<List<Integer>> list2 = Collections.singletonList(Arrays.asList(6, 7, 8, 9));
final List<Integer> list = Stream.of(list1, list2)
.flatMap(Collection::stream)
.flatMap(Collection::stream)
.collect(Collectors.toList());

groupingBy

这个需求常见于将流进行分类。比如将整数流按照奇数和偶数分成两部分。这个操作本身是将列表分成多个列表,但是并没有对应的中间操作,原因在于,流不支持分叉成多个流。此时要使用终端操作 Collectors.groupingBy

1
2
3
4
public static <T, K> Collector<T, ?, Map<K, List<T>>>
groupingBy(Function<? super T, ? extends K> classifier) {
return groupingBy(classifier, toList());
}

它将流的元素 T 映射到结果 K,最终得到的结果是 Map<K, List<T>>。假如要将整数流分成奇数和偶数,那么我们可以用 Boolean 表示元素 Integer 是否为奇数。

1
2
3
4
final List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
final Map<Boolean, List<Integer>> map = list.stream().collect(Collectors.groupingBy(i -> i % 2 != 0));
System.out.println(map.get(true)); // 奇数
System.out.println(map.get(false)); // 偶数

打印结果:

1
2
[1, 3, 5]
[2, 4]

这里需要重点注意的是,get(true)get(false) 可能得到的是 null,对应的分别是流中没有偶数和没有奇数。因此需要谨慎地进行 null check。

filter

将整数流分成奇数和偶数有一个更慢的方法,那就是实用 filter 遍历两次,第一次过滤出奇数,第二次过滤出偶数:

1
2
3
4
5
final List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
final List<Integer> odds = list.stream().filter(x -> x % 2 != 0).collect(Collectors.toList());
final List<Integer> evens = list.stream().filter(x -> x % 2 == 0).collect(Collectors.toList());
System.out.println(odds);
System.out.println(evens);

如果我们只想对奇数或者偶数处理,那么 filter 则是最适合的。

Map 容器的流处理

比如对 Map 容器本身无法得到流,因为 Map 并不是单个元素的流,只能对 entrySet() 方法返回的 Set 得到流 Stream<Map.Entry<K, V>>,然后对每个元素 Map.Entry<K, V> 进行处理。如果要在终端操作中重新得到新的 map,比如 Map<K, V2> 或者 Map<K2, V2>,那么则需要用 Collector.toMap 方法。

举个例子,下列 Java 代码将 Map<Integer, Integer> 分别对 value 和 entry 做映射得到 Map<Integer, String>Map<String, String>

1
2
3
4
5
6
7
8
9
10
11
final Map<Integer, Integer> map = new HashMap<>();
map.put(1, 100);
map.put(2, 200);
System.out.println(map.entrySet().stream().collect(Collectors.toMap(
Map.Entry::getKey,
e -> "value-" + e.getValue())
));
System.out.println(map.entrySet().stream().collect(Collectors.toMap(
e -> "key-" + e.getKey(),
e -> "value-" + e.getValue())
));

打印结果:

1
2
{1=value-100, 2=value-200}
{key-1=value-100, key-2=value-200}

可见 Collectors.toMap 中分别接收对 key 和 value 的映射,因此支持 key 和 value 不同的中间操作次数。

相比而言,Scala 在函数式编程上简洁太多,比如实现上述功能,在 Scala 中只需要如下所示:

1
2
3
val map = Map(1 -> 100, 2 -> 200)
println(map.map(e => (e._1, "value-" + e._2)))
println(map.map(e => ("key-" + e._1, "value-" + e._2)))

Collectors

前面我们使用了三种 Collectors 的静态方法,这里我们来看看 collect 到底做了什么。

1
<R, A> R collect(Collector<? super T, A, R> collector)

首先 collect 方法接受的是 Collector<T, A, R> 类型,其中:

  • T 是流的元素类型
  • A 是 Collector 进行中间运算的类型
  • R 是结果类型

Collectors#toList 方法为例:

1
2
3
4
5
6
public static <T>
Collector<T, ?, List<T>> toList() {
return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
(left, right) -> { left.addAll(right); return left; },
CH_ID);
}

其中 CollectorImpl 实现了 Collector 接口,接收四个参数,最主要的是:

字段名 toList 传入的参数 含义
supplier ArrayList::new 创建 ArrayList 存放收集的元素
accumulator List::add 以流的元素作为参数对 supplier 调用该方法,

其余的 combinercharacteristics 比较复杂,这里就不深入研究了。

如果有特殊要求,我们也可以模仿 CollectorImpl 类自行实现 Collector 接口。

总结

本文主要是最近一次重构中对 Java 基于 Stream 的函数式编程的一点学习笔记,从了解 Stream 开始到常用的几个中间操作,以及针对 Map 容器的特殊处理,最后看了下终端操作 collect。更多内容可以参考 Java SE 8 Documents

Java Executor 学习

[toc]

Executor 简介

在 Java 中,更偏向于使用 Executor 而非 Thread 来执行任务。Executor 接口定义如下:

1
2
3
public interface Executor {
void execute(Runnable command);
}

类似于 Thread,它可以用来在线程中执行一个 Runnable(任务),具体的执行策略取决于具体实现。比如以下实现就是每个任务都新建一个线程来执行:

1
Executor executor = command -> new Thread(command).start()

在标准库中,Executors 类提供了若干 static 方法可用于构造不同类型的 Executor,它们在线程池中取出线程来执行任务,对于已提交(调用 executesubmit 方法执行)的任务,一般有三种状态:

  • 已经完成:任务已经由池中的某个线程执行完毕,对应线程已返回给池中
  • 运行中:已经分配了线程执行任务
  • 等待执行:由于线程分配策略限制(比如限制了同时运行的线程数量上限),任务被缓存到内部队列,等待被执行。

ExecutorService

Executor 的生命周期

JVM 会在所有非守护线程全部终止后才会退出,比如以下代码:

1
2
final Executor executor = Executors.newSingleThreadExecutor(); // 该线程池仅包含一个可用线程
executor.execute(() -> System.out.println(Thread.currentThread().getName() + " done"))

打印 pool-1-thread-1 done 后会卡住,查看线程栈会看到:

1
2
3
4
5
6
7
8
9
10
11
"pool-1-thread-1" #11 prio=5 os_prio=31 tid=0x00007fa04392a000 nid=0xa603 waiting on condition [0x0000700004bd6000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000076b1778a0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

进入 ThreadPoolExecutor#getTask 内部看到相应代码:

1
2
3
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take(); // 卡在这里

workQueue 也就是前文所说的保存任务的工作队列:

1
private final BlockingQueue<Runnable> workQueue;

可见线程池 ThreadPoolExecutor 都会尝试从 workQueue 中取出任务然后分配给线程来执行,见 runWorker 方法:

1
2
3
4
final void runWorker(Worker w) {
/* ... */
try {
while (task != null || (task = getTask()) != null) {

Executors 类创建的实际上是 ExecutorService 类型,它继承自 Executor 接口,提供了对 Executor 生命周期的管理。当然,此外还提供了 submit 接口来基于 Future 对任务进行管理。

shutdown

1
void shutdown();

该方法会使 executor 等待所有已提交的任务运行完成。包括在工作队列中的任务。注意该方法并不会等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 3; i++) {
final int index = i;
executor.execute(() -> {
println(index + " started...");
sleep(1000);
println(index + " stopped.");
});
}
executor.shutdown();
println("Executor shutdown: " + executor.isShutdown());
try {
executor.execute(() -> {});
} catch (RejectedExecutionException e) {
println("Failed to execute task after shutdown: " + e.getMessage());
}

注:上述代码使用了我自省添加的 println 方法来打印时间戳和线程名,以及 sleep 方法吞掉 InterruptedException

1
2
3
4
5
6
7
8
9
10
private static void println(String x) {
System.out.println(System.currentTimeMillis() + " " + Thread.currentThread().getName() + " | " + x);
}

private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException ignored) {
}
}

运行结果:

1
2
3
4
5
6
7
8
1644147267909 pool-1-thread-1 | 0 started...
1644147267909 main | Executor shutdown: true
1644147267911 main | Failed to execute task after shutdown: Task jcip.Main$$Lambda$2/598446861@619a5dff rejected from java.util.concurrent.ThreadPoolExecutor@1ed6993a[Shutting down, pool size = 1, active threads = 1, queued tasks = 2, completed tasks = 0]
1644147268913 pool-1-thread-1 | 0 stopped.
1644147268913 pool-1-thread-1 | 1 started...
1644147269915 pool-1-thread-1 | 1 stopped.
1644147269916 pool-1-thread-1 | 2 started...
1644147270916 pool-1-thread-1 | 2 stopped.

可以看到 shutdown() 立刻返回了,但是 JVM 进程是等待所有线程退出后才结束。但由于 shutdown 方法被调用,executor 会拒绝接受新的任务,因此在调用 execute 时会抛出 RejectedExecutionException 异常,包含了线程池 ThreadPoolExecutor 的具体信息:

1
Shutting down, pool size = 1, active threads = 1, queued tasks = 2, completed tasks = 0

线程池已经关闭,池的大小为 1,活跃线程数为 1,排队的任务为 2,已经完成的任务数量为 0。

awaitTermination

由于 shutdown 并不会等待 executor 关闭,因此 ExecutorService 还提供了 awaitTermination 方法进行等待。

1
2
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

若在 timeout 范围内 executor 已经停止,则返回 true。否则返回 false,即等待超时。因此可以轮询调用该方法来等待 executor 停止。这里修改前一节调用 shutdown 方法之后的代码如下所示:

1
2
3
4
5
println("Executor terminated: " + executor.isTerminated());
while (!executor.awaitTermination(10000, TimeUnit.MILLISECONDS)) {
// No ops
}
println("Executor terminated: " + executor.isTerminated());

相关打印信息:

1
2
1644148189276 main | Executor terminated: false
1644148192288 main | Executor terminated: true

从时间戳之差(3008 毫秒)可见等待不到 10 秒(timeout)就完成了。注意这里还调用了 isTerminated 方法,当所有任务都结束时该方法会返回 true。

shutdownNow

shutdown 是优雅的关闭,如果担心有的任务是有 bug 的,会一直卡住,导致 JVM 进程无法终止,此时需要用一种粗暴的关闭方式,也就是 shutdownNow

1
List<Runnable> shutdownNow();

我初看这个方法时比较迷惑,以为 shutdownNow 是异步关闭,而 shutdown 是同步关闭。实际上,shutdown 会等待所有任务完成,只不过不再接受新的任务。而 shutdownNow 则是取消所有运行中的任务,并且不再启动等待中的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 3; i++) {
final int index = i;
executor.execute(() -> {
println(index + " started...");
try {
Thread.sleep(1000);
println(index + " stopped.");
} catch (InterruptedException e) {
println(index + " is cancelled");
}
});
}
executor.shutdownNow();
executor.awaitTermination(1, TimeUnit.MINUTES);

运行结果:

1
2
1644148971844 pool-1-thread-1 | 0 started...
1644148971844 pool-1-thread-1 | 0 is cancelled

中断状态的线程会被取消,因此抛出 InterruptedException,而排队的两个任务则干脆没执行。

线程池 ThreadPoolExecutor

前面一直使用了单线程的线程池,它创建的 executor 类型实际上是 ThreadPoolExecutor

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService // 仅仅是个 wrapper,finalize() 方法会调用 shutdown()
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

线程池的配置

包括前两节提到的 ThreadFactoryRejectedExecutionHandlerThreadPoolExecutor 构造参数及其作用如下所示:

参数名 类型 含义
corePoolSize int 核心线程数,即保持运行的线程数,即使处于空闲状态,除非 allowCoreThreadTimeout 被设置
maximumPoolSize int 线程池最大允许创建的线程数
keepAliveTime long 当线程数超过核心线程数数量时,闲置线程在终止前等待新任务的最大时间
unit TimeUnit keepAliveTime 对应的时间单位
workQueue BlockingQueue<Runnable> 持有待执行的任务的队列
threadFactory ThreadFactory 提供 newThread 接口,可在通过 Runnable创建线程时设置线程的一些信息(比如名字)
handler RejectedExecutionHandler 当线程池到达上限时,新任务到来的处理策略。默认是抛出 RejectedExecutionException

常见的几种快速创建线程池(用静态方法去掉 new 前缀)的参数:

corePoolSize maximumPoolSize keepAliveTime workQueue
FixedThreadPool nThreads nThreads 0 LinkedBlockingQueue
SingleThreadExecutor 1 1 0 LinkedBlockingQueue
CachedThreadPool 0 Integer.MAX_VALUE 60 s SynchronousQueue

上述几种典型的配置各有缺陷,比如 CachedThreadPool 无法限制线程数量,容易导致线程创建太多而 OOM。而另外两种配置则限制死了线程的最大数量。

最核心的参数是 corePoolSizemaximumPoolSize。当线程池的线程数不大于 corePoolSize 时,对于新的任务都会创建线程。但是如果正在运行的线程数量达到了 corePoolSize,新任务到来时则会根据 maximumPoolSizeworkQueue 来决定行为:

  • workQueue 未满:加入队列
  • workQueue 已满:
    • 若正在运行的线程数小于 maximumPoolSize,则创建新的线程执行任务
    • 否则抛出 RejectedExecutionException

这里给出一段示例配置的行为表现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final ExecutorService executor = new ThreadPoolExecutor(
1, // corePoolSize
3, // maximumPoolSize
1, // keepAliveTime
TimeUnit.SECONDS, // unit
new ArrayBlockingQueue<>(1) // workQueue
);
for (int i = 0; i < 5; i++) {
final int index = i;
try {
executor.execute(() -> {
println(index + " started...");
sleep(2000);
println(index + " stopped.");
});
} catch (RejectedExecutionException e) {
println(index + " rejected: " + e.getMessage());
}
}
executor.shutdown();
while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
// No ops
}

运行结果:

1
2
3
4
5
6
7
8
9
1644155259100 pool-1-thread-1 | 0 started...
1644155259101 main | 4 rejected: Task jcip.Main$$Lambda$1/758705033@4534b60d rejected from java.util.concurrent.ThreadPoolExecutor@3fa77460[Running, pool size = 3, active threads = 3, queued tasks = 1, completed tasks = 0]
1644155259100 pool-1-thread-3 | 3 started...
1644155259100 pool-1-thread-2 | 2 started...
1644155261104 pool-1-thread-1 | 0 stopped.
1644155261104 pool-1-thread-2 | 2 stopped.
1644155261104 pool-1-thread-3 | 3 stopped.
1644155261104 pool-1-thread-1 | 1 started...
1644155263107 pool-1-thread-1 | 1 stopped.

因为 println 不是线程安全的,因此打印法航了乱序,但从时间戳来看,顺序为:

  1. 提交任务 0,新建线程 pool-1-thread-1 来执行,此时活跃线程数到达了 corePoolSize
  2. 提交任务 1,进入大小为 1 的 ArrayBlockingQueue 中,此时队列已满。
  3. 提交任务 2,新建线程 pool-1-thread-2 来执行。
  4. 提交任务 3,新建线程 pool-1-thread-3 来执行,此时活跃线程数到达了 maximumPoolSize
  5. 提交任务 4,抛出 RejectedExecutionException
  6. 2 秒后,任务 0,2,3 执行完毕,队列中的任务 1 弹出,由线程 pool-1-thread-1 执行。

Keep Alive

keepAliveTimeunit 参数更多的是控制闲置的非核心线程数的等待时间。闲置线程即没有执行任务的线程,非核心线程,则是相对于核心线程而言的。比如若 corePoolSize 为 1,maximumPoolSize 为 3,那么如果因为提交任务数量比较多,导致创建了 3 个线程,这多出的 2 个线程都是非核心线程。

这里仅从代码角度分析见 ThreadPoolExecutor#getTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 必须对 ThreadPoolExecutor 调用 allowCoreThreadTimeOut 方法将同名参数设为 true 才会启用 keepAliveTime 检查
// wc 为工作线程数量,因此只有大于 corePoolSize 才会检查,因为只有超过核心运行线程数量的线程才会被当成多余的线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c)) // [2] 回收多余线程,减少工作线程数量
return null;
continue;
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true; // [1] 在 keepAliveTime 内没有任务,则置为 true
} catch (InterruptedException retry) {
timedOut = false;
}

注意这里的 keepAliveTime 在构造时已经转换成了纳秒单位:

1
this.keepAliveTime = unit.toNanos(keepAliveTime);

上面代码有个比较诡异的地方,就是什么时候 wc 会超过最大线程数量。查看方法注释,可知如果 setMaximumPoolSize 被调用,那么 maximumPoolSize 会发生动态变化,此时会导致 worker 数超过了 maximumPoolSize

ThreadFactory

1
2
3
public interface ThreadFactory {
Thread newThread(Runnable r);
}

ThreadPoolExecutor 中,默认的线程工厂是:

1
2
3
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}

public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false); // 对于守护线程,也要将其改成用户线程
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY); // 默认线程优先级是 NORM
return t;
}

它首先通过系统的 SecurityManager 是否存在来设置线程组。线程名则是 pool-<id>-thread-<thread-id>,其中 id 为线程池 id,从 1 开始递增,thread-id 为线程 id,也从 1 开始递增。

运行以下代码:

1
2
3
4
5
6
7
ExecutorService executor1 = Executors.newSingleThreadExecutor();
ExecutorService executor2 = Executors.newSingleThreadExecutor();
executor1.execute(() -> println("executor 1 thread 1"));
executor2.execute(() -> println("executor 2 thread 1"));
executor2.execute(() -> println("executor 2 thread 2"));
executor1.shutdown();
executor2.shutdown();

运行结果:

1
2
3
1644149652981 pool-1-thread-1 | executor 1 thread 1
1644149652981 pool-2-thread-1 | executor 2 thread 1
1644149652981 pool-2-thread-1 | executor 2 thread 2

从打印出的线程名就可以看出对应关系。常见的自定义 ThreadFactory 的场景往往是要单独给某些 executor 的线程进行标识,从而在调试的时候区分线程。当然,由于 newThread 方法是可定制的,也可以定制更多线程策略。

Executors 的方法中,一般都会有重载形式来支持指定 ThreadFactory,比如:

1
2
3
4
5
6
7
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}

RejectedExecutionHandler

在前文提到 shutdown 方法时,我们知道对于一个已关闭的 executor,若强行执行新的任务会抛出 RejectedExecutionException 异常。这里 RejectedExecutionHandler 则是支持定制化此时的行为。默认策略是 AbortPolicy

1
2
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
1
2
3
4
5
6
7
public static class AbortPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

不过 Executors的方法并没有支持自定义 RejectedExecutionHandler,因为抛出异常的处理方式基本上是很痛用了,一般没必要定制。

使用 submit 执行任务

ExecutorServiceExecutor 的基础上增加了 submit 方法,它会返回一个 Future,因此我们可以用 Future 来管理任务状态以及任务的返回值。对于有返回值的任务,submit 方法往往更实用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private static final List<Integer> DATA = prepareData();

private static List<Integer> prepareData() {
final List<Integer> data = new ArrayList<>();
for (int i = 0; i < 100000; i++) {
data.add(i);
}
return data;
}

public static void main(String[] args) throws InterruptedException, ExecutionException {
final int numThreads = 5;
final int sizePerThread = DATA.size() / numThreads;
final List<Future<Long>> futures = new ArrayList<>();
final ExecutorService executor = Executors.newFixedThreadPool(numThreads);
for (int i = 0; i < numThreads; i++) {
final int startIndex = i * sizePerThread;
final int endIndex = (i < numThreads - 1) ? (startIndex + sizePerThread) : DATA.size();
futures.add(executor.submit(() -> {
long sum = 0;
for (int j = startIndex; j < endIndex; j++) {
sum += DATA.get(j);
}
return sum; // 该 lambda 返回 Long,对应 Callable<Long>,因此 submit 返回的是 Future<Long>
}));
}
long sum = 0;
for (Future<Long> future : futures) {
sum += future.get();
}
System.out.println(sum);
executor.shutdown();
}

上述代码则是将 100000 个数据的 DATA 分成 5 块,每块由一个任务来求和,每个任务对应一个 Future,最后将结果汇总求和。

总结和展望

本文主要从 Executor 谈到最基本的线程池(ThreadPoolExecutor)的使用。

除此之外,以下相关内容并不属于本文的讨论范畴,但简单提及下。

  • Java 7 引入了基于 work-stealing 的 executor(使用 newWorkStealingPool 创建,类型为 ForkJoinPool),比起普通的线程池并行化更高,它的原理是用双端队列(deque)保存任务,并且可以有多个队列保存任务,这样在一个队列任务执行完毕后,还可以从其他队列的尾端来窃取工作线程。
  • 对于延时任务或者定时任务,应该使用 ScheduledThreadPoolExecutorschedule() 方法。相比起用 Timerschedule() 而言,不受系统时钟变化的影响,而且能够正确处理抛出 unchecked exception 的任务。

重新学习 Golang

前言

第一次接触 Go 还是 2018 年在 360 实习的时候,当时调研一个 Kafka 监控工具 Burrow,还贡献了我在开源社区的 第一个 PR。到后来正式入职前,还特地过了遍 The Way of Go,再后来也基本搁置了,偶尔帮业务排查下问题的时候会简单写个例子。当时还在用 GOPATH,现在 Burrow 官方文档都表示最低支持 Golang 1.11 和 Go module 的管理方式了。总的来说感觉目前由于云原生的火热,Go 的使用确实比较广,国庆刚好休息下,就重新看看了。

Get Started

基本概念

  • module(模块):一组 package 的集合,可以直接从版本控制仓库或者模块代理服务器上下载。module 由 go.mod 文件中的 module path 以及 module 依赖决定。module 根目录是包含 go.mod 文件的目录,main module 是包含这个目录的模块,并且在该目录下可以执行 go 命令。
  • module path(模块路径):仅作为 package 的 import path 的前缀,表明了 go 命令应该在哪去下载。
  • package(包):同一个目录下的一组源文件的集合。
  • package path:也就是 package 的 import path。是 module path 加上包含该 package 的子目录的路径。例如 "golang.org/x/net" 在目录 "html" 下面包含一个 package,这个 package 的路径是 "golang.org/x/net/html"

第一个程序

首先指定一个 module path(这里是 example.com/user/hello)创建 go.mod 文件:

1
2
3
4
$ go mod init example.com/user/hello
go: creating new go.mod: module example.com/user/hello
go: to add module requirements and sums:
go mod tidy

可以发现生成了 go.mod 文件,内容为:

1
2
3
module example.com/user/hello

go 1.16

然后创建 main.go,包含以下代码:

1
2
3
4
5
6
7
package main

import "fmt"

func main() {
fmt.Println("hello")
}

使用 go 工具安装并运行程序:

1
2
$ go install example.com/user/hello
$

该命令会构建 hello 命令并生成二进制文件 hello,安装到 GOPATH 目录(可通过 go env GOPATH 查看,默认是 $HOME/go)的 bin/ 子目录下。如果 GOBIN 已经设置,则会安装到 GOBIN 下面。

也可以直接用以下方式将其导入环境变量:

1
export PATH=$PATH:$(dirname $(go list -f '{{.Target}}' .))

设置 Go 环境变量的方式:

1
$ go env -w GOBIN=$PWD

这样就会安装到当前目录(注意这只是示例,而且会永久生效,最好改到需要的路径)。如果要取消 Go 环境变量,使用 -u 选项即可:

1
$ go env -u GOBIN

从 module 中导入 package

创建子目录 *morestrings/*,然后在该目录下创建文件 reverse.go

1
2
3
4
5
6
7
8
9
10
11
12
// Package morestrings implements additional functions to manipulate UTF-8
// encoded strings, beyond what is provided in the standard "strings" package.
package morestrings

// ReverseRunes returns its argument string reversed rune-wise left to right.
func ReverseRunes(s string) string {
r := []rune(s)
for i, j := 0, len(r)-1; i < len(r)/2; i, j = i+1, j-1 {
r[i], r[j] = r[j], r[i]
}
return string(r)
}

进入该目录,运行 go build,可以发现不会生成文件,但实际上已经编译到本地 build cache 里了。

现在修改 hello.go 内容,导入刚才的 package:

1
2
3
4
5
6
7
8
9
10
11
package main

import (
"fmt"

"example.com/user/hello/morestrings"
)

func main() {
fmt.Println(morestrings.ReverseRunes("!oG ,olleH"))
}

然后重新 go install example.com/user/hello 编译。

调用外部 package 的代码

Go 可以导入版本控制系统比如 Git 的 package 源码,go 工具使用这个属性去从远程仓库自动下载 package,比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
package main

import (
"fmt"

"example.com/user/hello/morestrings"
"github.com/google/go-cmp/cmp"
)

func main() {
fmt.Println(morestrings.ReverseRunes("!oG ,olleH"))
fmt.Println(cmp.Diff("Hello World", "Hello Go"))
}

然后运行:

1
2
3
$ go mod tidy
go: finding module for package github.com/google/go-cmp/cmp
go: found github.com/google/go-cmp/cmp in github.com/google/go-cmp v0.5.6

然后查看 go.mod 内容,可以发现它新加了一行:

1
require github.com/google/go-cmp v0.5.6

因为 go mod tidy 命令会添加缺失的 module requirements,同时移除不再使用的模块 requirements。在导入新 module 时,会从网上下载。

module 依赖会自动下载到 $GOPATH/pkg/mod 目录下面,下载内容会被所有其他 module 共享,要删除所有下载的 module,可以传递 -modcache 标志给 go clean

1
2
$ go clean -modcache
$

测试

一般在包目录下面会创建单元测试,比如 morestrings/reverse.go,一般会创建 morestrings/reverse_test.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package morestrings

import "testing"

func TestReverseRunes(t *testing.T) {
cases := []struct {
in, want string
}{
{"Hello, world", "dlrow ,olleH"},
{"Hello, 世界", "界世 ,olleH"},
{"", ""},
}
for _, c := range cases {
got := ReverseRunes(c.in)
if got != c.want {
t.Errorf("ReverseRunes(%q) == %q, want %q", c.in, got, c.want)
}
}
}

然后使用 go test 命令运行测试即可:

1
2
3
4
$ cd morestrings/
$ go test
PASS
ok example.com/user/hello/morestrings 0.401s

总结

基于 go mod 的方法,创建 go.mod 指定当前的 模块路径

1
go mod init <module-path>

模块路径一般是多级的,比如 github.com/google/go-cmp/cmp,可以发现项目是在 https://github.com/google/go-cmp/,那么模块路径就是 github.com/google/go-cmp,也就是项目下载地址,而包路径则是 cmp 目录。

然后代码中如果引入了外部依赖,需要在 go.mod 里导入依赖:

1
go mod tidy

如果依赖在本地不存在,它会去下载依赖。不仅在 go.mod 中添加依赖模块路径和版本,还会在 go.sum 文件中引入模块元数据(tag 或 commit 信息,Base64 编码的 Hash 码)。

在代码中导入 的方式是:

1
import "<module-path>/<package-path>"

包路径即包目录的相对路径。如果包路径是多级的,比如 dir1/dir2/dir3,那么包名是最后一级 dir3

构建即 go build,安装即 go install,可以使用 go env 相关命令设置环境变量,比如 GOBIN 配置安装目录。

一般对于模块下的 xxx.go 一般创建对应的测试文件 xxx_test.go,导入 testing 包然后使用相关对象比如 T 进行验证。在模块目录下运行 go test 即可。

Tour of Go 笔记

https://tour.golang.org/welcome/1

基础知识

Go 没有继承,也没有 public/private 这种关键字,导入一个包时,包内的函数/类/变量等,只有大写字母开头的才可以导入。

函数参数的定义基本是较新语言的 <value> <type> 的顺序,此外连续参数可以合并,比如 x int, y int 可以合并为 x, y int。Go 可以返回多值,这使得交换操作可以写成 x, y = y, x 这种形式。

函数的返回值也可以被命名,这样就不用显式 return 返回值了,但还是需要 return。

1
2
3
4
5
func split(sum int) (x, y int) {
x = sum * 4 / 9
y = sum - x
return
}

对于这个例子可能不太好,但是对于那种多返回值,且一个返回值在很多分支之前都是同样的情况,可能比较适用。

Go 的基本类型:

  • bool
  • 普通整型:int/uint,或者加上位数的后缀,比如 uint8,位数包括8,16,32,64。不带位数的话,能存储的位数取决于平台,最多为 64。
  • uintptr:指针类型,也是整型。Go 的指针不像 C 一样支持数学运算,从而保证安全。
  • byteuint8 的别名,代表一个字节
  • runeint32 的别名,它表示一个 Unicode 码点,因为对于 ASCII 字符串而言,基本单位是 int8(也就是 char),但是 Go 对字符串只支持 Unicode 编码,因此没有使用 char 关键字,而是使用 rune,代表一个 Unicode 字符。因此在 Go 里,字符和字节不会混淆。
  • 浮点数:float32float64
  • 复数:complex64complex128

Go 保证变量不赋予初值的话,默认是零值。数值类型都是 0,布尔类型是 false,字符串是空字符串 ""(注意不是 空值 nil)。

Go 的变量初始化可以推断类型,整型默认为 int,浮点数类型默认为 float64,如果没有初始值,则必须显式注明类型。此外初始化还支持 := 这种形式(但是必须在函数体内使用),比如:

1
2
3
4
var x = 10
var y int
y = x + 10
z := 30

常量(const 修饰)因为必须有初值,所以基本可以省略类型声明。

Go 的格式化打印不是很灵活,只支持 C 风格的格式化字符串(使用 fmt.Printf)或者直接打印任意类型(使用 fmt.Printlnfmt.Print)。但是在 C 风格打印时,Go 支持 %T 打印类型名,%v 打印任何类型,还能用 %#v 对类型本身进行修饰,比如:

1
fmt.Printf("%T x = %v, %T y = %#v\n", x, x, y, y)

打印的是 int x = 10, string y = "hello",注意如果不用 %#v 的话,不会给字符串加上引号。

Go 的类型转换有点像函数调用,比如 T(v)v 转换成类型 T。Go 没有隐式类型转换,因此即使将 int32 转换成 int 也要显式进行,比如:

1
2
var x int32 = 1
var y int = int(x)

Go 的循环只支持 for 循环,循环体必须有大括号,而初始化(可选),条件判断以及后置语句(可选)的部分则不用。if 也类似。比如:

1
2
3
4
5
 for i := 0; i < 100; i++ {
if i%3 == 0 {
fmt.Println(i)
}
}

无限循环可以缩写为

1
2
3
for {
// ...
}

if 也支持初始化,并且初始化语句的值可以在 else 分支使用,比如:

1
2
3
4
5
6
var x int = 100
if y := x / 2; y%2 == 0 {
fmt.Println(y)
} else {
fmt.Println(y)
}

注意在 forif 的初始化语句里,只能用 := 的形式,因此要声明类型必须对 := 右边的表达式进行类型转换。另外,Go 虽然也支持 else if,但这种情况下,最好使用 switch。Go 的 switch 默认不会 fallthrough,因此不需要对每个分支加 break

switch 比较灵活,其实更类似把 if 语句做个包装,比如:

1
2
3
4
5
6
7
8
switch x := 1; true { // switch <initialize>; <variable>
case x < 0:
fmt.Println("x < 0")
case x == 0:
fmt.Println("x = 0")
case x > 0:
fmt.Println("x > 0")
}

这里的 true 可以省略(因为默认的 <variable> 就是 true),比如上面代码可以改为 switch x := 1; {。这里我这么写实际上想表达,上面语句是将 true 与下面的 case 后接的表达式(比如 x < 0)求值依次进行比较。

这次重新学习的时候,差点以为 Go 的 switch 语句像 Scala,Rust 的模式匹配一样强大,实际上,虽然也比较强大就是了。

最后 Go 关键字里最关键的 defer,类似于 C++ 的析构,举个例子:

1
2
3
4
5
func main() {
x := 100
defer fmt.Printf("defer %v\n", x)
fmt.Println("main")
}

打印:

1
2
main
defer 100

因为 defer 语句会在脱离当前作用域时执行。

指针和结构体

Go 也有指针类型,但是它更多的只是作为对象的地址,不像 C 一样支持算术运算。因为 Go 没有所谓的引用类型,都是传值,对于复杂结构,传指针起到了传递引用的作用,并且避免了对象拷贝。示例:

1
2
3
4
var p *int
i := 10
p = &i
fmt.Printf("i = %v (%v)\n", *p, p)

Go 的函数可以像值一样直接传递,而不是像 C 一样需要传递函数指针。Go 的闭包也就是匿名函数,函数签名里少了函数名。闭包可以访问函数体之外的变量,类似 C++ 的 lambda 表达式捕获所有引用([&])。

Go 有结构体,语法和 C 类似。这里有个特殊的语法,Go 的结构体指针(假如为 p)可以直接访问字段,比如 p.X,而不用 (*p).X。 这是为了简单实现『方法』而用的。不同于 Java 这种面向对象语言,它没有方法,对于方法的定义,是在函数定义的 func 以及参数列表之间插入结构体的值或指针(也就是所谓的 接收者),在这个前提下,针对结构体指针访问字段的语法就能大量简化代码,比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type Vertex struct {
X, Y int
}

// 这里也可以写为 func (v Vertex) ToString() string,即将值而非指针作为接收者(但大多数情况下没必要)
func (v *Vertex) ToString() string {
return fmt.Sprintf("Vertex{X: %d, Y: %d}", v.X, v.Y)
}

func main() {
v1 := Vertex{1, 2} // 依次初始化
// 这里用到了结构体部分初始化的语法,即用 Name: Value 的语法列出部分字段
v2 := Vertex{X: 1} // 仅初始化 X,Y 为默认值 0
fmt.Printf("v1: %s, v2: %s\n", v1.ToString(), v2.ToString())
}

Go 也没有继承,但是可以通过匿名字段的方式实现继承,也就是结构体嵌入。(似乎官方 go tour 教程里并没有讲这个)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type Base struct {
i int
}

func (b *Base) String() string {
return fmt.Sprintf("Base: %d", b.i)
}

type Derived struct {
Base // 匿名字段

s string
}

// 如果不实现 String() 方法,那么 Derived 对象也能调用父类的 String() 方法
func (d *Derived) String() string {
return fmt.Sprintf("Derived: %s, %s", d.Base.String(), d.s) // d.Base 直接访问父类对象
}

数组,切片,映射

数组,切片,映射都是引用语义,因此想要修改内部时,不必传递指针。默认值都是 nil,也就是底层为空,但是像 lencap 都能成功调用并返回 0。

Go 的数组也类似 C,即是定长数组比如 [n]T,表示数组元素类型为 T,数量为 n ,也可以采用大括号初始化的方式:

1
x := [5]int{1, 1, 2, 3, 5}

注意,Go 的数组不像 C 一样可以推断出数组长度,如果你写成了下面这样:

1
x := []int{1, 1, 2, 3, 5}

此时 x 的类型是切片([]int)而非数组([5]int)。相比数组而言,切片可以动态增长,类似 C++ 的 vector,因此对于切片 x,可以用 cap(x) 取得切片 x 的容量,len(x) 取得切片的长度,以及用 make 进行初始化(make(<type>, <len>) 或者 make(<type>, <len>, <cap>),用 append 添加元素。但这些都是 Go 的内置函数。

数组和切片都支持 x[low:high] 这种形式的部分引用,low 缺省为 0,high 缺省为 len(x)(数组或切片的长度)。而 x[low:high] 本身类型其实也是切片,也就是说数组可以方便地转换成切片。如果 high 超出切片长度上限但没有超出容量上限,会扩充切片长度到 high

1
2
3
4
5
6
7
8
9
10
11
func printSlice(s []int) {
fmt.Printf("%v len=%d cap=%d\n", s, len(s), cap(s))
}

func main() {
s := make([]int, 5, 10)
printSlice(s)
s = s[:10]
printSlice(s)
s = s[:20] // 超出容量上限,引发 panic
}

输出:

1
2
3
[0 0 0 0 0] len=5 cap=10
[0 0 0 0 0 0 0 0 0 0] len=10 cap=10
panic: runtime error: slice bounds out of range [:20] with capacity 10

panic 类似其他语言的 异常。Go 对错误处理的态度是,它认为异常(panic)默认是不可恢复的,对于可恢复的错误,一般使用错误码或字符串保存。当然,万不得已要恢复异常,可以用 recover 内置函数。不像 Java 这种用 unchecked exception 和 checked exception 来区分这两种错误,而且很多时候写代码的人都没注意好。

如果要扩充容量,那么就必须用内置的 append 函数,它可以添加一个或多个元素。可以用以下代码查看切片的扩容策略。

1
2
3
4
5
6
7
8
9
10
11
func printSliceMetadata(s []int) {
fmt.Printf("len=%d cap=%d\n", len(s), cap(s))
}

func main() {
s := make([]int, 1)
for i := 0; i < 100; i++ {
s = append(s, i)
printSliceMetadata(s)
}
}

对于切片(以及映射),可以用 range 来进行遍历:

1
2
3
4
s := []int{1, 1, 2, 3, 5}
for i, v := range s {
fmt.Printf("%d %v\n", i, v)
}

注意,range 遍历映射返回 (key, value) 比较直观,但是对于切片,实际上也是返回一对值 (index, value),这样方便操作下标。如果不想取得值或下标,可以用 _ 隐去,比如下列代码就是只打印切片 s 的值。

1
2
3
for _, v := range s {
fmt.Printf("%v\n", v)
}

至于映射,直接用个 demo 表示吧,顺便复习下之前的知识。当前项目的模块路径为 com.example/temp,新建 wrapper/map.go,内容如下(实现了类似 Java Map 的接口):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package wrapper

import (
"errors"
"fmt"
)

type Map struct {
internalMap map[int]string
}

func NewMap(m map[int]string) Map {
return Map{m}
}

func (m *Map) ContainsKey(key int) bool {
_, ok := m.internalMap[key]
return ok
}

func (m *Map) Put(key int, value string) {
m.internalMap[key] = value
}

func (m *Map) Get(key int) (value string, err error) {
value, ok := m.internalMap[key]
if !ok {
err = errors.New(fmt.Sprintf("Map doesn't contain key %v", key))
}
return
}

func (m *Map) Remove(key int) (value string, err error) {
value, ok := m.internalMap[key]
if ok {
delete(m.internalMap, key)
} else {
err = errors.New(fmt.Sprintf("Map doesn't contain key %v", key))
}
return
}

然后 main.go 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"fmt"

"com.example/temp/wrapper"
)

func checkKey(m *wrapper.Map, key int) {
value, err := m.Get(key)
if err == nil {
fmt.Printf("Get %d: %s\n", key, value)
} else {
fmt.Println(err.Error())
}
}

func main() {
m := wrapper.NewMap(map[int]string{1: "hello"})
fmt.Println(m.ContainsKey(1))
fmt.Println(m.ContainsKey(2))
checkKey(&m, 2)
m.Put(2, "world")
checkKey(&m, 2)
}

运行结果:

1
2
3
4
true
false
Map doesn't contain key 2
Get 2: world

Remove 方法的示例和 Get 类似,在 main.go 里就不写了,总之套路就是返回正确值和 error 接口。很多其他语言用户觉得这种处理很丑陋,但其实我觉得还好。入乡随俗。

接口

基于对象编程只需要 结构体方法 即可,但是面向对象编程则需要 结构体接口 结合。接口的语法其实就是在 type <interface-name> interface 代码块中定义一系列函数,但是不需要 func 前缀。还是用个 demo 比较明确。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 1. 二员操作符接口
type BinaryOperation interface {
Calculate(x, y int) int
}

// 2. 加操作和乘操作的结构体,并且都实现了 Calculate 方法(函数参数和返回类型都一样)
type AddOperation struct{}
type ProductOperation struct{}

func (op AddOperation) Calculate(x, y int) int {
return x + y
}

func (op ProductOperation) Calculate(x, y int) int {
return x * y
}

func runBinaryOperation(op BinaryOperation, x int, y int) int {
return op.Calculate(x, y)
}

func main() {
// 3. 因此加操作和乘操作都可以视为二员操作符
x := runBinaryOperation(AddOperation{}, 2, 3)
y := runBinaryOperation(ProductOperation{}, 2, 3)
// ...
}

Duck typing 使得 Golang 实现多态非常灵活,无需像 Java 一样显式 implements 某个接口,不然无法被转换成对应的接口类型。典型的接口在我们之前用 errors.Error 接口保存错误信息时已经看到了。此外,Golang 的 Stringer 接口用于将任意结构体转换成字符串,从而可以被 %v 格式化打印。这点类似于 Java 的 toString() 方法。

需要注意的是,如果是将结构体指针作为方法的接收者,那么必须要通过指针访问才会被视为该接口,比如:

1
2
3
4
5
6
7
type Vertex struct {
X, Y int
}

func (v *Vertex) String() string {
return fmt.Sprintf("Point(%d, %d)", v.X, v.Y)
}

这里实现了接口 Stringer 的是 *Vertex,而不是 Vertex。因此对于以下调用:

1
2
3
v := Vertex{Y: 100}
fmt.Println(v)
fmt.Println(&v)

输出:

1
2
{0 100}
Point(0, 100)

因此很多时候,直接像这样初始化:

1
v := &Vertex{Y: 100} // v 是结构体指针

另一方面,duck typing 也使得 Go 可以用 interface{} 类型表示任意类型(类似 Java 的 Object)。由于 Golang(目前为止,1.17)没有泛型,使得它要对某种类型进行抽象,只能传递 interface{},比如上述代码就限定了参数类型都是 int。比如在 C++ 中可以定义这样的接口:

1
2
3
4
template <typename T>
struct BinaryOperation {
T Calculate(T x, T y);
};

但是 Golang 里,只能这么干了:

1
2
3
type BinaryOperation interface {
Calculate(x, y interface{}) interface{}
}

但是这么实现很蛋疼,因为虽然任何对象都可以转换成 interace{},但是 interface{} 不能直接转换为其他类型,比如我基于新的 BinaryOperation 接口尝试写下面这段代码:

1
2
3
func (op AddOperation) Calculate(x, y interface{}) interface{} {
return int(x) + int(y)
}

编译会报错:

cannot convert x (type interface {}) to type int: need type assertion

在 Golang 里需要进行类型断言,t := i.(T) 使得 i 的实际类型为 T 时将其转换成 T 类型的变量 t,如果类型不匹配将会触发 panic。而 t, ok := i.(T) 则可以避免 panic,类型不匹配时 okfalse。因此上述的实现可能变成:

1
2
3
4
5
6
7
8
func (op AddOperation) Calculate(x, y interface{}) interface{} {
return x.(int) + y.(int)
}

func main() {
sum := AddOperation{}.Calculate(1, 2)
fmt.Println(sum.(int))
}

如果要支持多种类型,比如 float,就得利用t, ok := i.(T) 的形式了,但是可以用 switch 进行简化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (op AddOperation) Calculate(x, y interface{}) interface{} {
switch vx := x.(type) {
case int:
if vy, ok := y.(int); ok {
return vx + vy
} else {
panic("x is int while y is not int")
}
case float64:
if vy, ok := y.(float64); ok {
return vx + vy
} else {
panic("x is int while y is not int")
}
default:
panic("x is not int or float64")
}
}

但是写起来还是很复杂……而且似乎不能写成:

1
switch vx, vy := x.(type), y.(type) {

这种形式。

我可以理解这么做的原因,因为在没有类型检查的情况下向下转型很危险。C++ 的模板相对一般语言的泛型比较特殊,可以简单地写成:

1
2
3
4
template <typename T>
struct AddOperation {
T add(T x, T y) { return x + y; }
};

但如果 T 的类型不支持 operator+,那么编译错误会非常庞大,这也是 C++20 引入 constrains 和 concepts 的原因。

Goroutine

首先可以把 goroutine 当成轻量级线程(Fiber)来使用,这里为求方便,不去纠结名词细节,下文中若不特别说明,『线程』直接代指『goroutine』。总之起一个线程语法很简单,就是 go 后接函数调用,甚至可以是匿名函数。

1
2
3
go func (s string) {
fmt.Println(s);
}("hello") // 定义匿名函数 func (s string),并传递参数 "hello"

因此类似其他语言,Golang 的线程同步也可以用锁和条件变量之类,具体见 sync。一般更多地是推荐用 channel(我更喜欢用『管道』这个翻译)进行同步。Channel 其实就是个 FIFO 队列,对于一个 channel(假如为 ch):

  • <- ch读操作。从 channel 中去取一个值,这个操作具有返回值,可以将其赋予一个变量,比如 v := <-ch
  • ch <- v写操作。将值 v 放入管道。

其实就类似 setter 和 getter 嘛。Channel 类型一般是 chan TT 为 channel 元素的类型,直接用 make(chan T)make(chan T, n) 的方式创建,指定 n 则代表 channel 带缓冲区(前提是 n 大于 0)。如果 channel 满了(或者没缓冲区),则写操作会阻塞直到读操作完成;如果 channel 为空,则读操作会阻塞直到 channel 有元素。这也是基于 channel 进行线程同步的基础。当然,对于单线程而言,这两个操作会直接触发 panic。就类似 Java 的 ArrayBlockingQueue 在队列为空时进行 remove 或者队列为满时进行 add 一样。但是 Golang channel 的特殊之处在于,多线程环境下会将 remove/add 的语义改成 take/put 的语义。

此外,channel 可以进行 range 操作,其实就是一个语法糖,代替在无限循环中进行读操作直到 channel 被关闭。对于一个 channel ch,关闭操作(close(ch))意味着 ch 之后不再可用(无法进行读写,否则会 panic)。

Golang 还提供了 select 语句,这个名字基本上就意味着它和 I/O 多路复用的场景。I/O 多路复用本质上是可以同时等待多个文件描述符的事件(包括读就绪,写就绪,错误就绪)。select 即可以等待多个 channel 的读写事件。

这里举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func eventLoop(c, quit chan int) {
cnt := 1
flag := true
for flag {
select {
case c <- cnt: // 写事件,c 未满即可触发
fmt.Printf("Push %d\n", cnt)
cnt++
case <-quit: // 读事件,quit 未空即可触发
fmt.Println("Quit")
flag = false
break // 跳出 select block
default: // 其他事件都未就绪时执行
time.Sleep(500 * time.Millisecond)
}
}
}

func main() {
ch := make(chan int, 2)
quit := make(chan int)

go func() {
time.Sleep(300 * time.Millisecond)
<-ch // 取出元素,使得 eventLoop 的 case c <- cnt 可以就绪
time.Sleep(100 * time.Millisecond)
quit <- 0 // 触发 eventLoop 的 case <-quit
}()
eventLoop(ch, quit)
}

多次运行,可以发现输出结果有两种,一种是

1
2
3
Push 1
Push 2
Quit

另一种是

1
2
3
4
Push 1
Push 2
Push 3
Quit

原因是在 push 1 和 2 之后,c <- cnt<- quit 都处于阻塞状态,因此进入 default 分支等待 500 ms。在此期间,子线程中的 <-chquit <- 0 使得两个事件都就绪了。此时 select 语句触发的顺序不是固定的。

在做等价二叉查找树练习的时候,发现一些 channel 被忽视的用法。Golang 不允许声明不被使用的变量,因此仅仅是想判断 channel 是否还有多余值时,可以用:

1
2
3
for range ch {
fmt.Println("There's a value in channel")
}

另外,默认读取空 channel 时会阻塞,但是有一种非阻塞的方式,和之前的类似:

1
elem, ok := <-ch // 如果 ch 暂时没有元素,则 ok 为 false,elem 为 nil

下一步

总的来说 Go 的语言设计最大的优点就是作为一门广泛使用的非脚本语言,上手很快,就这一两天把 Tour of Go 过了遍,感觉直接上手写代码是足够了。

不过感觉也可以抽时间看看:

当然,也有两个词典性质的网站:

Pulsar 源码阅读 - 消费者的订阅

[toc]

前言

之前对 Pulsar 消费端的逻辑不太熟悉,但是一直有印象就是刚接触 Pulsar 时,不记得在哪看到 Pulsar 的消费模型是 push 的,而这点和 Kafka 的 pull 消费模型是完全不同的。之前对于 Kafka 的消费模型已经比较熟悉了,客户端发送 FETCH 请求,其中对于需要拉取(pull)数据的每个 partition,请求中会有一个 partition_max_bytes 字段限制该分区获取的最大字节数。而从 FETCH v3 开始,还有个总的 max_bytes 字段限制总的最大字节数来针对分区太多的场合。具体协议参见 Kafka Message Fetch

那么,Pulsar 采用的 push 消费模型是怎样的呢?为什么要采用 push 消费模型呢?带着这些问题,开始阅读源码,本文采用 Pulsar 2.8.0 的源码(实际是 master 分支),因此和之前的 release 版本可能有些许出入。

本文在阅读源码时,会略去一些相对不核心的代码,必须合法性检查/异常处理/错误日志,此时会用 /* … */ 的风格来略去这一部分,而代码分析则统一使用 // 注释风格。

协议

参见 Pulsar 官网文档 Binary Protocol: Consumer

consumer

重点是客户端发送 Flow 请求,然后broker 回复消息。重点是流控的处理。这里先看看 PulsarApi.proto 中的定义(位于 org.apache.pulsar.common.api.proto 包):

1
2
3
4
5
6
7
message CommandFlow {
required uint64 consumer_id = 1;

// Max number of messages to prefetch, in addition
// of any number previously specified
required uint32 messagePermits = 2;
}

除了消费者 id 外,它只需要一个 permits 参数,表示 prefetch(提前获取)的最大消息数量。

再来看看文档的介绍。典型的消费者实现会在应用程序准备消费之前使用队列积累这些消息,在应用程序队列已经入队了半数以上消息时,消费者发送 permits 给 broker 来请求更多的消息(其数量等于队列大小的一半)。

Client

首先给出一份最简单的客户端消费的代码:

1
2
3
4
5
6
7
8
9
10
11
12
try (PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build()) {
Consumer<byte[]> consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-sub")
.subscribe();
Message<byte[]> message = consumer.receive();
// TODO: handle the message
} catch (PulsarClientException e) {
e.printStackTrace();
}
  1. 设置目标主题和订阅名后,调用 subscribe() 方法订阅该主题,创建 Consumer;
  2. consumer 调用 receive() 方法接收消息。

这里先简单介绍一下,客户端的实现代码位于 pulsar-client 模块,而接口定义则位于 pulsar-client-api 模块,一些(broker 和客户端等)公用的类位于 pulsar-common 模块。而各模块均位于同名目录下。

另外 Pulsar 所有的同步调用 API 都只是简单等待异步调用 API(方法名后缀是 Async)返回的 CompletableFuture 对象完成,其返回值为 T。

消费者的创建

创建 PulsarClient 时实际上是创建了 PulsarClientImpl 对象,其中 newConsumer 方法是创建一个 builder 用于链式调用:

1
2
3
4
public ConsumerBuilder<byte[]> newConsumer() {
// 另外从这里也可以看到默认的 consumer 是使用 bytes schema
return new ConsumerBuilderImpl<>(this, Schema.BYTES);
}

其中 builder 的方法就不仔细阅读了,主要是对参数进行必要的验证后设置相应字段,比如必要的是主题名和订阅名,都保存在 conf 中:

1
2
// ConsumerBuilderImpl<T>
private ConsumerConfigurationData<T> conf;
1
2
3
// ConsumerConfigurationData<T>
private Set<String> topicNames = Sets.newTreeSet();
private String subscriptionName;

ConsumerBuilderImpl#subscribeAsync 本身只是对 conf 的一些参数进行合法性校验,对于只设置主题名和订阅名的情况只是验证这两项配置是否存在,最后实际上是调用 PulsarClientImpl#subscribeAsync

1
2
3
return interceptorList == null || interceptorList.size() == 0 ?
client.subscribeAsync(conf, schema, null) : // 默认 intercepto 为 null
client.subscribeAsync(conf, schema, new ConsumerInterceptors<>(interceptorList));
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// PulsarClientImpl<T>
public <T> CompletableFuture<Consumer<T>> subscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
// 进行一些合法性检查(这里略去代码),包括:
// 1. client 状态为 Open
// 2. conf != null
// 3. conf.topicNames 的每个主题名的格式必须合法
// 4. 对于 compacted topic,主题必须为 persistent,订阅模式必须为 Exclusive 或者 Failover
// 5. 对于 ConsumerEventListener,订阅模式必须为 Failover
/* ... */

if (conf.getTopicsPattern() != null) {
// 正则订阅,此时禁止设置具体的主题名字(topicNames)
if (!conf.getTopicNames().isEmpty()){
return FutureUtil
.failedFuture(new IllegalArgumentException("Topic names list must be null when use topicsPattern"));
}
return patternTopicSubscribeAsync(conf, schema, interceptors);
} else if (conf.getTopicNames().size() == 1) {
// 单主题订阅
return singleTopicSubscribeAsync(conf, schema, interceptors);
} else {
// 多主题订阅
return multiTopicSubscribeAsync(conf, schema, interceptors);
}
}

为求简单,还是只看单主题订阅的情况:

1
2
3
4
private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
return preProcessSchemaBeforeSubscribe(this, schema, conf.getSingleTopic())
.thenCompose(schemaClone -> doSingleTopicSubscribeAsync(conf, schemaClone, interceptors));
}

这里的先后顺序是:

  1. 调用 preProcessSchemaBeforeSubscribe,此时会对 schema 进行预处理,必须注册 schema。
  2. 对前一步得到的 schemaClone 传入 doSingleTopicSubscribeAsync

这里关注第二步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();

String topic = conf.getSingleTopic();

// 1. 首先取得 topic 的 metadata(目前仅包含 partitions 字段表示分区数量)
getPartitionedTopicMetadata(topic).thenAccept(metadata -> {
/* debug 日志 ... */
ConsumerBase<T> consumer;
// 从 executor provider(包含一组 executors)中分配一个 executor
ExecutorService listenerThread = externalExecutorProvider.getExecutor();
if (metadata.partitions > 0) {
// 2.1 多分区订阅,创建的是 MultiTopicsConsumerImpl
consumer = MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf,
listenerThread, consumerSubscribedFuture, metadata.partitions, schema, interceptors);
} else {
// 2.2 单分区订阅,创建的是 ConsumerImpl
int partitionIndex = TopicName.getPartitionIndex(topic);
consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, partitionIndex, false,
consumerSubscribedFuture,null, schema, interceptors,
true /* createTopicIfDoesNotExist */);
}

consumers.add(consumer); // 将创建的 consumer 加入到 client 内部的 consumers 中
}).exceptionally(/* 异常处理... */);

return consumerSubscribedFuture;
}

多分区订阅和多主题订阅本质上是一样的,都是使用 MultiTopicsConsumerImpl 管理多个主题(因为 Pulsar 中分区只不过是一个包含后缀 -partition-<n> 的主题)。

这里还是关注单分区订阅,ConsumerImpl.newConsumerImpl(...) 只是将参数原封不动传给其构造方法,构造方法包括了 consumer 内部一些字段的初始化,因此比较长,我们还是只关注重点,那就是最简洁的订阅会和 broker 有什么交互。实际上这部分逻辑在构造方法最后,调用 grabCnx 方法:

1
2
3
void grabCnx() {
this.connectionHandler.grabCnx();
}

连接的建立

connectionHandler(下文简称 connection)的构造:

1
2
3
4
5
6
7
8
9
this.connectionHandler = new ConnectionHandler(this,
new BackoffBuilder()
// 设置 BackOff 所需要的参数,参数对应 client 的配置为:
// initialTime: 默认 100 ms,ClientBuilder#startingBackoffInternal
// max: 默认 60 s,ClientBuilder#maxBackoffInterval
// mandatoryStop:固定为 0 ms
/* ... */
.create(),
this); // 将 ConsumerImpl 对象自身传入 ConnectionHandler 的构造方法

这里简单说下 BackOff 对象。查看 backoff 在 ConnectionHandler 中的使用,可以看到主要是在重连活着关闭连接时用 next() 方法来得到建立连接时对应的 timeout,因为连接对应的是 client,所以这里用的都是 client 的配置。

回到正题,继续看 ConnectionHandler#grabCnx 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected void grabCnx() {
if (CLIENT_CNX_UPDATER.get(this) != null) {
// 已经连接成功了,无视这次调用
/* warn 日志... */
return;
}

if (!isValidStateForReconnection()) {
// 若 connection 状态不可用于重连(比如为 Closed),则无视这次调用
/* info 日志... */
return;
}

try {
// 1. 取得主题对应的连接
state.client.getConnection(state.topic) //
// 2.1 若连接成功,则调用 connectionOpened
.thenAccept(cnx -> connection.connectionOpened(cnx)) //
// 2.2 若连接失败,则调用 handleConnectionError,会进行重连操作
.exceptionally(this::handleConnectionError);
} catch (Throwable t) {
/* warn 日志... */
reconnectLater(t);
}
}

这里还是看看连接成功的处理。注意到 ConsumerImpl 是实现了 Connection 接口的:

1
public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandler.Connection {

然后注意到构造 connection 时用的 Connection 接口就是 ConsumerImpl 对象自己,因此调用的是 ConsumerImpl#connectionOpened

实际上这一小节的内容同样也适用于生产者以及多主题消费者,它们对应的类都实现了 Connection 接口,只需要实现各自的回调即可。

连接成功的回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public void connectionOpened(final ClientCnx cnx) {
if (getState() == State.Closing || getState() == State.Closed) {
/* 执行一些清理工作... */
return;
}
// 绑定 client 到 connection,前面 grabCnx() 检查的 CLIENT_CNX_UPDATER 也会在这里设置
setClientCnx(cnx);

/* info 日志表示准备订阅对应主题... */

long requestId = client.newRequestId();

int currentSize;
synchronized (this) {
// incomingMessages 为客户端缓存收到消息的队列(接收队列),这里先取得其大小
currentSize = incomingMessages.size();
// 清空接收队列,取得第一条消息的 id(也就是消费者第一条没有传给应用程序的消息的 id)
startMessageId = clearReceiverQueue();
// 清空 DLQ
if (possibleSendToDeadLetterTopicMessages != null) {
possibleSendToDeadLetterTopicMessages.clear();
}
}

boolean isDurable = subscriptionMode == SubscriptionMode.Durable;
MessageIdData startMessageIdData = null;
if (isDurable) {
// 对持久化订阅,那么将 startMessageIdData 置为 null,因为 broker 负责告诉客户端重新开始消费的消息 id
startMessageIdData = null;
} else if (startMessageId != null) {
// 对非持久化订阅(常用于 Reader API),则要用我们之前取得的第一条消息的 id
MessageIdData.Builder builder = MessageIdData.newBuilder();
builder.setLedgerId(startMessageId.getLedgerId());
builder.setEntryId(startMessageId.getEntryId());
if (startMessageId instanceof BatchMessageIdImpl) {
builder.setBatchIndex(startMessageId.getBatchIndex());
}

startMessageIdData = builder.build();
builder.recycle();
} // else: 非持久化订阅,但是缓存队列里没有消息

// 取得 schema
SchemaInfo si = schema.getSchemaInfo();
if (si != null && (SchemaType.BYTES == si.getType() || SchemaType.NONE == si.getType())) {
// don't set schema for Schema.BYTES
si = null;
}

// 构造 CommandSubscribe 并发送,具体字段就不贴出来了
ByteBuf request = Commands.newSubscribe(/* ... */);
cnx.sendRequestWithId(request, requestId).thenRun(() -> {
// case 1 发送成功
synchronized (ConsumerImpl.this) {
// 如果 State 是 Uninitialized,Connecting,RegisteringSchema,则将其改为 Ready
if (changeToReadyState()) {
// 将 availablePermits 置为 0
consumerIsReconnectedToBroker(cnx, currentSize);
} else {
// 其它 state 比如 Failed 则代表异常状态,此时将 state 置为 Closed
setState(State.Closed);
deregisterFromClientCnx(); // 将 consumer 和 connection 解绑
client.cleanupConsumer(this); // 将 consumer 从 client 中移除
cnx.channel().close(); // 关闭连接
return;
}
}

// 重制 BackOff,因为之前可能重连过导致 BackOff 内部状态发生变化,
resetBackoff();

// 因为 subscribeFuture 可能多次 complete,因此这里判断是否为第一次 complete
boolean firstTimeConnect = subscribeFuture.complete(this);
// 如果 receiverQueueSize(内部接收队列的大小)大于 0,则代表 consumer 有缓冲区可以接收消息。
// 此时,会将接收队列大小的一半作为 permits 发送 Flow 请求
// 但还有一种情况也不会发送,也就是满足以下三个条件:
// 1. 第一次连接完成
// 2. 当前 consumer 是一个子 consumer
// 3. 订阅类型是持久化(NOTE:这个条件可能是早期错误,见 https://github.com/apache/pulsar/pull/3960)
// 因为当 consumer 是子 consumer 的时候,它归属于 MultiTopicsConsumerImpl,也就是多分区/主题的 consumer
// MultiTopicsConsumerImpl 会在所有子 consumer 连接完成后再调用 startReceivingMessages
// 从而对每个子 consumer 调用 increaseAvailablePermits
if (!(firstTimeConnect && hasParentConsumer && isDurable) && conf.getReceiverQueueSize() != 0) {
increaseAvailablePermits(cnx, conf.getReceiverQueueSize());
}
}).exceptionally((e) -> {/* 异常处理... */});

可以看到连接成功后,主要是发送 CommandSubscribe(订阅命令),broker 处理成功后,consumer 就处于 Ready 状态,并且会发送 Flow 请求携带 permits 为接收队列大小的一半。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected void increaseAvailablePermits(ClientCnx currentCnx, int delta) {
// availablePermits 增加 delta,代表可用的接收队列的数量
int available = AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta);

// 如果超过了阈值(默认一半接收队列大小)且没有调用 pause,则将其清零,并将清零前的值构造 Flow 命令发送
while (available >= receiverQueueRefillThreshold && !paused) {
if (AVAILABLE_PERMITS_UPDATER.compareAndSet(this, available, 0)) {
sendFlowPermitsToBroker(currentCnx, available);
break;
} else {
available = AVAILABLE_PERMITS_UPDATER.get(this);
}
}
}

再看看 Flow 命令的定义和发送:

1
2
3
4
5
6
message CommandFlow {
required uint64 consumer_id = 1;

// 预先拉取的消息数量,也称为 permits
required uint32 messagePermits = 2;
}
1
2
3
4
5
6
7
8
9
10
private void sendFlowPermitsToBroker(ClientCnx cnx, int numMessages) {
// 仅当连接未断开且消息数大于 0 才会发送 Flow 请求,因为 broker 收到 permits 为 0 的请求会直接抛出异常
if (cnx != null && numMessages > 0) {
if (log.isDebugEnabled()) {
/* ... */
} else {
cnx.ctx().writeAndFlush(Commands.newFlow(consumerId, numMessages), cnx.ctx().voidPromise());
}
}
}

Broker 处理

CommandSubscribe 处理

broker 对 TCP 协议的处理位于 org.apache.pulsar.broker.service 包的 ServerCnx 类。对于 consumer 而言,比较独有的就是 CommandSubscribeCommandFlow

因为代码比较长,所以日志相关代码均略过,不特意用 /* xxx... */ 说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
protected void handleSubscribe(final CommandSubscribe subscribe) {
/* 略去相关字段的解析... */

// 验证是否具有 CONSUME 权限
CompletableFuture<Boolean> isAuthorizedFuture = isTopicOperationAllowed(
topicName,
subscriptionName,
TopicOperation.CONSUME
);
isAuthorizedFuture.thenApply(isAuthorized -> {
if (isAuthorized) {
/* 验证 metadata 字段...*/

// 这里的 Consumer 是 broker consumer,对应每个 client consumer
CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId,
consumerFuture);

if (existingConsumerFuture != null) { // 已经创建过 broker consumer
if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
// 创建完成则直接发送成功的响应
commandSender.sendSuccessResponse(requestId);
return null;
} else {
// 之前已经有相同 consumerId 的 subscribe 请求,这是因为 client timeout 小于 broker timeout,
// 因此 client 发生重试,此时需要等待之前的 consumer future 完成
ServerError error = null;
if (!existingConsumerFuture.isDone()) {
// 前一个 subscribe 请求还未完成,直接返回 ServiceNotReady
error = ServerError.ServiceNotReady;
} else {
// 前一个 subscribe 请求异常完成,则返回同样的错误码并将其移除 cache 避免下次重新进入此分支
error = getErrorCode(existingConsumerFuture);
consumers.remove(consumerId, existingConsumerFuture);
}
commandSender.sendErrorResponse(requestId, error,
"Consumer is already present on the connection");
return null;
}
}

// 判断是否自动创建 topic,forceTopicCreation 为 client 填充的字段(NOTE:实际上永远为 true)
// service 对象则是通过配置或者 system topic 的配置来判断是否允许自动创建 topic
boolean createTopicIfDoesNotExist = forceTopicCreation
&& service.isAllowAutoTopicCreation(topicName.toString());

// 当前 broker 获取或创建 Topic 对象
service.getTopic(topicName.toString(), createTopicIfDoesNotExist)
.thenCompose(optTopic -> {
if (!optTopic.isPresent()) {
return FutureUtil
.failedFuture(new TopicNotFoundException(
"Topic " + topicName + " does not exist"));
}

Topic topic = optTopic.get();

// 对于 durable cursor 而言,如果该订阅不存在且不允许订阅自动创建,subscribe 会失败
boolean rejectSubscriptionIfDoesNotExist = isDurable
&& !service.isAllowAutoSubscriptionCreation(topicName.toString())
&& !topic.getSubscriptions().containsKey(subscriptionName);

if (rejectSubscriptionIfDoesNotExist) {
return FutureUtil
.failedFuture(
new SubscriptionNotFoundException(
"Subscription does not exist"));
}

// 若带有 schema 则先检查 schema 兼容性,最后都会调用 Topic#subscribe
if (schema != null) {
return topic.addSchemaIfIdleOrCheckCompatible(schema)
.thenCompose(v -> topic.subscribe(/* ... */));
} else {
return topic.subscribe(/* ... */);
}
})
.thenAccept(consumer -> {
if (consumerFuture.complete(consumer)) {
commandSender.sendSuccessResponse(requestId);
} else {
// 如果 consumerFuture 已经完成,则当前 consumer 是 client timeout 重新创建的 consumer
// 此时需要关闭 consumer 并移除这个 future
try {
consumer.close();
log.info("[{}] Cleared consumer created after timeout on client side {}",
remoteAddress, consumer);
} catch (BrokerServiceException e) {
log.warn(
"[{}] Error closing consumer created"
+ " after timeout on client side {}: {}",
remoteAddress, consumer, e.getMessage());
}
consumers.remove(consumerId, consumerFuture);
}

})
.exceptionally(exception -> {
/* 根据异常严重性打印对应等级的日志... */

// 返回错误码移除订阅失败过程中添加的 consumer
if (consumerFuture.completeExceptionally(exception)) {
commandSender.sendErrorResponse(requestId,
BrokerServiceException.getClientErrorCode(exception),
exception.getCause().getMessage());
}
consumers.remove(consumerId, consumerFuture);
return null;

});
} else { // 鉴权失败
String msg = "Client is not authorized to subscribe";
ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
}
return null;
}).exceptionally(ex -> { // 鉴权抛出异常
commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage());
return null;
});
}

总结下来,处理 subscribe 请求核心调用是:

  • BrokerService#getTopic:获取当前 broker 所拥有(own)的 Topic 对象
  • Topic#subscribe:在 Topic 对象中创建对应的订阅,并得到 Consumer 对象

其中 TopicConsumer 是 broker 端对 topic 和 consumer 的抽象,负责管理对应的资源。均位于 org.apache.pulsar.broker.service 包下。这里我们重点看 PersistentTopic#subscribe

PersistentTopic#subscribe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public CompletableFuture<Consumer> subscribe(/* ... */) {
// 只有 Failover 和 Exclusive 模式才支持读取 compacted topic
if (readCompacted && !(subType == SubType.Failover || subType == SubType.Exclusive)) {
return FutureUtil.failedFuture(new NotAllowedException(
"readCompacted only allowed on failover or exclusive subscriptions"));
}

// 通过 NamespaceService 检查 topic owner 是否为当前 broker,若不是则该 future 会以 ServiceUnitNotReady 异常完成
return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> {
/* 进行一系列检查... (这里略去具体代码,仅用注释说明) */
// 1. 若 broker 未启用订阅复制,则仅仅是打印 warn 日志,防止 broker 禁止跨地域复制后 consumer 订阅失败
// 2. 检查 broker 是否支持 Key_Shared 订阅模式
// 3. 检查非 system topic(以 __change_events 结尾)的 topic 级别配置是否支持该订阅类型
// 4. 检查订阅名是否为空
// 5. 检查协议是否支持 batch 消息
// 6. 禁止对前缀是跨地域复制的前缀(默认 pulsar.repl)或者 pulsar.dedup 创建订阅

// 用连接地址作为 key,检查 consumer 对应的 RateLimiter 是否存在,限制重连次数,因为重连的连接地址是相同的
// 参考 https://github.com/apache/pulsar/pull/2977
if (cnx.clientAddress() != null && cnx.clientAddress().toString().contains(":")) {
SubscribeRateLimiter.ConsumerIdentifier consumer = new SubscribeRateLimiter.ConsumerIdentifier(
cnx.clientAddress().toString().split(":")[0], consumerName, consumerId);
if (subscribeRateLimiter.isPresent() && (!subscribeRateLimiter.get().subscribeAvailable(consumer)
|| !subscribeRateLimiter.get().tryAcquire(consumer))) {
/* warn 日志... */
return FutureUtil.failedFuture(
new NotAllowedException("Subscribe limited by subscribe rate limit per consumer."));
}
}

lock.readLock().lock();
try {
// 当 topic 被删除或关闭,或者 producer 写消息失败时都会标记为 fence 状态,此时禁止订阅
if (isFenced) {
log.warn("[{}] Attempting to subscribe to a fenced topic", topic);
return FutureUtil.failedFuture(new TopicFencedException("Topic is temporarily unavailable"));
}
// 实际上是增加 usageCount(连接的 producer 和 consumer 总数),传参数是为了打印 debug 日志
handleConsumerAdded(subscriptionName, consumerName);
} finally {
lock.readLock().unlock();
}

// 获取(或创建)订阅
CompletableFuture<? extends Subscription> subscriptionFuture = isDurable ? //
getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec,
replicatedSubscriptionState)
: getNonDurableSubscription(subscriptionName, startMessageId, initialPosition,
startMessageRollbackDurationSec);

// 对于持久化订阅,可以获取最大的未确认的消息
int maxUnackedMessages = isDurable
? getMaxUnackedMessagesOnConsumer()
: 0;

CompletableFuture<Consumer> future = subscriptionFuture.thenCompose(subscription -> {
// 创建 Consumer 对象并加入到对应的订阅中
Consumer consumer = new Consumer(/* ... */);
return addConsumerToSubscription(subscription, consumer).thenCompose(v -> {
checkBackloggedCursors();
if (!cnx.isActive()) {
// 连接已经断开,则需要关闭 consumer
try {
consumer.close();
} catch (BrokerServiceException e) {
/* ... */
// 减少 usageCount,对应 handleConsumerAdded
decrementUsageCount();
return FutureUtil.failedFuture(e);
}

decrementUsageCount();
return FutureUtil.failedFuture(
new BrokerServiceException("Connection was closed while the opening the cursor "));
} else {
// 连接仍存活,则继续检查复制订阅的状态,至此完成整个订阅
checkReplicatedSubscriptionControllerState();
return CompletableFuture.completedFuture(consumer);
}
});
});

future.exceptionally(ex -> {
decrementUsageCount();
/* 打印日志... */
return null;
});
return future;
});
}

核心流程:

  1. 获取 Subscription(broker 对订阅的抽象),若不存在则创建。
  2. 创建 Consumer(broker 对消费者的抽象)后加入订阅。

这里不进一步阅读 Subscription 代码。简单说,在创建订阅时,对于持久化(durable)订阅(PersistentSubscription),创建时会打开一个 cursor,对应于 PersistentTopic 内部的 ledger,若 cursor 不存在则创建;对于非持久化(non-durable)订阅(NonPersistentSubscription),则是直接内存中维护消息 id(MessageIdImpl)。两者最大的区别是,持久化订阅在 cursor 已存在时,直接打开 cursor,而无视掉 CommandSubscribe 中的消息 id。

至于将 consumer 加入订阅,实际上是根据订阅类型创建对应的 dispatcher(Dispatcher),这里以默认的 Exclusive 订阅为例:

1
2
3
4
5
6
7
8
9
10
11
switch (consumer.subType()) {
case Exclusive:
if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) {
previousDispatcher = dispatcher;
dispatcher = useStreamingDispatcher
? new PersistentStreamingDispatcherSingleActiveConsumer(
cursor, SubType.Exclusive, 0, topic, this)
: new PersistentDispatcherSingleActiveConsumer(
cursor, SubType.Exclusive, 0, topic, this);
}
break;

注:https://github.com/apache/pulsar/pull/9056 引入了 streaming dispatcher。

然后将 consumer 加入 dispatcher:

1
2
3
4
5
6
try {
dispatcher.addConsumer(consumer);
return CompletableFuture.completedFuture(null);
} catch (BrokerServiceException brokerServiceException) {
return FutureUtil.failedFuture(brokerServiceException);
}

订阅里实际负责消息分发的就是 dispatcher。

CommandFlow

首先是 ServerCnx 第一步处理:

1
2
3
4
5
6
7
8
9
10
11
12
protected void handleFlow(CommandFlow flow) {
checkArgument(state == State.Connected); // 必须是 Connected 状态
CompletableFuture<Consumer> consumerFuture = consumers.get(flow.getConsumerId());

// 如果 consumer 已经创建成功,则直接交给 Consumer 处理
if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
if (consumer != null) {
consumer.flowPermits(flow.getMessagePermits());
}
}
}

然后递交给 Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void flowPermits(int additionalNumberOfMessages) {
checkArgument(additionalNumberOfMessages > 0); // permits 必须大于 0

// 1. 对于只支持 individual ack 的订阅类型,检查没有确认的(unacked)消息数量
if (shouldBlockConsumerOnUnackMsgs() && unackedMessages >= maxUnackedMessages) {
blockedConsumerOnUnackedMsgs = true;
}
int oldPermits;
if (!blockedConsumerOnUnackedMsgs) {
// 1.1 正常情况下,增加 messagePermits,然后交给 subscription 处理
oldPermits = MESSAGE_PERMITS_UPDATER.getAndAdd(this, additionalNumberOfMessages);
subscription.consumerFlow(this, additionalNumberOfMessages);
} else {
// 1.2 如果 unacked 消息数量超过限制,则更新 permitsReceivedWhileConsumerBlocked 记录因此阻塞的 flow permits
oldPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndAdd(this, additionalNumberOfMessages);
}
}

可见,更新完 Consumer 内部的 messagePermits 后,交由对应的 Subscription 对象处理。这里仅看 PersistentSubscription 的实现:

1
2
3
4
public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
this.lastConsumedFlowTimestamp = System.currentTimeMillis();
dispatcher.consumerFlow(consumer, additionalNumberOfMessages);
}

仅仅是传递给 dispatcher。因此实际的处理是由 dispatcher 完成的。至于 dispatcher 的实现就留给下一篇进行讨论。

总结

本文梳理了 client 和 broker 创建消费者订阅某个 topic 的流程。Client 端的实现包含了一些和生产者通用的部分,也就是建立连接的部分。连接成功后的回调,消费者会先发一个 subscribe 命令注册自己,注册成功后发送 flow 请求,携带 permits,其值为内部缓冲区大小的一半(对于无缓冲区的 zero queue consumer 例外)。

Broker 端处理相对复杂,但本质是对一些概念进行了抽象。消费者对应 Consumer,每个消费者则包含一个订阅 Subscription。而 topic 对应 Topic,消费者发送 subscribe 命令时会在 Topic 里创建 Subscription,因为每个 topic 可以对应多个订阅,创建完成后根据消费者的订阅类型创建对应的 Dispatcher。通常意义上一个订阅可以对应多个消费者,但实际上真正维护这组消费者的不是 Subscription 而是 Dispatcher,订阅本身更多的是维护 cursor(消费进度),比如持久化订阅(PersistentSubscription)就会创建一个 durable cursor。而 client 端的 Flow 请求,最终也是 Dispatcher 来处理的。

最后解答最初提出的问题,push 和 pull 的区别,以及为什么 Pulsar 是 push 消费模型。Kafka 的消费者是发送 FETCH 请求给 broker,然后 broker 对应的 FETCH 响应里包含读取的消息。而 Pulsar 的消费者发送的是 Flow 请求,它本身是没有对应的 FETCH 响应的。Client 会主动告知 broker 自己可以缓存多少条消息,broker 根据这个提示可以灵活定制 dispatcher,然后主动发消息给 client。因此 Pulsar 的这种 push 模型,实际上是由服务端(broker)来进行流量控制。两者本质区别就是流控到底是 client 还是 server 处理的。

Netty-and-NIO-buffer

[toc]

简述

目的是比较 NIO buffer(java.nio.ByteBuffer)和 Netty buffer(io.netty.buffer.ByteBuf)设计上的区别。因为刚好需要用 Netty 的 ByteBufAllocator 去重写 Kafka 的基于 NIO buffer 的 ByteBufferOutputStream,所以借此机会学习下两者的区别。

NIO buffer

参考文档:https://docs.oracle.com/javase/8/docs/api/java/nio/ByteBuffer.html

NIO buffer 的设计很简单,它是定长的,也就是必须在构造时指定容量,且不可扩容,比如:

1
ByteBuffer buffer = ByteBuffer.allocate(4); // 分配 4 字节容量堆内内存

每个 NIO buffer 有四个属性:

  • mark:记录的位置
  • position:当前读写位置
  • limit:可读写字节的上界,初始值是 capacity
  • capacity:容量

这里暂时忽略 mark 属性。那么上述代码中,buffer 构造完成后各下标为:

1
position: 0, limit: 4, capacity: 4

可以用以下方法打印出来:

1
2
3
4
private static void printMetadata(final ByteBuffer buffer) {
System.out.println("position: " + buffer.position() + ", limit: " + buffer.limit()
+ ", capacity: " + buffer.capacity());
}

由于 NIO buffer 是不可扩容的,所以理论上 capacity 不会改变。而 position()limit() 方法都有另一个重载形式,可以接受一个 int 参数指定新的值。设置 limit 的场景大多是为了截断一个 buffer,比如:

1
2
3
4
5
6
final ByteBuffer buffer = ByteBuffer.allocate(4);
buffer.putInt(0x11223344); // 0x11, 0x22, 0x33, 0x44
printMetadata(buffer); // position: 4, limit: 4, capacity: 4
buffer.position(1).limit(3);
printMetadata(buffer); // position: 1, limit: 3, capacity: 4
System.out.println(Integer.toHexString(buffer.getShort())); // 2233

注:这里的设计没有采用 setPosition()setLimit() 这样的命名方式,但是返回的都是 ByteBuffer,因此支持上述代码的链式调用方式。

NIO buffer 提供 putget 方法,两者都针对不同类型提供了多种类似方法,但大体分为两类,一类是不带 index 的,也就是直接从末尾写入或读取,一类是带 index 的,从 index 对应位置写入和读取。

limit 属性则是 get 读写的上界,如果即将读取的字节位置已经超过了 limit,那么会抛出 BufferOverflowException(因为打破了 position <= limit 的约束),比如:

1
2
3
4
5
final ByteBuffer buffer = ByteBuffer.allocate(2);
buffer.put((byte) 0x11);
printMetadata(buffer); // position: 1, limit: 2, capacity: 2
// 写入的位置是 position + short size = 1 + 2 = 3 > limit,因此抛出异常
buffer.putShort((short) 0x2233);

无论是 get 还是 put,都会改变 position,因为它们共用一个下标如果往末尾写入了新数据则会改变 limit。这里需要注意的是,读写共用一个下 position,这会导致一些反直觉的行为,比如:

1
2
3
final ByteBuffer buffer = ByteBuffer.allocate(1);
buffer.put((byte) 0x11);
System.out.println(buffer.get()); // BufferUnderflowException

直觉上第一次 get() 应该从 position 0 开始,但这里其实是从 position 1 开始,因为 put 改变了 position。如果要从 position 0 开始,只能在 get() 之前手动调用 buffer.position(0) 或者 buffer.rewind()

这里提到 rewind,它和 position(0) 的唯一区别是它将 ByteBuffermark 字段设置为了 -1。

现在看看 mark,NIO buffer 永远满足约束: mark <= position <= limit <= capacity。读/写都会改变 position,写入可能改变 limit,而 capacity 无法改变。mark 属性仅在 mark() 方法中会设置为 -1 之外的值:

1
2
3
4
public final Buffer mark() {
mark = position;
return this;
}

也就是记录当前的 position,主要用途是在 put 写入之前记下位置。mark() 方法要结合 reset() 方法使用:

1
2
3
4
5
6
7
public final Buffer reset() {
int m = mark;
if (m < 0) // 必须 mark() 设置过合法的 position,否则会抛出异常
throw new InvalidMarkException();
position = m;
return this;
}

这样可以方便回滚到写之前的位置进行读取,比如:

1
2
3
4
5
6
7
8
9
final ByteBuffer buffer = ByteBuffer.allocate(2);
buffer.mark();
buffer.put((byte) 0x11);
buffer.reset();
System.out.println(Integer.toHexString(buffer.get())); // 11
buffer.mark();
buffer.put((byte) 0x22);
buffer.reset();
System.out.println(Integer.toHexString(buffer.get())); // 22

如果是单纯的先写后读的场景,可以在写结束后调用 flip()

1
2
3
4
5
6
public final Buffer flip() {
limit = position; // 禁止写入更多字节,除非重新设置 limit
position = 0; // 读写重新从 position 0 开始
mark = -1;
return this;
}

由于读写都会改变 position,因此如果想要读写的同时不改变 position,可以调用 duplicate() 方法创建一个 buffer 共享数据,但是拥有独立的 position 以及其他属性,这些属性的初始值都是调用时原 buffer 的相应值。也可以用 array() 直接获取底层数组(下标 0 到 capacity)。

Netty buffer

参考文档:https://netty.io/4.1/api/io/netty/buffer/ByteBuf.html

创建

Netty buffer 支持使用 Netty 的 allocator 来分配内存,具体定制可参考 Netty ByteBufAllocator。也可以直接通过现有的 NIO buffer 或者字节数组构造。

1
2
3
final ByteBuf buf1 = Unpooled.wrappedBuffer(new byte[2]);
final ByteBuf buf2 = Unpooled.wrappedBuffer(ByteBuffer.allocate(2));
final ByteBuf buf3 = ByteBufAllocator.DEFAULT.buffer(2);

其中,使用 wrappedBuffer 构造时,writer index 均为原来的 buffer/数组的大小。

读写位置分离

NIO buffer 的设计很简单,但是由于读写共用一个 position,导致使用起来不是很符合直觉,比如写完之后要重新调用 position() 设置位置,或者结合 mark()reset(),或者 flip() 等等。

而 Netty buffer 则将读写位置(这里称为 index 而非 position)分离了,见下图。

1
2
3
4
5
6
+-------------------+------------------+------------------+
| discardable bytes | readable bytes | writable bytes |
| | (CONTENT) | |
+-------------------+------------------+------------------+
| | | |
0 <= readerIndex <= writerIndex <= capacity
  • Discardable bytes:已经读取的区域
  • Readable bytes:数据实际存储的位置
  • Writable bytes:待填满的区域

Netty buffer 将读写分为了两套 API,read/write 系列方法类似于 NIO buffer 的 get/put,读写成功会更新对应的 index。同时还支持指定 index 的 get/set 系列方法用于随机读写,不会更新 index。另外,无论是 readerIndex 还是 writerIndex,Netty buffer 也提供了相应的 mark/reset 系列方法。

1
2
3
4
5
final ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(2);
buf.writeByte(1); // readerIndex: 0, writerIndex: 1
buf.writeByte(2); // readerIndex: 0, writerIndex: 2
buf.readByte(); // readerIndex: 1, writerIndex: 2
buf.readByte(); // readerIndex: 1, writerIndex: 2

扩容

NIO buffer 本质是对定长数组的包装,因此 capacity 是无法增长的,但是 Netty buffer 实现了自动扩容。这里可以先写个代码看看它的扩容策略。

1
2
3
4
5
6
final ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1);
for (int i = 0; i < 500; i++) {
buf.writeByte(1);
System.out.println(i + " address: " + buf.memoryAddress() + ", capacity: " + buf.capacity());
}
buf.release();

可以看到输出是:

1
2
3
4
5
6
7
8
9
10
11
0 address: 140495611232256, capacity: 1
1 address: 140495611232256, capacity: 16
...
16 address: 140495611240448, capacity: 64
...
64 address: 140495611248640, capacity: 128
...
128 address: 140495611256832, capacity: 256
...
256 address: 140495611265024, capacity: 512
...

这里不深究具体扩容策略,姑且看到 capacity 到达 64 之后就开始两倍扩容。这里主要看到,每次扩容之后内存地址发生了改变,对于连续存储的数据结构而言,基本都是这个实现套路。

但是,注意这里我们是用 ByteBufAllocator 进行分配的,如果是字节数组或者 NIO buffer 的 wrapper,由于底层是定长数组,因此 Netty buffer 无法获取其分配器,自然地,也不知道扩容时用哪个分配器才能跟原来的分配方式一致,因此这种情况是不允许扩容的。

1
2
3
4
5
6
7
8
9
10
11
final ByteBuf buf = Unpooled.wrappedBuffer(ByteBuffer.allocate(100));
buf.writerIndex(0);
for (int i = 0; i < 200; i++) {
try {
buf.writeByte(1);
} catch (IndexOutOfBoundsException e) {
System.out.println("i = " + i);
break;
}
}
buf.release();

输出 i = 100,也就是容量到达 100 时就无法继续写入了。

引用计数

需要注意的是每个 Netty buffer 维护了一个引用计数,在调用 retain() 的时候自增,在 release() 的时候自减,使用 allocator 分配的时候初始为 1,只有引用计数降为 0 才会回收内存,否则会出现内存泄漏。由于在 Java 中,经常是将 buffer 传递给其它对象后就不再使用了,因此惯用法是最后一个使用 buffer 对象负责 release。

Netty 可以设置内存泄漏的检测器,详细用法参考 ResourceLeakDetector,它可以设置检测等级,默认是 Disabled。这里给出一个示例检测全局资源泄漏。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void leakDemo(final ByteBufAllocator allocator) {
final ByteBuf buf = allocator.buffer(1024 * 1024);
buf.writeByte(1);
buf.readByte();
// NOTE: not released
//buf.release();
}

public static void main(String[] args) throws InterruptedException {
// 等价于加上 JVM 选项 -Dio.netty.leakDetectionLevel=paranoid
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
final ByteBufAllocator allocator = new PooledByteBufAllocator(false /* preferDirect */);
for (int i = 0; ; i++) {
leakDemo(allocator);
Thread.sleep(100);
}
}

可以看到输出:

1
2
3
4
5
2021-08-20 23:15:55:264 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
#1:
io.netty.buffer.AdvancedLeakAwareByteBuf.readByte(AdvancedLeakAwareByteBuf.java:400)
...

从而方便定位泄漏位置。

需要注意的是 paranoid 时最高的检测等级,如果改成 simple 或 advanced,上述错误不会报出来。另外如果 preferDirect 改为 true 或者使用默认的 allocator,这里也检测不出来。原因可以参考回答:https://stackoverflow.com/questions/28822632/netty-4-5-does-not-actually-detect-resource-leak-of-bytebuf

原因简单说就是 Netty 的资源泄漏检测机制依赖于 ReferenceQueuePhantomReference,如果 VM 太早终止或者 GC 不够快,那么检测器无法判断是否泄漏

总结

本文从 NIO buffer 入手,学习了 Netty buffer 的一些改进,包括读写位置分离,自定义分配器,自动扩容。同时还要注意 Netty buffer 相比 NIO buffer 而言需要谨慎地管理内存,同时还可以用 Netty 提供的检测器检测资源泄漏。

个人认为 Netty 的最大优点在于它提供的池化分配器,可以安全地复用堆外内存,减少了 GC 的同时避免了从系统的堆内存到 JVM 堆的拷贝。因此 Pulsar 无论是 broker 还是 client 在分配内存保存消息时,都是使用的 Netty 的分配器。

再探 CompletableFuture

[toc]

前言

上次在 Github 上面写博客都是大半年前了,这段时间一直相对较忙,没抽出时间来认真写博客,大多只是一些零散的笔记记录在个人的有道云笔记上,这次姑且尝试继续写博客。

在之前的博客 Java CompletableFuture 学习 中,我们了解了 CompletableFuture 用于异步编程的基本套路:

  • 使用 supplyAsyncExecutorService 中启动异步任务并返回 future
  • 使用 thenApply / thenCompose / exceptionally 方法进行 future 之间的链式映射
  • 使用 whenComplete 方法来指定异步任务完成时的回调

然而在这段时间阅读 PulsarKoP 的源码过程中,发现其实对 CompletableFuture 的使用还是存在一些误区的,本文将个人的一些经验整合一下。

本文的示例从一个简单的例子开始(其中 print 方法会在后文中复用):

1
2
3
4
5
6
7
8
9
10
11
private static void print(final String msg) {
System.out.println(Thread.currentThread().getName() + " " + msg);
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
final CompletableFuture<Integer> intFuture = CompletableFuture.supplyAsync(() -> {
print("in supplyAsync()");
return 0;
});
print("intFuture returns: " + intFuture.get());
}

输出结果:

1
2
ForkJoinPool.commonPool-worker-9 in supplyAsync()
main intFuture returns: 0

可见 supplyAsync 默认用的是 ForkJoinPool 这个 Java 线程池。由于本人是半路出家的 Javaer,这里先记下这个知识点,之后再系统学习下。

thenApply/exceptionally vs. whenComplete

针对不会异常完成的 future

这个其实很多人都用错了使用场景,因为大多数时候,其实我们只是想给 future 加一个回调,并不想继续链式调用。其实这种简单场景下,whenComplete 才是应该用的。比如,我们想对奇数进行操作,偶数则抛出异常,可以是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final CompletableFuture<Integer> firstFuture = CompletableFuture.supplyAsync(() -> {
final int x = new Random().nextInt();
print("in firstFuture, x = " + x);
return x;
});
final CompletableFuture<Integer> secondFuture = new CompletableFuture<>();
firstFuture.whenComplete((integer, throwable) -> {
assert throwable != null;
print("in secondFuture, firstFuture returns " + integer);
if (integer % 2 != 0) {
secondFuture.complete(integer / 2);
} else {
secondFuture.completeExceptionally(new Exception("firstFuture returns an even number: " + integer));
}
});
try {
print("secondFuture returns: " + secondFuture.get());
} catch (ExecutionException e) {
print("secondFuture failed: " + e.getCause());
}

以下是两种典型输出:

1
2
3
ForkJoinPool.commonPool-worker-9 in firstFuture, x = -1634409183
main in secondFuture, firstFuture returns -1634409183
main secondFuture returns: -817204591
1
2
3
ForkJoinPool.commonPool-worker-9 in firstFuture, x = 1243845674
main in secondFuture, firstFuture returns 1243845674
main secondFuture failed: java.lang.Exception: firstFuture returns an even number: 1243845674

这里我们可以先留意一下,firstFuturewhenComplete 方法是在主线程内执行的。

但实际上由于 firstFuture 永远不会异常结束,因此这个时候其实可以简化 secondFuture 的构造为:

1
2
3
4
5
6
7
8
9
final CompletableFuture<Integer> secondFuture = firstFuture.thenApply(integer -> {
print("in secondFuture, firstFuture returns " + integer);
if (integer % 2 != 0) {
return integer / 2;
} else {
// 注意 thenApply 内部只能抛出 unchecked exception
throw new RuntimeException("firstFuture returns an even number: " + integer);
}
});

此时两者本质上是一样的,但是 thenApply 更简洁。而且其实可以将上述代码简化为:

1
final CompletableFuture<Integer> future = CompletableFuture.supplyAsync(/* ... */).thenApply(/* ... */);

针对可能异常完成的 future

TL; DR 使用 whenComplete 就是最好的,不要链式使用 thenApplyexceptionally

假如 firstFuture 就在偶数的时候抛出异常:

1
2
3
4
5
6
7
8
9
final CompletableFuture<Integer> firstFuture = CompletableFuture.supplyAsync(() -> {
final int x = new Random().nextInt();
print("in firstFuture, x = " + x);
if (x % 2 != 0) {
return x;
} else {
throw new RuntimeException("firstFuture returns an even number: " + x);
}
});

那么基于 whenComplete 实现的 secondFuture 变成了:

1
2
3
4
5
6
7
8
9
final CompletableFuture<Integer> secondFuture = new CompletableFuture<>();
print("in secondFuture, firstFuture returns " + integer);
firstFuture.whenComplete((integer, throwable) -> {
if (throwable != null) {
secondFuture.completeExceptionally(throwable);
} else {
secondFuture.complete(integer / 2);
}
});

这个实现和上一节的是等价的。但此时,如果基于 thenApply,很多人会进一步用 exceptionally 进行两层链式调用:

1
2
3
4
5
6
7
final CompletableFuture<Integer> secondFuture = firstFuture.thenApply(integer -> {
print("in secondFuture thenApply, x = " + integer);
return integer;
}).exceptionally(e -> {
print("in secondFuture exception: " + e);
return null;
});

注意,这其实是错误的实现,并且和基于 whenComplete 的实现不是等价的。原因在于 exceptionally,注意我们返回的是 null,因此 secondFuturefirstFuture 异常完成时,是返回 null 而非异常完成。

比如下面这组 firstFuture 异常完成时的输出:

1
2
3
ForkJoinPool.commonPool-worker-9 in firstFuture, x = 784152994
main in secondFuture exception: java.util.concurrent.CompletionException: java.lang.RuntimeException: firstFuture returns an even number: 784152994
main secondFuture returns: null

这里有两个重点:

  1. secondFuture 是以 null 正常完成(而不是继续以 firstFuture 的异常正常完成);
  2. exceptionallye 的类型不是 RuntimeException,而是 CompletionException,它的 cause 才是 RuntimeException

第一点我们刚才提到了,是 return null 导致的,第二点则尤为致命。之前 Pulsar 这边有个问题是,大量的 HTTP 处理的错误码都是 500 internal error,而不是正确的错误码,原因就在于大量处理都是这样的:

1
2
3
4
5
6
7
final CompletableFuture<Object> future = new CompletableFuture<>();
asyncDoSomething()
.thenApply(result -> future.complete(result))
.exceptionally(e -> {
future.completeExceptionally(e);
return null;
};

而在之后处理 future 的异常时,直接把 e 当成 e.getCause() 的类型来使用了。

thenApply 的实现

造成上一节里 exceptionallye 不再是最初 future 的异常,而是包裹后的 CompletionException 的原因其实很简单,那就是某个 CompletableFuture 调用 thenApply 返回后的 CompletableFuture,已经不是它自己了。这里我们简单看下源码。

1
2
3
4
5
6
7
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {

public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
// 这里 fn 即我们传入的 lambda 表达式,接收 T 类型的参数,返回 U 类型
return uniApplyStage(null, fn);
}

注意 thenApply 其实是将 CompletableFuture<T> 映射到 CompletableFuture<U> 的,因此很显然前后两个 future 类型不一样。那为什么不继续传播前一个 future 的异常呢?继续看下去。

1
2
3
4
5
6
7
8
9
10
11
12
13
private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T,? extends V> f) {
// f 即用户传入的 T -> U 的 lambda 表达式,这里泛型参数是 V 而不是 U
if (f == null) throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
// 注意这里的 e 是 Executor 对象,而不是异常,因为 f 并没有执行
if (e != null || !d.uniApply(this, f, null)) {
UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
push(c);
c.tryFire(SYNC); // SYNC 是常量 0
}
return d;
}

构造了 UniApply 对象,然后调用了 tryFire 方法:

1
2
3
4
5
6
7
8
final CompletableFuture<V> tryFire(int mode) {
CompletableFuture<V> d; CompletableFuture<T> a;
if ((d = dep) == null ||
!d.uniApply(a = src, fn, mode > 0 ? null : this))
return null;
dep = null; src = null; fn = null;
return d.postFire(a, mode);
}

对返回的 future d 调用了 uniApply 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final <S> boolean uniApply(CompletableFuture<S> a,
Function<? super S,? extends T> f,
UniApply<S,T> c) {
Object r; Throwable x;
if (a == null || (r = a.result) == null || f == null)
return false;
tryComplete: if (result == null) {
if (r instanceof AltResult) {
/* ... */
}
try {
if (c != null && !c.claim())
return false;
@SuppressWarnings("unchecked") S s = (S) r;
// 注意,在这里执行我们传入的 lambda 表达式 f
completeValue(f.apply(s));
} catch (Throwable ex) {
// 如果抛出异常,则调用 `completeThrowable` 方法
completeThrowable(ex);
}
}
return true;
}

终于看到捕获异常的位置了,最后我们只需要看看 completeThrowable 做了什么。

1
2
3
4
5
6
7
8
9
10
11
static AltResult encodeThrowable(Throwable x) {
// 用 x 作为 cause 构造 CompletionException,这里还进行了类型判断避免 CompletionException 嵌套
return new AltResult((x instanceof CompletionException) ? x :
new CompletionException(x));
}

final boolean completeThrowable(Throwable x) {
// 使用 native CAS 方法将偏移量 RESULT 对应的字段(其实就是 result)设置为 encodeThrowable(x),如果 result 之前是 null
return UNSAFE.compareAndSwapObject(this, RESULT, null,
encodeThrowable(x));
}

至此,我们知道为什么 thenApply 返回的 future 若异常完成,其异常是 CompletionException 而非原来的 future 的异常了。这里多说一句,设置的 result 字段为:

1
volatile Object result;       // Either the result or boxed AltResult

也就是 get() / getNow() 等方法会尝试返回的字段。进一步的代码阅读就不继续了,总之,我们已经知道了,并且知道为什么,将 thenApplyexceptionally 结合起来并不是一种合适的取代 whenComplete 的方法。

虽然我们一般用 thenApply 的返回值比较多,但实际上 exceptionallywhenComplete 也有返回值。

1
2
3
4
5
6
7
8
9
public CompletableFuture<T> exceptionally(
Function<Throwable, ? extends T> fn) {
return uniExceptionallyStage(fn);
}

public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}

初学者一个常见的使用错误或困惑是,在 exceptionally 末尾忘记去 return null,因为在 whenComplete 里不用 return。从方法签名就能看出来,原因是 exceptionally 接收的是 (Throwable) -> T 的函数,而 whenComplete 接收的是 (T, Throwable) -> void 的函数,没有返回值。而由于 exceptionally 接收的参数里没有 T 类型,因此只能返回一个常量,绝大多时候就是 null 了。

当然,它们返回的虽然和当前 future 对象是一个类型: CompletableFuture<T>,但实际上已经不是一个对象了。依然是举个例子:

1
2
3
4
5
6
7
8
9
10
11
CompletableFuture.supplyAsync(() -> {
final int x = new Random().nextInt();
print("supplyAsync, x = " + x);
if (x % 2 != 0) {
return x;
} else {
throw new RuntimeException("firstFuture returns an even number: " + x);
}
}).whenComplete((integer, throwable) -> {
print("whenComplete integer=" + integer + ", throwable=" + throwable);
}).get();

两个示例输出:

1
2
ForkJoinPool.commonPool-worker-9 supplyAsync, x = 1489321475
main whenComplete integer=1489321475, throwable=null
1
2
ForkJoinPool.commonPool-worker-9 supplyAsync, x = -1124011744
main whenComplete integer=null, throwable=java.util.concurrent.CompletionException: java.lang.RuntimeException: firstFuture returns an even number: -1124011744

我们可以注意到由于 get() 方法是 whenComplete 返回的 future 而不是原来的 future 调用的,抛出异常时,也是 CompletionException,其中的 cause 才是真正的异常。这里就不去看源码了,其实可以猜到,处理和 thenApply 是类似的。

getNow vs. thenApply/whenComplete

有一些场景是,判断当前 future 是否已经完成,若完成则直接处理结果,否则加入队列等待。亦或者,是用 CompletableFuture.all 来等待多个 future 结束时再做处理,比如:

1
2
3
4
5
6
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException ignored) {
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
final CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
print("future1 returns");
return 1;
});
final CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
sleep(1200);
print("future2 returns");
return 2;
});
// allOf 返回的是 CompletableFuture<Void>,而非 Scala 里 Future.sequence 那样返回一个 List 的 future
CompletableFuture.allOf(future1, future2).get();
print(future1.getNow(null) + ", " + future2.getNow(null));

示例输出:

1
2
3
ForkJoinPool.commonPool-worker-9 future1 returns
ForkJoinPool.commonPool-worker-2 future2 returns
main 1, 2

这里我们用的 getNow,实际上用 get 方法也一样,因为此时 future 已经完成了,get 会立刻返回。但由于 get 的方法签名中包含 checked exception,因此需要你捕获。

实际上 getNow 从表达上也不太好,因为 future 已经完成,这个默认值 null 其实是多余的。从语义表达的角度,在这里不对 getNow(null) 的返回值进行 null 检查的话,就得在注释中写明此时保证 future 已经完成。但实际上有可能前面 allOf 写错了,比如写漏了一个 future,导致其实有的 future 在这里是并没有完成的,那么排查问题就变得麻烦。

有时候在这里也会选择 join 方法,它和 get 的不同在于,future 异常完成时,它抛出的是 unchecked exception,这样就不需要捕获。

但实际上其实也可以用 thenApply

1
2
3
4
5
6
7
future1.thenApply(result1 -> {
future2.thenApply(result2 -> {
print(result1 + ", " + result2);
return null;
});
return null;
});

虽然在这里略显麻烦,并且出现了 future 嵌套。但其实反而这种嵌套从表达上更好,因为这里我们的处理,result2 是依赖于 result1 的,这种嵌套表达了依赖关系。

另外,对于已经完成的 future,thenApply 的处理全部是在主线程进行的。同理,whenComplete 也是。因此假如我们在对一个已经完成的 future 进行处理时,可以直接用 whenComplete

现在来看看另一个场景,那就是在 future 完成之前这么做会怎样?

1
2
3
4
5
6
7
8
9
10
11
final CompletableFuture<Void> future = new CompletableFuture<>();
future1.thenApply(result1 -> {
print("result1: " + result1);
future2.thenApply(result2 -> {
print(result1 + ", " + result2);
future.complete(null);
return null;
});
return null;
});
future.get();

示例输出:

1
2
3
4
ForkJoinPool.commonPool-worker-9 future1 returns
ForkJoinPool.commonPool-worker-9 result1: 1
ForkJoinPool.commonPool-worker-2 future2 returns
ForkJoinPool.commonPool-worker-2 1, 2

可见每个 future 的回调都是接着当前 future 所在线程的。然而,假如我们调整一下,让 future2 只 sleep 200 ms(也就是说 future2 在 future1 之前完成),输出变成了:

1
2
3
4
ForkJoinPool.commonPool-worker-2 future2 returns
ForkJoinPool.commonPool-worker-9 future1 returns
ForkJoinPool.commonPool-worker-9 result1: 1
ForkJoinPool.commonPool-worker-9 1, 2

一个显著变化是,future2 的回调不再是在 future2 所在线程执行,而是在 future1 所在线程执行。原因其实和刚才的一样,在 future1 回调中去给 future2 再注册新的回调时,future2 已经完成了,因此 future2 的回调直接在 当前线程 执行。其实这个原理在前面贴的代码中 uniApply 中实现的:

1
2
3
4
5
6
7
8
final <S> boolean uniApply(CompletableFuture<S> a,
Function<? super S,? extends T> f,
UniApply<S,T> c) {
Object r; Throwable x;
// 如果当前 future 未完成,a.result 则为 null,此时 uniApply 会返回 false,f 被加入队列
if (a == null || (r = a.result) == null || f == null)
return false;
// 如果 a.result != null,则后面会调用 completeValue 执行 f.apply,这里就不重复贴代码了

至此可见使用 thenApply 注册回调,比起 getNow 等方法更灵活,而且在 future 已经完成的场景下,两者其实是等价的,前者并不会带来更多的开销。当然,从代码简洁性的角度,有时候用 getNow(null) 其实还是可以的,取决于个人。

总结

本文主要探讨了两个方面,一个是设置回调时结合 thenApplyexceptionally 和直接使用 whenComplete 的对比,另一个是对于 future 已经完成的场合,thenApply 的使用。此外,还附带着看了下 thenApply 的实现。总的来说,CompletableFuture 这个基础设施还是简单易用,但还是有一些细节需要注意才能写出更好的代码。

Pulsar Protocol Handler

[toc]

概述

本文主要目的是阅读 Pulsar protocol handler(比如 KoP,AoP,MoP)在 broker 中如何运作的,protocol handler(下文简称 handler)对应的是包 org.apache.pulsar.broker.protocol(下文将略去包括 broker 之前的包前缀)的接口 ProtocolHandler,只要实现了该接口,并打包成 *.nar 后缀以供 broker 加载,即相当于实现了一个 handler。

handler 初始化

ProtocolHandler 本身位于 protocol 包下,注意到初始化方法 initialize,找到它的使用,在 ProtocolHandlerWithClassLoader 类的同名方法被调用,简单看下代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Slf4j
@Data
@RequiredArgsConstructor
class ProtocolHandlerWithClassLoader implements ProtocolHandler {

private final ProtocolHandler handler;
private final NarClassLoader classLoader;

@Override
public void initialize(ServiceConfiguration conf) throws Exception {
handler.initialize(conf);
}
/* 其他方法都是 override handler 的同名方法,比如 xxx(),都是直接调用 handler.xxx() */

@Override
public void close() {
handler.close();
try {
classLoader.close();
} catch (IOException e) {
log.warn("Failed to close the protocol handler class loader", e);
}
}
}

仅仅是在构造时多传入了一个 NarClassLoader 对象,并且在 override close() 方法时关闭这个对象,其他方法都是直接调用 handler 的同名方法。

继续查找 initialize() 方法的调用,又被 ProtocolHandlers 类的同名方法调用:

1
2
3
4
5
6
7
8
9
10
11
public void initialize(ServiceConfiguration conf) throws Exception {
for (ProtocolHandler handler : handlers.values()) {
handler.initialize(conf);
}
}

private final Map<String, ProtocolHandlerWithClassLoader> handlers;

ProtocolHandlers(Map<String, ProtocolHandlerWithClassLoader> handlers) {
this.handlers = handlers;
}

可见 ProtocolHandlers 维护了一系列 ProtocolHandlerWithClassLoader,并在构造时传入。而它的构造则在静态方法 load 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public static ProtocolHandlers load(ServiceConfiguration conf) throws IOException {
// 1. 在配置 protocolHandlerDirectory 对应的目录下查找所有的 handler definitions,并暂时解压到配置 narExtractionDirectory 目录下
ProtocolHandlerDefinitions definitions =
ProtocolHandlerUtils.searchForHandlers(conf.getProtocolHandlerDirectory(), conf.getNarExtractionDirectory());

ImmutableMap.Builder<String, ProtocolHandlerWithClassLoader> handlersBuilder = ImmutableMap.builder();

// 2. 遍历配置 messagingProtocols 列表的每个 protocol
conf.getMessagingProtocols().forEach(protocol -> {

// 2.1 取得 protocol 名字对应的 definition
ProtocolHandlerMetadata definition = definitions.handlers().get(protocol);
if (null == definition) {
/* 抛出异常表示 protocol handler is found */
}

ProtocolHandlerWithClassLoader handler;
try {
// 2.2 加载 definition 对应的 handler
handler = ProtocolHandlerUtils.load(definition, conf.getNarExtractionDirectory());
} catch (IOException e) {
/* 记录错误日志并抛出异常表示 Failed to load the protocol handler */
}

// 2.3 通过 handler 的 accept 方法判断 protocol 是否可接受
if (!handler.accept(protocol)) {
/* 关闭 handler,记录错误日志并抛出异常表示 Malformed protocol handler found */
}

// 2.4 将 protocol 和 handler 作为 key-value 加入 map
handlersBuilder.put(protocol, handler);
log.info("Successfully loaded protocol handler for protocol `{}`", protocol);
});

return new ProtocolHandlers(handlersBuilder.build());
}

上面代码注释给出了初始化流程,比如修改配置(conf/broker.confconf/standalone.conf):

1
2
3
# 默认目录就是 ./protocols
protocolHandlerDirectory=./protocols
messagingProtocols=kafka

就会在 ./protocols 目录下面查找所有 handler definition,然后找到协议名 kafka 对应的 definition,期间会调用 handler 的 initialize 方法进行初始化。初始化完成后还要调用 handler 的 accept 方法判断是否被接受。最后和 kafka 组成键值对交由 Protocols 类管理。

这里回顾下 handler 的这两个接口(略去 javadoc 注释):

1
2
3
boolean accept(String protocol);

void initialize(ServiceConfiguration conf) throws Exception;

definition

在上一节,引入了 handler definition 的概念,这里看看其实现。首先是取得所有 definition 的 searchForHandlers 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static ProtocolHandlerDefinitions searchForHandlers(String handlersDirectory,
String narExtractionDirectory) throws IOException {
Path path = Paths.get(handlersDirectory).toAbsolutePath();
log.info("Searching for protocol handlers in {}", path);

ProtocolHandlerDefinitions handlers = new ProtocolHandlerDefinitions();
if (!path.toFile().exists()) {
/* warn 日志提示目录不存在 */
return handlers;
}

try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
for (Path archive : stream) {
try {
// 1. 从 nar 包中取得 definition
ProtocolHandlerDefinition phDef =
ProtocolHandlerUtils.getProtocolHandlerDefinition(archive.toString(), narExtractionDirectory);
log.info("Found protocol handler from {} : {}", archive, phDef);

checkArgument(StringUtils.isNotBlank(phDef.getName()));
checkArgument(StringUtils.isNotBlank(phDef.getHandlerClass()));

// 2. 将 definition 和 nar 包路径组成 metadata
ProtocolHandlerMetadata metadata = new ProtocolHandlerMetadata();
metadata.setDefinition(phDef);
metadata.setArchivePath(archive);

// 3. 将 definition name 作为 key 加入返回的 map
handlers.handlers().put(phDef.getName(), metadata);
} catch (Throwable t) {
/* warn 日志提示加载失败 */
}
}
}

return handlers;
}

可以看到会从 protocolHandlerDirectory 所在目录下面找到所有 *.nar 后缀的文件,然后取得 definition:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static ProtocolHandlerDefinition getProtocolHandlerDefinition(String narPath, String narExtractionDirectory) throws IOException {
// 1. 解压 *.nar 包到 narExtractionDirectory,并从解压目录构造 NarClassLoader
try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), narExtractionDirectory)) {
// 2. 从 NarClassLoader 中取得 definition
return getProtocolHandlerDefinition(ncl);
}
}

private static ProtocolHandlerDefinition getProtocolHandlerDefinition(NarClassLoader ncl) throws IOException {
// 取得 META-INF/services/pulsar-protocol-handler.yml 的配置内容,以 KoP 为例:
// name: kafka
// description: Kafka Protocol Handler
// handlerClass: io.streamnative.pulsar.handlers.kop.KafkaProtocolHandler
// 也就是 ProtocolHandlerDefinition 的 3 个字段
String configStr = ncl.getServiceDefinition(PULSAR_PROTOCOL_HANDLER_DEFINITION_FILE);

// 解析 YAML 配置,从中得到 definition 的 class,也就是将配置文件的三个字段填充进来
return ObjectMapperFactory.getThreadLocalYaml().readValue(
configStr, ProtocolHandlerDefinition.class
);
}

至此,我们知道了 definition 其实就是从 nar 包中解析 pulsar-protocol-handler.yml 得到对应的三个字段(都是 String 类型):

  • name:协议名,比如 KoP 的协议名为 kafka
  • description:handler 的描述信息
  • handlerClass:handler 的主类

回顾前一节的 load() 方法注释 2.3,definition 的协议名是返回的 definition map 的 key,因此可以通过用户配置的 messagingProtocols 来找到对应的 definition,从而找到对应的 handler:

1
2
// 这里的 definition 变量其实是 definition + handler 解压目录的路径
ProtocolHandlerMetadata definition = definitions.handlers().get(protocol);

handler 的启动

前文提到了 ProtocolHandlers#load 从配置文件中找到 nar 包解压后并加载得到 handlers,而 load 方法在 PulsarService#start 中被调用,并且对加载的 handlers 进行其他处理:

1
2
3
4
5
6
7
8
9
10
protocolHandlers = ProtocolHandlers.load(config);
protocolHandlers.initialize(config);
/* 其他初始化及启动流程(略) */
this.protocolHandlers.start(brokerService);
// 第一个 key 为协议名,第二个 key 为 handler 绑定的地址
Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> protocolHandlerChannelInitializers =
this.protocolHandlers.newChannelInitializers();
this.brokerService.startProtocolHandlers(protocolHandlerChannelInitializers);

state = State.Started; // 加载完 handlers 后 broker 的状态才改为 Started

最后一步涉及到了 BrokerService#startProtocolHandlers 会启动 handler 创建的 ChannelInitializer<SocketChannel>,其实现为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void startProtocolHandlers(
Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> protocolHandlers) {

protocolHandlers.forEach((protocol, initializers) -> {
initializers.forEach((address, initializer) -> {
try {
startProtocolHandler(protocol, address, initializer);
} catch (IOException e) {
/* 异常处理... */
}
});
});
}

private void startProtocolHandler(String protocol,
SocketAddress address,
ChannelInitializer<SocketChannel> initializer) throws IOException {
ServerBootstrap bootstrap = defaultServerBootstrap.clone();
bootstrap.childHandler(initializer);
try {
bootstrap.bind(address).sync(); // handler 对应的服务绑定了相应的端口
} catch (Exception e) {
/* 异常处理 */
}
log.info("Successfully bind protocol `{}` on {}", protocol, address);
}

总结

至此,加载并启动 handlers 的流程就出来了:

  1. 通过配置 protocolHandlerDirectory 找到目录下所有 nar 包并解压,通过解析 YAML 文件得到 handler 的名字和主类,通过 NarClassLoader 加载主类并转型为 ProtocolHandler 接口;

  2. 调用 handler 的 accept 方法判断协议是否被接受,加载所有接受的 handlers;

    1
    boolean accept(String protocol);
  3. 传入 broker 的配置到 initialize 中初始化加载的 handlers;

    1
    void initialize(ServiceConfiguration conf) throws Exception;
  4. BrokerService 对象传入各 handler 的 start 方法启动

    1
    void start(BrokerService service);
  5. 创建 handler 对应的 channel initializers 交由 BrokerService 启动,也就是在这一步,handler 被单独作为一个服务启动,比如 KoP 在这里就会默认绑定 9092 端口提供服务

    1
    Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers();

这几步完成后,broker 就不会干预 handler 了,除非 broker 本身关闭。而 handler 作为一个相对独立的服务,和 broker 的交互全部借由 start() 方法中得到的 BrokerService 对象来进行。

Java CompletableFuture 学习

[toc]

前言

看了一段时间 Pulsar 代码,令人印象最深的就是整个异步处理都是基于 Java 8 引入的 CompletableFuture。作为 C++er 相对而言更为熟悉普通的 Future 和 Promise。Future 表示一个异步任务,即在 未来(future) 某个时间点完成的任务,而 Promise 则表示这个 Future 的完成状态(是否完成,以及完成结果正常的还是异常的)。因此 Promise 是绑定于某个 Future 的,一般而言,即可以等待 Future 自然完成,也可以提前操作 Promise 使得 Future 提前完成。

Java 1.5 引入了 Future 接口,但过于简单。Java 8 提供CompletableFuture(下文简称 future)用起来则更像 Future 和 Promise 的结合,本文将通过简单的例子来稍微系统地学习下 CompletableFuture

状态的访问和设置

首先给出一个主线程使用 CompletableFuture 的代码:

1
2
3
4
5
6
// 1. 创建一个 future,其返回结果是 Integer
CompletableFuture<Integer> future = new CompletableFuture<>();
System.out.println(future.getNow(-1)); // -1
// 2. 使这个 future 完成
future.complete(10);
System.out.println(future.getNow(-1)); // 10

涉及到的方法:

1
2
public T getNow(T valueIfAbsent);
public boolean complete(T value);

其中,getNow() 方法表示取得 Future 完成结果(如果未完成则返回 valueIfAbsent),默认创建的 future 是未完成的,因此第一次打印结果是 -1,第二次打印时由于调用了 complete() 方法使 future 完成了,因此 getNow() 取得了 value 并返回。

对比一般的 FutureCompletableFuture 实现了完成状态的访问和设置,而 Future 只能通过 get 方法被动等待 future 完成(可以无限等待也可以设置超时),无法设置状态,要取得瞬时状态只能调用 isDone() 来判断是否完成,若完成,则可以安心调用 get() 避免阻塞。

另外,除了设置返回结果(正常状态)外,还可以抛出异常(异常状态):

1
2
3
4
5
6
7
8
CompletableFuture<Integer> future = new CompletableFuture<>();
// 主动设置 future 为异常完成状态
future.completeExceptionally(new RuntimeException("failed"));
try {
System.out.println(future.getNow(-1));
} catch (Exception e) {
System.out.println("Caused by " + e.getCause());
}

涉及到的方法:

1
public boolean completeExceptionally(Throwable ex);

在多线程的使用

一般的异步任务都会借助多线程来实现,避免主线程阻塞。类似于 ExecutorServiceexecute()submit() 方法,CompletableFuture 也提供两种方法来分别执行无返回值和有返回值的任务:

1
2
3
4
5
6
7
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}

其中 SupplierCallable 类似,都是实现了 T get() 的接口,区别在于前者没有异常声明。

注意到这里都用到了 asyncPool

1
2
3
4
5
6
7
8
9
private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);

/**
* Default executor -- ForkJoinPool.commonPool() unless it cannot
* support parallelism.
*/
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

在合适的时候会使用 ForkJoinPool 框架去调度 future 对应的异步任务,也就是 future 和 thread 不一定是一对一的关系,从而支持了大量异步任务的调度。

PS: ForkJoinPool 并不是本文的重点,之后再去学习。

runAsyncsupplyAsync 也都支持自定义 executor 的形式,即根据合适的场景提供自定义的线程池:

1
2
3
4
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor);

给出示例:

1
2
3
4
5
6
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> System.out.println("future1 done"));
CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("future2 done");
return null;
});
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> 100);

这里特地给出 supplyAsync 创建 CompletableFuture<Void> 的情况,由于 Supplier 接口的方法具有返回类型 T,因此必须显式返回 null。这也是我从 Scala 转向 Java 时一个比较不适应的地方,因为 Scala 不需要 return 语句就能返回结果(即最后一个表达式的返回值),而 Java 由于语法限制,这里必须显式返回。当然,直接创建 Completable<Void> 的时候用 runAsync 就好了,但是对于多个 future 的链式调用,有时候必须传入 Supplier,因此了解这点也很重要。

另一个常见用法是兼容旧接口,比如原来的 API 是通过回调来表示任务完成的,比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class AsyncTask {
public void run(Consumer<String> callback) {
final String result = compute();
if (callback != null) {
callback.accept(result);
}
}

private String compute() {
/* 耗时操作... */
return "hello";
}
}

那么,我们只要在 Runnable 中设置 future 状态即可:

1
2
3
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<String> future = new CompletableFuture<>();
executor.execute(() -> new AsyncTask().run(future::complete));

多任务链式调用

thenApply

相比直接创建线程并结合 join()/detach() 这种方法来执行异步任务,future 最为灵活的一点就是支持多个任务的组合,比如现在有个任务:

  1. 进行服务发现;
  2. 连接刚才发现的服务。

第二步依赖于第一步的成功返回,执行同步任务时直接顺序写下来就行,但执行异步任务时,比如:

1
2
3
4
CompletableFuture<String> serviceDiscoveryFuture = CompletableFuture.supplyAsync(() -> {
/* 进行服务发现... */
return "http://localhost:8080"; // 返回发现的 URL
});

我们不能在这里直接等待 serviceDiscoveryFuture 完成,因为这样相当于异步操作就变成同步操作了,可能阻塞当前线程。

CompletableFuture 提供了一系列方法来进行链式调用,比如上面的任务可以写成:

1
2
3
4
5
6
7
8
CompletableFuture<String> connectServiceFuture = CompletableFuture.supplyAsync(() -> {
/* 1. 服务发现... */
return "http://localhost:8080";
}).thenApply((serverUrl) -> {
/* 2. 连接服务... */
System.out.println("Connected to " + serverUrl);
return null;
});

这样,只有第一个异步任务成功返回时才会将返回结果作为输入参数应用于第二个任务。

看看 thenApply 的方法签名:

1
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);

传入的是 Function 对象,也就是输入参数类型可以是 T(当前 future 的返回类型)的派生类,而返回类型则是 U 的派生类,通过 lambda 表达式中 return 的结果进行推导。

thenCompose

在一些异步任务已经被封装成返回 future 的方法时,thenApply 无法复用这些方法。

在前一节的连接服务为例的基础上,用一个整型状态码表示连接的服务(返回 0 代表成功):

1
2
3
4
5
6
7
8
9
10
public static CompletableFuture<String> discoverServiceAsync() {
/* 服务发现... */
return CompletableFuture.completedFuture("http://localhost:8080");
}

public static CompletableFuture<Integer> connectServiceAsync(String serviceUrl) {
/* 连接服务... */
System.out.println("Connected to " + serviceUrl);
return CompletableFuture.completedFuture(1);
}

PS:这里用了静态方法 completedFuture 来得到已完成的 future(默认构造的 future 是未完成的)。

如果我们要添加第三步:对状态码进行处理,由于现有的方法都是返回 CompletableFuture<T> 而非 T,因此 thenApply 此时无能为力:

1
2
3
4
CompletableFuture<Void> future = discoverServiceAsync().thenAccept(serviceUrl -> {
CompletableFuture<Integer> statusFuture = connectServiceAsync(serviceUrl);
// NOTE: 无法处理 statusFuture
});

此时 thenCompose 就有用武之地了:

1
2
3
4
CompletableFuture<Integer> future = discoverServiceAsync().thenCompose((serviceUrl) -> {
/* ... */
return connectServiceAsync(serviceUrl);
});

再看看其方法签名:

1
2
public <U> CompletableFuture<U> thenCompose(
Function<? super T, ? extends CompletionStage<U>> fn)

CompletionStage 接口表示异步处理的一个 阶段(stage),而 CompletableFuture 实现了该接口(另一个实现的接口是 Future),因此该 Function 可以传入返回 CompletableFuture 的 lambda 表达式。

也就是说 thenCompose 可以将返回的 stage 作为当前 future 的新 stage。上面得到的 future 会等到 connectServiceAsync 返回的 CompletableFuture 完成时才算完成。

异常处理

前两节都是讨论 future 正常完成时的链式调用,虽然异常完成时在 future 调用 get() 时会抛出异常,但是有时候会对异常进行处理,比如:

  1. 进行服务发现;
  2. 若发现成功,连接服务;
  3. 否则使用本地 HTTP 服务 http://localhost:8080

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) throws ExecutionException, InterruptedException {
final String name = (args.length > 0) ? args[0] : "unknown";
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (name.equals("pulsar")) {
return "pulsar://localhost:6650";
} else {
throw new RuntimeException("Unknown server name: " + name);
}
}).exceptionally(e -> {
System.out.println(e.getCause() + ", use HTTP service");
return "http://localhost:8080";
});
System.out.println(future.get());
}

还是看看方法签名:

1
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) 

输入参数是抛出的异常,而返回类型则仍然是 future 泛型 T 的派生类。

如果并不想对异常进行一些挽救的处理,只想打印下日志,那么可以返回 null,因为任何类型都可以被赋值为 null,此时推断的类型仍为 T。但对于这种情况,除了结合 thenApplyexceptionally,也可以使用 whenComplete

1
2
3
4
5
6
7
8
9
10
11
12
13
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (name.equals("pulsar")) {
return "pulsar://localhost:6650";
} else {
throw new RuntimeException("Unknown server name: " + name);
}
}).whenComplete((serviceUrl, e) -> {
if (e == null) {
System.out.println("Connected to " + serviceUrl);
} else {
System.out.println("Failed to connect to " + name + ": " + e.getCause());
}
});

看看方法签名:

1
2
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action)

传入的是接受返回结果和异常的 BiConsumer,缺点是无法针对异常进行挽救,future 调用 get() 方法仍然会抛出异常,而在 exceptionally 中对异常进行处理并返回 null 的话,之后 future 调用 get() 不会抛出异常,而只是返回 null

合并并行异步任务的结果

链式调用解决的是串行异步任务,也就是一个异步任务的启动要依赖另一个异步任务的结果,但有时候会遇到并行的异步任务,比如对数组分块求和:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int[] array = {1, 2, 3, 4, 5, 6};
int[] indexes = {0, array.length / 2, array.length};
List<CompletableFuture<Integer>> futures = new ArrayList<>();
for (int i = 0; i < indexes.length - 1; i++) {
final int start = indexes[i];
final int end = indexes[i + 1];
futures.add(CompletableFuture.supplyAsync(() -> {
int sum = 0;
for (int j = start; j < end; j++) {
sum += array[j];
}
return sum;
}));
}

现在要等待多个 future 完成,如果我们设置了超时时间比如 1 秒,那么要得到的结果实际上是 List<Integer>,此时可以用 allOf 方法将 future 列表转换成结果列表:

1
2
3
4
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
List<Integer> results =
futures.stream().map(future -> future.getNow(-1)).collect(Collectors.toList());
// results: [6, 15]

由于 allOf 返回的是 CompletableFuture<Void>,因此只能用来等待所有 future 完成或者其中一个失败。将结果重新组合成 List 还是得另外执行,看起来 Java 没有 Scala 的 Future#sequence 这样方便的方法。

类似地,anyOf 则是等待任意一个 future 完成,这里就不给例子了。

总结

本文简单学习了 Java 中 CompletableFuture 常用的使用场景,由于语法本身的限制,比起 Scala 的 Future 还是略显麻烦,但至少 CompletableFuture 的出现使得异步编程变得更加容易。