(1). 概述
在前面的源码中,稍有剖析NettyMessagingService,它是底层的通信基石,所以,这一小篇,通过一个小小的案例,来了解这个类的功能点.
(2). NettyMessagingServiceCompressionTest
package io.atomix.cluster.messaging.impl;
import static org.assertj.core.api.Assertions.assertThat;
import io.atomix.cluster.messaging.ManagedMessagingService;
import io.atomix.cluster.messaging.MessagingConfig;
import io.atomix.cluster.messaging.MessagingConfig.CompressionAlgorithm;
import io.atomix.utils.net.Address;
import io.camunda.zeebe.test.util.socket.SocketUtil;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
class NettyMessagingServiceCompressionTest {
void shouldSendAndReceiveMessagesWhenCompressionEnabled() {
// 1.配置压缩算法
final var config =
new MessagingConfig()
.setShutdownQuietPeriod(Duration.ofMillis(50))
.setCompressionAlgorithm(CompressionAlgorithm.NONE);
// 2. 监听1025端口,并启动(主要是用于发送消息)
final var senderAddress = Address.from(SocketUtil.getNextAddress().getPort());
final var senderNetty = (ManagedMessagingService)new NettyMessagingService("test", senderAddress, config).start().join();
// 3. 监听1026端口,并启动(主要用于接收消息).
final var receiverAddress = Address.from(SocketUtil.getNextAddress().getPort());
final var receiverNetty = (ManagedMessagingService)new NettyMessagingService("test", receiverAddress, config).start().join();
// 定义主题
final String subject = "subject";
// 定义发送消息的消息体
final String requestString = "message";
// 定义回复消息的消息体
final String responseString = "success";
// 4. 配置处理器,监听某个主题(subject),并,回复消息
receiverNetty.registerHandler(
subject,
(m, payload) -> {
final String message = new String(payload);
assertThat(message).isEqualTo(requestString);
return CompletableFuture.completedFuture(responseString.getBytes());
});
// 5. 向某个IP:PORT发送消息.
final CompletableFuture<byte[]> response = senderNetty.sendAndReceive(receiverAddress, subject, requestString.getBytes());
// 6. 等待结果返回
final var result = response.join();
assertThat(new String(result)).isEqualTo(responseString);
// 7. 关闭
senderNetty.stop();
receiverNetty.stop();
}
}
(3). 总结
之所以,拿出这样一个案例是因为,看了下Zeebe针对Netty通信方面的的源码写得还是挺不错的,后面想把代码抠出来,复用下.