(1). 概述
前一节,分析到在UpdateExecutor方法里,会调用:ConnectionProxy.commit()方法,没有进入到里面进行详解,这一小节,主要分析commit/rollback方法.
(2). ConnectionProxy.commit
public void commit() throws SQLException {
try {
LOCK_RETRY_POLICY.execute(() -> {
// 2.1 委托给:doCommit()
doCommit();
return null;
});
} catch (SQLException e) {
// 失败的情况下,进行:rollback,并向TC汇报失败.
if (targetConnection != null && !getAutoCommit()) {
rollback();
}
throw e;
} catch (Exception e) {
throw new SQLException(e);
}
}// end commit
// 2.1 doCommit()
private void doCommit() throws SQLException {
if (context.inGlobalTransaction()) { // xid != null
// 3. processGlobalTransactionCommit
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) { // isGlobalLockRequire == true
processLocalCommitWithGlobalLocks();
} else { //直接commit
targetConnection.commit();
}
}// end doCommit
(3). ConnectionProxy.processGlobalTransactionCommit
3. processGlobalTransactionCommit
private void processGlobalTransactionCommit() throws SQLException {
try {
// *************************************************************
// 4. 注册分支事务
// 在这里名称是叫做注册,实际还韵含着注册和获取全局锁的操作来着的.
// *************************************************************
register();
} catch (TransactionException e) {
// *************************************************************
// 5. 注册分支事务失败,会封装异常,然后继续throw,压根就不会走下面的逻辑了.
// *************************************************************
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
// 把undo_log刷盘
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
// *************************************************************
// 这里才是真正的提交
// *************************************************************
targetConnection.commit();
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
// 向TC汇报异常
// 6. ConnectionProxy.report
report(false);
throw new SQLException(ex);
}
if (IS_REPORT_SUCCESS_ENABLE) { // 默认值是:false
// 当开启了汇报情况下,才会向TC汇报正常.
// 这样设计的意思是:Seata(TC)默认认为是成功的,只有失败才会向TC汇报.
// 6. ConnectionProxy.report
report(true);
}
// 重置上下文.
context.reset();
}
(4). ConnectionProxy.register
private void register() throws TransactionException {
if (!context.hasUndoLog() || context.getLockKeysBuffer().isEmpty()) {
return;
}
// RM向TC注册分支事务.
// 注册成功后,会产生分支事务ID
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
null, context.getXid(), null, context.buildLockKeys());
// 设置分支事务ID
context.setBranchId(branchId);
}
(5). ConnectionProxy.recognizeLockKeyConflictException
注册分支事务失败,封装异常信息,继续往外抛出异常.
private void recognizeLockKeyConflictException(TransactionException te, String lockKeys) throws SQLException {
if (te.getCode() == TransactionExceptionCode.LockKeyConflict) { // 针对:TransactionExceptionCode.LockKeyConflict处理
StringBuilder reasonBuilder = new StringBuilder("get global lock fail, xid:");
reasonBuilder.append(context.getXid());
if (StringUtils.isNotBlank(lockKeys)) {
reasonBuilder.append(", lockKeys:").append(lockKeys);
}
throw new LockConflictException(reasonBuilder.toString());
} else {
throw new SQLException(te);
}
}
(6). ConnectionProxy.report
private void report(boolean commitDone) throws SQLException {
// 不存在分支ID的情况下,跳过
if (context.getBranchId() == null) {
return;
}
// 最大重试次数为:5次
int retry = REPORT_RETRY_COUNT; // 5
while (retry > 0) {
try {
// RM向TC汇报情况
DefaultResourceManager.get().branchReport(BranchType.AT, context.getXid(), context.getBranchId(),
commitDone ? BranchStatus.PhaseOne_Done : BranchStatus.PhaseOne_Failed, null);
return;
} catch (Throwable ex) {
// 有异常的情况下,重试,直到retry < 0
LOGGER.error("Failed to report [" + context.getBranchId() + "/" + context.getXid() + "] commit done ["
+ commitDone + "] Retry Countdown: " + retry);
retry--;
if (retry == 0) {
throw new SQLException("Failed to report branch status " + commitDone, ex);
}
}
}
}
(7). 总结
ConnectionProxy.commit方法的主要职责:
- 向TC获取全局锁(注册分支事事)
- 提交本地事务.
- 失败的情况下,进行rollback,并向TC汇报失败.
- 那什么时候?进行第二阶段的全局:commit/rollback呢?答案就在:RmNettyRemotingClient.registerProcessor方法里.