(1). 前言
在前面,把本地环境搭建起来了,在这里,开始对:Servicecomb Pack的源码进行剖析,在剖析之前,建议查看下各模块依赖图,能更加有利于你对模块的了解.
很多的框架,在设计阶段,会对项目进行分层,层与层之间通过接口提供服务,然后,再通过门面或聚合所有的模块,形成服务.
所以,一个好的框架,它的分层模型决定了设计者对事物高层的次抽象(以及职能划分).
(2). Servicecomb Pack 内部模块架构图
这张图虽然画的有点粗了点,但是,并不妨碍我们对Servicecomb Pack的了解.
事务注解模块(Transaction Annotation),事务拦截器模块(Transaction Interceptor),事务上下文模块(Transaction Context),事务回调模块(Transaction Callback),事务执行器模块(Transaction Executor),事务传输模块(Transaction Transport)…
(3). 如何入手?
- “先阅读参考入门手册”.
- 既然:Servicecomb Pack深度拥抱了Spring,自然,是要从它与Spring(EnableAutoConfiguration)的切入点跟踪.
- 所以,切入点模块为:omega-spring-starter.
# omega-spring-starter-0.6.0.jar/META-INF/spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.apache.servicecomb.pack.omega.spring.OmegaSpringAutoConfiguration
(4). OmegaSpringAutoConfiguration
OmegaSpringAutoConfiguration很简单,直接导入了两个@Configuration配置,重点关注:TransactionAspectConfig,因为一看名字就知道是AOP的聚合配置.
@Configuration
// 很简单,就引用了两个@Configuration配置
// OmegaSpringConfig / TransactionAspectConfig
@Import({OmegaSpringConfig.class,TransactionAspectConfig.class})
@ConditionalOnProperty(value = {"omega.enabled"}, matchIfMissing = true)
public class OmegaSpringAutoConfiguration {
}
(6). TransactionAspectConfig
通过翻看源码,定位到两个类:SagaStartAspect/TransactionAspect.
SagaStartAspect: 针对配置有@SagaStart的类进行代理,这个注解是配置全局事务注解 TransactionAspect: 针对配置有@Compensable的类进行代理,这个注解是配置分支事务的注解
@Configuration
@EnableAspectJAutoProxy
public class TransactionAspectConfig {
// 1. 对@Compensable注解内容进行检测
@Bean
CompensableAnnotationProcessor compensableAnnotationProcessor(
OmegaContext omegaContext,
@Qualifier("compensationContext") CallbackContext compensationContext) {
return new CompensableAnnotationProcessor(omegaContext, compensationContext);
}
// 2. 对@Participate注解内容进行检测
@Bean
ParticipateAnnotationProcessor participateAnnotationProcessor(
OmegaContext omegaContext,
@Qualifier("coordinateContext") CallbackContext coordinateContext) {
return new ParticipateAnnotationProcessor(omegaContext, coordinateContext);
}
// 3. CompensationMessageHandler消息处理器
@Bean
MessageHandler messageHandler(
SagaMessageSender sender,
@Qualifier("compensationContext") CallbackContext context,
OmegaContext omegaContext) {
return new CompensationMessageHandler(sender, context);
}
// ***************************************************************
// 4. 针对:@SagaStart进行AOP拦截
// ***************************************************************
@Bean
SagaStartAspect sagaStartAspect(SagaMessageSender sender, OmegaContext context) {
return new SagaStartAspect(sender, context);
}
// ***************************************************************
// 5. 针对@Compensable进行AOP拦截
// ***************************************************************
@Bean
TransactionAspect transactionAspect(SagaMessageSender sender, OmegaContext context) {
return new TransactionAspect(sender, context);
}
// 6. CoordinateMessageHandler
@Bean
TccMessageHandler coordinateMessageHandler(
TccMessageSender tccMessageSender,
@Qualifier("coordinateContext") CallbackContext coordinateContext,
OmegaContext omegaContext,
ParametersContext parametersContext) {
return new CoordinateMessageHandler(tccMessageSender, coordinateContext, omegaContext, parametersContext);
}
// ***************************************************************
// 7. 针对@TccStart进行AOP拦截
// ***************************************************************
@Bean
TccStartAspect tccStartAspect(
TccMessageSender tccMessageSender,
OmegaContext context) {
return new TccStartAspect(tccMessageSender, context);
}
// ***************************************************************
// 8. 针对@Participate进行拦截处理.
// ***************************************************************
@Bean
TccParticipatorAspect tccParticipatorAspect(
TccMessageSender tccMessageSender,
OmegaContext context, ParametersContext parametersContext) {
return new TccParticipatorAspect(tccMessageSender, context, parametersContext);
}
}
(6). 全局事务处理(SagaStartAspect)
@Aspect
@Order(value = 100)
public class SagaStartAspect {
private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor;
private final OmegaContext context;
public SagaStartAspect(SagaMessageSender sender, OmegaContext context) {
this.context = context;
this.sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context, sender);
}
// 1. 拦截@SagaStart注解
@Around("execution(@org.apache.servicecomb.pack.omega.context.annotations.SagaStart * *(..)) && @annotation(sagaStart)")
Object advise(ProceedingJoinPoint joinPoint, SagaStart sagaStart) throws Throwable {
// 创建全局事务
initializeOmegaContext();
// 判断是否开启了:akka,并且,配置的超时时间大于零
if(context.getAlphaMetas().isAkkaEnabled() && sagaStart.timeout()>0){ // false
SagaStartAnnotationProcessorTimeoutWrapper wrapper = new SagaStartAnnotationProcessorTimeoutWrapper(this.sagaStartAnnotationProcessor);
return wrapper.apply(joinPoint,sagaStart,context);
}else{ // true
// 2. 创建:SagaStartAnnotationProcessorWrapper
SagaStartAnnotationProcessorWrapper wrapper = new SagaStartAnnotationProcessorWrapper(this.sagaStartAnnotationProcessor);
// 3. apply
return wrapper.apply(joinPoint,sagaStart,context);
}
}
// 创建全局事务ID
private void initializeOmegaContext() {
context.setLocalTxId(context.newGlobalTxId());
}
}
(7). SagaStartAnnotationProcessorWrapper
public class SagaStartAnnotationProcessorWrapper {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor;
public SagaStartAnnotationProcessorWrapper(
SagaStartAnnotationProcessor sagaStartAnnotationProcessor) {
this.sagaStartAnnotationProcessor = sagaStartAnnotationProcessor;
}
public Object apply(ProceedingJoinPoint joinPoint, SagaStart sagaStart, OmegaContext context)
throws Throwable {
// 获得目标类(BookingController)上的注解:@SagaStart
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
// 1. 调用目标类的方法之前,先向Alpha发送SagaStartedEvent事件.
sagaStartAnnotationProcessor.preIntercept(sagaStart.timeout());
LOG.debug("Initialized context {} before execution of method {}", context, method.toString());
try {
// *******************************************************
// 2. 调用目标类的方法
// *******************************************************
Object result = joinPoint.proceed();
// *******************************************************
// 3. 判断是否自动向:Alpha发送:SagaEndedEvent事件.
// *******************************************************
if (sagaStart.autoClose()) {
sagaStartAnnotationProcessor.postIntercept(context.globalTxId());
LOG.debug("Transaction with context {} has finished.", context);
} else {
LOG.debug("Transaction with context {} is not finished in the SagaStarted annotated method.", context);
}
return result;
} catch (Throwable throwable) {
// *******************************************************
// 4. 调用目标方法后,如果抛出的异常,不属于:OmegaException,则,向:Alpha发送:SagaAbortedEvent/TxAbortedEvent
// *******************************************************
// We don't need to handle the OmegaException here
if (!(throwable instanceof OmegaException)) {
sagaStartAnnotationProcessor.onError(method.toString(), throwable);
LOG.error("Transaction {} failed.", context.globalTxId());
}
throw throwable;
} finally {
// *******************************************************
// 清除线程上绑定的全局事务信息.
// *******************************************************
context.clear();
}
}
}
(8). @SagaStart全局事务流程图
(9). 总结
- Spring对@SagaStart注解进行拦截(代表这个方法是一个全局事务的开始).
- 通过:OmegaContext创建一个全局事务ID.
- 调用目标类的方法之前,通过SagaStartAnnotationProcessor.preIntercept方法,向Alpha发送:SagaStartedEvent事件.
- 调用目标类和方法.
- 调用目标类的方法之后(没有异常的情况下),通过SagaStartAnnotationProcessor.postIntercept方法,向Alpha发送:SagaEndedEvent事件.
- 调用目标类的方法之后(有异常的情况下),通过SagaStartAnnotationProcessor.postIntercept方法,向Alpha发送:SagaAbortedEvent/TxAbortedEvent事件.