[toc]
前言
之前对 Pulsar 消费端的逻辑不太熟悉,但是一直有印象就是刚接触 Pulsar 时,不记得在哪看到 Pulsar 的消费模型是 push 的,而这点和 Kafka 的 pull 消费模型是完全不同的。之前对于 Kafka 的消费模型已经比较熟悉了,客户端发送 FETCH 请求,其中对于需要拉取(pull)数据的每个 partition,请求中会有一个 partition_max_bytes
字段限制该分区获取的最大字节数。而从 FETCH v3 开始,还有个总的 max_bytes
字段限制总的最大字节数来针对分区太多的场合。具体协议参见 Kafka Message Fetch。
那么,Pulsar 采用的 push 消费模型是怎样的呢?为什么要采用 push 消费模型呢?带着这些问题,开始阅读源码,本文采用 Pulsar 2.8.0 的源码(实际是 master 分支),因此和之前的 release 版本可能有些许出入。
本文在阅读源码时,会略去一些相对不核心的代码,必须合法性检查/异常处理/错误日志,此时会用 /* … */ 的风格来略去这一部分,而代码分析则统一使用 // 注释风格。
协议
参见 Pulsar 官网文档 Binary Protocol: Consumer:
重点是客户端发送 Flow 请求,然后broker 回复消息。重点是流控的处理。这里先看看 PulsarApi.proto
中的定义(位于 org.apache.pulsar.common.api.proto
包):
1 2 3 4 5 6 7
| message CommandFlow { required uint64 consumer_id = 1;
required uint32 messagePermits = 2; }
|
除了消费者 id 外,它只需要一个 permits 参数,表示 prefetch(提前获取)的最大消息数量。
再来看看文档的介绍。典型的消费者实现会在应用程序准备消费之前使用队列积累这些消息,在应用程序队列已经入队了半数以上消息时,消费者发送 permits 给 broker 来请求更多的消息(其数量等于队列大小的一半)。
Client
首先给出一份最简单的客户端消费的代码:
1 2 3 4 5 6 7 8 9 10 11 12
| try (PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .build()) { Consumer<byte[]> consumer = client.newConsumer() .topic("my-topic") .subscriptionName("my-sub") .subscribe(); Message<byte[]> message = consumer.receive(); } catch (PulsarClientException e) { e.printStackTrace(); }
|
- 设置目标主题和订阅名后,调用
subscribe()
方法订阅该主题,创建 Consumer
;
- consumer 调用
receive()
方法接收消息。
这里先简单介绍一下,客户端的实现代码位于 pulsar-client
模块,而接口定义则位于 pulsar-client-api
模块,一些(broker 和客户端等)公用的类位于 pulsar-common
模块。而各模块均位于同名目录下。
另外 Pulsar 所有的同步调用 API 都只是简单等待异步调用 API(方法名后缀是 Async)返回的 CompletableFuture 对象完成,其返回值为 T。
消费者的创建
创建 PulsarClient
时实际上是创建了 PulsarClientImpl
对象,其中 newConsumer
方法是创建一个 builder 用于链式调用:
1 2 3 4
| public ConsumerBuilder<byte[]> newConsumer() { return new ConsumerBuilderImpl<>(this, Schema.BYTES); }
|
其中 builder 的方法就不仔细阅读了,主要是对参数进行必要的验证后设置相应字段,比如必要的是主题名和订阅名,都保存在 conf
中:
1 2
| private ConsumerConfigurationData<T> conf;
|
1 2 3
| private Set<String> topicNames = Sets.newTreeSet(); private String subscriptionName;
|
ConsumerBuilderImpl#subscribeAsync
本身只是对 conf
的一些参数进行合法性校验,对于只设置主题名和订阅名的情况只是验证这两项配置是否存在,最后实际上是调用 PulsarClientImpl#subscribeAsync
:
1 2 3
| return interceptorList == null || interceptorList.size() == 0 ? client.subscribeAsync(conf, schema, null) : client.subscribeAsync(conf, schema, new ConsumerInterceptors<>(interceptorList));
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public <T> CompletableFuture<Consumer<T>> subscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
if (conf.getTopicsPattern() != null) { if (!conf.getTopicNames().isEmpty()){ return FutureUtil .failedFuture(new IllegalArgumentException("Topic names list must be null when use topicsPattern")); } return patternTopicSubscribeAsync(conf, schema, interceptors); } else if (conf.getTopicNames().size() == 1) { return singleTopicSubscribeAsync(conf, schema, interceptors); } else { return multiTopicSubscribeAsync(conf, schema, interceptors); } }
|
为求简单,还是只看单主题订阅的情况:
1 2 3 4
| private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) { return preProcessSchemaBeforeSubscribe(this, schema, conf.getSingleTopic()) .thenCompose(schemaClone -> doSingleTopicSubscribeAsync(conf, schemaClone, interceptors)); }
|
这里的先后顺序是:
- 调用
preProcessSchemaBeforeSubscribe
,此时会对 schema 进行预处理,必须注册 schema。
- 对前一步得到的 schemaClone 传入
doSingleTopicSubscribeAsync
。
这里关注第二步:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) { CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();
String topic = conf.getSingleTopic();
getPartitionedTopicMetadata(topic).thenAccept(metadata -> { ConsumerBase<T> consumer; ExecutorService listenerThread = externalExecutorProvider.getExecutor(); if (metadata.partitions > 0) { consumer = MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf, listenerThread, consumerSubscribedFuture, metadata.partitions, schema, interceptors); } else { int partitionIndex = TopicName.getPartitionIndex(topic); consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, partitionIndex, false, consumerSubscribedFuture,null, schema, interceptors, true ); }
consumers.add(consumer); }).exceptionally();
return consumerSubscribedFuture; }
|
多分区订阅和多主题订阅本质上是一样的,都是使用 MultiTopicsConsumerImpl
管理多个主题(因为 Pulsar 中分区只不过是一个包含后缀 -partition-<n>
的主题)。
这里还是关注单分区订阅,ConsumerImpl.newConsumerImpl(...)
只是将参数原封不动传给其构造方法,构造方法包括了 consumer 内部一些字段的初始化,因此比较长,我们还是只关注重点,那就是最简洁的订阅会和 broker 有什么交互。实际上这部分逻辑在构造方法最后,调用 grabCnx
方法:
1 2 3
| void grabCnx() { this.connectionHandler.grabCnx(); }
|
连接的建立
connectionHandler
(下文简称 connection)的构造:
1 2 3 4 5 6 7 8 9
| this.connectionHandler = new ConnectionHandler(this, new BackoffBuilder() .create(), this);
|
这里简单说下 BackOff
对象。查看 backoff 在 ConnectionHandler
中的使用,可以看到主要是在重连活着关闭连接时用 next()
方法来得到建立连接时对应的 timeout,因为连接对应的是 client,所以这里用的都是 client 的配置。
回到正题,继续看 ConnectionHandler#grabCnx
的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| protected void grabCnx() { if (CLIENT_CNX_UPDATER.get(this) != null) { return; }
if (!isValidStateForReconnection()) { return; }
try { state.client.getConnection(state.topic) .thenAccept(cnx -> connection.connectionOpened(cnx)) .exceptionally(this::handleConnectionError); } catch (Throwable t) { reconnectLater(t); } }
|
这里还是看看连接成功的处理。注意到 ConsumerImpl
是实现了 Connection
接口的:
1
| public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandler.Connection {
|
然后注意到构造 connection 时用的 Connection
接口就是 ConsumerImpl
对象自己,因此调用的是 ConsumerImpl#connectionOpened
。
实际上这一小节的内容同样也适用于生产者以及多主题消费者,它们对应的类都实现了 Connection
接口,只需要实现各自的回调即可。
连接成功的回调
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
| public void connectionOpened(final ClientCnx cnx) { if (getState() == State.Closing || getState() == State.Closed) { return; } setClientCnx(cnx);
long requestId = client.newRequestId();
int currentSize; synchronized (this) { currentSize = incomingMessages.size(); startMessageId = clearReceiverQueue(); if (possibleSendToDeadLetterTopicMessages != null) { possibleSendToDeadLetterTopicMessages.clear(); } }
boolean isDurable = subscriptionMode == SubscriptionMode.Durable; MessageIdData startMessageIdData = null; if (isDurable) { startMessageIdData = null; } else if (startMessageId != null) { MessageIdData.Builder builder = MessageIdData.newBuilder(); builder.setLedgerId(startMessageId.getLedgerId()); builder.setEntryId(startMessageId.getEntryId()); if (startMessageId instanceof BatchMessageIdImpl) { builder.setBatchIndex(startMessageId.getBatchIndex()); }
startMessageIdData = builder.build(); builder.recycle(); }
SchemaInfo si = schema.getSchemaInfo(); if (si != null && (SchemaType.BYTES == si.getType() || SchemaType.NONE == si.getType())) { si = null; }
ByteBuf request = Commands.newSubscribe(); cnx.sendRequestWithId(request, requestId).thenRun(() -> { synchronized (ConsumerImpl.this) { if (changeToReadyState()) { consumerIsReconnectedToBroker(cnx, currentSize); } else { setState(State.Closed); deregisterFromClientCnx(); client.cleanupConsumer(this); cnx.channel().close(); return; } }
resetBackoff();
boolean firstTimeConnect = subscribeFuture.complete(this); if (!(firstTimeConnect && hasParentConsumer && isDurable) && conf.getReceiverQueueSize() != 0) { increaseAvailablePermits(cnx, conf.getReceiverQueueSize()); } }).exceptionally((e) -> {});
|
可以看到连接成功后,主要是发送 CommandSubscribe(订阅命令),broker 处理成功后,consumer 就处于 Ready 状态,并且会发送 Flow 请求携带 permits 为接收队列大小的一半。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| protected void increaseAvailablePermits(ClientCnx currentCnx, int delta) { int available = AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta);
while (available >= receiverQueueRefillThreshold && !paused) { if (AVAILABLE_PERMITS_UPDATER.compareAndSet(this, available, 0)) { sendFlowPermitsToBroker(currentCnx, available); break; } else { available = AVAILABLE_PERMITS_UPDATER.get(this); } } }
|
再看看 Flow 命令的定义和发送:
1 2 3 4 5 6
| message CommandFlow { required uint64 consumer_id = 1;
required uint32 messagePermits = 2; }
|
1 2 3 4 5 6 7 8 9 10
| private void sendFlowPermitsToBroker(ClientCnx cnx, int numMessages) { if (cnx != null && numMessages > 0) { if (log.isDebugEnabled()) { } else { cnx.ctx().writeAndFlush(Commands.newFlow(consumerId, numMessages), cnx.ctx().voidPromise()); } } }
|
Broker 处理
CommandSubscribe 处理
broker 对 TCP 协议的处理位于 org.apache.pulsar.broker.service
包的 ServerCnx
类。对于 consumer 而言,比较独有的就是 CommandSubscribe
和 CommandFlow
。
因为代码比较长,所以日志相关代码均略过,不特意用 /* xxx... */
说明。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
| protected void handleSubscribe(final CommandSubscribe subscribe) {
CompletableFuture<Boolean> isAuthorizedFuture = isTopicOperationAllowed( topicName, subscriptionName, TopicOperation.CONSUME ); isAuthorizedFuture.thenApply(isAuthorized -> { if (isAuthorized) { CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>(); CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId, consumerFuture);
if (existingConsumerFuture != null) { if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) { commandSender.sendSuccessResponse(requestId); return null; } else { ServerError error = null; if (!existingConsumerFuture.isDone()) { error = ServerError.ServiceNotReady; } else { error = getErrorCode(existingConsumerFuture); consumers.remove(consumerId, existingConsumerFuture); } commandSender.sendErrorResponse(requestId, error, "Consumer is already present on the connection"); return null; } }
boolean createTopicIfDoesNotExist = forceTopicCreation && service.isAllowAutoTopicCreation(topicName.toString());
service.getTopic(topicName.toString(), createTopicIfDoesNotExist) .thenCompose(optTopic -> { if (!optTopic.isPresent()) { return FutureUtil .failedFuture(new TopicNotFoundException( "Topic " + topicName + " does not exist")); }
Topic topic = optTopic.get();
boolean rejectSubscriptionIfDoesNotExist = isDurable && !service.isAllowAutoSubscriptionCreation(topicName.toString()) && !topic.getSubscriptions().containsKey(subscriptionName);
if (rejectSubscriptionIfDoesNotExist) { return FutureUtil .failedFuture( new SubscriptionNotFoundException( "Subscription does not exist")); }
if (schema != null) { return topic.addSchemaIfIdleOrCheckCompatible(schema) .thenCompose(v -> topic.subscribe()); } else { return topic.subscribe(); } }) .thenAccept(consumer -> { if (consumerFuture.complete(consumer)) { commandSender.sendSuccessResponse(requestId); } else { try { consumer.close(); log.info("[{}] Cleared consumer created after timeout on client side {}", remoteAddress, consumer); } catch (BrokerServiceException e) { log.warn( "[{}] Error closing consumer created" + " after timeout on client side {}: {}", remoteAddress, consumer, e.getMessage()); } consumers.remove(consumerId, consumerFuture); }
}) .exceptionally(exception -> {
if (consumerFuture.completeExceptionally(exception)) { commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(exception), exception.getCause().getMessage()); } consumers.remove(consumerId, consumerFuture); return null;
}); } else { String msg = "Client is not authorized to subscribe"; ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg)); } return null; }).exceptionally(ex -> { commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage()); return null; }); }
|
总结下来,处理 subscribe 请求核心调用是:
BrokerService#getTopic
:获取当前 broker 所拥有(own)的 Topic
对象
Topic#subscribe
:在 Topic
对象中创建对应的订阅,并得到 Consumer
对象
其中 Topic
和 Consumer
是 broker 端对 topic 和 consumer 的抽象,负责管理对应的资源。均位于 org.apache.pulsar.broker.service
包下。这里我们重点看 PersistentTopic#subscribe
。
PersistentTopic#subscribe
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| public CompletableFuture<Consumer> subscribe() { if (readCompacted && !(subType == SubType.Failover || subType == SubType.Exclusive)) { return FutureUtil.failedFuture(new NotAllowedException( "readCompacted only allowed on failover or exclusive subscriptions")); }
return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> {
if (cnx.clientAddress() != null && cnx.clientAddress().toString().contains(":")) { SubscribeRateLimiter.ConsumerIdentifier consumer = new SubscribeRateLimiter.ConsumerIdentifier( cnx.clientAddress().toString().split(":")[0], consumerName, consumerId); if (subscribeRateLimiter.isPresent() && (!subscribeRateLimiter.get().subscribeAvailable(consumer) || !subscribeRateLimiter.get().tryAcquire(consumer))) { return FutureUtil.failedFuture( new NotAllowedException("Subscribe limited by subscribe rate limit per consumer.")); } }
lock.readLock().lock(); try { if (isFenced) { log.warn("[{}] Attempting to subscribe to a fenced topic", topic); return FutureUtil.failedFuture(new TopicFencedException("Topic is temporarily unavailable")); } handleConsumerAdded(subscriptionName, consumerName); } finally { lock.readLock().unlock(); }
CompletableFuture<? extends Subscription> subscriptionFuture = isDurable ? getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec, replicatedSubscriptionState) : getNonDurableSubscription(subscriptionName, startMessageId, initialPosition, startMessageRollbackDurationSec);
int maxUnackedMessages = isDurable ? getMaxUnackedMessagesOnConsumer() : 0;
CompletableFuture<Consumer> future = subscriptionFuture.thenCompose(subscription -> { Consumer consumer = new Consumer(); return addConsumerToSubscription(subscription, consumer).thenCompose(v -> { checkBackloggedCursors(); if (!cnx.isActive()) { try { consumer.close(); } catch (BrokerServiceException e) { decrementUsageCount(); return FutureUtil.failedFuture(e); }
decrementUsageCount(); return FutureUtil.failedFuture( new BrokerServiceException("Connection was closed while the opening the cursor ")); } else { checkReplicatedSubscriptionControllerState(); return CompletableFuture.completedFuture(consumer); } }); });
future.exceptionally(ex -> { decrementUsageCount(); return null; }); return future; }); }
|
核心流程:
- 获取
Subscription
(broker 对订阅的抽象),若不存在则创建。
- 创建
Consumer
(broker 对消费者的抽象)后加入订阅。
这里不进一步阅读 Subscription
代码。简单说,在创建订阅时,对于持久化(durable)订阅(PersistentSubscription
),创建时会打开一个 cursor,对应于 PersistentTopic
内部的 ledger,若 cursor 不存在则创建;对于非持久化(non-durable)订阅(NonPersistentSubscription
),则是直接内存中维护消息 id(MessageIdImpl
)。两者最大的区别是,持久化订阅在 cursor 已存在时,直接打开 cursor,而无视掉 CommandSubscribe 中的消息 id。
至于将 consumer 加入订阅,实际上是根据订阅类型创建对应的 dispatcher(Dispatcher
),这里以默认的 Exclusive 订阅为例:
1 2 3 4 5 6 7 8 9 10 11
| switch (consumer.subType()) { case Exclusive: if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) { previousDispatcher = dispatcher; dispatcher = useStreamingDispatcher ? new PersistentStreamingDispatcherSingleActiveConsumer( cursor, SubType.Exclusive, 0, topic, this) : new PersistentDispatcherSingleActiveConsumer( cursor, SubType.Exclusive, 0, topic, this); } break;
|
注:https://github.com/apache/pulsar/pull/9056 引入了 streaming dispatcher。
然后将 consumer 加入 dispatcher:
1 2 3 4 5 6
| try { dispatcher.addConsumer(consumer); return CompletableFuture.completedFuture(null); } catch (BrokerServiceException brokerServiceException) { return FutureUtil.failedFuture(brokerServiceException); }
|
订阅里实际负责消息分发的就是 dispatcher。
CommandFlow
首先是 ServerCnx
第一步处理:
1 2 3 4 5 6 7 8 9 10 11 12
| protected void handleFlow(CommandFlow flow) { checkArgument(state == State.Connected); CompletableFuture<Consumer> consumerFuture = consumers.get(flow.getConsumerId());
if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { Consumer consumer = consumerFuture.getNow(null); if (consumer != null) { consumer.flowPermits(flow.getMessagePermits()); } } }
|
然后递交给 Consumer
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public void flowPermits(int additionalNumberOfMessages) { checkArgument(additionalNumberOfMessages > 0);
if (shouldBlockConsumerOnUnackMsgs() && unackedMessages >= maxUnackedMessages) { blockedConsumerOnUnackedMsgs = true; } int oldPermits; if (!blockedConsumerOnUnackedMsgs) { oldPermits = MESSAGE_PERMITS_UPDATER.getAndAdd(this, additionalNumberOfMessages); subscription.consumerFlow(this, additionalNumberOfMessages); } else { oldPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndAdd(this, additionalNumberOfMessages); } }
|
可见,更新完 Consumer
内部的 messagePermits
后,交由对应的 Subscription
对象处理。这里仅看 PersistentSubscription
的实现:
1 2 3 4
| public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) { this.lastConsumedFlowTimestamp = System.currentTimeMillis(); dispatcher.consumerFlow(consumer, additionalNumberOfMessages); }
|
仅仅是传递给 dispatcher。因此实际的处理是由 dispatcher 完成的。至于 dispatcher 的实现就留给下一篇进行讨论。
总结
本文梳理了 client 和 broker 创建消费者订阅某个 topic 的流程。Client 端的实现包含了一些和生产者通用的部分,也就是建立连接的部分。连接成功后的回调,消费者会先发一个 subscribe 命令注册自己,注册成功后发送 flow 请求,携带 permits,其值为内部缓冲区大小的一半(对于无缓冲区的 zero queue consumer 例外)。
Broker 端处理相对复杂,但本质是对一些概念进行了抽象。消费者对应 Consumer
,每个消费者则包含一个订阅 Subscription
。而 topic 对应 Topic
,消费者发送 subscribe 命令时会在 Topic
里创建 Subscription
,因为每个 topic 可以对应多个订阅,创建完成后根据消费者的订阅类型创建对应的 Dispatcher
。通常意义上一个订阅可以对应多个消费者,但实际上真正维护这组消费者的不是 Subscription
而是 Dispatcher
,订阅本身更多的是维护 cursor(消费进度),比如持久化订阅(PersistentSubscription
)就会创建一个 durable cursor。而 client 端的 Flow 请求,最终也是 Dispatcher
来处理的。
最后解答最初提出的问题,push 和 pull 的区别,以及为什么 Pulsar 是 push 消费模型。Kafka 的消费者是发送 FETCH 请求给 broker,然后 broker 对应的 FETCH 响应里包含读取的消息。而 Pulsar 的消费者发送的是 Flow 请求,它本身是没有对应的 FETCH 响应的。Client 会主动告知 broker 自己可以缓存多少条消息,broker 根据这个提示可以灵活定制 dispatcher,然后主动发消息给 client。因此 Pulsar 的这种 push 模型,实际上是由服务端(broker)来进行流量控制。两者本质区别就是流控到底是 client 还是 server 处理的。