(1). Disruptor为消费者配置异常处理
(2). 定义业务模型(User)
package help.lixin.disruptor.pojo;
public class User {
private Integer id;
private String name;
private Integer age;
private String address;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
@Override
public String toString() {
return "User [id=" + id + ", name=" + name + ", age=" + age + ", address=" + address + "]";
}
}
(3). 定义Event(UserEvent)
package help.lixin.disruptor.event;
public class UserEvent {
private Integer id;
private String name;
private Integer age;
private String address;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
@Override
public String toString() {
return "UserEvent [id=" + id + ", name=" + name + ", age=" + age + ", address=" + address + "]";
}
}
(4). 定久Event工厂(UserEventFactory)
package help.lixin.disruptor.factory;
import com.lmax.disruptor.EventFactory;
import help.lixin.disruptor.event.UserEvent;
public class UserEventFactory implements EventFactory<UserEvent> {
public UserEvent newInstance() {
return new UserEvent();
}
}
(5). 定义EventHandler
UserEventHandler1
package help.lixin.disruptor.consumer;
import java.util.concurrent.TimeUnit;
import com.lmax.disruptor.EventHandler;
import help.lixin.disruptor.event.UserEvent;
/**
* 消费者一
* @author lixin
*
*/
public class UserEventHandler1 implements EventHandler<UserEvent> {
@Override
public void onEvent(UserEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("1开始" + System.currentTimeMillis()+" ["+ Thread.currentThread().getName() +" ] UserEvent->" + event );
if(event.getId() == 3) {
throw new Exception("UserEventHandler1抛出异常.");
}
event.setAddress(event.getAddress() + " Handler1");
TimeUnit.SECONDS.sleep(1);
System.out.println("1结束" + System.currentTimeMillis()+" ["+ Thread.currentThread().getName() +" ] UserEvent->" + event );
}
}
UserEventHandler2
package help.lixin.disruptor.consumer;
import java.util.concurrent.TimeUnit;
import com.lmax.disruptor.EventHandler;
import help.lixin.disruptor.event.UserEvent;
/**
* 消费者一
* @author lixin
*
*/
public class UserEventHandler2 implements EventHandler<UserEvent> {
@Override
public void onEvent(UserEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("2开始" + System.currentTimeMillis()+" ["+ Thread.currentThread().getName() +" ] UserEvent->" + event );
if(event.getId() == 4) {
throw new Exception("UserEventHandler2抛出异常.");
}
event.setAddress(event.getAddress() + " Handler2");
TimeUnit.SECONDS.sleep(2);
System.out.println("2结束" + System.currentTimeMillis()+" ["+ Thread.currentThread().getName() +" ] UserEvent->" + event );
}
}
UserEventHandler3
package help.lixin.disruptor.consumer;
import java.util.concurrent.TimeUnit;
import com.lmax.disruptor.EventHandler;
import help.lixin.disruptor.event.UserEvent;
/**
* 消费者一
* @author lixin
*
*/
public class UserEventHandler3 implements EventHandler<UserEvent> {
@Override
public void onEvent(UserEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("3开始" + System.currentTimeMillis()+" ["+ Thread.currentThread().getName() +" ] UserEvent->" + event );
if(event.getId() == 4) {
throw new Exception("UserEventHandler3抛出异常.");
}
event.setAddress(event.getAddress() + " Handler3");
TimeUnit.SECONDS.sleep(3);
System.out.println("3结束" + System.currentTimeMillis()+" ["+ Thread.currentThread().getName() +" ] UserEvent->" + event );
}
}
UserEventHandler4
package help.lixin.disruptor.consumer;
import java.util.concurrent.TimeUnit;
import com.lmax.disruptor.EventHandler;
import help.lixin.disruptor.event.UserEvent;
/**
* 消费者一
* @author lixin
*
*/
public class UserEventHandler4 implements EventHandler<UserEvent> {
@Override
public void onEvent(UserEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("4开始" + System.currentTimeMillis()+" ["+ Thread.currentThread().getName() +" ] UserEvent->" + event );
if(event.getId() == 4) {
throw new Exception("UserEventHandler4抛出异常.");
}
event.setAddress(event.getAddress() + " Handler4");
TimeUnit.SECONDS.sleep(4);
System.out.println("4结束" + System.currentTimeMillis()+" ["+ Thread.currentThread().getName() +" ] UserEvent->" + event );
}
}
(6). 定义异常处理类
全局异常处理类(DefaultUserEventExceptionHandler)
package help.lixin.disruptor.exception;
import com.lmax.disruptor.ExceptionHandler;
import help.lixin.disruptor.event.UserEvent;
public class DefaultUserEventExceptionHandler implements ExceptionHandler<UserEvent> {
@Override
public void handleEventException(Throwable ex, long sequence, UserEvent event) {
System.out.println("DefaultUserEventExceptionHandler.handleEventException ->" + event);
}
@Override
public void handleOnStartException(Throwable ex) {
System.out.println("DefaultUserEventExceptionHandler.handleOnStartException->" + ex.getMessage());
}
@Override
public void handleOnShutdownException(Throwable ex) {
System.out.println("DefaultUserEventExceptionHandler.handleOnShutdownException->" + ex.getMessage());
}
}
为某个EventHandler定义异常处理类(UserEventExceptionHandler)
package help.lixin.disruptor.exception;
import com.lmax.disruptor.ExceptionHandler;
import help.lixin.disruptor.event.UserEvent;
public class UserEventExceptionHandler implements ExceptionHandler<UserEvent>{
@Override
public void handleEventException(Throwable ex, long sequence, UserEvent event) {
System.out.println("UserEventExceptionHandler.handleEventException ->" + event);
}
@Override
public void handleOnStartException(Throwable ex) {
System.out.println("UserEventExceptionHandler.handleOnStartException->" + ex.getMessage());
}
@Override
public void handleOnShutdownException(Throwable ex) {
System.out.println("UserEventExceptionHandler.handleOnShutdownException->" + ex.getMessage());
}
}
(7). 组合测试
package help.lixin.disruptor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import help.lixin.disruptor.consumer.UserEventHandler1;
import help.lixin.disruptor.consumer.UserEventHandler2;
import help.lixin.disruptor.consumer.UserEventHandler3;
import help.lixin.disruptor.consumer.UserEventHandler4;
import help.lixin.disruptor.event.UserEvent;
import help.lixin.disruptor.exception.DefaultUserEventExceptionHandler;
import help.lixin.disruptor.exception.UserEventExceptionHandler;
import help.lixin.disruptor.factory.UserEventFactory;
import help.lixin.disruptor.pojo.User;
import help.lixin.disruptor.producer.UserEventProducer;
public class UserEventTest1 {
public static void main(String[] args) throws Exception {
AtomicInteger inc = new AtomicInteger(1);
// 1. 初始化线程池工厂
ThreadFactory factory = (r) -> {
int index = inc.getAndIncrement();
Thread t = new Thread(r);
t.setName("disruptor thread(" + index + ")");
return t;
};
// 2. 初始化RingBuffer的大小,必须是2的指数
int bufferSize = 1024;
// 3.Event处理器(消费者)
UserEventHandler1 handler1 = new UserEventHandler1();
UserEventHandler2 handler2 = new UserEventHandler2();
UserEventHandler3 handler3 = new UserEventHandler3();
UserEventHandler4 handler4 = new UserEventHandler4();
// 默认生产者为:多线程模式
Disruptor<UserEvent> disruptor = new Disruptor<UserEvent>(new UserEventFactory(), bufferSize, factory);
// 指定两个消费者(并行)
disruptor.handleEventsWith(handler1);
disruptor.handleEventsWith(handler2);
// 设置全局的异常处理器类
disruptor.setDefaultExceptionHandler(new DefaultUserEventExceptionHandler());
// 为消费者(handler1)指定异常处理类为:UserEventExceptionHandler
// 如果有为某个消费者指定异常处理类,则会覆盖全局的异常处理类.
disruptor.handleExceptionsFor(handler1).with(new UserEventExceptionHandler());
// 该方法只能调用一次,并且所有的EventHandler必须在start之前添加,包括:ExeceptionHandler
disruptor.start();
RingBuffer<UserEvent> ringBuffer = disruptor.getRingBuffer();
UserEventProducer producer = new UserEventProducer(ringBuffer);
ExecutorService executorService = Executors.newCachedThreadPool();
int count = 5;
for (int i = 0; i < count; i++) {
final int id = i;
executorService.submit(() -> {
User user = new User();
user.setId(id);
user.setName("lixin " + id);
user.setAge(id + 10);
user.setAddress("深圳 " + id);
producer.publish(user);
});
}
TimeUnit.SECONDS.sleep(1000000);
disruptor.shutdown();
}
}
(8). 总结
Disruptor可以为(消费者)EventHandler指定异常处理类.可以全局(disruptor.setDefaultExceptionHandler)指定,也可以单独为EventHandler(disruptor.handleExceptionsFor(handler1).with(new UserEventExceptionHandler()))进行指定(单独指定的会覆盖全局的).