From ba82ca30515cb16a6c5b51c6a13de5c7d31a164c Mon Sep 17 00:00:00 2001 From: Redkale Date: Tue, 3 Jan 2023 10:03:23 +0800 Subject: [PATCH] =?UTF-8?q?DataSource=E5=A2=9E=E5=8A=A0createTable?= =?UTF-8?q?=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/source/DataJdbcSource.java | 135 ++++++++++++++++++ .../org/redkale/source/DataMemorySource.java | 5 + .../java/org/redkale/source/DataSource.java | 73 ++++++++-- .../org/redkale/source/DataSqlSource.java | 63 +++++++- 4 files changed, 263 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/redkale/source/DataJdbcSource.java b/src/main/java/org/redkale/source/DataJdbcSource.java index a4b37befe..df022848f 100644 --- a/src/main/java/org/redkale/source/DataJdbcSource.java +++ b/src/main/java/org/redkale/source/DataJdbcSource.java @@ -705,11 +705,146 @@ public class DataJdbcSource extends DataSqlSource { } } + @Override + protected CompletableFuture createTableDBAsync(EntityInfo info, String copyTableSql, final Serializable pk, String... sqls) { + return supplyAsync(() -> createTableDB(info, copyTableSql, pk, sqls)); + } + @Override protected CompletableFuture dropTableDBAsync(EntityInfo info, final String[] tables, FilterNode node, String... sqls) { return supplyAsync(() -> dropTableDB(info, tables, node, sqls)); } + @Override + protected int createTableDB(EntityInfo info, String copyTableSql, Serializable pk, String... sqls) { + Connection conn = null; + Statement stmt; + final long s = System.currentTimeMillis(); + try { + conn = writePool.pollConnection(); + conn.setReadOnly(false); + conn.setAutoCommit(false); + int c; + if (copyTableSql == null) { + if (sqls.length == 1) { + stmt = conn.createStatement(); + int c1 = stmt.executeUpdate(sqls[0]); + stmt.close(); + c = c1; + } else { + stmt = conn.createStatement(); + for (String sql : sqls) { + stmt.addBatch(sql); + } + int c1 = 0; + int[] cs = stmt.executeBatch(); + stmt.close(); + for (int cc : cs) { + c1 += cc; + } + c = c1; + } + } else { //建分表 + try { + stmt = conn.createStatement(); + c = stmt.executeUpdate(copyTableSql); + } catch (SQLException se) { + if (isTableNotExist(info, se.getSQLState())) { //分表的原始表不存在 + final String newTable = info.getTable(pk); + if (newTable.indexOf('.') <= 0) { //分表的原始表不存在 + if (info.isLoggable(logger, Level.FINEST, sqls[0])) { + logger.finest(info.getType().getSimpleName() + " createTable sql=" + Arrays.toString(sqls)); + } + //创建原始表 + stmt = conn.createStatement(); + if (sqls.length == 1) { + stmt.execute(sqls[0]); + } else { + for (String tableSql : sqls) { + stmt.addBatch(tableSql); + } + stmt.executeBatch(); + } + stmt.close(); + //再执行一遍创建分表操作 + if (info.isLoggable(logger, Level.FINEST, copyTableSql)) { + logger.finest(info.getType().getSimpleName() + " createTable sql=" + copyTableSql); + } + stmt = conn.createStatement(); + c = stmt.executeUpdate(copyTableSql); + stmt.close(); + + } else { //需要先建库 + String newCatalog = newTable.substring(0, newTable.indexOf('.')); + String catalogSql = ("postgresql".equals(dbtype()) ? "CREATE SCHEMA IF NOT EXISTS " : "CREATE DATABASE IF NOT EXISTS ") + newCatalog; + try { + if (info.isLoggable(logger, Level.FINEST, catalogSql)) { + logger.finest(info.getType().getSimpleName() + " createCatalog sql=" + catalogSql); + } + stmt = conn.createStatement(); + stmt.executeUpdate(catalogSql); + stmt.close(); + } catch (SQLException sqle1) { + logger.log(Level.SEVERE, "create database " + copyTableSql + " error", sqle1); + } + try { + //再执行一遍创建分表操作 + if (info.isLoggable(logger, Level.FINEST, copyTableSql)) { + logger.finest(info.getType().getSimpleName() + " createTable sql=" + copyTableSql); + } + stmt = conn.createStatement(); + c = stmt.executeUpdate(copyTableSql); + stmt.close(); + } catch (SQLException sqle2) { + if (isTableNotExist(info, sqle2.getSQLState())) { + if (info.isLoggable(logger, Level.FINEST, sqls[0])) { + logger.finest(info.getType().getSimpleName() + " createTable sql=" + Arrays.toString(sqls)); + } + //创建原始表 + stmt = conn.createStatement(); + if (sqls.length == 1) { + stmt.execute(sqls[0]); + } else { + for (String tableSql : sqls) { + stmt.addBatch(tableSql); + } + stmt.executeBatch(); + } + stmt.close(); + //再执行一遍创建分表操作 + if (info.isLoggable(logger, Level.FINEST, copyTableSql)) { + logger.finest(info.getType().getSimpleName() + " createTable sql=" + copyTableSql); + } + stmt = conn.createStatement(); + c = stmt.executeUpdate(copyTableSql); + stmt.close(); + } else { + throw new SourceException(sqle2); + } + } + } + } + throw new SourceException(se); + } + } + conn.commit(); + slowLog(s, sqls); + return c; + } catch (SQLException e) { + if (conn != null) { + try { + conn.rollback(); + } catch (SQLException se) { + } + } + throw new SourceException(e); + } finally { + if (conn != null) { + writePool.offerConnection(conn); + } + } + } + @Override protected int dropTableDB(EntityInfo info, String[] tables, FilterNode node, String... sqls) { Connection conn = null; diff --git a/src/main/java/org/redkale/source/DataMemorySource.java b/src/main/java/org/redkale/source/DataMemorySource.java index cefd56279..67d0171ca 100644 --- a/src/main/java/org/redkale/source/DataMemorySource.java +++ b/src/main/java/org/redkale/source/DataMemorySource.java @@ -201,6 +201,11 @@ public class DataMemorySource extends DataSqlSource implements SearchSource { return CompletableFuture.completedFuture(0); } + @Override + protected CompletableFuture createTableDBAsync(EntityInfo info, String copyTableSql, Serializable pk, String... sqls) { + return CompletableFuture.completedFuture(0); + } + @Override protected CompletableFuture dropTableDBAsync(EntityInfo info, String[] tables, FilterNode node, String... sqls) { return CompletableFuture.completedFuture(0); diff --git a/src/main/java/org/redkale/source/DataSource.java b/src/main/java/org/redkale/source/DataSource.java index 254709a3a..a8bc38a16 100644 --- a/src/main/java/org/redkale/source/DataSource.java +++ b/src/main/java/org/redkale/source/DataSource.java @@ -97,7 +97,9 @@ public interface DataSource extends Resourcable { * @return 影响的记录条数 */ default int insert(final Collection entitys) { - if (entitys == null || entitys.isEmpty()) return 0; + if (entitys == null || entitys.isEmpty()) { + return 0; + } return insert(entitys.toArray()); } @@ -110,7 +112,9 @@ public interface DataSource extends Resourcable { * @return 影响的记录条数 */ default int insert(final Stream entitys) { - if (entitys == null) return 0; + if (entitys == null) { + return 0; + } return insert(entitys.toArray()); } @@ -133,7 +137,9 @@ public interface DataSource extends Resourcable { * @return CompletableFuture */ default CompletableFuture insertAsync(final Collection entitys) { - if (entitys == null || entitys.isEmpty()) return CompletableFuture.completedFuture(0); + if (entitys == null || entitys.isEmpty()) { + return CompletableFuture.completedFuture(0); + } return insertAsync(entitys.toArray()); } @@ -146,7 +152,9 @@ public interface DataSource extends Resourcable { * @return CompletableFuture */ default CompletableFuture insertAsync(final Stream entitys) { - if (entitys == null) return CompletableFuture.completedFuture(0); + if (entitys == null) { + return CompletableFuture.completedFuture(0); + } return insertAsync(entitys.toArray()); } @@ -172,7 +180,9 @@ public interface DataSource extends Resourcable { * @return 影响的记录条数 */ default int delete(final Collection entitys) { - if (entitys == null || entitys.isEmpty()) return 0; + if (entitys == null || entitys.isEmpty()) { + return 0; + } return delete(entitys.toArray()); } @@ -186,7 +196,9 @@ public interface DataSource extends Resourcable { * @return 影响的记录条数 */ default int delete(final Stream entitys) { - if (entitys == null) return 0; + if (entitys == null) { + return 0; + } return delete(entitys.toArray()); } @@ -211,7 +223,9 @@ public interface DataSource extends Resourcable { * @return 影响的记录条数CompletableFuture */ default CompletableFuture deleteAsync(final Collection entitys) { - if (entitys == null || entitys.isEmpty()) return CompletableFuture.completedFuture(0); + if (entitys == null || entitys.isEmpty()) { + return CompletableFuture.completedFuture(0); + } return deleteAsync(entitys.toArray()); } @@ -225,7 +239,9 @@ public interface DataSource extends Resourcable { * @return 影响的记录条数CompletableFuture */ default CompletableFuture deleteAsync(final Stream entitys) { - if (entitys == null) return CompletableFuture.completedFuture(0); + if (entitys == null) { + return CompletableFuture.completedFuture(0); + } return deleteAsync(entitys.toArray()); } @@ -360,6 +376,31 @@ public interface DataSource extends Resourcable { */ public CompletableFuture clearTableAsync(final Class clazz, final FilterNode node); + //------------------------createTable--------------------------- + /** + * 删除表
+ * 等价SQL: DROP TABLE {table}
+ * + * @param Entity泛型 + * @param clazz Entity类 + * @param pk 主键 + * + * @return 建表结果 + */ + public int createTable(final Class clazz, final Serializable pk); + + /** + * 删除表
+ * 等价SQL: DROP TABLE {table}
+ * + * @param Entity泛型 + * @param clazz Entity类 + * @param pk 主键 + * + * @return 建表结果 + */ + public CompletableFuture createTableAsync(final Class clazz, final Serializable pk); + //------------------------dropAsync--------------------------- /** * 删除表
@@ -439,7 +480,9 @@ public interface DataSource extends Resourcable { * @return 影响的记录条数 */ default int update(final Collection entitys) { - if (entitys == null || entitys.isEmpty()) return 0; + if (entitys == null || entitys.isEmpty()) { + return 0; + } return update(entitys.toArray()); } @@ -456,7 +499,9 @@ public interface DataSource extends Resourcable { * @return 影响的记录条数 */ default int update(final Stream entitys) { - if (entitys == null) return 0; + if (entitys == null) { + return 0; + } return update(entitys.toArray()); } @@ -487,7 +532,9 @@ public interface DataSource extends Resourcable { * @return 影响的记录条数CompletableFuture */ default CompletableFuture updateAsync(final Collection entitys) { - if (entitys == null || entitys.isEmpty()) return CompletableFuture.completedFuture(0); + if (entitys == null || entitys.isEmpty()) { + return CompletableFuture.completedFuture(0); + } return updateAsync(entitys.toArray()); } @@ -504,7 +551,9 @@ public interface DataSource extends Resourcable { * @return 影响的记录条数CompletableFuture */ default CompletableFuture updateAsync(final Stream entitys) { - if (entitys == null) return CompletableFuture.completedFuture(0); + if (entitys == null) { + return CompletableFuture.completedFuture(0); + } return updateAsync(entitys.toArray()); } diff --git a/src/main/java/org/redkale/source/DataSqlSource.java b/src/main/java/org/redkale/source/DataSqlSource.java index a3be1c2bd..331f04e3f 100644 --- a/src/main/java/org/redkale/source/DataSqlSource.java +++ b/src/main/java/org/redkale/source/DataSqlSource.java @@ -673,6 +673,9 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi //清空表 protected abstract CompletableFuture clearTableDBAsync(final EntityInfo info, String[] tables, FilterNode node, final String... sqls); + //建表 + protected abstract CompletableFuture createTableDBAsync(final EntityInfo info, String copyTableSql, Serializable pk, final String... sqls); + //删除表 protected abstract CompletableFuture dropTableDBAsync(final EntityInfo info, String[] tables, FilterNode node, final String... sqls); @@ -721,6 +724,11 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi return clearTableDBAsync(info, tables, node, sqls).join(); } + //建表 + protected int createTableDB(final EntityInfo info, String copyTableSql, Serializable pk, final String... sqls) { + return createTableDBAsync(info, copyTableSql, pk, sqls).join(); + } + //删除表 protected int dropTableDB(final EntityInfo info, String[] tables, FilterNode node, final String... sqls) { return dropTableDBAsync(info, tables, node, sqls).join(); @@ -1215,7 +1223,60 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi return sqls.toArray(new String[sqls.size()]); } - //----------------------------- dropTableCompose ----------------------------- + //----------------------------- dropTable ----------------------------- + @Override + public int createTable(final Class clazz, final Serializable pk) { + final EntityInfo info = loadEntityInfo(clazz); + final String[] sqls = createTableSqls(info); + if (sqls == null) { + return -1; + } + String copyTableSql = info.getTableStrategy() == null ? null : getTableCopySQL(info, info.getTable(pk)); + if (info.isLoggable(logger, Level.FINEST, sqls[0])) { + logger.finest(info.getType().getSimpleName() + " createTable sql=" + Arrays.toString(sqls)); + } + if (isAsync()) { + int rs = createTableDBAsync(info, copyTableSql, pk, sqls).join(); + return rs; + } else { + int rs = createTableDB(info, copyTableSql, pk, sqls); + return rs; + } + } + + @Override + public CompletableFuture createTableAsync(final Class clazz, final Serializable pk) { + final EntityInfo info = loadEntityInfo(clazz); + final String[] sqls = createTableSqls(info); + if (sqls == null) { + return CompletableFuture.completedFuture(-1); + } + String copyTableSql = info.getTableStrategy() == null ? null : getTableCopySQL(info, info.getTable(pk)); + if (copyTableSql == null) { + if (info.isLoggable(logger, Level.FINEST, sqls[0])) { + logger.finest(info.getType().getSimpleName() + " createTable sql=" + Arrays.toString(sqls)); + } + } else { + if (info.isLoggable(logger, Level.FINEST, copyTableSql)) { + logger.finest(info.getType().getSimpleName() + " createTable sql=" + copyTableSql); + } + } + if (isAsync()) { + return createTableDBAsync(info, copyTableSql, pk, sqls).whenComplete((rs, t) -> { + if (t != null) { + errorCompleteConsumer.accept(rs, t); + } + }); + } else { + return supplyAsync(() -> createTableDB(info, copyTableSql, pk, sqls)).whenComplete((rs, t) -> { + if (t != null) { + errorCompleteConsumer.accept(rs, t); + } + }); + } + } + + //----------------------------- dropTable ----------------------------- @Override public int dropTable(Class clazz, FilterNode node) { final EntityInfo info = loadEntityInfo(clazz);