(1). 概述
前面剖析到:GossipManager内部会创建UdpTransportManager进行消息管理,在这一小篇,主要剖析接收消息后,如何进行处理的.
(2). UdpTransportManager.startEndpoint
public class UdpTransportManager
extends AbstractTransportManager
// ***********************************************************************
// 1. 它实现Runnable
// ***********************************************************************
implements Runnable {
private final Thread me;
public UdpTransportManager(GossipManager gossipManager, GossipCore gossipCore) {
super(gossipManager, gossipCore);
soTimeout = gossipManager.getSettings().getGossipInterval() * 2;
try {
SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost() , gossipManager.getMyself().getUri().getPort());
// udp://localhost:10000
server = new DatagramSocket(socketAddress);
} catch (SocketException ex) {
LOGGER.warn(ex);
throw new RuntimeException(ex);
}
// ***********************************************************************
// 2. 创建了一个线程,指定Runnable为this,所以,这个类,要是Runnable的实现类,并且,要求要实现:run方法.
// ***********************************************************************
me = new Thread(this);
} // end
// ***********************************************************************
// 3. 直接Thread.start方法
// ***********************************************************************
public void startEndpoint() {
me.start();
} // end
}
(3). UdpTransportManager.run
public void run() {
while (keepRunning.get()) {
try {
// **********************************************************
// 1. 通过UDP读取消息.
// **********************************************************
byte[] buf = read();
try {
// 通过协议管理器,进行解码
Base message = gossipManager.getProtocolManager().read(buf);
// ********************************************************
// 2. 委派给GossipCore.receive方法,进行消息的处理.
// ********************************************************
// 委派给GossipCore进行处理
gossipCore.receive(message);
//TODO this is suspect
gossipManager.getMemberStateRefresher().run();
} catch (RuntimeException ex) {//TODO trap json exception
LOGGER.error("Unable to process message", ex);
}
} catch (IOException e) {
if (!(e.getCause() instanceof InterruptedException)) {
LOGGER.error(e);
}
keepRunning.set(false);
}
}
} // end
(4). GossipCore.receive
public void receive(Base base) {
// ************************************************************
// 前面有剖析过,会遍历MessageHandler(TypedMessageHandler),判断数据类型(Base)是否支持,如果支持,则invoke,并返回:true
// ************************************************************
if (!gossipManager.getMessageHandler().invoke(this, gossipManager, base)) {
// 当没有找到相应的:MessageHandler时,提示处理不存在
LOGGER.warn("received message can not be handled");
}
} // end
(5). ActiveGossipMessageHandler
在这里以:ActiveGossipMessageHandler为例
public class ActiveGossipMessageHandler implements MessageHandler {
@Override
public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
List<Member> remoteGossipMembers = new ArrayList<>();
RemoteMember senderMember = null;
UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base;
// *************************************************************************
// 遍历所有的成员列表
// *************************************************************************
for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) {
URI u;
try {
u = new URI(activeGossipMessage.getMembers().get(i).getUri());
} catch (URISyntaxException e) {
GossipCore.LOGGER.debug("Gossip message with faulty URI", e);
continue;
}
RemoteMember member = new RemoteMember(
activeGossipMessage.getMembers().get(i).getCluster(),
u,
activeGossipMessage.getMembers().get(i).getId(),
activeGossipMessage.getMembers().get(i).getHeartbeat(),
activeGossipMessage.getMembers().get(i).getProperties());
// 标记第一个为发送者成员
if (i == 0) {
senderMember = member;
}
// 处理集群名称相同的成员
if (!(member.getClusterName().equals(gossipManager.getMyself().getClusterName()))) {
UdpNotAMemberFault f = new UdpNotAMemberFault();
f.setException("Not a member of this cluster " + i);
f.setUriFrom(activeGossipMessage.getUriFrom());
f.setUuid(activeGossipMessage.getUuid());
GossipCore.LOGGER.warn(f);
gossipCore.sendOneWay(f, member.getUri());
continue;
}
remoteGossipMembers.add(member);
}
// apache gossip好像对Google进行了增强,最终会发送一个请求,告之对方.
UdpActiveGossipOk o = new UdpActiveGossipOk();
o.setUriFrom(activeGossipMessage.getUriFrom());
o.setUuid(activeGossipMessage.getUuid());
gossipCore.sendOneWay(o, senderMember.getUri());
// *************************************************
// 最终会处理member信息.
// *************************************************
gossipCore.mergeLists(senderMember, remoteGossipMembers);
return true;
}
}
(6). GossipCore.mergeLists
public void mergeLists(RemoteMember senderMember, List<Member> remoteList) {
// senderMember 代表着远程发送消息过来的程序
// 1. 本进程记录的:下线的成员列表中,如果,包含有:发送消息过来的成员,则先标记这个成员的心跳时间.
for (LocalMember i : gossipManager.getDeadMembers()) {
if (i.getId().equals(senderMember.getId())) {
LOGGER.debug(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri());
i.recordHeartbeat(senderMember.getHeartbeat());
i.setHeartbeat(senderMember.getHeartbeat());
//TODO consider forcing an UP here
}
}
// 2. 遍历所有的成员列表
for (Member remoteMember : remoteList) {
// 如果,遍历的某一个成员信息与本地(me)成员相同,则跳过.
if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {
continue;
}
// 3. 把RemoteMember转换成:LocalMember
LocalMember aNewMember = new LocalMember(remoteMember.getClusterName(),
remoteMember.getUri(),
remoteMember.getId(),
remoteMember.getHeartbeat(),
remoteMember.getProperties(),
gossipManager.getSettings().getWindowSize(),
gossipManager.getSettings().getMinimumSamples(),
gossipManager.getSettings().getDistribution());
aNewMember.recordHeartbeat(remoteMember.getHeartbeat());
// ***************************************************************************
// 4. 添加新的成员到:ConcurrentSkipListMap<LocalMember, GossipState>集合里,并标记为:UP
// ***************************************************************************
Object result = gossipManager.getMembers().putIfAbsent(aNewMember, GossipState.UP);
// 5. 当ConcurrentSkipListMap<LocalMember, GossipState>集合里,存在这个成员的情况下,重新标记这个成员的相关信息(Heartbeat)
if (result != null){
for (Entry<LocalMember, GossipState> localMember : gossipManager.getMembers().entrySet()){
if (localMember.getKey().getId().equals(remoteMember.getId())){
localMember.getKey().recordHeartbeat(remoteMember.getHeartbeat());
localMember.getKey().setHeartbeat(remoteMember.getHeartbeat());
localMember.getKey().setProperties(remoteMember.getProperties());
}
}
} // end result
}// end for
} // end
(7). 总结
UdpTransportManager的职责之一就是接受网络请求,并,转发请求给相应的:MessageHandler(比如:ActiveGossipMessageHandler),反正,最终MessageHandler会操纵GossipManager更新成员列表(ConcurrentSkipListMap<LocalMember, GossipState>)信息,注意一点就是:本机成员(me)是不会在这个集合里的.