(1). 概述

在这一小篇,主要是把Atomix的源码,拉取下来,然后,运行一个测试案例,测试案例的需求是这样的,多个节点组成一个集群启动,能实时(上线/下线)感知其它节点.

(2). 节点一

package help.lixin.atomix.cluster;

import io.atomix.cluster.BootstrapService;
import io.atomix.cluster.Node;
import io.atomix.cluster.discovery.BootstrapDiscoveryProvider;
import io.atomix.cluster.discovery.MulticastDiscoveryBuilder;
import io.atomix.cluster.discovery.MulticastDiscoveryProvider;
import io.atomix.cluster.discovery.NodeDiscoveryProvider;
import io.atomix.cluster.messaging.BroadcastService;
import io.atomix.cluster.messaging.ManagedMessagingService;
import io.atomix.cluster.messaging.MessagingConfig;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.cluster.messaging.impl.NettyBroadcastService;
import io.atomix.cluster.messaging.impl.NettyMessagingService;
import io.atomix.utils.net.Address;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

public class NodeDiscoveryOneTest {

    @Test
    public void testNodeDiscoveryOne() throws Exception {
        List<Node> nodes = new ArrayList<Node>();

        Node node1 = Node.builder().withId("1").withAddress(Address.from("127.0.0.1", 50001)).build();
        Node node2 = Node.builder().withId("2").withAddress(Address.from("127.0.0.1", 50002)).build();
        Node node3 = Node.builder().withId("3").withAddress(Address.from("127.0.0.1", 50003)).build();
//        nodes.add(node1);
        nodes.add(node2);
        nodes.add(node3);

        BootstrapService bootstrapService = bootstrapService(node1);

        // 基于固定的节点服务发现
        NodeDiscoveryProvider nodeDiscoveryProvider = BootstrapDiscoveryProvider.builder().withNodes(nodes).build();
        nodeDiscoveryProvider.join(bootstrapService, node1).get();

        // 基于UDP服务发现
//        NodeDiscoveryProvider nodeDiscoveryProvider = MulticastDiscoveryProvider.builder().build();
        // 在此处会为:TCP/UDP进行服务发现
        // ManagedBroadcastService
        // ManagedMessagingService
//        nodeDiscoveryProvider.join(bootstrapService, node1).get();

        new Thread() {
            @Override
            public void run() {
                while (true) {
                    Set<Node> tmpNodes = nodeDiscoveryProvider.getNodes();
                    for (Node node : tmpNodes) {
                        System.out.print("node: " + node);
                    }
                    System.out.println();
                    try {
                        TimeUnit.SECONDS.sleep(5);
                    } catch (Exception e) {
                    }
                }
            }
        }.start();

        TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
    }

    public BootstrapService bootstrapService(Node node) {

        // 1.ManagedMessagingService(NettyMessagingService) 维扩的是一条TCP连接,主要用于两个Node(节点)之间的通信.
        // 2.通过MessagingService接口,可以大概猜出有如下功能: 异步发送消息/同步发送消息/注册事件处理/取消注册事件处理
        String cluster = "default";
        MessagingConfig messagingConfig = new MessagingConfig();
        ManagedMessagingService messagingService = new NettyMessagingService( //
                cluster,  // 集群名称
                node.address(), // 节点地址
                messagingConfig);
        messagingService.start();

        // 3. NettyBroadcastService用于广播.
        // 4. 会在本地创建"两个"(127.0.0.1:8888)UDP端口,并加入到广播地址(231.0.0.1:8888)中
        NettyBroadcastService broadcastService = new NettyBroadcastService( //
                node.address(), //
                Address.from("231.0.0.1:8888"), //
                true);
        broadcastService.start();

        // 4. BootstrapService包含着(ManagedMessagingService/BroadcastService),也就是广播和单播都支持
        return new BootstrapService() {
            @Override
            public MessagingService getMessagingService() {
                return messagingService;
            }

            @Override
            public BroadcastService getBroadcastService() {
                return broadcastService;
            }
        };
    }

}

(3). 节点二

package help.lixin.atomix.cluster;

import io.atomix.cluster.BootstrapService;
import io.atomix.cluster.Node;
import io.atomix.cluster.discovery.BootstrapDiscoveryProvider;
import io.atomix.cluster.discovery.MulticastDiscoveryProvider;
import io.atomix.cluster.discovery.NodeDiscoveryProvider;
import io.atomix.cluster.messaging.BroadcastService;
import io.atomix.cluster.messaging.ManagedMessagingService;
import io.atomix.cluster.messaging.MessagingConfig;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.cluster.messaging.impl.NettyBroadcastService;
import io.atomix.cluster.messaging.impl.NettyMessagingService;
import io.atomix.utils.net.Address;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

public class NodeDiscoveryTwoTest {

