(1). 概述
我只关心WAL的复制功能,所以,只挑部份源码查看.
(2). HRegionServer.run
public void run() {
// ... ...
// 1. 判断是否集群模式
while (keepLooping()) {
// 2. 向HMaster发送请求,返回MAP内容如下:
// name: "hbase.rootdir" value: "hdfs://lixin-macbook.local:9000/hbase"
// name: "fs.defaultFS" value: "hdfs://lixin-macbook.local:9000"
// name: "hbase.master.info.port" value: "16010"
// name: "hbase.regionserver.hostname.seen.by.master" value: "localhost"
RegionServerStartupResponse w = reportForDuty();
if (w == null) {
LOG.warn("reportForDuty failed; sleeping and then retrying.");
this.sleeper.sleep();
} else {
// 3. 处理结果
handleReportForDutyResponse(w);
break;
}
}
// ... ...
}
(3). HRegionServer.handleReportForDutyResponse
protected void handleReportForDutyResponse(final RegionServerStartupResponse c) throws IOException {
// ... ...
// 设置WAL和复制功能
this.walFactory = setupWALAndReplication();
// ... ...
}
(4). HRegionServer.setupWALAndReplication
private WALFactory setupWALAndReplication() throws IOException {
// 创建复制实例
createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir);
}
(5). HRegionServer.createNewReplicationInstance
static private void createNewReplicationInstance(
// Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, hbase-default.xml, hbase-site.xml
Configuration conf,
//
HRegionServer server,
// DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1886280369_1, ugi=lixin (auth:SIMPLE)]]
FileSystem walFs,
// hdfs://lixin-macbook.local:9000/hbase/WALs/localhost,16201,1617955056347
Path walDir,
// hdfs://lixin-macbook.local:9000/hbase/oldWALs
Path oldWALDir) throws IOException{
// ...
// 1. 读取配置:hbase.replication.source.service,如果没有配置,则返回默认值:org.apache.hadoop.hbase.replication.regionserver.Replication
String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
// 2. 读取配置:hbase.replication.sink.service,如果没有配置,则返回默认值:org.apache.hadoop.hbase.replication.regionserver.Replication
String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
// 如果:sourceClassname和sinkClassname相同,replicationSinkHandler=replicationSourceHandler
if (sourceClassname.equals(sinkClassname)) { //true
// 给HRegionServer设置复制SourceHandler/SinkHandler
// HRegionServer.replicationSourceHandler
server.replicationSourceHandler = (ReplicationSourceService)
newReplicationInstance(sourceClassname,
conf, server, walFs, walDir, oldWALDir);
server.replicationSinkHandler = (ReplicationSinkService)
server.replicationSourceHandler;
} else {
// 通过反射构造各自的Hnadler
server.replicationSourceHandler = (ReplicationSourceService)
newReplicationInstance(sourceClassname,
conf, server, walFs, walDir, oldWALDir);
server.replicationSinkHandler = (ReplicationSinkService)
newReplicationInstance(sinkClassname,
conf, server, walFs, walDir, oldWALDir);
} // end else
}
(6). HRegionServer.newReplicationInstance
static private ReplicationService newReplicationInstance(
// org.apache.hadoop.hbase.replication.regionserver.Replication
String classname,
Configuration conf,
HRegionServer server,
FileSystem walFs,
Path walDir,
Path oldLogDir) throws IOException{
// 1. 通过反射加载:org.apache.hadoop.hbase.replication.regionserver.Replication
Class<?> clazz = null;
try {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
clazz = Class.forName(classname, true, classLoader);
} catch (java.lang.ClassNotFoundException nfe) {
throw new IOException("Could not find class for " + classname);
}
// 2. 调用:Replication类的无参构造器,返回:ReplicationService实例
// create an instance of the replication object.
ReplicationService service = (ReplicationService)
ReflectionUtils.newInstance(clazz, conf);
// 3. ReplicationService.initialize.
service.initialize(server, walFs, walDir, oldLogDir);
return service;
}
(7). 总结
- HRegionServer启动时,会与HMaster进行交互,获得:WAL所在的目录.
- 构建:Replication实例,并初始化.
- Replication的内容,另开一篇来分析.