(1). 概述
前面了解到GlobalTransactionalInterceptor,它主要负责拦截@GlobalTransactional(@GlobalLock).并把请求委派给:TransactionalTemplate.
(2). TransactionalTemplate.execute
public Object execute(TransactionalExecutor business) throws Throwable {
// 把@GlobalTransactional转换成:业务对象(TransactionInfo)
// 1. Get transactionInfo
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 1. 从ThreadLocal中获得TX_XID,赋给临时变量:xid
// 2. 如果xid为空,则直接返回:GlobalTransaction tx = null.
// 3. 如果xid不为空,则创建:new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);
// 有xid的情况下,代表这是一个:参与者.
// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
// 获得事务的传播行为(默认是:Propagation.REQUIRED)
// 1.2 Handle the transaction propagation.
Propagation propagation = txInfo.getPropagation();
// 挂起资源(越看越像Spring里的事务代码了)
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
switch (propagation) { // 判断传播行为
case NOT_SUPPORTED:
// If transaction is existing, suspend it.
// 这种情况,代表(GlobalTransaction)事务已经存在了,因为判断了不允许为空.
if (existingTransaction(tx)) { // tx != null
suspendedResourcesHolder = tx.suspend();
}
// Execute without transaction and return.
return business.execute();
case REQUIRES_NEW:
// 这种情况,代表(GlobalTransaction)事务已经存在了,因为判断了不允许为空.
// If transaction is existing, suspend it, and then begin new transaction.
if (existingTransaction(tx)) { // tx != null
// 先挂起,后再创建一新的tx
suspendedResourcesHolder = tx.suspend();
tx = GlobalTransactionContext.createNew();
}
// Continue and execute with new transaction
break;
case SUPPORTS:
// 这种情况,代表(GlobalTransaction)事务不允许存在
// If transaction is not existing, execute without transaction.
if (notExistingTransaction(tx)) { // tx == null
return business.execute();
}
// Continue and execute with new transaction
break;
case REQUIRED:
// If current transaction is existing, execute with current transaction,
// else continue and execute with new transaction.
break;
case NEVER:
// 这种情况,(GlobalTransaction)事务若存在,则抛出异常(TransactionException)
// 若不存在,则执行业务逻辑.
// If transaction is existing, throw exception.
if (existingTransaction(tx)) {
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
, tx.getXid()));
} else {
// Execute without transaction and return.
return business.execute();
}
case MANDATORY:
// 这种情况,代表(GlobalTransaction)事务不存在,则抛出异常:TransactionException
// If transaction is not existing, throw exception.
if (notExistingTransaction(tx)) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
// Continue and execute with current transaction.
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
// ***********************************************************************
// 如果tx为空,则创建一个新的tx,它的角色为:GlobalTransactionRole.Launcher
// ***********************************************************************
// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
if (tx == null) {
// 此时全局事务xid还不存在的.
// 创建:DefaultGlobalTransaction 内部持有一个对象:DefaultTransactionManager
// 通过SPI获取到的DefaultTransactionManager
// DefaultGlobalTransaction() {
// this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);
// }
tx = GlobalTransactionContext.createNew();
}
// 把@GlobalTransactional转换成:业务对象(TransactionInfo)
// 1. 把业务对象(TransactionInfo)的信息设置到ThreadLocal(GlobalLockConfigHolder)对象里.
// set current tx config to holder
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {
// 这代码的注解很全面.
// 如果角色是:GlobalTransactionRole.Launcher,发送信息到TC(事务协调器)
// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
// else do nothing. Of course, the hooks will still be triggered.
// 3. 查看(TransactionalTemplate.beginTransaction)
beginTransaction(txInfo, tx);
Object rs;
try {
// ***************************************************************
// 执行业务逻辑
// ***************************************************************
// Do Your Business
rs = business.execute();
} catch (Throwable ex) {
// ***************************************************************
// 抛出异常时的处理
// ***************************************************************
// 3. The needed business exception to rollback.
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
// ***************************************************************
// 正确时的提交
// 4. everything is fine, commit.
// ***************************************************************
commitTransaction(tx);
return rs;
} finally {
// ***************************************************************
// 清理现场
// ***************************************************************
//5. clear
// 清除ThreaLocal上的GlobalLockConfig
resumeGlobalLockConfig(previousConfig);
// 执行钩子函数:TransactionHook.afterCompletion方法
triggerAfterCompletion();
// 清除ThreadLocal上的TransactionHook
cleanUp();
}
} finally {
// 把挂起的资源拿出来,重新设置到ThreadLocal.
// If the transaction is suspended, resume it.
if (suspendedResourcesHolder != null) {
tx.resume(suspendedResourcesHolder);
}
}
}
(3). TransactionalTemplate.beginTransaction
开始全局事务.
在执行全局事务之前,先回调相应的钩子函数.
在执行全局事务之后,先回调相应的钩子函数.
// 2. triggerBeforeBegin
private void triggerBeforeBegin() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.beforeBegin();
} catch (Exception e) { // 人家考虑还是挺周到的,遍历钩子函数并执行若是出错,也不能影响全局,仅仅打印error出日志
LOGGER.error("Failed execute beforeBegin in hook {}", e.getMessage(), e);
}
}
} // end triggerBeforeBegin
// 1. beginTransaction
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
// 注册全局事务前,执行:TransactionHook.beforeBegin函数
triggerBeforeBegin();
// *****************************************************
// 执行全局事务的注册
// 在这里我不详解,后面我留一节,专门讲解这部份
// GlobalTransaction.begin
// *****************************************************
tx.begin(txInfo.getTimeOut(), txInfo.getName());
// 注册全局事务后,执行:TransactionHook.afterBegin函数
triggerAfterBegin();
} catch (TransactionException txe) {
// 注册失败,则抛出异常(TransactionalExecutor.ExecutionException)
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
} // end beginTransaction
private void triggerAfterBegin() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterBegin();
} catch (Exception e) {
LOGGER.error("Failed execute afterBegin in hook {}", e.getMessage(), e);
}
}
} // end triggerAfterBegin
(4). TransactionalTemplate.completeTransactionAfterThrowing
进入方法的前提是,调用了业务方法时出现了异常.
如果抛出的异常(@GlobalTransactional),用户有自定义异常处理,则,把异常交给用户自己处理.
如果抛出的异常,用户没有自定义处理,Seta默认还是会再commit一次.
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {
//roll back
// 1. 获得注解@GlobalTransactional(rollbackFor=xxx,rollbackForClassName=xxx,noRollbackFor=xxx,noRollbackForClassName=xxxx)
// 2. 判断异常是否指定的异常
if (txInfo != null && txInfo.rollbackOn(originalException)) {
try {
// 回调到自定义的异常处理类上.
rollbackTransaction(tx, originalException);
} catch (TransactionException txe) {
// 回调失败的情况下,抛出:TransactionalExecutor.ExecutionException
// 异常信息为:TransactionalExecutor.Code.RollbackFailure
// Failed to rollback
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.RollbackFailure, originalException);
}
} else {
// 继续commit
// not roll back on this exception, so commit
commitTransaction(tx);
}
} // end completeTransactionAfterThrowing
private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
// TransactionHook.beforeRollback
triggerBeforeRollback();
// *******************************************************************
// 在这里,我先不讨论,会留一节,专门讨论这个类:GlobalTransaction
// *******************************************************************
// GlobalTransaction.rollback()
tx.rollback();
// TransactionHook.afterRollback
triggerAfterRollback();
// 3.1 Successfully rolled back
throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())
? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
} // end rollbackTransaction
(5). TransactionalTemplate.commitTransaction
// 执行业务逻辑正常的情况下.执行:commitTransaction
private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
// 执行:TransactionHook.beforeCommit
triggerBeforeCommit();
// *********************************************************************
// 这一部份的内容,我留到后面,再详细
// 执行GlobalTransaction.commit
// *********************************************************************
tx.commit();
// 执行:TransactionHook.afterCommit
triggerAfterCommit();
} catch (TransactionException txe) {
// 4.1 Failed to commit
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.CommitFailure);
}
} // end commitTransaction
(6). 总结
TransactionalTemplate的职责如下:
- 向TC事务协调器注册全局事务(执行begin).
- 调用业务逻辑.
- 失败时,rollback全局事务.
- 成功时,commit全局事务.
- 用户可以在ThreadLocal中注册钩子函数(TransactionHook),Seta会在回调相应的函数.
- begin/commit/rollback会委托给:GlobalTransaction去执行.
- 下一节,主要分析:GlobalTransaction.