(1). 概述
这一篇,我们主要剖析PulsarClient构建过程.
(2). PulsarClient 构建过程
在这里典型的用到了Builder模式.
PulsarClient client = PulsarClient.builder()
(2). PulsarClient.builder
与我们平时的Builder方法不同,Pulsar为Client的构建过程,也独立配置了一个接口(ClientBuilder).
static ClientBuilder builder() {
return DefaultImplementation.getDefaultImplementation().newClientBuilder();
}
(3). DefaultImplementation.getDefaultImplementation
典型的单例模式.
public class DefaultImplementation {
private static final PulsarClientImplementationBinding IMPLEMENTATION;
// ************************************************************************************
// 单例模式
// ************************************************************************************
static {
PulsarClientImplementationBinding impl = null;
try {
// *****************************************************************************
// 通过反射创建:PulsarClientImplementationBinding
// *****************************************************************************
impl = (PulsarClientImplementationBinding) ReflectionUtils.newClassInstance("org.apache.pulsar.client.impl.PulsarClientImplementationBindingImpl").getConstructor().newInstance();
} catch (Throwable error) {
throw new RuntimeException("Cannot load Pulsar Client Implementation: "+error, error);
}
IMPLEMENTATION = impl;
}
public static PulsarClientImplementationBinding getDefaultImplementation() {
return IMPLEMENTATION;
}
}
(4). PulsarClientImplementationBinding.newClientBuilder
public ClientBuilder newClientBuilder() {
return new ClientBuilderImpl();
}
(5). ClientBuilder.build
可以理解:ClientBuilder的所有的配置方法,都是对:ClientConfigurationData进行操作.
public class ClientBuilderImpl implements ClientBuilder {
// *****************************************************************
// client配置
// *****************************************************************
ClientConfigurationData conf;
public PulsarClient build() throws PulsarClientException {
if (StringUtils.isBlank(this.conf.getServiceUrl()) && this.conf.getServiceUrlProvider() == null) {
throw new IllegalArgumentException("service URL or service URL provider needs to be specified on the ClientBuilder object.");
} else if (StringUtils.isNotBlank(this.conf.getServiceUrl()) && this.conf.getServiceUrlProvider() != null) {
throw new IllegalArgumentException("Can only chose one way service URL or service URL provider.");
} else {
if (this.conf.getServiceUrlProvider() != null) {
if (StringUtils.isBlank(this.conf.getServiceUrlProvider().getServiceUrl())) {
throw new IllegalArgumentException("Cannot get service url from service url provider.");
}
this.conf.setServiceUrl(this.conf.getServiceUrlProvider().getServiceUrl());
}
// ***************************************************************************
// 创建:PulsarClient
// ***************************************************************************
PulsarClient client = new PulsarClientImpl(this.conf);
if (this.conf.getServiceUrlProvider() != null) {
this.conf.getServiceUrlProvider().initialize(client);
}
return client;
}
}
}
(6). PulsarClient UML图解
(7). 看下PulsarClient接口
最终要把焦点放在:PulsarClient接口上,因为,ClientBuilder只是构建过程需要的参数配置而已.
public interface PulsarClient extends Closeable {
// *******************************************************************
// 生产者Builder
// *******************************************************************
<T> ProducerBuilder<T> newProducer(Schema<T> schema);
// *******************************************************************
// 消费者Builder
// *******************************************************************
<T> ConsumerBuilder<T> newConsumer(Schema<T> schema);
// *******************************************************************
// 读消息Builder
// *******************************************************************
<T> ReaderBuilder<T> newReader(Schema<T> schema);
// *******************************************************************
// 配置serviceUrl
// *******************************************************************
void updateServiceUrl(String serviceUrl) throws PulsarClientException;
// *******************************************************************
// 根据topic获取分区信息
// *******************************************************************
CompletableFuture<List<String>> getPartitionsForTopic(String topic);
// *******************************************************************
// 事务Builder
// *******************************************************************
TransactionBuilder newTransaction() throws PulsarClientException;
}
(8). 设计模式
在这几行简单的代码里,用到了哪些设计模式?
构建者模式
单例模式(9). 总结