(1). 前言

前面对一些分布式事务框架(ServiceComb Pack和Seata)有一些剖析,其实,大家思路都差不多来着的,还是先找到注解@HmilyTCC(全局事务)的拦截器.

(2). applicationContext.xml

<!-- 这个xml配置是在官网的案例中找到的 -->
<!-- 开启Aop --> 
<aop:aspectj-autoproxy expose-proxy="true"/>

<!-- 配置Aspect(对配置有注解@HmilyTCC和@HmilyTAC的类产生动代工理对象) -->
<bean id = "hmilyTransactionAspect" class="org.dromara.hmily.spring.aop.SpringHmilyTransactionAspect"/>

(3). SpringHmilyTransactionAspect

package org.dromara.hmily.spring.aop;
public class SpringHmilyTransactionAspect 
       // ********************************************************
	   // 4. 继承
	   // ********************************************************
       extends AbstractHmilyTransactionAspect 
	   implements Ordered {
		   
    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE;
    }
	
} // end SpringHmilyTransactionAspect

(4). AbstractHmilyTransactionAspect

package org.dromara.hmily.core.aspect;

// 1. 定义Aspect
@Aspect
public abstract class AbstractHmilyTransactionAspect {

	// 事务拦截器
    private final HmilyTransactionInterceptor interceptor = new HmilyGlobalInterceptor();

	// 2. 定义对@HmilyTCC和@HmilyTAC进行代码织入
    @Pointcut("@annotation(org.dromara.hmily.annotation.HmilyTCC) || @annotation(org.dromara.hmily.annotation.HmilyTAC)")
    public void hmilyInterceptor() {
    }

    
	// 3. 以环绕的形式
    @Around("hmilyInterceptor()")
    public Object interceptTccMethod(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
		// 4. 委托给:HmilyTransactionInterceptor
        return interceptor.invoke(proceedingJoinPoint);
    }
	
} // end AbstractHmilyTransactionAspect

(5). HmilyTransactionInterceptor

public class HmilyGlobalInterceptor implements HmilyTransactionInterceptor {
    
    private static RpcParameterLoader parameterLoader;
    
	// 1. TransTypeEnum可枚举的值有:TCC/TAC/XA/SXA/CC
    private static final EnumMap<TransTypeEnum, HmilyTransactionHandlerRegistry> REGISTRY = new EnumMap<>(TransTypeEnum.class);
    
	// 2. 通过:RpcParameterLoader产生HmilyTransactionContext
    static {
        parameterLoader = Optional.ofNullable(ExtensionLoaderFactory.load(RpcParameterLoader.class))
		                          .orElse(new LocalParameterLoader());
    }
    
	// 3. 注册tcc和tac
    static {
        REGISTRY.put(TransTypeEnum.TCC, ExtensionLoaderFactory.load(HmilyTransactionHandlerRegistry.class, "tcc"));
        REGISTRY.put(TransTypeEnum.TAC, ExtensionLoaderFactory.load(HmilyTransactionHandlerRegistry.class, "tac"));
    }
    
    @Override
    public Object invoke(final ProceedingJoinPoint pjp) throws Throwable {
		// 4. 从线程中加载上下文
        HmilyTransactionContext context = parameterLoader.load();
		// 调用invokeWithinTransaction
        return invokeWithinTransaction(context, pjp);
    }
    
    private Object invokeWithinTransaction(final HmilyTransactionContext context, final ProceedingJoinPoint point) throws Throwable {
		// 5. 拦截的目标方法
        MethodSignature signature = (MethodSignature) point.getSignature();
		
		// 5.1 根据方法的注解,获得:HmilyTransactionHandlerRegistry的实现.
        return getRegistry(signature.getMethod())
		       // ************************************************************
		       // 5.2 根据上下文,获得角色对应的HmilyTransactionHandler
			   //     每一个角色都对应一个:Handler
			   // ************************************************************
			   // START(1, "发起者")               -->  StarterHmilyTccTransactionHandler
			   // CONSUMER(2, "消费者")            --> ConsumeHmilyTccTransactionHandler
			   //  PARTICIPANT(3, "参与者")        --> ParticipantHmilyTccTransactionHandler
			   //  LOCAL(4, "本地调用")            --> LocalHmilyTccTransactionHandler
		       .select(context)
			   
			   // 5.3 调用HmilyTransactionHandler.handleTransaction进行处理.
			   // 6.我这里以:StarterHmilyTccTransactionHandler为例
			   .handleTransaction(point, context);
    }
    
    private HmilyTransactionHandlerRegistry getRegistry(final Method method) {
		// 根据方法上的注解,返回:HmilyTransactionHandlerRegistry的实现:
		// HmilyTccTransactionHandlerRegistry
		// HmilyTacTransactionHandlerRegistry
        return null != method.getAnnotation(HmilyTCC.class) 
		            ? REGISTRY.get(TransTypeEnum.TCC) 
					: REGISTRY.get(TransTypeEnum.TAC);
    }

(6). StarterHmilyTccTransactionHandler

public class StarterHmilyTccTransactionHandler implements HmilyTransactionHandler, AutoCloseable {
	// 1. 事务执行器(单例模式)
	private final HmilyTccTransactionExecutor executor = HmilyTccTransactionExecutor.getInstance();
	    
