(1). TransactionManager
在这一小节,我主要分析:DefaultTransactionManager,因为它是:TransactionManager的实现类.
(2). DefaultTransactionManager
public class DefaultTransactionManager implements TransactionManager {
// begin
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
// 创建:GlobalBeginRequest它属于:AbstractMessage的子类
// 在前面有分析过:AbstractMessage是Seata在Netty上基于ProtoBuf自定义的协议栈
GlobalBeginRequest request = new GlobalBeginRequest();
// 如果注解有指定名称就用注解上的名称,否则,就是方法签名了
request.setTransactionName(name);
request.setTimeout(timeout);
// syncCall == this.syncCall
// 调用:TmNettyRemotingClient.getInstance().sendSyncRequest(request)
GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
}
return response.getXid();
}
// commit
@Override
public GlobalStatus commit(String xid) throws TransactionException {
// 创建:GlobalCommitRequest它属于:AbstractMessage的子类
// 在前面有分析过:AbstractMessage是Seata在Netty上基于ProtoBuf自定义的协议栈
GlobalCommitRequest globalCommit = new GlobalCommitRequest();
globalCommit.setXid(xid);
// syncCall == this.syncCall
// 调用:TmNettyRemotingClient.getInstance().sendSyncRequest(request)
GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
return response.getGlobalStatus();
}
// rollback
@Override
public GlobalStatus rollback(String xid) throws TransactionException {
// 创建:GlobalRollbackRequest它属于:AbstractMessage的子类
// 在前面有分析过:AbstractMessage是Seata在Netty上基于ProtoBuf自定义的协议栈
GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
globalRollback.setXid(xid);
// syncCall == this.syncCall
// 调用:TmNettyRemotingClient.getInstance().sendSyncRequest(request)
GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
return response.getGlobalStatus();
}
// 向TC获得XID的状态
@Override
public GlobalStatus getStatus(String xid) throws TransactionException {
// 创建:GlobalStatusRequest它属于:AbstractMessage的子类
// 在前面有分析过:AbstractMessage是Seata在Netty上基于ProtoBuf自定义的协议栈
GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
queryGlobalStatus.setXid(xid);
// syncCall == this.syncCall
// 调用:TmNettyRemotingClient.getInstance().sendSyncRequest(request)
GlobalStatusResponse response = (GlobalStatusResponse) syncCall(queryGlobalStatus);
return response.getGlobalStatus();
}
// 向TC汇报状态
@Override
public GlobalStatus globalReport(String xid, GlobalStatus globalStatus) throws TransactionException {
// 创建:GlobalReportRequest它属于:AbstractMessage的子类
// 在前面有分析过:AbstractMessage是Seata在Netty上基于ProtoBuf自定义的协议栈
GlobalReportRequest globalReport = new GlobalReportRequest();
globalReport.setXid(xid);
globalReport.setGlobalStatus(globalStatus);
// syncCall == this.syncCall
// 调用:TmNettyRemotingClient.getInstance().sendSyncRequest(request)
GlobalReportResponse response = (GlobalReportResponse) syncCall(globalReport);
return response.getGlobalStatus();
}
// ****************************************************************
// 通过:TmNettyRemotingClient进行同步调用
// ****************************************************************
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
try {
// **********************************************************************
// TmNettyRemotingClient.getInstance().sendSyncRequest我在前面已经分析过了
// **********************************************************************
return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
} catch (TimeoutException toe) {
throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
}
}
}
(3). 整个链路调用图解
(4). 总结
GlobalTransactionalInterceptor的职责:
- 拦截注解(AOP).
- 向TC发送指令(begin),产生xid.
- 执行业务逻辑代码.
- 向TC发送指令(commit/rollback).
- 留一个疑问?分析了这么久,都只是分析到全局事务的处理,那么分支事务是如何做的呢?哈哈哈,且跟着我继续往下走.