(1). AT模式下切入口在呢?
AT模式的切入口在:
- 以JDBC编程为例(Connection/PreparedStatement/ResultSet).
- 业务端(JdbcTemplate/MyBatis/Hibernate…),会调用DataSourceProxy.getConnection()方法获取一个Connection,不过这个Connection是经过代理之后的(ConnectionProxy).
- 通过ConnectionProxy创建:PreparedStatementProxy(Statement).
- PreparedStatementProxy.execute返回:ResultSet.
(2). 测试发送请求
- curl http://localhost:8084/purchase/commit
- 断点在:AbstractConnectionProxy.prepareStatement
(3). ConnectionProxy.getConnection
首先业务端(JdbcTemplate/MyBatis/Hibernate)会获得一个Connection.
public ConnectionProxy getConnection() throws SQLException {
Connection targetConnection = targetDataSource.getConnection();
// **********************************************************************
// 获取一个Connection时都是new出一个:ConnectionProxy
// 所以ConnectionProxy内的属性(ConnectionContext)都是实例级的,不存在冲突
// **********************************************************************
return new ConnectionProxy(this, targetConnection);
}
(4). AbstractConnectionProxy.prepareStatement
再次,业务端(JdbcTemplate/MyBatis/Hibernate)会调用:Connection.prepareStatement方法.
public PreparedStatement prepareStatement(String sql) throws SQLException {
// mysql
String dbType = getDbType();
// support oracle 10.2+
PreparedStatement targetPreparedStatement = null;
// 如果是AT模式下
if (BranchType.AT == RootContext.getBranchType()) { // true
// sql = update storage_tbl set count = count - ? where commodity_code = ?
// 对SQL语句进行识别(io.seata.sqlparser.druid.mysql.MySQLUpdateRecognizer)
List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
// 只有INSERT语句才会进入该逻辑
if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
}// end if
}// end if
}// end if
// update/delete语句都会进该逻辑
if (targetPreparedStatement == null) { // true
// 调用真实的:Connection.prepareStatement(sql)
// getTargetConnection() = com.mysql.cj.jdbc.ConnectionImpl
targetPreparedStatement = getTargetConnection().prepareStatement(sql);
}
// ****************************************************
// 创建了:PreparedStatementProxy
// ****************************************************
return new PreparedStatementProxy(this, targetPreparedStatement, sql);
} //end prepareStatement
(5). PreparedStatementProxy.executeUpdate
public int executeUpdate() throws SQLException {
// 委派给了:ExecuteTemplate
return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate());
}
(6). ExecuteTemplate.execute
public class ExecuteTemplate {
// 1. execute
public static <T, S extends Statement> T execute(
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
//2. execute
return execute(null, statementProxy, statementCallback, args);
}// end execute
// 3. execute
public static <T, S extends Statement> T execute(
// sqlRecognizers = null
List<SQLRecognizer> sqlRecognizers,
// statementProxy = io.seata.rm.datasource.PreparedStatementProxy
StatementProxy<S> statementProxy,
// 在第5步的lambad函数
// io.seata.rm.datasource.PreparedStatementProxy$$Lambda$586/
StatementCallback<T, S> statementCallback,
// args=[]
Object... args) throws SQLException {
// 执行原始的SQL语句
// false
if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
// Just work as original statement
return statementCallback.execute(statementProxy.getTargetStatement(), args);
}
// mysql
String dbType = statementProxy.getConnectionProxy().getDbType();
if (CollectionUtils.isEmpty(sqlRecognizers)) { //true
// 对SQL语句又重新进行了一次识别?
// 前面不是识别了吗?
// 因为没办法通过参数传递过来
sqlRecognizers = SQLVisitorFactory.get(
statementProxy.getTargetSQL(),
dbType);
}
// 执行器
Executor<T> executor;
if (CollectionUtils.isEmpty(sqlRecognizers)) {
// **************************************************
// 如果对SQL语句没有识别到类型,那么,就使用默认的:PlainExecutor
// **************************************************
executor = new PlainExecutor<>(statementProxy, statementCallback);
} else {
if (sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
switch (sqlRecognizer.getSQLType()) {
case INSERT: // INSERT:
executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
new Object[]{statementProxy, statementCallback, sqlRecognizer});
break;
case UPDATE: // UpdateExecutor
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE: // DeleteExecutor
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
default:
// 默认:PlainExecutor
executor = new PlainExecutor<>(statementProxy, statementCallback);
break;
}
} else {
// 如果识别出来的是多条SQL语句类型,那么执行器则是:MultiExecutor
executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
}
} // end else
T rs;
try {
// ************************************************************
// 调用Executor去执行,并返回:ResultSet
// 在这一篇我只能分析到这里.
// Executor如何执行的,我会另开几篇进行分析.
// ************************************************************
rs = executor.execute(args);
} catch (Throwable ex) {
if (!(ex instanceof SQLException)) {
// Turn other exception into SQLException
ex = new SQLException(ex);
}
throw (SQLException) ex;
}
return rs;
}// end execute
}
(7). Seata AT模型执行流程图解
(8). 总结
AT模型是以DataSource为切入点,对Connection/PreparedStatement进行了代理.