(1). 自定义生产者,方案(1)
自己抽象出发布数据模板.
import com.lmax.disruptor.RingBuffer;
public class LongEventProducer {
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void publish(Long data) {
long sequence = ringBuffer.next();
try {
LongEvent event = ringBuffer.get(sequence);
event.set(data);
System.out.println("[" + Thread.currentThread().getName() + "] LongEventProducer product ->" + event);
} finally {
ringBuffer.publish(sequence);
}
}
}
(2). 自定义生产者,方案(2)
自定义:EventTranslatorOneArg/EventTranslatorXXXX
import java.nio.ByteBuffer;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;
public class LongEventProducerWithTranslator {
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
// ByteBuffer 为入参
// LongEvent 为返回类型
@Override
public void translateTo(LongEvent event, long sequence, ByteBuffer buffer) {
event.set(buffer.getLong(0));
}
};
public void publish(ByteBuffer buffer) {
ringBuffer.publishEvent(TRANSLATOR, buffer);
}
}
(3). 组合测试
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
public class DisruptorTest2 {
public static void main(String[] args) throws Exception {
// 1. 初始化线程池-用户执行Consumer
Executor executor = Executors.newCachedThreadPool();
// 2. 初始化RingBuffer的大小,必须是2的指数
int bufferSize = 1024;
// 3.Event处理器(消费者)
LongEventHandler consumerHandler = new LongEventHandler();
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(LongEvent.EVENT_FACTORY, bufferSize, executor);
// 指定消费者
disruptor.handleEventsWith(consumerHandler);
// 该方法只能调用一次,并且所有的EventHandler必须在start之前添加,包括:ExeceptionHandler
disruptor.start();
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++) {
bb.putLong(0, l);
producer.publish(bb);
Thread.sleep(1000);
}
}
}
(3). 总结
因为生产者代码都是模板样式,Disruptor允许自定义生产者.