(1). 概述
前面分析了UdpTransportManager的功能之一就是接受消息并处理,在这里,剖析它的另一大功能,就是发送成员信息.
(2). UdpTransportManager构造器
在前面对UdpTransportManager的构造器只是一带而过,实际:UdpTransportManager继承于:AbstractTransportManager
public UdpTransportManager(GossipManager gossipManager, GossipCore gossipCore) {
super(gossipManager, gossipCore);
// ... ...
}
(3). AbstractTransportManager构造器
public AbstractTransportManager(GossipManager gossipManager, GossipCore gossipCore) {
this.gossipManager = gossipManager;
this.gossipCore = gossipCore;
gossipThreadExecutor = Executors.newCachedThreadPool();
// ********************************************************
// 通过反射,创建:AbstractActiveGossiper的实现类
// org.apache.gossip.manager.SimpleActiveGossiper
// ********************************************************
activeGossipThread = ReflectionUtils.constructWithReflection(
gossipManager.getSettings().getActiveGossipClass(),
new Class<?>[]{
GossipManager.class, GossipCore.class, MetricRegistry.class
},
new Object[]{
gossipManager, gossipCore, gossipManager.getRegistry()
});
} // end
(4). AbstractTransportManager.startActiveGossiper
public void startActiveGossiper() {
// ****************************************************************
// activeGossipThread = org.apache.gossip.manager.SimpleActiveGossiper
// ****************************************************************
activeGossipThread.init();
}
(5). SimpleActiveGossiper.init
public void init() {
super.init();
// *************************************************************************************
// sendToALiveMember 发送在线成员列表
// *************************************************************************************
scheduledExecutorService.scheduleAtFixedRate(() -> {
threadService.execute(() -> {
sendToALiveMember();
});
}, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
// *************************************************************************************
// sendToDeadMember发送离线成员列表
// 在前面有剖析过,其余的种子成员,默认状态是下线状态的(DOWN)
// *************************************************************************************
scheduledExecutorService.scheduleAtFixedRate(() -> {
sendToDeadMember();
}, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
// ... ...
}
(6). SimpleActiveGossiper.sendToDeadMember
protected void sendToDeadMember(){
// 通过随机函数,过滤出来一个:状态为下线(DOWN)成员
LocalMember member = selectPartner(gossipManager.getDeadMembers());
// 委托给sendMembershipList方法
// 注意:mySelf是本机(me)成员,而,member是随机其它种子成员.
sendMembershipList(gossipManager.getMyself(), member);
}
(7). SimpleActiveGossiper.sendMembershipList
protected void sendMembershipList(LocalMember me, LocalMember member) {
if (member == null){
return;
}
long startTime = System.currentTimeMillis();
me.setHeartbeat(System.nanoTime());
// ****************************************************************
// 1. 创建报文(UdpActiveGossipMessage),内剖持有一个集合列表.
// ****************************************************************
UdpActiveGossipMessage message = new UdpActiveGossipMessage();
message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
message.setUuid(UUID.randomUUID().toString());
// ****************************************************************
// 2. 集合列表中的第一个成员是本机成员(me)
// ****************************************************************
message.getMembers().add(convert(me));
// ****************************************************************
// 3. 其余的成员,都往集合的第一个成员后添加,这也是能解释,为什么上一篇(接受消息)时,要标记出第一个成员,因为第一个成员发过来的消息,肯定是活着的.
// ****************************************************************
for (LocalMember other : gossipManager.getMembers().keySet()) {
message.getMembers().add(convert(other));
}
// ****************************************************************
// 4. 委派给GossipCore.send方法进行消息发送,这个没有什么好去剖析的了,肯定是委托给:TransportManager做处理.
// ****************************************************************
Response r = gossipCore.send(message, member.getUri());
if (r instanceof ActiveGossipOk){
} else {
LOGGER.debug("Message " + message + " generated response " + r);
}
sendMembershipHistogram.update(System.currentTimeMillis() - startTime);
} //end
(8). 总结
UdpTransportManager的startActiveGossiper方法,会以固定的时间,向其它成员发送心跳信息,至此,Gossip大体的流程剖析完毕了.