(1). 概述
还是按照官网的框架图,以局部作为点,进行剖析,这一小篇主要分析:LogManager,它是对LogStorage进行了包装,为什么要包装呢?因为,这个类它在LogStorage的基础上增加了:缓存和批量提交的功能,咱们一点点的剖析.
(2). LogManager追加日志(LogManager.appendEntries)
public void appendEntries(final List<LogEntry> entries, final StableClosure done) {
assert(done != null);
Requires.requireNonNull(done, "done");
if (this.hasError) { // 如果追加日志有错的情况下处理
entries.clear();
ThreadPoolsFactory.runClosureInThread(this.groupId, done, new Status(RaftError.EIO, "Corrupted LogStorage"));
return;
}
boolean doUnlock = true;
this.writeLock.lock();
try {
// 检查要追加的日志
// 如果追加的日志的第一个日志的index是0的情况下,对日志的index进行重新编排,使其日志index是从1开始
// lastLogIndex
if (!entries.isEmpty() && !checkAndResolveConflict(entries, done, this.writeLock)) {
// If checkAndResolveConflict returns false, the done will be called in it.
entries.clear();
return;
}
// 遍历日志,当日志类型为:CONFIGURATION,对其进行特殊处理
for (int i = 0; i < entries.size(); i++) {
final LogEntry entry = entries.get(i);
// Set checksum after checkAndResolveConflict
if (this.raftOptions.isEnableLogEntryChecksum()) {
entry.setChecksum(entry.checksum());
}
if (entry.getType() == EntryType.ENTRY_TYPE_CONFIGURATION) {
Configuration oldConf = new Configuration();
if (entry.getOldPeers() != null) {
oldConf = new Configuration(entry.getOldPeers(), entry.getOldLearners());
}
final ConfigurationEntry conf = new ConfigurationEntry(entry.getId(),
new Configuration(entry.getPeers(), entry.getLearners()), oldConf);
this.configManager.add(conf);
}
}
// 日志集合不为空的情况下
if (!entries.isEmpty()) {
// 为钩子函数配置第一个日志为集合中的第一个元数
done.setFirstLogIndex(entries.get(0).getId().getIndex());
// *****************************************************************************
// 把日志集合,添加到内存集合里进行缓存,等处理完后,再从缓存中取出.
// *****************************************************************************
this.logsInMemory.addAll(entries);
}
// *****************************************************************************
// 为钩子函数配置日志集合
// *****************************************************************************
done.setEntries(entries);
doUnlock = false;
// 这一段可以理解为:遍历所有的:LastLogIndexListener,挨个调用:onLastLogIndexChanged方法
if (!wakeupAllWaiter(this.writeLock)) {
notifyLastLogIndexListeners();
}
// *****************************************************************************
// 可以这样直接向Disruptor直接发布事件了,不需要先拿sequencer了
// 看了下publishEvent代码,内部实际帮你拿了sequencer进行发布.
// *****************************************************************************
// publish event out of lock
this.diskQueue.publishEvent((event, sequence) -> {
event.reset();
event.type = EventType.OTHER;
event.done = done;
});
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}
(3). StableClosureEventHandler.onEvent
private class StableClosureEventHandler implements EventHandler<StableClosureEvent> {
LogId lastId = LogManagerImpl.this.diskId;
List<StableClosure> storage = new ArrayList<>(256);
// ********************************************************************************************
// StableClosureEventHandler是单例的哈
// AppendBatcher一看类名就知道是批量追加数据
// ********************************************************************************************
AppendBatcher ab = new AppendBatcher(this.storage, 256, new ArrayList<>(), LogManagerImpl.this.diskId);
@Override
public void onEvent(final StableClosureEvent event, final long sequence, final boolean endOfBatch)
throws Exception {
if (event.type == EventType.SHUTDOWN) { // 针对关闭的处理
this.lastId = this.ab.flush();
setDiskId(this.lastId);
LogManagerImpl.this.shutDownLatch.countDown();
event.reset();
return;
}
final StableClosure done = event.done;
final EventType eventType = event.type;
event.reset();
if (done.getEntries() != null && !done.getEntries().isEmpty()) {
// **************************************************************
// 1. 委派给:AppendBatcher.append方法追加数据(数据被包在done里)
// **************************************************************
this.ab.append(done);
} else {
this.lastId = this.ab.flush();
boolean ret = true;
switch (eventType) {
case LAST_LOG_ID:
((LastLogIdClosure) done).setLastLogId(this.lastId.copy());
break;
case TRUNCATE_PREFIX:
long startMs = Utils.monotonicMs();
try {
final TruncatePrefixClosure tpc = (TruncatePrefixClosure) done;
LOG.debug("Truncating storage to firstIndexKept={}.", tpc.firstIndexKept);
ret = LogManagerImpl.this.logStorage.truncatePrefix(tpc.firstIndexKept);
} finally {
LogManagerImpl.this.nodeMetrics.recordLatency("truncate-log-prefix", Utils.monotonicMs()
- startMs);
}
break;
case TRUNCATE_SUFFIX:
startMs = Utils.monotonicMs();
try {
final TruncateSuffixClosure tsc = (TruncateSuffixClosure) done;
LOG.warn("Truncating storage to lastIndexKept={}.", tsc.lastIndexKept);
ret = LogManagerImpl.this.logStorage.truncateSuffix(tsc.lastIndexKept);
if (ret) {
this.lastId.setIndex(tsc.lastIndexKept);
this.lastId.setTerm(tsc.lastTermKept);
Requires.requireTrue(this.lastId.getIndex() == 0 || this.lastId.getTerm() != 0);
}
} finally {
LogManagerImpl.this.nodeMetrics.recordLatency("truncate-log-suffix", Utils.monotonicMs()
- startMs);
}
break;
case RESET:
final ResetClosure rc = (ResetClosure) done;
LOG.info("Resetting storage to nextLogIndex={}.", rc.nextLogIndex);
ret = LogManagerImpl.this.logStorage.reset(rc.nextLogIndex);
break;
default:
break;
}
if (!ret) {
reportError(RaftError.EIO.getNumber(), "Failed operation in LogStorage");
} else {
done.run(Status.OK());
}
}
if (endOfBatch) {
// *********************************************************************
// 2. 刷新缓存
// *********************************************************************
this.lastId = this.ab.flush();
setDiskId(this.lastId);
}
}
}
(4). AppendBatcher.append
void append(final StableClosure done) {
if (this.size == this.cap || this.bufferSize >= LogManagerImpl.this.raftOptions.getMaxAppendBufferSize()) {
flush();
}
this.storage.add(done);
this.size++;
// *****************************************************************
// 仅仅是把日志添加到待追加的集合中
// *****************************************************************
this.toAppend.addAll(done.getEntries());
for (final LogEntry entry : done.getEntries()) {
this.bufferSize += entry.getData() != null ? entry.getData().remaining() : 0;
}
} // end
(5). AppendBatcher.flush
LogId flush() {
if (this.size > 0) {
// *******************************************************************
// 1. 委派给:LogManager.appendToStorage进行日志的追加
// *******************************************************************
this.lastId = appendToStorage(this.toAppend);
for (int i = 0; i < this.size; i++) {
this.storage.get(i).getEntries().clear();
Status st = null;
try {
if (LogManagerImpl.this.hasError) {
st = new Status(RaftError.EIO, "Corrupted LogStorage");
} else {
st = Status.OK();
}
// *****************************************************************
// 2. 回调钩子函数.
// *****************************************************************
this.storage.get(i).run(st);
} catch (Throwable t) {
LOG.error("Fail to run closure with status: {}.", st, t);
}
}
this.toAppend.clear();
this.storage.clear();
}
this.size = 0;
this.bufferSize = 0;
return this.lastId;
} // end
(6). LogManager.appendToStorage
private LogId appendToStorage(final List<LogEntry> toAppend) {
LogId lastId = null;
if (!this.hasError) {
final long startMs = Utils.monotonicMs();
final int entriesCount = toAppend.size();
this.nodeMetrics.recordSize("append-logs-count", entriesCount);
try {
int writtenSize = 0;
for (int i = 0; i < entriesCount; i++) {
final LogEntry entry = toAppend.get(i);
writtenSize += entry.getData() != null ? entry.getData().remaining() : 0;
}
this.nodeMetrics.recordSize("append-logs-bytes", writtenSize);
// **************************************************************************
// 委派给:LogStorage追加数据
// **************************************************************************
final int nAppent = this.logStorage.appendEntries(toAppend);
if (nAppent != entriesCount) {
LOG.error("**Critical error**, fail to appendEntries, nAppent={}, toAppend={}", nAppent,
toAppend.size());
reportError(RaftError.EIO.getNumber(), "Fail to append log entries");
}
if (nAppent > 0) {
lastId = toAppend.get(nAppent - 1).getId();
}
toAppend.clear();
} finally {
this.nodeMetrics.recordLatency("append-logs", Utils.monotonicMs() - startMs);
}
}
return lastId;
}
(7). 总结
实际上,LogManager在追加日志时,是利用Disruptor异步线程去做的,并且,在追加日志可以以批量为单位,在追加之前先进行缓存,当追加日志完毕后,会再清理下缓存.