(1). 概述
终于轮到剖析StateMachine了,其实,剖析完:FSMCaller之后,StateMachine要不要剖析,也没多大的意义了,只是,还是要让整个源码剖析过程比较完整一些,此处,以:CounterStateMachine为例.
(2). StateMachine UML图
(3). 快照保存(CounterStateMachine.onSnapshotSave)
public void onSnapshotSave(final SnapshotWriter writer, final Closure done) {
// currVal为业务数据来着的
final long currVal = this.value.get();
executor.submit(() -> {
final CounterSnapshotFile snapshot = new CounterSnapshotFile(writer.getPath() + File.separator + "data");
// *************************************************************************
// 自己实现数据的写出(CounterSnapshotFile)
// *************************************************************************
if (snapshot.save(currVal)) {
// *************************************************************************
// 添加元数据信息,在前面剖析过源码来着,这里不详解了.
// 我现在感觉元数据仅仅只是一个标记而已,先不管了,后再剖析KV的时候再看它的实现是咋回事
// *************************************************************************
if (writer.addFile("data")) {
done.run(Status.OK());
} else {
done.run(new Status(RaftError.EIO, "Fail to add file to writer"));
}
} else {
done.run(new Status(RaftError.EIO, "Fail to save counter snapshot %s", snapshot.getPath()));
}
});
}
(4). CounterSnapshotFile.save
public boolean save(final long value) {
try {
// 将数据转换成字符串,写出到指定的目录
FileUtils.writeStringToFile(new File(path), String.valueOf(value));
return true;
} catch (IOException e) {
LOG.error("Fail to save snapshot", e);
return false;
}
}
(5). 快照加载(CounterStateMachine.onSnapshotLoad)
public boolean onSnapshotLoad(final SnapshotReader reader) {
if (isLeader()) { // leader 不允许加载快照
LOG.warn("Leader is not supposed to load snapshot");
return false;
}
// 验证下元数据是否存在
if (reader.getFileMeta("data") == null) {
LOG.error("Fail to find data file in {}", reader.getPath());
return false;
}
final CounterSnapshotFile snapshot = new CounterSnapshotFile(reader.getPath() + File.separator + "data");
try {
// ********************************************************************************************
// 委托给:CounterSnapshotFile加载数据
// ********************************************************************************************
this.value.set(snapshot.load());
return true;
} catch (final IOException e) {
LOG.error("Fail to load snapshot from {}", snapshot.getPath());
return false;
}
}
(6). CounterSnapshotFile.load
public long load() throws IOException {
// 从磁盘加载数据,转换成long类型
final String s = FileUtils.readFileToString(new File(path));
if (!StringUtils.isBlank(s)) {
return Long.parseLong(s);
}
throw new IOException("Fail to load snapshot from " + path + ",content: " + s);
}
(7). 应用日志(CounterStateMachine.onApply)
// ***************************************************************************
// onApply是由FSMCaller.onCommitted触发回调的
// ***************************************************************************
public void onApply(final Iterator iter) {
while (iter.hasNext()) {
long current = 0;
CounterOperation counterOperation = null;
CounterClosure closure = null;
// 从Iterator里获得数据.
if (iter.done() != null) {
// 从Closure中获取数据
// This task is applied by this node, get value from closure to avoid additional parsing.
closure = (CounterClosure) iter.done();
counterOperation = closure.getCounterOperation();
} else {
// Have to parse FetchAddRequest from this user log.
final ByteBuffer data = iter.getData();
try {
// 反序列化操作
counterOperation = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
data.array(), CounterOperation.class.getName());
} catch (final CodecException e) {
LOG.error("Fail to decode IncrementAndGetRequest", e);
}
// follower ignore read operation
if (counterOperation != null && counterOperation.isReadOp()) {
iter.next();
continue;
}
}
if (counterOperation != null) {
switch (counterOperation.getOp()) {
case GET: // 获取操作
current = this.value.get();
LOG.info("Get value={} at logIndex={}", current, iter.getIndex());
break;
case INCREMENT: // 自增操作
final long delta = counterOperation.getDelta();
final long prev = this.value.get();
current = this.value.addAndGet(delta);
LOG.info("Added value={} by delta={} at logIndex={}", prev, delta, iter.getIndex());
break;
}
if (closure != null) {
closure.success(current);
closure.run(Status.OK());
}
}
iter.next();
}
}
(8). 总结
StateMachine的所有方法,都是由FSMCaller进行触发的,相当于StateMachine只是一个回调模板,留给我们开发人员而已.