(1). 概述
在前面对NettyMessagingService进行了简单的了解,它主要的职责是进行通信的管理,对于Server(服务端)来说,底层原理应该是要接受请求,并回调给业务代码来着,所以,我们要找到NettyMessagingService与Netty的交接处:ChannelInboundHandlerAdapter.
(2). NettyMessagingService$HandshakeHandlerAdapter
public class NettyMessagingService {
// ************************************************************************************
// 1. Zeebe定义,握手处理抽象类.
// ************************************************************************************
private abstract class HandshakeHandlerAdapter<M extends ProtocolMessage> extends ChannelInboundHandlerAdapter {
void writeProtocolVersion(final ChannelHandlerContext context, final ProtocolVersion version) {
// ... ...
} // end
OptionalInt readProtocolVersion(final ChannelHandlerContext context, final ByteBuf buffer) {
// ... ...
} // end
void activateProtocolVersion(
final ChannelHandlerContext context,
final Connection<M> connection,
final ProtocolVersion protocolVersion) {
final MessagingProtocol protocol = protocolVersion.createProtocol(advertisedAddress);
context.pipeline().remove(this);
context.pipeline().addLast("encoder", protocol.newEncoder());
context.pipeline().addLast("decoder", protocol.newDecoder());
context.pipeline().addLast("handler", new MessageDispatcher<>(connection));
} // end activateProtocolVersion
} // end HandshakeHandlerAdapter
} // end NettyMessagingService
(3). NettyMessagingService$ServerHandshakeHandlerAdapter
public class NettyMessagingService {
private class ServerHandshakeHandlerAdapter extends HandshakeHandlerAdapter<ProtocolRequest> {
@Override
public void channelRead(final ChannelHandlerContext context, final Object message)
throws Exception {
readProtocolVersion(context, (ByteBuf) message)
.ifPresent(
version -> {
ProtocolVersion protocolVersion = ProtocolVersion.valueOf(version);
if (protocolVersion == null) {
protocolVersion = ProtocolVersion.latest();
}
writeProtocolVersion(context, protocolVersion);
activateProtocolVersion(
context,
// ****************************************************************
// 创建了一个RemoteServerConnection
// ****************************************************************
new RemoteServerConnection(handlers, context.channel()),
protocolVersion);
});
} // end channelRead
@Override
void activateProtocolVersion(
final ChannelHandlerContext context,
final Connection<ProtocolRequest> connection,
final ProtocolVersion protocolVersion) {
log.debug(
"Activating server protocol version {} for connection to {}",
protocolVersion,
context.channel().remoteAddress());
super.activateProtocolVersion(context, connection, protocolVersion);
} // end activateProtocolVersion
} // end NettyMessagingService$ServerHandshakeHandlerAdapter
}
(4). RemoteServerConnection
RemoteServerConnection是我们的核心类了,它相当于一个桥梁类,一侧与Netty打交道,另一侧与我们的业务Handler打交道.
interface Connection<M extends ProtocolMessage> {
// 分发消息
void dispatch(M message);
// ... ...
} // end
interface ServerConnection extends Connection<ProtocolRequest> {
// 回复消息
void reply(ProtocolRequest message, ProtocolReply.Status status, Optional<byte[]> payload);
// ... ...
} // end
abstract class AbstractServerConnection implements ServerConnection {
// *****************************************************************
// HandlerRegistry是一个典型的中介者模式,下一篇剖析跟它相关的内容.
// *****************************************************************
private final HandlerRegistry handlers;
AbstractServerConnection(final HandlerRegistry handlers) {
this.handlers = handlers;
} // end AbstractServerConnection
public void dispatch(final ProtocolRequest message) {
final String subject = message.subject();
// *****************************************************************
// 2. 根据subject,从HandlerRegistry中获取一个:BiConsumer<ProtocolRequest, ServerConnection>处理消息
// *****************************************************************
final BiConsumer<ProtocolRequest, ServerConnection> handler = handlers.get(subject);
if (handler != null) {
log.trace("Received message type {} from {}", subject, message.sender());
handler.accept(message, this);
} else {
// *****************************************************************
// 3. 未找到Handler处理方式.
// *****************************************************************
log.debug("No handler for message type {} from {}", subject, message.sender());
byte[] subjectBytes = null;
if (subject != null) {
subjectBytes = StringUtil.getBytes(subject);
}
// *****************************************************************
// 4. reply交给子类(RemoteServerConnection)处理.
// 因为,自己都是一个抽象类
// *****************************************************************
reply(message, ProtocolReply.Status.ERROR_NO_HANDLER, Optional.ofNullable(subjectBytes));
}
} // end dispatch
} // end
final class RemoteServerConnection extends AbstractServerConnection {
RemoteServerConnection(final HandlerRegistry handlers, final Channel channel) {
super(handlers);
this.channel = channel;
} // end RemoteServerConnection
public void reply(
final ProtocolRequest message,
final ProtocolReply.Status status,
final Optional<byte[]> payload) {
// 仅仅是通过Netty Channel写出了一个协议消息.
final ProtocolReply response = new ProtocolReply(message.id(), payload.orElse(EMPTY_PAYLOAD), status);
channel.writeAndFlush(response, channel.voidPromise());
} // end reply
} // end
(5). 总结
NettyMessagingService底层,最终是委派给:RemoteServerConnection进行通信,而,RemoteServerConnection负责分发请求给:HandlerRegistry.