DataSource增加createTable方法

This commit is contained in:
Redkale
2023-01-03 10:03:23 +08:00
parent 0032c06498
commit ba82ca3051
4 changed files with 263 additions and 13 deletions

View File

@@ -705,11 +705,146 @@ public class DataJdbcSource extends DataSqlSource {
}
}
@Override
protected <T> CompletableFuture<Integer> createTableDBAsync(EntityInfo<T> info, String copyTableSql, final Serializable pk, String... sqls) {
return supplyAsync(() -> createTableDB(info, copyTableSql, pk, sqls));
}
@Override
protected <T> CompletableFuture<Integer> dropTableDBAsync(EntityInfo<T> info, final String[] tables, FilterNode node, String... sqls) {
return supplyAsync(() -> dropTableDB(info, tables, node, sqls));
}
@Override
protected <T> int createTableDB(EntityInfo<T> info, String copyTableSql, Serializable pk, String... sqls) {
Connection conn = null;
Statement stmt;
final long s = System.currentTimeMillis();
try {
conn = writePool.pollConnection();
conn.setReadOnly(false);
conn.setAutoCommit(false);
int c;
if (copyTableSql == null) {
if (sqls.length == 1) {
stmt = conn.createStatement();
int c1 = stmt.executeUpdate(sqls[0]);
stmt.close();
c = c1;
} else {
stmt = conn.createStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
int c1 = 0;
int[] cs = stmt.executeBatch();
stmt.close();
for (int cc : cs) {
c1 += cc;
}
c = c1;
}
} else { //建分表
try {
stmt = conn.createStatement();
c = stmt.executeUpdate(copyTableSql);
} catch (SQLException se) {
if (isTableNotExist(info, se.getSQLState())) { //分表的原始表不存在
final String newTable = info.getTable(pk);
if (newTable.indexOf('.') <= 0) { //分表的原始表不存在
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " createTable sql=" + Arrays.toString(sqls));
}
//创建原始表
stmt = conn.createStatement();
if (sqls.length == 1) {
stmt.execute(sqls[0]);
} else {
for (String tableSql : sqls) {
stmt.addBatch(tableSql);
}
stmt.executeBatch();
}
stmt.close();
//再执行一遍创建分表操作
if (info.isLoggable(logger, Level.FINEST, copyTableSql)) {
logger.finest(info.getType().getSimpleName() + " createTable sql=" + copyTableSql);
}
stmt = conn.createStatement();
c = stmt.executeUpdate(copyTableSql);
stmt.close();
} else { //需要先建库
String newCatalog = newTable.substring(0, newTable.indexOf('.'));
String catalogSql = ("postgresql".equals(dbtype()) ? "CREATE SCHEMA IF NOT EXISTS " : "CREATE DATABASE IF NOT EXISTS ") + newCatalog;
try {
if (info.isLoggable(logger, Level.FINEST, catalogSql)) {
logger.finest(info.getType().getSimpleName() + " createCatalog sql=" + catalogSql);
}
stmt = conn.createStatement();
stmt.executeUpdate(catalogSql);
stmt.close();
} catch (SQLException sqle1) {
logger.log(Level.SEVERE, "create database " + copyTableSql + " error", sqle1);
}
try {
//再执行一遍创建分表操作
if (info.isLoggable(logger, Level.FINEST, copyTableSql)) {
logger.finest(info.getType().getSimpleName() + " createTable sql=" + copyTableSql);
}
stmt = conn.createStatement();
c = stmt.executeUpdate(copyTableSql);
stmt.close();
} catch (SQLException sqle2) {
if (isTableNotExist(info, sqle2.getSQLState())) {
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " createTable sql=" + Arrays.toString(sqls));
}
//创建原始表
stmt = conn.createStatement();
if (sqls.length == 1) {
stmt.execute(sqls[0]);
} else {
for (String tableSql : sqls) {
stmt.addBatch(tableSql);
}
stmt.executeBatch();
}
stmt.close();
//再执行一遍创建分表操作
if (info.isLoggable(logger, Level.FINEST, copyTableSql)) {
logger.finest(info.getType().getSimpleName() + " createTable sql=" + copyTableSql);
}
stmt = conn.createStatement();
c = stmt.executeUpdate(copyTableSql);
stmt.close();
} else {
throw new SourceException(sqle2);
}
}
}
}
throw new SourceException(se);
}
}
conn.commit();
slowLog(s, sqls);
return c;
} catch (SQLException e) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException se) {
}
}
throw new SourceException(e);
} finally {
if (conn != null) {
writePool.offerConnection(conn);
}
}
}
@Override
protected <T> int dropTableDB(EntityInfo<T> info, String[] tables, FilterNode node, String... sqls) {
Connection conn = null;

View File

@@ -201,6 +201,11 @@ public class DataMemorySource extends DataSqlSource implements SearchSource {
return CompletableFuture.completedFuture(0);
}
@Override
protected <T> CompletableFuture<Integer> createTableDBAsync(EntityInfo<T> info, String copyTableSql, Serializable pk, String... sqls) {
return CompletableFuture.completedFuture(0);
}
@Override
protected <T> CompletableFuture<Integer> dropTableDBAsync(EntityInfo<T> info, String[] tables, FilterNode node, String... sqls) {
return CompletableFuture.completedFuture(0);

View File

@@ -97,7 +97,9 @@ public interface DataSource extends Resourcable {
* @return 影响的记录条数
*/
default <T> int insert(final Collection<T> entitys) {
if (entitys == null || entitys.isEmpty()) return 0;
if (entitys == null || entitys.isEmpty()) {
return 0;
}
return insert(entitys.toArray());
}
@@ -110,7 +112,9 @@ public interface DataSource extends Resourcable {
* @return 影响的记录条数
*/
default <T> int insert(final Stream<T> entitys) {
if (entitys == null) return 0;
if (entitys == null) {
return 0;
}
return insert(entitys.toArray());
}
@@ -133,7 +137,9 @@ public interface DataSource extends Resourcable {
* @return CompletableFuture
*/
default <T> CompletableFuture<Integer> insertAsync(final Collection<T> entitys) {
if (entitys == null || entitys.isEmpty()) return CompletableFuture.completedFuture(0);
if (entitys == null || entitys.isEmpty()) {
return CompletableFuture.completedFuture(0);
}
return insertAsync(entitys.toArray());
}
@@ -146,7 +152,9 @@ public interface DataSource extends Resourcable {
* @return CompletableFuture
*/
default <T> CompletableFuture<Integer> insertAsync(final Stream<T> entitys) {
if (entitys == null) return CompletableFuture.completedFuture(0);
if (entitys == null) {
return CompletableFuture.completedFuture(0);
}
return insertAsync(entitys.toArray());
}
@@ -172,7 +180,9 @@ public interface DataSource extends Resourcable {
* @return 影响的记录条数
*/
default <T> int delete(final Collection<T> entitys) {
if (entitys == null || entitys.isEmpty()) return 0;
if (entitys == null || entitys.isEmpty()) {
return 0;
}
return delete(entitys.toArray());
}
@@ -186,7 +196,9 @@ public interface DataSource extends Resourcable {
* @return 影响的记录条数
*/
default <T> int delete(final Stream<T> entitys) {
if (entitys == null) return 0;
if (entitys == null) {
return 0;
}
return delete(entitys.toArray());
}
@@ -211,7 +223,9 @@ public interface DataSource extends Resourcable {
* @return 影响的记录条数CompletableFuture
*/
default <T> CompletableFuture<Integer> deleteAsync(final Collection<T> entitys) {
if (entitys == null || entitys.isEmpty()) return CompletableFuture.completedFuture(0);
if (entitys == null || entitys.isEmpty()) {
return CompletableFuture.completedFuture(0);
}
return deleteAsync(entitys.toArray());
}
@@ -225,7 +239,9 @@ public interface DataSource extends Resourcable {
* @return 影响的记录条数CompletableFuture
*/
default <T> CompletableFuture<Integer> deleteAsync(final Stream<T> entitys) {
if (entitys == null) return CompletableFuture.completedFuture(0);
if (entitys == null) {
return CompletableFuture.completedFuture(0);
}
return deleteAsync(entitys.toArray());
}
@@ -360,6 +376,31 @@ public interface DataSource extends Resourcable {
*/
public <T> CompletableFuture<Integer> clearTableAsync(final Class<T> clazz, final FilterNode node);
//------------------------createTable---------------------------
/**
* 删除表 <br>
* 等价SQL: DROP TABLE {table}<br>
*
* @param <T> Entity泛型
* @param clazz Entity类
* @param pk 主键
*
* @return 建表结果
*/
public <T> int createTable(final Class<T> clazz, final Serializable pk);
/**
* 删除表 <br>
* 等价SQL: DROP TABLE {table}<br>
*
* @param <T> Entity泛型
* @param clazz Entity类
* @param pk 主键
*
* @return 建表结果
*/
public <T> CompletableFuture<Integer> createTableAsync(final Class<T> clazz, final Serializable pk);
//------------------------dropAsync---------------------------
/**
* 删除表 <br>
@@ -439,7 +480,9 @@ public interface DataSource extends Resourcable {
* @return 影响的记录条数
*/
default <T> int update(final Collection<T> entitys) {
if (entitys == null || entitys.isEmpty()) return 0;
if (entitys == null || entitys.isEmpty()) {
return 0;
}
return update(entitys.toArray());
}
@@ -456,7 +499,9 @@ public interface DataSource extends Resourcable {
* @return 影响的记录条数
*/
default <T> int update(final Stream<T> entitys) {
if (entitys == null) return 0;
if (entitys == null) {
return 0;
}
return update(entitys.toArray());
}
@@ -487,7 +532,9 @@ public interface DataSource extends Resourcable {
* @return 影响的记录条数CompletableFuture
*/
default <T> CompletableFuture<Integer> updateAsync(final Collection<T> entitys) {
if (entitys == null || entitys.isEmpty()) return CompletableFuture.completedFuture(0);
if (entitys == null || entitys.isEmpty()) {
return CompletableFuture.completedFuture(0);
}
return updateAsync(entitys.toArray());
}
@@ -504,7 +551,9 @@ public interface DataSource extends Resourcable {
* @return 影响的记录条数CompletableFuture
*/
default <T> CompletableFuture<Integer> updateAsync(final Stream<T> entitys) {
if (entitys == null) return CompletableFuture.completedFuture(0);
if (entitys == null) {
return CompletableFuture.completedFuture(0);
}
return updateAsync(entitys.toArray());
}

View File

@@ -673,6 +673,9 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
//清空表
protected abstract <T> CompletableFuture<Integer> clearTableDBAsync(final EntityInfo<T> info, String[] tables, FilterNode node, final String... sqls);
//建表
protected abstract <T> CompletableFuture<Integer> createTableDBAsync(final EntityInfo<T> info, String copyTableSql, Serializable pk, final String... sqls);
//删除表
protected abstract <T> CompletableFuture<Integer> dropTableDBAsync(final EntityInfo<T> info, String[] tables, FilterNode node, final String... sqls);
@@ -721,6 +724,11 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
return clearTableDBAsync(info, tables, node, sqls).join();
}
//建表
protected <T> int createTableDB(final EntityInfo<T> info, String copyTableSql, Serializable pk, final String... sqls) {
return createTableDBAsync(info, copyTableSql, pk, sqls).join();
}
//删除表
protected <T> int dropTableDB(final EntityInfo<T> info, String[] tables, FilterNode node, final String... sqls) {
return dropTableDBAsync(info, tables, node, sqls).join();
@@ -1215,7 +1223,60 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
return sqls.toArray(new String[sqls.size()]);
}
//----------------------------- dropTableCompose -----------------------------
//----------------------------- dropTable -----------------------------
@Override
public <T> int createTable(final Class<T> clazz, final Serializable pk) {
final EntityInfo<T> info = loadEntityInfo(clazz);
final String[] sqls = createTableSqls(info);
if (sqls == null) {
return -1;
}
String copyTableSql = info.getTableStrategy() == null ? null : getTableCopySQL(info, info.getTable(pk));
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " createTable sql=" + Arrays.toString(sqls));
}
if (isAsync()) {
int rs = createTableDBAsync(info, copyTableSql, pk, sqls).join();
return rs;
} else {
int rs = createTableDB(info, copyTableSql, pk, sqls);
return rs;
}
}
@Override
public <T> CompletableFuture<Integer> createTableAsync(final Class<T> clazz, final Serializable pk) {
final EntityInfo<T> info = loadEntityInfo(clazz);
final String[] sqls = createTableSqls(info);
if (sqls == null) {
return CompletableFuture.completedFuture(-1);
}
String copyTableSql = info.getTableStrategy() == null ? null : getTableCopySQL(info, info.getTable(pk));
if (copyTableSql == null) {
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " createTable sql=" + Arrays.toString(sqls));
}
} else {
if (info.isLoggable(logger, Level.FINEST, copyTableSql)) {
logger.finest(info.getType().getSimpleName() + " createTable sql=" + copyTableSql);
}
}
if (isAsync()) {
return createTableDBAsync(info, copyTableSql, pk, sqls).whenComplete((rs, t) -> {
if (t != null) {
errorCompleteConsumer.accept(rs, t);
}
});
} else {
return supplyAsync(() -> createTableDB(info, copyTableSql, pk, sqls)).whenComplete((rs, t) -> {
if (t != null) {
errorCompleteConsumer.accept(rs, t);
}
});
}
}
//----------------------------- dropTable -----------------------------
@Override
public <T> int dropTable(Class<T> clazz, FilterNode node) {
final EntityInfo<T> info = loadEntityInfo(clazz);