(1). 引入依赖(pom.xml)
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<pulsar.version>2.8.1</pulsar.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
(2). 生产者
package help.lixin.pulsar.example;
import org.apache.pulsar.client.api.*;
import org.junit.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class ProductTest {
// 自己本地配置host(启动时,会去远程服务器,拉取集群列表出来,好像我的集群列表是机器名称,不是ip列表)
// ./bin/pulsar-admin brokers list pulsar-cluster
//
// 10.211.55.100 pulsar.lixin.help
// 10.211.55.101 pulsar.lixin.help
// 10.211.55.102 pulsar.lixin.help
// 10.211.55.100 erp-100
// 10.211.55.101 erp-101
// 10.211.55.102 erp-102
private static final String SERVER_URL = "pulsar://pulsar.lixin.help:6650";
@Test
public void testProduct() throws Exception {
// 构造Pulsar Client
PulsarClient client = PulsarClient.builder()
.serviceUrl(SERVER_URL)
.enableTcpNoDelay(true)
.build();
// 构造生产者
Producer<String> producer = client.newProducer(Schema.STRING)
.producerName("my-producer")
.topic("persistent://public/default/test")
.batchingMaxMessages(1024)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.enableBatching(true)
.blockIfQueueFull(true)
.maxPendingMessages(512)
.sendTimeout(10, TimeUnit.SECONDS)
.blockIfQueueFull(true)
.create();
// 同步发送消息
MessageId messageId = producer.send("Hello World");
String format = String.format("message id is %s", messageId);
System.out.println(format);
CompletableFuture<MessageId> asyncMessageId = producer.sendAsync("This is a async message");
// 阻塞线程,直到返回结果
String format2 = String.format("async message id is %s" , asyncMessageId.get());
System.out.println(format2);
// 配置发送的消息元信息,同步发送
producer.newMessage()
.key("my-message-key")
.value("my-message")
.property("my-key", "my-value")
.property("my-other-key", "my-other-value")
.send();
producer.newMessage()
.key("my-async-message-key")
.value("my-async-message")
.property("my-async-key", "my-async-value")
.property("my-async-other-key", "my-async-other-value")
.sendAsync();
// 关闭producer的方式有两种:同步和异步
// producer.closeAsync();
producer.close();
// 关闭licent的方式有两种,同步和异步
// client.close();
client.closeAsync();
}
}
(3). 消费者
package help.lixin.pulsar.example;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
public class ConsumerTest {
// 自己本地配置host(启动时,会去远程服务器,拉取集群列表出来,好像我的集群列表是机器名称,不是ip列表)
// ./bin/pulsar-admin brokers list pulsar-cluster
//
// 10.211.55.100 pulsar.lixin.help
// 10.211.55.101 pulsar.lixin.help
// 10.211.55.102 pulsar.lixin.help
// 10.211.55.100 erp-100
// 10.211.55.101 erp-101
// 10.211.55.102 erp-102
private static final String SERVER_URL = "pulsar://pulsar.lixin.help:6650";
@Test
public void testConsumer1() throws Exception {
// 构造Pulsar Client
PulsarClient client = PulsarClient.builder()
.serviceUrl(SERVER_URL)
.enableTcpNoDelay(true)
.build();
Consumer consumer = client.newConsumer()
.consumerName("my-consumer")
.topic("persistent://public/default/test")
.subscriptionName("my-subscription")
.ackTimeout(10, TimeUnit.SECONDS)
.maxTotalReceiverQueueSizeAcrossPartitions(10)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
do {
// 接收消息有两种方式:异步和同步
// CompletableFuture<Message<String>> message = consumer.receiveAsync();
Message message = consumer.receive();
String format = String.format("%s", new String(message.getData()));
System.err.println(format);
// 消息确认机制
consumer.acknowledge(message);
} while (true);
}
}
(4). 总结
后面,会对Pulsar的基本命令进行学习,以及相关源码深入剖析.