(1). 概述
先描述下问题,当使用Shardin-jdbc之后,从开发的视角,只有虚拟表了,那如何让Liquibase根据虚拟表信息,生成物理表呢?
在这里我只聊下大概思路:
- 获得Sharding-jdbc代理的DataSource(ShardingDataSource).
- 在ShardingDataSource里有所有真实数据源的集合(dataSource.getDataSourceMap()).
- 在ShardingDataSource里有ShardingRuntimeContext,里面存在表规则信息(虚拟表与物理表的关系),提取这些表规则信息,转换成你的业务模型.
- 根据业务模型,调用Liquibase创建表.
(2). 提取ShardingDataSource信息,转换成业务模型(DataBaseInfo)
@Bean
@Conditional(ShardingJdbcDataSourceCondition.class)
@ConditionalOnClass(DataSource.class)
public DataBaseInfo dataBaseInfo(DataSource shardingDataSource) {
if (null != shardingDataSource && shardingDataSource instanceof ShardingDataSource) {
ShardingDataSource dataSource = (ShardingDataSource) shardingDataSource;
// 所有的数据源集合.
Map<String, DataSource> dataSourceMap = dataSource.getDataSourceMap();
// 业务模型对象
DataBaseInfo.Builder builder = DataBaseInfo.newBuilder();
builder.dataSources(dataSourceMap);
// 获得Sharding-jdbc配置信息.
ShardingRuntimeContext shardingRuntimeContext = dataSource.getRuntimeContext();
// 默认的数据源
ShardingRuleConfiguration shardingRuleConfiguration = dataSource.getRuntimeContext().getRule().getRuleConfiguration();
builder.defaultDataSourceName(shardingRuleConfiguration.getDefaultDataSourceName());
// 数据源的类型(MySQL/Oracle...)
String databaseType = shardingRuntimeContext.getDatabaseType().getName();
builder.databaseType(databaseType);
// 获得规则信息集
ShardingRule shardingRule = shardingRuntimeContext.getRule();
Collection<TableRule> tableRules = shardingRule.getTableRules();
for (TableRule tableRule : tableRules) {
String logicTable = tableRule.getLogicTable();
List<DataNode> dataNodes = tableRule.getActualDataNodes();
dataNodes.forEach(dataNode -> {
String tableName = dataNode.getTableName();
String dataSourceName = dataNode.getDataSourceName();
TableInfo tableInfo = TableInfo.newBuilder().logicTable(logicTable)
.dataSourceName(dataSourceName)
.tableName(tableName).build();
builder.addTableInfo(tableInfo);
if (logger.isDebugEnabled()) {
logger.debug("logicTable:[{}],tableNmae:[{}],dataSource:[{}]", logicTable,
tableName, dataSourceName);
}
});
return builder.build();
}
}
return DataBaseInfo.newBuilder().build();
}
(3). 将业务模型(DataBaseInfo),转换成Liquibase
@Bean
@ConditionalOnBean(DataBaseInfo.class)
public Map<Liquibase, Contexts> liquibases(
DataBaseInfo dataBaseInfo,
ObjectProvider<List<LiquibaseCustomizer>> customizersList,
ObjectProvider<Map<String, Class<? extends Database>>> databasesMap,
LiquibaseIntegrationProperties liquibaseIntegrationProperties,
LiquibaseResourceLoader liquibaseResourceLoader) throws Exception {
Map<String, Class<? extends Database>> databases = databasesMap.getIfAvailable();
List<LiquibaseCustomizer> customizers = customizersList.getIfAvailable();
Map<Liquibase, Contexts> liquibases = new HashMap<>();
// changeLog位置
String changeLogFile = liquibaseIntegrationProperties.getChangeLog();
if (null == changeLogFile) {
logger.error("liquibase.changeLogFile properties is require");
throw new IllegalArgumentException("liquibase.changeLogFile properties is require");
}
// 数据库类型
String databaseType = dataBaseInfo.getDatabaseType();
// 数据源集合
Map<String, DataSource> dataSourceMap = dataBaseInfo.getDataSourceMap();
// 上下文信息
String context = liquibaseIntegrationProperties.getContexts();
// 默认的数据源名称
String defaultDataSourceName = dataBaseInfo.getDefaultDataSourceName();
// 创建资源访问授权器
ResourceAccessor resourceAccessor = new SpringResourceAccessor(liquibaseResourceLoader.getResourceLoader());
// 构建默认表的信息
if (null != defaultDataSourceName) {
StringBuilder contextBuilder = new StringBuilder(context);
contextBuilder.append(",").append(TableType.PhysicalTable);
// 获得数据源
DataSource dataSource = dataSourceMap.get(defaultDataSourceName);
// 根据数据源创建:Database
Database database = buildDatabase(databases, databaseType, dataSource);
// 构建:Liquibase
Liquibase liquibase = new Liquibase(changeLogFile, resourceAccessor, database);
// *******************************************************************
// 构建上下文,这一步很重要:
// 在Sharding-jdbc中默认的数据源,对上下文很重要,当上下文为:TableType.PhysicalTable时
// changeLog中context对应得上的才会执行.
// *******************************************************************
Contexts contexts = new Contexts(contextBuilder.toString());
// 允许业务对:liquibase进行自定义(实现:LiquibaseCustomizer即可)
if (!customizers.isEmpty()) {
customizers.forEach(customizer -> customizer.customize(liquibase));
}
liquibases.put(liquibase, contexts);
}
// 所有的逻辑表信息
Iterator<TableInfo> iterator = dataBaseInfo.getTableInfos().iterator();
while (iterator.hasNext()) {
TableInfo tableInfo = iterator.next();
String logicTable = tableInfo.getLogicTable();
String tableName = tableInfo.getTableName();
String dataSourceName = tableInfo.getDataSourceName();
// 1. 首先增加用户自定义的上下文信息
StringBuilder contextBuilder = new StringBuilder(context);
// *********************************************************************
// 2. TableInfo信息存在的情况下,代表着这是一张逻辑表(虚表)
// *********************************************************************
contextBuilder.append(",").append(TableType.LogicalTable);
// *********************************************************************
// 3. 如果有配置默认数据源,代表没有分片的表都路由到这个数据源上(所以,属于物理表)
// *********************************************************************
if (null != defaultDataSourceName && defaultDataSourceName.equalsIgnoreCase(dataSourceName)) {
contextBuilder.append(",").append(TableType.PhysicalTable);
}
// 检查规则对应的数据源是否存在
if (!dataSourceMap.containsKey(dataSourceName)) {
String formatLine = String.format("Rule dataSourceName:[%s],logicTable:[%s],physicsTable:[%s],Not Found DataSource", defaultDataSourceName, logicTable, tableName);
logger.error(formatLine);
throw new IllegalArgumentException(formatLine);
}
// 获得数据源
DataSource dataSource = dataSourceMap.get(dataSourceName);
// 根据数据源创建:Database
Database database = buildDatabase(databases, databaseType, dataSource);
// 构建:Liquibase
Liquibase liquibase = new Liquibase(changeLogFile, resourceAccessor, database);
// *********************************************************************
// 逻辑表与物理表的关系(这个是必须要存在的,开发在chagneLog时,表名称可以变量[逻辑表名称])
// *********************************************************************
liquibase.setChangeLogParameter(logicTable, tableName);
// *********************************************************************
// 物理表与数据源关系(这个是附助数据.)
// *********************************************************************
liquibase.setChangeLogParameter(tableName, dataSourceName);
// 允许业务对:liquibase进行自定义(实现:LiquibaseCustomizer即可)
if (!customizers.isEmpty()) {
customizers.forEach(customizer -> customizer.customize(liquibase));
}
if (logger.isDebugEnabled()) {
logger.debug("build Liquibase SUCCESS.[{}]", liquibase);
}
// 构建上下文
Contexts contexts = new Contexts(contextBuilder.toString());
liquibases.put(liquibase, contexts);
}
return liquibases;
}
private Database buildDatabase(Map<String, Class<? extends Database>> databases,
String databaseType,
DataSource dataSource) throws Exception {
Database database = null;
if (!databases.containsKey(databaseType)) {
String formatLine = String.format("databases:[%s],No Match databaseType:[%s]", databases, databaseType);
throw new IllegalArgumentException(formatLine);
}
// 不能共用对象,每一次都要构建出新的来
database = databases.get(databaseType).newInstance();
DatabaseConnection databaseConnection = buildDatabaseConnection(dataSource);
database.setConnection(databaseConnection);
return database;
}
private DatabaseConnection buildDatabaseConnection(DataSource dataSource) throws Exception {
Connection connection = dataSource.getConnection();
DatabaseConnection databaseConnection = new JdbcConnection(connection);
return databaseConnection;
}
(4). 调用Liquibase.update方法,进行建表
经过测试,发现:Liquibase不支持并发.
public class LiquibaseLifecycle implements ApplicationContextAware, SmartLifecycle {
private final Logger log = LoggerFactory.getLogger(LiquibaseLifecycle.class);
private AtomicBoolean running = new AtomicBoolean(false);
private Map<Liquibase, Contexts> liquibases;
private ApplicationContext applicationContext;
public LiquibaseLifecycle(Map<Liquibase, Contexts> liquibases) {
this.liquibases = liquibases;
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public void stop(Runnable callback) {
stop();
callback.run();
}
@Override
public int getPhase() {
return 0;
}
@Override
public void start() {
if (running.compareAndSet(false, true)) {
List<LiquibaseExecute> executes = new ArrayList<>(liquibases.size());
CountDownLatch latch = new CountDownLatch(liquibases.size());
// 把Liquibase集合转换成:Execute集合.
Iterator<Map.Entry<Liquibase, Contexts>> iterator = liquibases.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Liquibase, Contexts> entry = iterator.next();
Liquibase liquibase = entry.getKey();
Contexts contexts = entry.getValue();
// 转换成:LiquibaseExecute对象.
LiquibaseExecute execute = new LiquibaseExecute(latch, liquibase, contexts);
executes.add(execute);
}
for (int i = 0; i < executes.size(); i++) {
// 这种任务只执行一次,不需要建线程池
// 为每一个Liquibase构建线程
// TODO lixin 测试发现:Liquibase不支持并发创建表.
Thread thread = new Thread(executes.get(i));
thread.setDaemon(true);
thread.setName("liquibase-generate-table-" + i);
thread.run();
}
try {
// 等待所有的count
latch.await();
// 是否所有的Executer的结果都是成功的
boolean isAllSuccess = false;
// 获取成功的数据
long successCount = executes.stream().filter(execute -> execute.getIsSuccess().get()).count();
if (successCount == executes.size()) {
isAllSuccess = true;
}
if (isAllSuccess) {
// 发布事件,触发向注册中心进行注册.
applicationContext.publishEvent(new RegisterServiceStartEvent("TriggerRegister"));
running.set(Boolean.TRUE);
} else {
// 获取所有失败的:Execute
List<LiquibaseExecute> failExecutes = executes.stream().filter(execute -> execute.getIsSuccess().get() == false).collect(Collectors.toList());
for (LiquibaseExecute execute : failExecutes) {
log.error("execute liquibase FAIL,description:[{}]", execute);
}
}
} catch (InterruptedException ignore) {
}
}
}
@Override
public void stop() {
}
@Override
public boolean isRunning() {
return running.get();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
class LiquibaseExecute implements Runnable {
private final Logger log = LoggerFactory.getLogger(LiquibaseExecute.class);
private Liquibase liquibase;
private Contexts context;
private AtomicBoolean isSuccess = new AtomicBoolean(false);
private Throwable exception = null;
private CountDownLatch latch;
public LiquibaseExecute(CountDownLatch latch, Liquibase liquibase, Contexts context) {
this.latch = latch;
this.liquibase = liquibase;
this.context = context;
}
public Contexts getContext() {
return context;
}
public Liquibase getLiquibase() {
return liquibase;
}
public AtomicBoolean getIsSuccess() {
return isSuccess;
}
public Throwable getException() {
return exception;
}
@Override
public void run() {
try {
log.info("START execute generate table for liquibase:[{}]", liquibase);
liquibase.update(context);
isSuccess.compareAndSet(false, true);
log.info("END execute generate table for liquibase:[{}]", liquibase);
} catch (Throwable e) {
exception = e;
log.warn("FAIL execute generate table for liquibase:[{}],exception:[{}]", liquibase, e);
}
// 放在代码,最后面:latch只是用来记录线程是否执行完毕.不做成功或异常处理.
latch.countDown();
}
@Override
public String toString() {
return "Execute{" +
"liquibase=" + liquibase +
", context=" + context +
'}';
}
}
(4). root.changelog.xml
<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog
xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-2.0.xsd">
<!-- 逻辑表context="LogicalTable" -->
<!-- 在编写逻辑表的XML时,(必须)要注意三点: -->
<!-- 1. tableName="${t_order}",这里的变量名称是你在sharding-jdbc中你声明的逻辑表名称 -->
<!-- 2. 如果是逻辑表,要指定这个值:context="LogicalTable" -->
<!-- 3. liquibase 是根据:id+author计算(check sum),所以,要在auth上增加多一个标识(_${t_order}),否则,同一个库不同的表,会创建失败 -->
<!-- 比如: md5(1+lixin) = 123 ==> t_order_1 -->
<!-- 比如: md5(1+lixin) = 123 ==> t_order_2 -->
<!-- 上面的例子就会造成,在db1库,只会创建一张表成功,另一张表不成功,提示:已经存在. -->
<changeSet id="1" author="lixin_${t_order}" context="LogicalTable">
<createTable tableName="${t_order}">
<column name="order_id" type="bigint(20)">
<constraints primaryKey="true" nullable="false"/>
</column>
<column name="price" type="decimal(10,2)"/>
<column name="user_id" type="bigint(20)"/>
<column name="status" type="varchar(50)"/>
</createTable>
</changeSet>
<!-- 物理表(context="PhysicalTable") -->
<!-- 物理表注意事项: -->
<!-- 1. context="PhysicalTable" 必须指定为物理表 -->
<!-- 2. 表名称不再需要变量了. -->
<changeSet id="2" author="lixin" context="PhysicalTable">
<createTable tableName="t_user">
<column name="user_id" type="bigint(20)">
<constraints primaryKey="true" nullable="false"/>
</column>
<column name="fullname" type="varchar(255)"/>
<column name="user_type" type="char(1)"/>
</createTable>
</changeSet>
</databaseChangeLog>
(5). 总结
需要做好规划,在编写changelog时,开发要清楚使用物理表和逻辑表.