(1). Disruptor ProducerType配置
Disruptor可以根据业务场景(生产者)来指定:ProducerType.SINGLE / ProducerType.MULTI参数,用来控制序列器(sequence)的生成模式,默认为:ProducerType.MULTI
(2). 组合测试
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
public class DisruptorTest3 {
public static void main(String[] args) throws Exception {
AtomicInteger inc = new AtomicInteger(1);
// 1. 初始化线程池工厂s
ThreadFactory factory = (r) -> {
int index = inc.getAndIncrement();
Thread t = new Thread(r);
t.setName("disruptor " + index);
return t;
};
// 2. 初始化RingBuffer的大小,必须是2的指数
int bufferSize = 1024;
// 3.Event处理器(消费者)
LongEventHandler consumerHandler = new LongEventHandler();
// 默认生产者为:多线程模式
// Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(LongEvent.EVENT_FACTORY, bufferSize, factory);
// 多线程模式(多线程模式情况下,会存在吞吐量有所下降,但能保证不丢失数据)
// Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(LongEvent.EVENT_FACTORY, bufferSize, factory,ProducerType.MULTI,new BlockingWaitStrategy());
// 单线程模式(注意:单线程模式情况下,却开启了多线程,会存在丢数据的可能性)
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(LongEvent.EVENT_FACTORY, bufferSize, factory,
ProducerType.SINGLE, new BlockingWaitStrategy());
// 指定消费者
disruptor.handleEventsWith(consumerHandler);
// 该方法只能调用一次,并且所有的EventHandler必须在start之前添加,包括:ExeceptionHandler
disruptor.start();
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
// 自定义生产者
LongEventProducer producer = new LongEventProducer(ringBuffer);
int count = 10;
final CyclicBarrier barrier = new CyclicBarrier(count);
ExecutorService executor = Executors.newCachedThreadPool();
for (long i = 0; i < count; i++) {
final long data = i;
executor.submit(() -> {
try {
System.out.println(data + " is ready...");
barrier.await();
producer.publish(data);
} catch (Exception e) {
e.printStackTrace();
}
});
}
TimeUnit.SECONDS.sleep(1000000);
disruptor.shutdown();
}
}
(4). 总结
如果业务场景(生产者)是单线程模式下,生产者模式可以调整为:ProducerType.SINGLE,可以显著提高性能,因为不用处理并发模式下sequence的产生.
如果业务场景(生产者)是多线程模式下,而生产者模式为:ProducerType.MULTI,则会出现sequence丢失的情况(数据丢失或者覆盖掉了).
总结:除非确保生产者是单线程模式,否则,尽量用多线程模式