(1). 概述
在前面分析NettyMessagingService的构建器时,会看到有构建一个类:NettyUnicastService,那这个类主要是干嘛呢?
(2). AtomixCluster构建器
public AtomixCluster(final ClusterConfig config, final Version version) {
// *****************************************************
// buildUnicastService
// *****************************************************
this(config, version, buildMessagingService(config), buildUnicastService(config));
}
(3). AtomixCluster.buildUnicastService
protected static ManagedUnicastService buildUnicastService(final ClusterConfig config) {
// 26502
return new NettyUnicastService(config.getClusterId(), config.getNodeConfig().getAddress(), config.getMessagingConfig());
}
(4). ManagedUnicastService
先看下接口
public interface ManagedUnicastService extends UnicastService, Managed<UnicastService> {}
// 广播消息
public interface UnicastService {
// 广播
void unicast(Address address, String subject, byte[] message);
// 对接收的消息,添加监听器进行处理
void addListener(String subject, BiConsumer<Address, byte[]> listener, Executor executor);
// 移除监听器
void removeListener(String subject, BiConsumer<Address, byte[]> listener);
}
(5). NettyUnicastService.start
public CompletableFuture<UnicastService> start() {
group = new NioEventLoopGroup(0, namedThreads("netty-unicast-event-nio-client-%d", log));
// *****************************************************************
// 启动
// *****************************************************************
return bootstrap().thenRun(() -> started.set(true)).thenApply(v -> this);
}
(6). NettyUnicastService.bootstrap
private CompletableFuture<Void> bootstrap() {
final Bootstrap serverBootstrap =
new Bootstrap()
.group(group)
// ************************************************************
// NioDatagramChannel
// UDP通信
// ************************************************************
.channel(NioDatagramChannel.class)
.handler(
new SimpleChannelInboundHandler<DatagramPacket>() {
@Override
protected void channelRead0(
final ChannelHandlerContext context, final DatagramPacket packet)
throws Exception {
// ************************************************************
// 接收并处理消息
// ************************************************************
handleReceivedPacket(packet);
}
})
.option(ChannelOption.RCVBUF_ALLOCATOR, new DefaultMaxBytesRecvByteBufAllocator())
.option(ChannelOption.SO_BROADCAST, true)
.option(ChannelOption.SO_REUSEADDR, true);
return bind(serverBootstrap);
}
(7). NettyUnicastService.handleReceivedPacket
private void handleReceivedPacket(final DatagramPacket packet) {
// 集群名称的hashcode值
final int preambleReceived = packet.content().readInt();
if (preambleReceived != preamble) { // 集群名称不同时,忽悠这个消息
log.warn(
"Received unicast message from {} which is outside of the cluster. Ignoring the message.",
packet.sender());
return;
}
// 读取body的字节数
final byte[] payload = new byte[packet.content().readInt()];
packet.content().readBytes(payload);
// 通过:Kryo解码消息
final Message message = SERIALIZER.decode(payload);
// 主题监听器
final Map<BiConsumer<Address, byte[]>, Executor> subjectListeners = listeners.get(message.subject());
if (subjectListeners != null) {
// 遍历所有的listener,进行消息的处理.
subjectListeners.forEach(
(consumer, executor) ->
executor.execute(() -> consumer.accept(message.source(), message.payload())));
}
} // end
(8). NettyUnicastService.unicast
public void unicast(final Address address, final String subject, final byte[] payload) {
if (!started.get()) {
LOGGER.debug("Failed sending unicast message, unicast service was not started.");
return;
}
final InetAddress resolvedAddress = address.address();
if (resolvedAddress == null) {
LOGGER.debug(
"Failed sending unicast message (destination address {} cannot be resolved)", address);
return;
}
// 通过Kryo进行编码.
final Message message = new Message(this.address, subject, payload);
final byte[] bytes = SERIALIZER.encode(message);
// ************************************************************
// 4 + 4 + N
// 4 : preamble(集群的hashcode)
// 4 : playbody bytes length
// ************************************************************
final ByteBuf buf = channel.alloc().buffer(Integer.BYTES + Integer.BYTES + bytes.length);
buf.writeInt(preamble);
buf.writeInt(bytes.length).writeBytes(bytes);
// 直接往指定的address进行消息发送
channel.writeAndFlush(
new DatagramPacket(buf, new InetSocketAddress(resolvedAddress, address.port())));
} // end
(9). NettyUnicastServiceTest
package io.atomix.cluster.messaging.impl;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.slf4j.LoggerFactory.getLogger;
import io.atomix.cluster.messaging.ManagedUnicastService;
import io.atomix.cluster.messaging.MessagingConfig;
import io.atomix.utils.net.Address;
import io.camunda.zeebe.test.util.socket.SocketUtil;
import net.jodah.concurrentunit.ConcurrentTestCase;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
public class NettyUnicastServiceTest extends ConcurrentTestCase {
private static final Logger LOGGER = getLogger(NettyUnicastServiceTest.class);
ManagedUnicastService service1;
ManagedUnicastService service2;
Address address1; // 127.0.0.1:1025
Address address2; // 127.0.0.1:1025
@Before
public void setUp() throws Exception {
address1 = Address.from("127.0.0.1", SocketUtil.getNextAddress().getPort());
address2 = Address.from("127.0.0.1", SocketUtil.getNextAddress().getPort());
final String clusterId = "testClusterId";
service1 = new NettyUnicastService(clusterId, address1, new MessagingConfig());
service1.start().join();
service2 = new NettyUnicastService(clusterId, address2, new MessagingConfig());
service2.start().join();
} // end setUp
@Test
public void testUnicast() throws Exception {
// 监听的:1025端口,添加主题(test)处理器.
service1.addListener(
"test",
(address, payload) -> {
assertEquals(address2, address);
assertArrayEquals("Hello world!".getBytes(), payload);
resume();
});
// 1026 --> 1025 发送广播消息
service2.unicast(address1, "test", "Hello world!".getBytes());
await(5000);
} // end testUnicast
@After
public void tearDown() throws Exception {
if (service1 != null) {
try {
service1.stop().join();
} catch (final Exception e) {
LOGGER.warn("Failed stopping netty1", e);
}
}
if (service2 != null) {
try {
service2.stop().join();
} catch (final Exception e) {
LOGGER.warn("Failed stopping netty2", e);
}
}
} // end tearDown
}
(10). 总结
NettyMessagingService的底层是,在启动时监听UDP端口(26502),主要用于广播和监听消息来着的.