(1). Canal Server机器信息
我这里是对Canal Server进行伪集群配置
可参考(https://github.com/alibaba/canal/wiki/AdminGuide)
Canal Server | Canal Server Port |
---|---|
127.0.0.1 | 11111 |
127.0.0.1 | 22222 |
(2). ZK集群信息
Zookeeper Server | Zookeeper Port |
---|---|
127.0.0.1 | 2181 |
127.0.0.1 | 2182 |
127.0.0.1 | 2183 |
(3). MySQL服务器信息
MySQL Server | MySQL Port |
---|---|
127.0.0.1 | 3306 |
(4). 当前工作目录
# 当前工作目录
lixin-macbook:canal-server-cluster lixin$ pwd
/Users/lixin/Developer/canal-server-cluster
# 两台Canal Server
lixin-macbook:canal-server-cluster lixin$ ll
drwxr-xr-x 7 lixin staff 224 11 19 14:55 canal-server-1/
drwxr-xr-x 7 lixin staff 224 11 19 14:55 canal-server-2/
(5). 修改全局canal.properties配置文件
canal-server-1/conf/canal.properties
# 修改如下配置
# 指定端口
canal.port = 11111
canal.metrics.pull.port = 11112
canal.admin.port = 11110
# 配置ZK地址
canal.zkServers = 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
# 注释掉:canal.instance.global.spring.xml = classpath:spring/file-instance.xml
# HA模式:必须都选择default-instance.xml配置.
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
canal-server-2/conf/canal.properties
# 指定端口
canal.port = 22222
canal.metrics.pull.port = 22223
canal.admin.port = 22220
# 配置ZK地址
canal.zkServers = 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
# 注释掉:canal.instance.global.spring.xml = classpath:spring/file-instance.xml
# HA模式:必须都选择default-instance.xml配置.
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
(6). 修改实例(example)instance.properties配置文件
canal-server-1/conf/example/instance.properties
# slaveId要保证不可重复
canal.instance.mysql.slaveId=12345
# 指定MySQL服务器/账号/密码
# 参考Canal Server的单机配置(创建的复制账号)
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal-server-2/conf/example/instance.properties
# slaveId要保证不可重复
canal.instance.mysql.slaveId=54321
# 指定MySQL服务器/账号/密码
# 参考Canal Server的单机配置(创建的复制账号)
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
(7). 启动ZK(略)
(8). 启动Canal Server
# 启动第一台
/Users/lixin/Developer/canal-server-cluster/canal-server-1/bin/startup.sh
# 启动第二台
/Users/lixin/Developer/canal-server-cluster/canal-server-2/bin/startup.sh
(9). 查看(canal-server-1)日志
lixin-macbook:canal lixin$ ll /Users/lixin/Developer/canal-server-cluster/canal-server-1/logs/
# canal启动日志
drwxr-xr-x 4 lixin staff 128 11 19 15:31 canal/
# 实例example日志
drwxr-xr-x 3 lixin staff 96 11 19 15:31 example/
# 查看实例日志
lixin-macbook:canal lixin$ cat /Users/lixin/Developer/canal-server-cluster/canal-server-1/logs/example/example.log
2020-11-19 15:31:54.053 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2020-11-19 15:31:54.094 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2020-11-19 15:31:54.412 [main] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]
2020-11-19 15:31:54.517 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2020-11-19 15:31:54.518 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2020-11-19 15:31:55.130 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2020-11-19 15:31:55.140 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2020-11-19 15:31:55.140 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter :
#############################提示启动成功#############################
2020-11-19 15:31:55.154 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2020-11-19 15:31:55.277 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2020-11-19 15:31:55.278 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
2020-11-19 15:31:55.679 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000018,position=4,serverId=1,gtid=<null>,timestamp=1605768072000] cost : 385ms , the next step is binlog dump
(10). 查看(canal-server-2)日志
# canal-server-2只有canal日志信息,并无实例信息
lixin-macbook:canal-server-cluster lixin$ ll ./canal-server-2/logs/
drwxr-xr-x 4 lixin staff 128 11 19 15:31 canal/
# 查看canal日志信息
lixin-macbook:canal-server-cluster lixin$ cat ./canal-server-2/logs/canal/canal.log
2020-11-19 16:35:23.042 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2020-11-19 16:35:23.222 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2020-11-19 16:35:23.269 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2020-11-19 16:35:23.650 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.17.0.125(172.17.0.125):22222]
2020-11-19 16:35:24.220 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
(11). ZK客户端查看实例信息
# example的实现,在运行状态的是:172.17.0.125:11111
[zk: localhost:2181(CONNECTED) 9] get /otter/canal/destinations/example/running
{"active":true,"address":"172.17.0.125:11111"}
(12). Canal Client连接Canal Server进行消费
请参考:com.alibaba.otter.canal.example.ClusterCanalClientTest
String destination = "example";
CanalConnector connector = CanalConnectors
.newClusterConnector(
// ZK集群列表
"127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183",
// 实例
destination,
null,
null);
(14). 查看ZK信息
# 在实例(example)节点下,增加了:1001节点,并包含有两个子节点:cursor/running
# /otter/canal/destinations/example/1001/cursor
# /otter/canal/destinations/example/1001/running
[zk: localhost:2181(CONNECTED) 62] ls /otter/canal/destinations/example/1001
[cursor, running]
# 查看活动的canal client,意味着:canal client也可以集群.
# 那么1001是怎么来的?是固定的吗?
[zk: localhost:2181(CONNECTED) 64] get /otter/canal/destinations/example/1001/running
{"active":true,"address":"172.17.0.125:63912","clientId":1001}
# 查看position信息(ZK持久化记录:上次消费的position)
[zk: localhost:2181(CONNECTED) 63] get /otter/canal/destinations/example/1001/cursor
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000018","position":384,"serverId":1,"timestamp":1605773474000}}
(15). Canal Client日志
2020-11-19 16:38:13.102 [ZkClient-EventThread-10-127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183] INFO org.I0Itec.zkclient.ZkEventThread - Starting ZkClient event thread.
2020-11-19 16:38:13.104 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT
2020-11-19 16:38:13.106 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:host.name=localhost
2020-11-19 16:38:13.106 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.version=1.8.0_251
2020-11-19 16:38:13.106 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.vendor=Oracle Corporation
2020-11-19 16:38:13.107 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_251.jdk/Contents/Home/jre
2020-11-19 16:38:13.107 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.library.path=/Users/lixin/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.
2020-11-19 16:38:13.107 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.io.tmpdir=/var/folders/l2/v7kxnww15mjb9sps4yb25sqh0000gn/T/
2020-11-19 16:38:13.107 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.compiler=<NA>
2020-11-19 16:38:13.107 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.name=Mac OS X
2020-11-19 16:38:13.107 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.arch=x86_64
2020-11-19 16:38:13.107 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.version=10.15.7
2020-11-19 16:38:13.107 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.name=lixin
2020-11-19 16:38:13.107 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.home=/Users/lixin
2020-11-19 16:38:13.107 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.dir=/Users/lixin/GitRepository/canal-1.1.4/example
###########################初始化连接到ZK##########################################
2020-11-19 16:38:13.107 [main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 sessionTimeout=90000 watcher=com.alibaba.otter.canal.common.zookeeper.ZkClientx@6a4f787b
2020-11-19 16:38:13.131 [main] INFO org.I0Itec.zkclient.ZkClient - Waiting for keeper state SyncConnected
2020-11-19 16:38:13.133 [main-SendThread(localhost:2183)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2183. Will not attempt to authenticate using SASL (unknown error)
2020-11-19 16:38:13.225 [main-SendThread(localhost:2183)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to localhost/127.0.0.1:2183, initiating session
2020-11-19 16:38:13.235 [main-SendThread(localhost:2183)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2183, sessionid = 0x3000034a4be0004, negotiated timeout = 40000
2020-11-19 16:38:13.236 [main-EventThread] INFO org.I0Itec.zkclient.ZkClient - zookeeper state changed (SyncConnected)
###########################MySQL插入一条数据##########################################
****************************************************
* Batch Id: [3] ,count : [3] , memsize : [150] , Time : 2020-11-19 16:38:22
* Start : [mysql-bin.000018:1263:1605775102000(2020-11-19 16:38:22)]
* End : [mysql-bin.000018:1428:1605775102000(2020-11-19 16:38:22)]
****************************************************
================> binlog[mysql-bin.000018:1263] , executeTime : 1605775102000(2020-11-19 16:38:22) , gtid : () , delay : 995ms
BEGIN ----> Thread id: 14
----------------> binlog[mysql-bin.000018:1379] , name[db,t1] , eventType : INSERT , executeTime : 1605775102000(2020-11-19 16:38:22) , gtid : () , delay : 998 ms
id : 10004 type=int(11) update=true
name : tom10004 type=varchar(20) update=true
----------------
END ----> transaction id: 170
================> binlog[mysql-bin.000018:1428] , executeTime : 1605775102000(2020-11-19 16:38:22) , gtid : () , delay : 1007ms
###########################停止canal-server-1(11111)##########################################
2020-11-19 16:38:57.135 [Thread-2] WARN c.alibaba.otter.canal.client.impl.ClusterCanalConnector - something goes wrong when getWithoutAck data from server:null
com.alibaba.otter.canal.protocol.exception.CanalClientException: java.io.IOException: Connection reset by peer
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.getWithoutAck(SimpleCanalConnector.java:325) ~[classes/:na]
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.getWithoutAck(SimpleCanalConnector.java:295) ~[classes/:na]
at com.alibaba.otter.canal.client.impl.ClusterCanalConnector.getWithoutAck(ClusterCanalConnector.java:183) ~[classes/:na]
at com.alibaba.otter.canal.example.AbstractCanalClientTest.process(AbstractCanalClientTest.java:64) [classes/:na]
at com.alibaba.otter.canal.example.AbstractCanalClientTest$1.run(AbstractCanalClientTest.java:31) [classes/:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_251]
Caused by: java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:1.8.0_251]
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[na:1.8.0_251]
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0_251]
at sun.nio.ch.IOUtil.read(IOUtil.java:197) ~[na:1.8.0_251]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[na:1.8.0_251]
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:206) ~[na:1.8.0_251]
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[na:1.8.0_251]
at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[na:1.8.0_251]
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.read(SimpleCanalConnector.java:411) ~[classes/:na]
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.readNextPacket(SimpleCanalConnector.java:401) ~[classes/:na]
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.readNextPacket(SimpleCanalConnector.java:385) ~[classes/:na]
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.receiveMessages(SimpleCanalConnector.java:330) ~[classes/:na]
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.getWithoutAck(SimpleCanalConnector.java:323) ~[classes/:na]
... 5 common frames omitted
# 连接到:canal-server2(22222)
2020-11-19 16:39:02.266 [Thread-2] ERROR c.a.otter.canal.client.impl.running.ClientRunningMonitor - There is an error when execute initRunning method, with destination [example].
com.alibaba.otter.canal.protocol.exception.CanalClientException: failed to subscribe with reason: something goes wrong with channel:[id: 0x291b1bbf, /172.17.0.125:52599 => /172.17.0.125:22222], exception=com.alibaba.otter.canal.server.exception.CanalServerException: destination:example should start first
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.subscribe(SimpleCanalConnector.java:249) [classes/:na]
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector$1.processActiveEnter(SimpleCanalConnector.java:434) ~[classes/:na]
at com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor.processActiveEnter(ClientRunningMonitor.java:221) [classes/:na]
at com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor.initRunning(ClientRunningMonitor.java:123) [classes/:na]
at com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor.start(ClientRunningMonitor.java:93) [classes/:na]
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.connect(SimpleCanalConnector.java:108) [classes/:na]
at com.alibaba.otter.canal.client.impl.ClusterCanalConnector.connect(ClusterCanalConnector.java:64) [classes/:na]
at com.alibaba.otter.canal.client.impl.ClusterCanalConnector.restart(ClusterCanalConnector.java:273) [classes/:na]
at com.alibaba.otter.canal.client.impl.ClusterCanalConnector.getWithoutAck(ClusterCanalConnector.java:189) [classes/:na]
at com.alibaba.otter.canal.example.AbstractCanalClientTest.process(AbstractCanalClientTest.java:64) [classes/:na]
at com.alibaba.otter.canal.example.AbstractCanalClientTest$1.run(AbstractCanalClientTest.java:31) [classes/:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_251]
# 连接失败,进行重试
2020-11-19 16:39:02.283 [Thread-2] WARN c.alibaba.otter.canal.client.impl.ClusterCanalConnector - failed to connect to:/172.17.0.125:22222 after retry 0 times
2020-11-19 16:39:02.288 [Thread-2] WARN c.a.otter.canal.client.impl.running.ClientRunningMonitor - canal is not run any in node
2020-11-19 16:39:07.307 [Thread-2] INFO c.alibaba.otter.canal.client.impl.ClusterCanalConnector - restart the connector for next round retry.
###########################MySQL中重新插入一条数据##########################################
****************************************************
* Batch Id: [1] ,count : [1] , memsize : [31] , Time : 2020-11-19 16:39:07
* Start : [mysql-bin.000018:1428:1605775102000(2020-11-19 16:38:22)]
* End : [mysql-bin.000018:1428:1605775102000(2020-11-19 16:38:22)]
****************************************************
----------------
END ----> transaction id: 170
================> binlog[mysql-bin.000018:1428] , executeTime : 1605775102000(2020-11-19 16:38:22) , gtid : () , delay : 45313ms
****************************************************
* Batch Id: [2] ,count : [3] , memsize : [150] , Time : 2020-11-19 16:39:28
* Start : [mysql-bin.000018:1524:1605775168000(2020-11-19 16:39:28)]
* End : [mysql-bin.000018:1689:1605775168000(2020-11-19 16:39:28)]
****************************************************
================> binlog[mysql-bin.000018:1524] , executeTime : 1605775168000(2020-11-19 16:39:28) , gtid : () , delay : 228ms
BEGIN ----> Thread id: 14
----------------> binlog[mysql-bin.000018:1640] , name[db,t1] , eventType : INSERT , executeTime : 1605775168000(2020-11-19 16:39:28) , gtid : () , delay : 228 ms
id : 10005 type=int(11) update=true
name : tom10005 type=varchar(20) update=true
----------------
END ----> transaction id: 191
================> binlog[mysql-bin.000018:1689] , executeTime : 1605775168000(2020-11-19 16:39:28) , gtid : () , delay : 229ms
(16). ZK节点信息介绍
/otter/canal/destinations/example/running : (EPHEMERAL节点)记录集群中活动的CanalServer
[zk: localhost:2181(CONNECTED) 129] get /otter/canal/destinations/example/running
{"active":true,"address":"172.17.0.125:22222"}
/otter/canal/destinations/example/1001/running : (EPHEMERAL节点)记录集群中活动的CanalClient
[zk: localhost:2181(CONNECTED) 130] get /otter/canal/destinations/example/1001/running
{"active":true,"address":"172.17.0.125:52603","clientId":1001}
/otter/canal/destinations/example/1001/cursor : (Persistent节点)记录MySQL position信息
[zk: localhost:2181(CONNECTED) 131] get /otter/canal/destinations/example/1001/cursor
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000018","position":1689,"serverId":1,"timestamp":1605775168000}}
/otter/canal/destinations/example/cluster : (EPHEMERAL节点)记录CanalServer集群的机器列表(ip:port)
[zk: localhost:2181(CONNECTED) 134] ls /otter/canal/destinations/example/cluster
[172.17.0.125:11111, 172.17.0.125:22222]
(17). 总结
Canal Server利用了ZK进行了选举.启动时,多台机器抢占创建ZK(EPHEMERAL/瞬时)节点(/otter/canal/destinations/example/running),同一时间只有一台能抢占成功,抢占失败的监听该节点变化,一旦发现该节点不存在,继续进行抢占.