(1). Disruptor开发模型
参考网站(https://www.cnblogs.com/crazymakercircle/p/13909235.html)
- 定义Event,代表Disruptor所能处理的数据单元.
- 定义Event工厂,实现EventFactory<?>接口,用来填充RingBuffer容器.
- 定义Event处理器(消费者),实现:EventHandler<?>接口,用来从RingBuffer中取出数据并处理.
- 组合(1~3步)
(2). 定义Event/Event工厂/Event消费者
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
// 1. 定义Event
public class LongEvent {
private long value;
public void set(long value) {
this.value = value;
}
public long get() {
return value;
}
// 2.定义Event工厂
static EventFactory<LongEvent> EVENT_FACTORY = () -> {
return new LongEvent();
};
}
// 3.定义Event处理器(消费者)
class LongEventHandler implements EventHandler<LongEvent> {
/**
* event:发布到RingBuffer中的事件. <br/>
* sequence:当前正在处理的事件序列号.<br/>
* endOfBatch:是否为RingBuffer的最后一个.<br/>
*/
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("[" + Thread.currentThread().getName() + "] LongEventHandler consumer->" + event);
}
}
(3). 组合测试
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
public class DisruptorTest1 {
public static void main(String[] args) {
// 1. 初始化线程池-用户执行Consumer
Executor executor = Executors.newCachedThreadPool();
// 2. 初始化RingBuffer的大小,必须是2的指数
int bufferSize = 1024;
// 3.Event处理器(消费者)
LongEventHandler consumerHandler = new LongEventHandler();
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(LongEvent.EVENT_FACTORY, bufferSize, executor);
// 指定消费者
disruptor.handleEventsWith(consumerHandler);
// 该方法只能调用一次,并且所有的EventHandler必须在start之前添加,包括:ExeceptionHandler
disruptor.start();
// 获取RingBuffer
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
// 获取RingBuffer的下一个序号,序列相关于指针
long sequence = ringBuffer.next();
try {
// 取出下标(指针)对应的数据
LongEvent longEvent = ringBuffer.get(sequence);
// 填充数据
longEvent.set(new Random(1000).nextLong());
} finally {
// 发送数据对应的序号(指针)
ringBuffer.publish(sequence);
}
}
}
(3). 总结
看到了没有?生产者生产数据时:
- 是拿出sequence下标.
- 取出sequence对应的模板数据.
- 对sequence下标进行发布(发布的是sequence,而不是发布所谓的业务模型(LongEvent))
.
所以,Disruptor,在启动时,会根据bufferSize创建N个LongEvent,放在环形队列里,而环形队列的下标(long)放在CPU 的缓存行中,每次生产消息时,根据下标(CPU缓存行)取出业务模型(LongEvent),对业务模型进行数据填充后,通知Disruptor对下标进行发布.