    @Test
    public void testNodeDiscoveryTwo() throws Exception {
        List<Node> nodes = new ArrayList<Node>();
        Node node1 = Node.builder().withId("1").withAddress(Address.from("127.0.0.1", 50001)).build();
        Node node2 = Node.builder().withId("2").withAddress(Address.from("127.0.0.1", 50002)).build();
        Node node3 = Node.builder().withId("3").withAddress(Address.from("127.0.0.1", 50003)).build();
        nodes.add(node1);
//        nodes.add(node2);
        nodes.add(node3);

        BootstrapService bootstrapService = bootstrapService(node2);
        // 基于固定的节点服务发现
        NodeDiscoveryProvider nodeDiscoveryProvider = BootstrapDiscoveryProvider.builder().withNodes(nodes).build();
        nodeDiscoveryProvider.join(bootstrapService, node2).get();

        // 基于UDP服务发现
//        NodeDiscoveryProvider nodeDiscoveryProvider = MulticastDiscoveryProvider.builder().build();
//        nodeDiscoveryProvider.join(bootstrapService, node2).get();

        new Thread() {
            @Override
            public void run() {
                while (true) {
                    Set<Node> tmpNodes = nodeDiscoveryProvider.getNodes();
                    for (Node node : tmpNodes) {
                        System.out.print("node: " + node);
                    }
                    System.out.println();
                    try {
                        TimeUnit.SECONDS.sleep(5);
                    } catch (Exception e) {
                    }
                }
            }
        }.start();

        TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
    }

    public BootstrapService bootstrapService(Node node) {
        // 集群名称
        String cluster = "default";
        // 消息配置
        MessagingConfig messagingConfig = new MessagingConfig();
        // 当前节点(5001端口)信息
        ManagedMessagingService messagingService = new NettyMessagingService(cluster, node.address(), messagingConfig);

        messagingService.start();

        NettyBroadcastService broadcastService = new NettyBroadcastService(node.address(), Address.from("231.0.0.1:8888"), true);
        broadcastService.start();

        return new BootstrapService() {
            @Override
            public MessagingService getMessagingService() {
                return messagingService;
            }

            @Override
            public BroadcastService getBroadcastService() {
                return broadcastService;
            }
        };
    }

}

(4). 节点三

package help.lixin.atomix.cluster;

import io.atomix.cluster.BootstrapService;
import io.atomix.cluster.Node;
import io.atomix.cluster.discovery.*;
import io.atomix.cluster.messaging.BroadcastService;
import io.atomix.cluster.messaging.ManagedMessagingService;
import io.atomix.cluster.messaging.MessagingConfig;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.cluster.messaging.impl.NettyBroadcastService;
import io.atomix.cluster.messaging.impl.NettyMessagingService;
import io.atomix.utils.net.Address;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

public class NodeDiscoveryThreeTest {

    @Test
    public void testNodeDiscoveryThree() throws Exception {
        List<Node> nodes = new ArrayList<Node>();
        Node node1 = Node.builder().withId("1").withAddress(Address.from("127.0.0.1", 50001)).build();
        Node node2 = Node.builder().withId("2").withAddress(Address.from("127.0.0.1", 50002)).build();
        Node node3 = Node.builder().withId("3").withAddress(Address.from("127.0.0.1", 50003)).build();
        nodes.add(node1);
        nodes.add(node2);
//        nodes.add(node3);

        BootstrapService bootstrapService = bootstrapService(node3);
        // 基于固定的节点服务发现
        NodeDiscoveryProvider nodeDiscoveryProvider = BootstrapDiscoveryProvider.builder().withNodes(nodes).build();
        nodeDiscoveryProvider.join(bootstrapService, node3).get();

        // 基于UDP服务发现
//        NodeDiscoveryProvider nodeDiscoveryProvider = MulticastDiscoveryProvider.builder().build();
//        nodeDiscoveryProvider.join(bootstrapService, node3).get();

        new Thread() {
            @Override
            public void run() {
                while (true) {
                    Set<Node> tmpNodes = nodeDiscoveryProvider.getNodes();
                    for (Node node : tmpNodes) {
                        System.out.print("node: " + node);
                    }
                    System.out.println();
                    try {
                        TimeUnit.SECONDS.sleep(5);
                    } catch (Exception e) {
                    }
                }
            }
        }.start();

        TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
    }

    public BootstrapService bootstrapService(Node node) {
        String cluster = "default";
        MessagingConfig messagingConfig = new MessagingConfig();
        ManagedMessagingService messagingService = new NettyMessagingService(cluster, node.address(), messagingConfig);
        messagingService.start();

        NettyBroadcastService broadcastService = new NettyBroadcastService(node.address(), Address.from("231.0.0.1:8888"), true);
        broadcastService.start();

        return new BootstrapService() {
            @Override
            public MessagingService getMessagingService() {
                return messagingService;
            }

            @Override
            public BroadcastService getBroadcastService() {
                return broadcastService;
            }
        };
    }

}

(5). 输出结果

node: Node{id=1, address=127.0.0.1:50001}
node: Node{id=2, address=127.0.0.1:50002}
node: Node{id=3, address=127.0.0.1:50003}

(6). 总结

这个案例的目的在于,能及时知道其它节点的上线和离线(而且支持TCP/UDP的方式,可以自由切换).