From 496b165d4261b8246869ee0b2ffefcde17633522 Mon Sep 17 00:00:00 2001 From: Redkale Date: Mon, 2 Jan 2023 10:01:24 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96DataJdbcSource?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/source/DataJdbcSource.java | 644 +++++++++--------- 1 file changed, 322 insertions(+), 322 deletions(-) diff --git a/src/main/java/org/redkale/source/DataJdbcSource.java b/src/main/java/org/redkale/source/DataJdbcSource.java index 7cdfa5918..a4b37befe 100644 --- a/src/main/java/org/redkale/source/DataJdbcSource.java +++ b/src/main/java/org/redkale/source/DataJdbcSource.java @@ -201,328 +201,6 @@ public class DataJdbcSource extends DataSqlSource { return i; } - @Override - protected CompletableFuture deleteDBAsync(final EntityInfo info, String[] tables, Flipper flipper, FilterNode node, Map> pkmap, final String... sqls) { - return supplyAsync(() -> deleteDB(info, tables, flipper, node, pkmap, sqls)); - } - - @Override - protected int deleteDB(EntityInfo info, String[] tables, Flipper flipper, FilterNode node, Map> pkmap, String... sqls) { - Connection conn = null; - final long s = System.currentTimeMillis(); - try { - conn = writePool.pollConnection(); - conn.setReadOnly(false); - conn.setAutoCommit(false); - int c; - if (sqls.length == 1) { - final Statement stmt = conn.createStatement(); - int c1 = stmt.executeUpdate(sqls[0]); - stmt.close(); - c = c1; - } else { - final Statement stmt = conn.createStatement(); - for (String sql : sqls) { - stmt.addBatch(sql); - } - int c1 = 0; - int[] cs = stmt.executeBatch(); - stmt.close(); - for (int cc : cs) { - c1 += cc; - } - c = c1; - } - conn.commit(); - slowLog(s, sqls); - return c; - } catch (SQLException e) { - try { - conn.rollback(); - } catch (SQLException se) { - } - if (isTableNotExist(info, e.getSQLState())) { - if (info.getTableStrategy() == null) { - String[] tableSqls = createTableSqls(info); - if (tableSqls != null) { - try { - Statement st = conn.createStatement(); - if (tableSqls.length == 1) { - st.execute(tableSqls[0]); - } else { - for (String tableSql : tableSqls) { - st.addBatch(tableSql); - } - st.executeBatch(); - } - st.close(); - return 0; - } catch (SQLException e2) { - throw new SourceException(e2); - } - } - //单表结构不存在 - return 0; - } else if (tables != null && tables.length == 1) { - //只查一个不存在的分表 - return 0; - } else if (tables != null && tables.length > 1) { - //多分表查询中一个或多个分表不存在 -// String tableName = parseNotExistTableName(e); -// if (tableName == null) { -// throw new SourceException(e); -// } - String[] oldTables = tables; - List notExistTables = checkNotExistTablesNoThrows(conn, tables); - if (notExistTables.isEmpty()) { - throw new SourceException(e); - } - for (String t : notExistTables) { - if (pkmap != null) { - pkmap.remove(t); - } else { - tables = Utility.remove(tables, t); - } - } - if (logger.isLoggable(Level.FINE)) { - logger.log(Level.FINE, "deleteDB, old-tables: " + Arrays.toString(oldTables) + ", new-tables: " + (pkmap != null ? pkmap.keySet() : Arrays.toString(tables))); - } - if ((pkmap != null ? pkmap.size() : tables.length) == 0) { //分表全部不存在 - return 0; - } - sqls = pkmap != null ? deleteSql(info, pkmap) : deleteSql(info, tables, flipper, node); - if (info.isLoggable(logger, Level.FINEST, sqls[0])) { - logger.finest(info.getType().getSimpleName() + " delete sql=" + Arrays.toString(sqls)); - } - try { - final Statement stmt = conn.createStatement(); - for (String sql : sqls) { - stmt.addBatch(sql); - } - int c = 0; - int[] cs = stmt.executeBatch(); - stmt.close(); - for (int cc : cs) { - c += cc; - } - conn.commit(); - slowLog(s, sqls); - return c; - } catch (SQLException se) { - throw new SourceException(se); - } - } else { - throw new SourceException(e); - } - } - throw new SourceException(e); - } finally { - if (conn != null) { - writePool.offerConnection(conn); - } - } - } - - @Override - protected CompletableFuture clearTableDBAsync(EntityInfo info, final String[] tables, FilterNode node, String... sqls) { - return supplyAsync(() -> clearTableDB(info, tables, node, sqls)); - } - - @Override - protected int clearTableDB(EntityInfo info, String[] tables, FilterNode node, String... sqls) { - Connection conn = null; - final long s = System.currentTimeMillis(); - try { - conn = writePool.pollConnection(); - conn.setReadOnly(false); - conn.setAutoCommit(false); - int c; - if (sqls.length == 1) { - final Statement stmt = conn.createStatement(); - int c1 = stmt.executeUpdate(sqls[0]); - stmt.close(); - c = c1; - } else { - final Statement stmt = conn.createStatement(); - for (String sql : sqls) { - stmt.addBatch(sql); - } - int c1 = 0; - int[] cs = stmt.executeBatch(); - stmt.close(); - for (int cc : cs) { - c1 += cc; - } - c = c1; - } - conn.commit(); - slowLog(s, sqls); - return c; - } catch (SQLException e) { - try { - conn.rollback(); - } catch (SQLException se) { - } - if (isTableNotExist(info, e.getSQLState())) { - if (info.getTableStrategy() == null) { - //单表结构不存在 - return 0; - } else if (tables != null && tables.length == 1) { - //只查一个不存在的分表 - return 0; - } else if (tables != null && tables.length > 1) { - //多分表查询中一个或多个分表不存在 -// String tableName = parseNotExistTableName(e); -// if (tableName == null) { -// throw new SourceException(e); -// } - String[] oldTables = tables; - List notExistTables = checkNotExistTablesNoThrows(conn, tables); - if (notExistTables.isEmpty()) { - throw new SourceException(e); - } - for (String t : notExistTables) { - tables = Utility.remove(tables, t); - } - if (logger.isLoggable(Level.FINE)) { - logger.log(Level.FINE, "clearTableDB, old-tables: " + Arrays.toString(oldTables) + ", new-tables: " + Arrays.toString(tables)); - } - if (tables.length == 0) { //分表全部不存在 - return 0; - } - sqls = clearTableSql(info, tables, node); - if (info.isLoggable(logger, Level.FINEST, sqls[0])) { - logger.finest(info.getType().getSimpleName() + " clearTable sql=" + Arrays.toString(sqls)); - } - try { - final Statement stmt = conn.createStatement(); - for (String sql : sqls) { - stmt.addBatch(sql); - } - int c = 0; - int[] cs = stmt.executeBatch(); - stmt.close(); - for (int cc : cs) { - c += cc; - } - conn.commit(); - slowLog(s, sqls); - return c; - } catch (SQLException se) { - throw new SourceException(se); - } - } else { - throw new SourceException(e); - } - } - throw new SourceException(e); - } finally { - if (conn != null) { - writePool.offerConnection(conn); - } - } - } - - @Override - protected CompletableFuture dropTableDBAsync(EntityInfo info, final String[] tables, FilterNode node, String... sqls) { - return supplyAsync(() -> dropTableDB(info, tables, node, sqls)); - } - - @Override - protected int dropTableDB(EntityInfo info, String[] tables, FilterNode node, String... sqls) { - Connection conn = null; - final long s = System.currentTimeMillis(); - try { - conn = writePool.pollConnection(); - conn.setReadOnly(false); - conn.setAutoCommit(false); - int c; - if (sqls.length == 1) { - final Statement stmt = conn.createStatement(); - int c1 = stmt.executeUpdate(sqls[0]); - stmt.close(); - c = c1; - } else { - final Statement stmt = conn.createStatement(); - for (String sql : sqls) { - stmt.addBatch(sql); - } - int c1 = 0; - int[] cs = stmt.executeBatch(); - stmt.close(); - for (int cc : cs) { - c1 += cc; - } - c = c1; - } - conn.commit(); - slowLog(s, sqls); - return c; - } catch (SQLException e) { - try { - conn.rollback(); - } catch (SQLException se) { - } - if (isTableNotExist(info, e.getSQLState())) { - if (info.getTableStrategy() == null) { - //单表结构不存在 - return 0; - } else if (tables != null && tables.length == 1) { - //只查一个不存在的分表 - return 0; - } else if (tables != null && tables.length > 1) { - //多分表查询中一个或多个分表不存在 -// String tableName = parseNotExistTableName(e); -// if (tableName == null) { -// throw new SourceException(e); -// } - String[] oldTables = tables; - List notExistTables = checkNotExistTablesNoThrows(conn, tables); - if (notExistTables.isEmpty()) { - throw new SourceException(e); - } - for (String t : notExistTables) { - tables = Utility.remove(tables, t); - } - if (logger.isLoggable(Level.FINE)) { - logger.log(Level.FINE, "dropTableDB, old-tables: " + Arrays.toString(oldTables) + ", new-tables: " + Arrays.toString(tables)); - } - if (tables.length == 0) { //分表全部不存在 - return 0; - } - sqls = dropTableSql(info, tables, node); - if (info.isLoggable(logger, Level.FINEST, sqls[0])) { - logger.finest(info.getType().getSimpleName() + " dropTable sql=" + Arrays.toString(sqls)); - } - try { - final Statement stmt = conn.createStatement(); - for (String sql : sqls) { - stmt.addBatch(sql); - } - int c = 0; - int[] cs = stmt.executeBatch(); - stmt.close(); - for (int cc : cs) { - c += cc; - } - conn.commit(); - slowLog(s, sqls); - return c; - } catch (SQLException se) { - throw new SourceException(se); - } - } else { - throw new SourceException(e); - } - } - throw new SourceException(e); - } finally { - if (conn != null) { - writePool.offerConnection(conn); - } - } - } - @Override public int batch(final DataBatch batch) { Objects.requireNonNull(batch); @@ -805,6 +483,328 @@ public class DataJdbcSource extends DataSqlSource { } } + @Override + protected CompletableFuture deleteDBAsync(final EntityInfo info, String[] tables, Flipper flipper, FilterNode node, Map> pkmap, final String... sqls) { + return supplyAsync(() -> deleteDB(info, tables, flipper, node, pkmap, sqls)); + } + + @Override + protected int deleteDB(EntityInfo info, String[] tables, Flipper flipper, FilterNode node, Map> pkmap, String... sqls) { + Connection conn = null; + final long s = System.currentTimeMillis(); + try { + conn = writePool.pollConnection(); + conn.setReadOnly(false); + conn.setAutoCommit(false); + int c; + if (sqls.length == 1) { + final Statement stmt = conn.createStatement(); + int c1 = stmt.executeUpdate(sqls[0]); + stmt.close(); + c = c1; + } else { + final Statement stmt = conn.createStatement(); + for (String sql : sqls) { + stmt.addBatch(sql); + } + int c1 = 0; + int[] cs = stmt.executeBatch(); + stmt.close(); + for (int cc : cs) { + c1 += cc; + } + c = c1; + } + conn.commit(); + slowLog(s, sqls); + return c; + } catch (SQLException e) { + try { + conn.rollback(); + } catch (SQLException se) { + } + if (isTableNotExist(info, e.getSQLState())) { + if (info.getTableStrategy() == null) { + String[] tableSqls = createTableSqls(info); + if (tableSqls != null) { + try { + Statement st = conn.createStatement(); + if (tableSqls.length == 1) { + st.execute(tableSqls[0]); + } else { + for (String tableSql : tableSqls) { + st.addBatch(tableSql); + } + st.executeBatch(); + } + st.close(); + return 0; + } catch (SQLException e2) { + throw new SourceException(e2); + } + } + //单表结构不存在 + return 0; + } else if (tables != null && tables.length == 1) { + //只查一个不存在的分表 + return 0; + } else if (tables != null && tables.length > 1) { + //多分表查询中一个或多个分表不存在 +// String tableName = parseNotExistTableName(e); +// if (tableName == null) { +// throw new SourceException(e); +// } + String[] oldTables = tables; + List notExistTables = checkNotExistTablesNoThrows(conn, tables); + if (notExistTables.isEmpty()) { + throw new SourceException(e); + } + for (String t : notExistTables) { + if (pkmap != null) { + pkmap.remove(t); + } else { + tables = Utility.remove(tables, t); + } + } + if (logger.isLoggable(Level.FINE)) { + logger.log(Level.FINE, "delete, old-tables: " + Arrays.toString(oldTables) + ", new-tables: " + (pkmap != null ? pkmap.keySet() : Arrays.toString(tables))); + } + if ((pkmap != null ? pkmap.size() : tables.length) == 0) { //分表全部不存在 + return 0; + } + sqls = pkmap != null ? deleteSql(info, pkmap) : deleteSql(info, tables, flipper, node); + if (info.isLoggable(logger, Level.FINEST, sqls[0])) { + logger.finest(info.getType().getSimpleName() + " delete sql=" + Arrays.toString(sqls)); + } + try { + final Statement stmt = conn.createStatement(); + for (String sql : sqls) { + stmt.addBatch(sql); + } + int c = 0; + int[] cs = stmt.executeBatch(); + stmt.close(); + for (int cc : cs) { + c += cc; + } + conn.commit(); + slowLog(s, sqls); + return c; + } catch (SQLException se) { + throw new SourceException(se); + } + } else { + throw new SourceException(e); + } + } + throw new SourceException(e); + } finally { + if (conn != null) { + writePool.offerConnection(conn); + } + } + } + + @Override + protected CompletableFuture clearTableDBAsync(EntityInfo info, final String[] tables, FilterNode node, String... sqls) { + return supplyAsync(() -> clearTableDB(info, tables, node, sqls)); + } + + @Override + protected int clearTableDB(EntityInfo info, String[] tables, FilterNode node, String... sqls) { + Connection conn = null; + final long s = System.currentTimeMillis(); + try { + conn = writePool.pollConnection(); + conn.setReadOnly(false); + conn.setAutoCommit(false); + int c; + if (sqls.length == 1) { + final Statement stmt = conn.createStatement(); + int c1 = stmt.executeUpdate(sqls[0]); + stmt.close(); + c = c1; + } else { + final Statement stmt = conn.createStatement(); + for (String sql : sqls) { + stmt.addBatch(sql); + } + int c1 = 0; + int[] cs = stmt.executeBatch(); + stmt.close(); + for (int cc : cs) { + c1 += cc; + } + c = c1; + } + conn.commit(); + slowLog(s, sqls); + return c; + } catch (SQLException e) { + try { + conn.rollback(); + } catch (SQLException se) { + } + if (isTableNotExist(info, e.getSQLState())) { + if (info.getTableStrategy() == null) { + //单表结构不存在 + return 0; + } else if (tables != null && tables.length == 1) { + //只查一个不存在的分表 + return 0; + } else if (tables != null && tables.length > 1) { + //多分表查询中一个或多个分表不存在 +// String tableName = parseNotExistTableName(e); +// if (tableName == null) { +// throw new SourceException(e); +// } + String[] oldTables = tables; + List notExistTables = checkNotExistTablesNoThrows(conn, tables); + if (notExistTables.isEmpty()) { + throw new SourceException(e); + } + for (String t : notExistTables) { + tables = Utility.remove(tables, t); + } + if (logger.isLoggable(Level.FINE)) { + logger.log(Level.FINE, "clearTable, old-tables: " + Arrays.toString(oldTables) + ", new-tables: " + Arrays.toString(tables)); + } + if (tables.length == 0) { //分表全部不存在 + return 0; + } + sqls = clearTableSql(info, tables, node); + if (info.isLoggable(logger, Level.FINEST, sqls[0])) { + logger.finest(info.getType().getSimpleName() + " clearTable sql=" + Arrays.toString(sqls)); + } + try { + final Statement stmt = conn.createStatement(); + for (String sql : sqls) { + stmt.addBatch(sql); + } + int c = 0; + int[] cs = stmt.executeBatch(); + stmt.close(); + for (int cc : cs) { + c += cc; + } + conn.commit(); + slowLog(s, sqls); + return c; + } catch (SQLException se) { + throw new SourceException(se); + } + } else { + throw new SourceException(e); + } + } + throw new SourceException(e); + } finally { + if (conn != null) { + writePool.offerConnection(conn); + } + } + } + + @Override + protected CompletableFuture dropTableDBAsync(EntityInfo info, final String[] tables, FilterNode node, String... sqls) { + return supplyAsync(() -> dropTableDB(info, tables, node, sqls)); + } + + @Override + protected int dropTableDB(EntityInfo info, String[] tables, FilterNode node, String... sqls) { + Connection conn = null; + final long s = System.currentTimeMillis(); + try { + conn = writePool.pollConnection(); + conn.setReadOnly(false); + conn.setAutoCommit(false); + int c; + if (sqls.length == 1) { + final Statement stmt = conn.createStatement(); + int c1 = stmt.executeUpdate(sqls[0]); + stmt.close(); + c = c1; + } else { + final Statement stmt = conn.createStatement(); + for (String sql : sqls) { + stmt.addBatch(sql); + } + int c1 = 0; + int[] cs = stmt.executeBatch(); + stmt.close(); + for (int cc : cs) { + c1 += cc; + } + c = c1; + } + conn.commit(); + slowLog(s, sqls); + return c; + } catch (SQLException e) { + try { + conn.rollback(); + } catch (SQLException se) { + } + if (isTableNotExist(info, e.getSQLState())) { + if (info.getTableStrategy() == null) { + //单表结构不存在 + return 0; + } else if (tables != null && tables.length == 1) { + //只查一个不存在的分表 + return 0; + } else if (tables != null && tables.length > 1) { + //多分表查询中一个或多个分表不存在 +// String tableName = parseNotExistTableName(e); +// if (tableName == null) { +// throw new SourceException(e); +// } + String[] oldTables = tables; + List notExistTables = checkNotExistTablesNoThrows(conn, tables); + if (notExistTables.isEmpty()) { + throw new SourceException(e); + } + for (String t : notExistTables) { + tables = Utility.remove(tables, t); + } + if (logger.isLoggable(Level.FINE)) { + logger.log(Level.FINE, "dropTable, old-tables: " + Arrays.toString(oldTables) + ", new-tables: " + Arrays.toString(tables)); + } + if (tables.length == 0) { //分表全部不存在 + return 0; + } + sqls = dropTableSql(info, tables, node); + if (info.isLoggable(logger, Level.FINEST, sqls[0])) { + logger.finest(info.getType().getSimpleName() + " dropTable sql=" + Arrays.toString(sqls)); + } + try { + final Statement stmt = conn.createStatement(); + for (String sql : sqls) { + stmt.addBatch(sql); + } + int c = 0; + int[] cs = stmt.executeBatch(); + stmt.close(); + for (int cc : cs) { + c += cc; + } + conn.commit(); + slowLog(s, sqls); + return c; + } catch (SQLException se) { + throw new SourceException(se); + } + } else { + throw new SourceException(e); + } + } + throw new SourceException(e); + } finally { + if (conn != null) { + writePool.offerConnection(conn); + } + } + } + @Override protected CompletableFuture updateEntityDBAsync(EntityInfo info, T... entitys) { return supplyAsync(() -> updateEntityDB(info, entitys));