[toc]
概述
本文主要目的是阅读 Pulsar protocol handler(比如 KoP,AoP,MoP)在 broker 中如何运作的,protocol handler(下文简称 handler)对应的是包 org.apache.pulsar.broker.protocol
(下文将略去包括 broker
之前的包前缀)的接口 ProtocolHandler
,只要实现了该接口,并打包成 *.nar
后缀以供 broker 加载,即相当于实现了一个 handler。
handler 初始化
ProtocolHandler
本身位于 protocol
包下,注意到初始化方法 initialize
,找到它的使用,在 ProtocolHandlerWithClassLoader
类的同名方法被调用,简单看下代码实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Slf4j @Data @RequiredArgsConstructor class ProtocolHandlerWithClassLoader implements ProtocolHandler {
private final ProtocolHandler handler; private final NarClassLoader classLoader;
@Override public void initialize(ServiceConfiguration conf) throws Exception { handler.initialize(conf); }
@Override public void close() { handler.close(); try { classLoader.close(); } catch (IOException e) { log.warn("Failed to close the protocol handler class loader", e); } } }
|
仅仅是在构造时多传入了一个 NarClassLoader
对象,并且在 override close()
方法时关闭这个对象,其他方法都是直接调用 handler 的同名方法。
继续查找 initialize()
方法的调用,又被 ProtocolHandlers
类的同名方法调用:
1 2 3 4 5 6 7 8 9 10 11
| public void initialize(ServiceConfiguration conf) throws Exception { for (ProtocolHandler handler : handlers.values()) { handler.initialize(conf); } }
private final Map<String, ProtocolHandlerWithClassLoader> handlers;
ProtocolHandlers(Map<String, ProtocolHandlerWithClassLoader> handlers) { this.handlers = handlers; }
|
可见 ProtocolHandlers
维护了一系列 ProtocolHandlerWithClassLoader
,并在构造时传入。而它的构造则在静态方法 load
中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public static ProtocolHandlers load(ServiceConfiguration conf) throws IOException { ProtocolHandlerDefinitions definitions = ProtocolHandlerUtils.searchForHandlers(conf.getProtocolHandlerDirectory(), conf.getNarExtractionDirectory());
ImmutableMap.Builder<String, ProtocolHandlerWithClassLoader> handlersBuilder = ImmutableMap.builder();
conf.getMessagingProtocols().forEach(protocol -> {
ProtocolHandlerMetadata definition = definitions.handlers().get(protocol); if (null == definition) { }
ProtocolHandlerWithClassLoader handler; try { handler = ProtocolHandlerUtils.load(definition, conf.getNarExtractionDirectory()); } catch (IOException e) { }
if (!handler.accept(protocol)) { }
handlersBuilder.put(protocol, handler); log.info("Successfully loaded protocol handler for protocol `{}`", protocol); });
return new ProtocolHandlers(handlersBuilder.build()); }
|
上面代码注释给出了初始化流程,比如修改配置(conf/broker.conf
或 conf/standalone.conf
):
1 2 3
| protocolHandlerDirectory=./protocols messagingProtocols=kafka
|
就会在 ./protocols
目录下面查找所有 handler definition,然后找到协议名 kafka
对应的 definition,期间会调用 handler 的 initialize
方法进行初始化。初始化完成后还要调用 handler 的 accept
方法判断是否被接受。最后和 kafka
组成键值对交由 Protocols
类管理。
这里回顾下 handler 的这两个接口(略去 javadoc 注释):
1 2 3
| boolean accept(String protocol);
void initialize(ServiceConfiguration conf) throws Exception;
|
definition
在上一节,引入了 handler definition 的概念,这里看看其实现。首先是取得所有 definition 的 searchForHandlers
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public static ProtocolHandlerDefinitions searchForHandlers(String handlersDirectory, String narExtractionDirectory) throws IOException { Path path = Paths.get(handlersDirectory).toAbsolutePath(); log.info("Searching for protocol handlers in {}", path);
ProtocolHandlerDefinitions handlers = new ProtocolHandlerDefinitions(); if (!path.toFile().exists()) { return handlers; }
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) { for (Path archive : stream) { try { ProtocolHandlerDefinition phDef = ProtocolHandlerUtils.getProtocolHandlerDefinition(archive.toString(), narExtractionDirectory); log.info("Found protocol handler from {} : {}", archive, phDef);
checkArgument(StringUtils.isNotBlank(phDef.getName())); checkArgument(StringUtils.isNotBlank(phDef.getHandlerClass()));
ProtocolHandlerMetadata metadata = new ProtocolHandlerMetadata(); metadata.setDefinition(phDef); metadata.setArchivePath(archive);
handlers.handlers().put(phDef.getName(), metadata); } catch (Throwable t) { } } }
return handlers; }
|
可以看到会从 protocolHandlerDirectory
所在目录下面找到所有 *.nar
后缀的文件,然后取得 definition:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public static ProtocolHandlerDefinition getProtocolHandlerDefinition(String narPath, String narExtractionDirectory) throws IOException { try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), narExtractionDirectory)) { return getProtocolHandlerDefinition(ncl); } }
private static ProtocolHandlerDefinition getProtocolHandlerDefinition(NarClassLoader ncl) throws IOException { String configStr = ncl.getServiceDefinition(PULSAR_PROTOCOL_HANDLER_DEFINITION_FILE);
return ObjectMapperFactory.getThreadLocalYaml().readValue( configStr, ProtocolHandlerDefinition.class ); }
|
至此,我们知道了 definition 其实就是从 nar 包中解析 pulsar-protocol-handler.yml
得到对应的三个字段(都是 String
类型):
name
:协议名,比如 KoP 的协议名为 kafka
description
:handler 的描述信息
handlerClass
:handler 的主类
回顾前一节的 load()
方法注释 2.3,definition 的协议名是返回的 definition map 的 key,因此可以通过用户配置的 messagingProtocols
来找到对应的 definition,从而找到对应的 handler:
1 2
| ProtocolHandlerMetadata definition = definitions.handlers().get(protocol);
|
handler 的启动
前文提到了 ProtocolHandlers#load
从配置文件中找到 nar 包解压后并加载得到 handlers,而 load
方法在 PulsarService#start
中被调用,并且对加载的 handlers 进行其他处理:
1 2 3 4 5 6 7 8 9 10
| protocolHandlers = ProtocolHandlers.load(config); protocolHandlers.initialize(config);
this.protocolHandlers.start(brokerService);
Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> protocolHandlerChannelInitializers = this.protocolHandlers.newChannelInitializers(); this.brokerService.startProtocolHandlers(protocolHandlerChannelInitializers);
state = State.Started;
|
最后一步涉及到了 BrokerService#startProtocolHandlers
会启动 handler 创建的 ChannelInitializer<SocketChannel>
,其实现为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public void startProtocolHandlers( Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> protocolHandlers) {
protocolHandlers.forEach((protocol, initializers) -> { initializers.forEach((address, initializer) -> { try { startProtocolHandler(protocol, address, initializer); } catch (IOException e) { } }); }); }
private void startProtocolHandler(String protocol, SocketAddress address, ChannelInitializer<SocketChannel> initializer) throws IOException { ServerBootstrap bootstrap = defaultServerBootstrap.clone(); bootstrap.childHandler(initializer); try { bootstrap.bind(address).sync(); } catch (Exception e) { } log.info("Successfully bind protocol `{}` on {}", protocol, address); }
|
总结
至此,加载并启动 handlers 的流程就出来了:
通过配置 protocolHandlerDirectory
找到目录下所有 nar 包并解压,通过解析 YAML 文件得到 handler 的名字和主类,通过 NarClassLoader
加载主类并转型为 ProtocolHandler
接口;
调用 handler 的 accept
方法判断协议是否被接受,加载所有接受的 handlers;
1
| boolean accept(String protocol);
|
传入 broker 的配置到 initialize
中初始化加载的 handlers;
1
| void initialize(ServiceConfiguration conf) throws Exception;
|
将 BrokerService
对象传入各 handler 的 start
方法启动
1
| void start(BrokerService service);
|
创建 handler 对应的 channel initializers 交由 BrokerService
启动,也就是在这一步,handler 被单独作为一个服务启动,比如 KoP 在这里就会默认绑定 9092 端口提供服务
1
| Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers();
|
这几步完成后,broker 就不会干预 handler 了,除非 broker 本身关闭。而 handler 作为一个相对独立的服务,和 broker 的交互全部借由 start()
方法中得到的 BrokerService
对象来进行。