(1). CanalStarter
CanalLauncher读取配置文件,并调用:CanalStarter.start方法
public class CanalStarter {
private static final Logger logger = LoggerFactory.getLogger(CanalStarter.class);
// *******************************************************
// Canal控制器入口
private CanalController controller = null;
// 抽象MQ生产者
private CanalMQProducer canalMQProducer = null;
// 关闭线程
private Thread shutdownThread = null;
private CanalMQStarter canalMQStarter = null;
// 全局:canal.properties
private volatile Properties properties;
// 运行标记
private volatile boolean running = false;
//
private CanalAdminWithNetty canalAdmin;
public synchronized void start() throws Throwable {
// canal.serverMode = tcp
String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
if (serverMode.equalsIgnoreCase("kafka")) { // false
canalMQProducer = new CanalKafkaProducer();
} else if (serverMode.equalsIgnoreCase("rocketmq")) { //false
canalMQProducer = new CanalRocketMQProducer();
}
if (canalMQProducer != null) {// flase
// disable netty
System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
// 设置为raw避免ByteString->Entry的二次解析
System.setProperty("canal.instance.memory.rawEntry", "false");
}
logger.info("## start the canal server.");
// **********************************************
// 创建:CanalController
controller = new CanalController(properties);
// 开始启动
controller.start();
logger.info("## the canal server is running now ......");
// 配置关闭线程
shutdownThread = new Thread() {
public void run() {
try {
logger.info("## stop the canal server");
controller.stop();
CanalLauncher.runningLatch.countDown();
} catch (Throwable e) {
logger.warn("##something goes wrong when stopping canal Server:", e);
} finally {
logger.info("## canal server is down.");
}
}
};
// 添加关闭Hook
Runtime.getRuntime().addShutdownHook(shutdownThread);
if (canalMQProducer != null) { // false
canalMQStarter = new CanalMQStarter(canalMQProducer);
MQProperties mqProperties = buildMQProperties(properties);
String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
canalMQStarter.start(mqProperties, destinations);
controller.setCanalMQStarter(canalMQStarter);
}
// start canalAdmin
// canal.admin.port = 11110
String port = properties.getProperty(CanalConstants.CANAL_ADMIN_PORT);
if (canalAdmin == null && StringUtils.isNotEmpty(port)) { // true
//canal.admin.user = admin
String user = properties.getProperty(CanalConstants.CANAL_ADMIN_USER);
// canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
String passwd = properties.getProperty(CanalConstants.CANAL_ADMIN_PASSWD);
CanalAdminController canalAdmin = new CanalAdminController(this);
canalAdmin.setUser(user);
canalAdmin.setPasswd(passwd);
String ip = properties.getProperty(CanalConstants.CANAL_IP);
// ****************************************************
// 创建:CanalAdminWithNetty
CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance();
canalAdminWithNetty.setCanalAdmin(canalAdmin);
canalAdminWithNetty.setPort(Integer.valueOf(port));
canalAdminWithNetty.setIp(ip);
canalAdminWithNetty.start();
this.canalAdmin = canalAdminWithNetty;
}
running = true;
}// end start
}
(2). 总结
- 创建CanalController.start()方法
- 创建CanalAdminWithNetty,端口为:11110
lixin-macbook:canal-server lixin$ lsof -nP -p 17747|grep LISTEN
java 17747 lixin 95u IPv4 0x73965c583e0d9249 0t0 TCP *:11112 (LISTEN)
java 17747 lixin 113u IPv4 0x73965c5844d62609 0t0 TCP *:11111 (LISTEN)
java 17747 lixin 117u IPv4 0x73965c584cc4d249 0t0 TCP *:11110 (LISTEN)