(1). 概述
本来应该要剖析StateMachine的,但是,在剖析FSMCaller(onCommitted方法)时,有涉及到一个接口:ClosureQueue,所以,在这一篇主要剖析它了.
(2). 先看下ClosureQueue接口签名
有点懒了,就不画UML图了,因为,这个接口比较简单.
public interface ClosureQueue {
// 清除所有的闭包
void clear();
// 重置第一个索引
void resetFirstIndex(final long firstIndex);
// 添加闭包到队列中
void appendPendingClosure(final Closure closure);
// 遍历队列,从列首开始弹出endIndex个数据
// 如果数据类型是:TaskClosure,则,把数据添加到集合taskClosures里.
// 其余所有的Closure,则,把数据添加到集合closures里.
long popClosureUntil(final long endIndex, final List<Closure> closures, final List<TaskClosure> taskClosures);
}
(3). 看下ClosureQueue的默认实现类
能从ClosureQueue的实现类里看出来,它应该是在LinkedList的基础上做了一层业务逻辑的包装.
public class ClosureQueueImpl implements ClosureQueue {
private static final Logger LOG = LoggerFactory.getLogger(ClosureQueueImpl.class);
private String groupId;
private final Lock lock;
private long firstIndex;
// *******************************************************************
// 线性链表
// *******************************************************************
private LinkedList<Closure> queue;
@OnlyForTest
public long getFirstIndex() {
return firstIndex;
}
@OnlyForTest
public LinkedList<Closure> getQueue() {
return queue;
}
public ClosureQueueImpl() {
super();
this.lock = new ReentrantLock();
this.firstIndex = 0;
this.queue = new LinkedList<>();
}
// ... ...
}
(4). ClosureQueueTest
public class ClosureQueueTest {
private static final String GROUP_ID = "group001";
private ClosureQueueImpl queue;
@Before
public void setup() {
this.queue = new ClosureQueueImpl(GROUP_ID);
} // end setup
@SuppressWarnings("SameParameterValue")
private Closure mockClosure(final CountDownLatch latch) {
return status -> {
if (latch != null) {
latch.countDown();
}
};
} // end mockClosure
@Test
public void testAppendPop() {
for (int i = 0; i < 10; i++) {
// *******************************************************************
// 添加闭包到队列中
// *******************************************************************
this.queue.appendPendingClosure(mockClosure(null));
}
List<Closure> closures = new ArrayList<>();
assertEquals(0, this.queue.popClosureUntil(4, closures));
assertEquals(5, closures.size());
} // end testAppendPop
}
(5). ClosureQueue.appendPendingClosure
appendPendingClosure很简单,把数据,添加到队列的列尾.
// private LinkedList<Closure> queue;
public void appendPendingClosure(final Closure closure) {
this.lock.lock();
try {
// 添加闭包到队列的队尾
this.queue.add(closure);
} finally {
this.lock.unlock();
}
}
(6). ClosureQueue.popClosureUntil
// closures 存储 Closure的结果集
// taskClosures 存储 TaskClosure的结果集
public long popClosureUntil(final long endIndex, final List<Closure> closures, final List<TaskClosure> taskClosures) {
// 清空结果集.
closures.clear();
if (taskClosures != null) {
taskClosures.clear();
}
this.lock.lock();
try {
// 队列里的数据数量.
final int queueSize = this.queue.size();
// 如果队列里没有数据,又或者,endIndex 又小于 firstIndex,则返回:endIndex+1,不太理解这逻辑,先不管了.
// 我的理解是:无论如何,你要取的数据得存在吧,这个返回有什么意义?
if (queueSize == 0 || endIndex < this.firstIndex) {
return endIndex + 1;
}
// 这一步是验证:
// endIndex : 要取的数据数量
// this.firstIndex + queueSize - 1 : 为队列里的实际数量
// 如果,要取的数据量大于队列里的数量,肯定是不能拿出那么多数据的.
if (endIndex > this.firstIndex + queueSize - 1) {
LOG.error("Invalid endIndex={}, firstIndex={}, closureQueueSize={}", endIndex, this.firstIndex, queueSize);
return -1;
}
// ****************************************************************************
// firstIndex我咋感觉是一个计数器呢?
// 从队列里弹出endIndex + 1个元数
// ****************************************************************************
final long outFirstIndex = this.firstIndex;
for (long i = outFirstIndex; i <= endIndex; i++) {
// ****************************************************************************
// 从队列的队首弹出一个元素
// ****************************************************************************
final Closure closure = this.queue.pollFirst();
// 如果是:TaskClosure添加,把它添加到集合:taskClosures里
if (taskClosures != null && closure instanceof TaskClosure) {
taskClosures.add((TaskClosure) closure);
}
// 这里不是排除关系哦,这个closures可以包含:Closure和TaskClosure
closures.add(closure);
}
this.firstIndex = endIndex + 1;
return outFirstIndex;
} finally {
this.lock.unlock();
}
}
(7). 总结
ClosureQueue在LinkedList的基础上,提供了添加到队列和弹出队列的功能.