(1). 分支事务是何时commit和rollback
和AT模式是一样的,分支事务都是要等待TC大体流程如下:
- RmBranchCommitProcessor接受TC发送过来的事件.
- DefaultRMHandler.onRequest对请求进行转发.
- DefaultRMHandler.handle对请求进行处理.
- 最终:TCCResourceManager.branchCommit/branchRollback进行处理.
(2). TCCResourceManager.branchCommit
public BranchStatus branchCommit(
BranchType branchType,
String xid,
long branchId,
String resourceId,
String applicationData) throws TransactionException {
// @TwoPhaseBusinessAction(name = "firstTccAction", commitMethod = "commit", rollbackMethod = "rollback")
// 在分支事务,进行注册的时候,就在保存资源ID(firstTccAction)
// ActionInterceptorHandler.doTccActionLogStore
TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
// 如果是这样,是否代表着RM是有状态的?
if (tccResource == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
}
// Bean是Spring创建的
Object targetTCCBean = tccResource.getTargetBean();
// commit
Method commitMethod = tccResource.getCommitMethod();
// 检查下Bean和对应的方法是否存在
if (targetTCCBean == null || commitMethod == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
}
try {
// 在 ActionInterceptorHandler.doTccActionLogStore会创建:BusinessActionContext
// 把数据转换成了JSON,并提交给了TC.
在这阶段,TC会把数据传过来,需要把JSON数据反序列化成:BusinessActionContext
//BusinessActionContext
BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
applicationData);
// ****************************************************************************
// 调用业务方法
// ****************************************************************************
Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);
LOGGER.info("TCC resource commit result : {}, xid: {}, branchId: {}, resourceId: {}", ret, xid, branchId, resourceId);
// *****************************************************************
// TCC看来是比较强性的规定:commit的返回类型了.
// *****************************************************************
// 1. 返回结果(ret)如果为null,则直接告诉TC:BranchStatus.PhaseTwo_Committed.
// 2. 如果ret == TwoPhaseResult,获取:TwoPhaseResult.success,根据success,决定返回:PhaseTwo_Committed/PhaseTwo_CommitFailed_Retryable.
// 3. 把ret强制转换成Booleean,再决定返回:PhaseTwo_Committed/PhaseTwo_CommitFailed_Retryable.
// 4. 总结:TCC的commit的方法签名,返回值反正建议是:Boolean.
boolean result;
if (ret != null) {
if (ret instanceof TwoPhaseResult) {
result = ((TwoPhaseResult)ret).isSuccess();
} else {
result = (boolean)ret;
}
} else {
result = true;
}
// result == true ==> BranchStatus.PhaseTwo_Committed
// result == false ==> BranchStatus.PhaseTwo_CommitFailed_Retryable
return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
} catch (Throwable t) {
// **********************************************************
// 抛出异常时,通知重试
// **********************************************************
String msg = String.format("commit TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
LOGGER.error(msg, t);
return BranchStatus.PhaseTwo_CommitFailed_Retryable;
}
}
(3). TCCResourceManager.branchRollback
public BranchStatus branchRollback(
BranchType branchType,
String xid,
long branchId,
String resourceId,
String applicationData) throws TransactionException {
TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
if (tccResource == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
}
Object targetTCCBean = tccResource.getTargetBean();
Method rollbackMethod = tccResource.getRollbackMethod();
if (targetTCCBean == null || rollbackMethod == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
}
try {
//BusinessActionContext
BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
applicationData);
// *******************************************************************
// 调用业务方法
// *******************************************************************
Object ret = rollbackMethod.invoke(targetTCCBean, businessActionContext);
LOGGER.info("TCC resource rollback result : {}, xid: {}, branchId: {}, resourceId: {}", ret, xid, branchId, resourceId);
boolean result;
if (ret != null) {
if (ret instanceof TwoPhaseResult) {
result = ((TwoPhaseResult)ret).isSuccess();
} else {
result = (boolean)ret;
}
} else {
result = true;
}
// 返回:BranchStatus.PhaseTwo_Rollbacked/PhaseTwo_RollbackFailed_Retryable
return result ? BranchStatus.PhaseTwo_Rollbacked : BranchStatus.PhaseTwo_RollbackFailed_Retryable;
} catch (Throwable t) {
String msg = String.format("rollback TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
LOGGER.error(msg, t);
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
}
(4). 总结
- @GlobalTransactional向TC注册全局事务.并通过网络透传xid.
- RM绑定远程xid到本地线程,当遇到(@TwoPhaseBusinessAction/@LocalTCC)时,会向TC注册分支事务,并调用业务代码的try方法.
- @GlobalTransactional决定着整个全局事务的commit和rollback.
- 如果是commit,则,由TC通知所有的参与者(RM),调用commit.
- 如果是rollback,则,由TC通知所有的参与者(RM),调用rollback.