	private DisruptorProviderManage<HmilyTransactionHandlerAlbum> disruptorProviderManage;
	
	public StarterHmilyTccTransactionHandler() {
		disruptorProviderManage = new DisruptorProviderManage<>(new HmilyTransactionExecutorHandler(),
				Runtime.getRuntime().availableProcessors() << 1, DisruptorProviderManage.DEFAULT_SIZE);
		disruptorProviderManage.startup();
	}
	
	public Object handleTransaction(final ProceedingJoinPoint point, final HmilyTransactionContext context)
	            throws Throwable {
		Object returnValue;
		Supplier<Boolean> histogramSupplier = null;
		
		// metrics
		Optional<MetricsHandlerFacade> handlerFacade = MetricsHandlerFacadeEngine.load();
		try {
			if (handlerFacade.isPresent()) {
				handlerFacade.get().counterIncrement(MetricsLabelEnum.TRANSACTION_TOTAL.getName(), TransTypeEnum.TCC.name());
				histogramSupplier = handlerFacade.get().histogramStartTimer(MetricsLabelEnum.TRANSACTION_LATENCY.getName(), TransTypeEnum.TCC.name());
			} 
			
			// *******************************************************************
			// 1. 准备执行
			// 1.1 创建全局事务(HmilyTransaction)
			// 1.2 获取@HmilyTCC注解上的信息,方法上的信息,入参类型,入参参数,对其,进行持久化(MySQL/MongoDB/Redis...).
			// 1.3 创建HmilyTransactionContext,绑定到Context
			// *******************************************************************
			HmilyTransaction hmilyTransaction = executor.preTry(point);
			try {
				
				// *******************************************************************
				// 2. 真正的执行目标方法
				// *******************************************************************
				//execute try
				returnValue = point.proceed();
				
				//  设置:HmilyTransactio的状成为:TRYING
				hmilyTransaction.setStatus(HmilyActionEnum.TRYING.getCode());
				
				// 3. 调用持久化(MySQL/MongoDB/Redis...)对象,更新状态
				executor.updateStartStatus(hmilyTransaction);
			} catch (Throwable throwable) {
				// *******************************************************************
				//if exception ,execute cancel
				// *******************************************************************
				final HmilyTransaction currentTransaction = HmilyTransactionHolder.getInstance().getCurrentTransaction();
				disruptorProviderManage.getProvider().onData(() -> {
					handlerFacade.ifPresent(metricsHandlerFacade -> metricsHandlerFacade.counterIncrement(MetricsLabelEnum.TRANSACTION_STATUS.getName(),
							TransTypeEnum.TCC.name(), HmilyRoleEnum.START.name(), HmilyActionEnum.CANCELING.name()));
					
					// *******************************************************************
					// 4. 异常时,回调:cancelMethod指定的方法,然后,发布事件:EventTypeEnum.REMOVE_HMILY_PARTICIPANT
					// @HmilyTCC(confirmMethod = "confirmOrderStatus", cancelMethod = "cancelOrderStatus")
					// *******************************************************************
					executor.globalCancel(currentTransaction);
					
				});
				throw throwable;
			}
			
			// *******************************************************************
			// 调用目标类的方法没有抛错的情况下,会调用:confirm
			// *******************************************************************
			//execute confirm
			final HmilyTransaction currentTransaction = HmilyTransactionHolder.getInstance().getCurrentTransaction();
			disruptorProviderManage.getProvider().onData(() -> {
				handlerFacade.ifPresent(metricsHandlerFacade -> metricsHandlerFacade.counterIncrement(MetricsLabelEnum.TRANSACTION_STATUS.getName(),
						TransTypeEnum.TCC.name(), HmilyRoleEnum.START.name(), HmilyActionEnum.CONFIRMING.name()));
				// *******************************************************************
				// 5. 正常时,回调:confirmOrderStatus指定的方法,然后,发布事件:EventTypeEnum.REMOVE_HMILY_PARTICIPANT
				// @HmilyTCC(confirmMethod = "confirmOrderStatus", cancelMethod = "cancelOrderStatus")
				// *******************************************************************
				executor.globalConfirm(currentTransaction);
			});
		} finally {
			HmilyContextHolder.remove();
			executor.remove();
			if (null != histogramSupplier) {
				histogramSupplier.get();
			}
		}
		return returnValue;
	} // end handleTransaction
	
	@Override
	public void close() {
		disruptorProviderManage.getProvider().shutdown();
	}
}

(7). 总结

通过AOP,对目标方法(trye)进行代理,当成功的情况下,调用:confirmMethod,失败的情况下调用:cancelMethod