优化DataJdbcSource

This commit is contained in:
Redkale
2022-12-29 14:56:24 +08:00
parent 8701450940
commit 2c861a5ed4
3 changed files with 291 additions and 152 deletions

View File

@@ -202,66 +202,44 @@ public class DataJdbcSource extends DataSqlSource {
}
@Override
protected <T> CompletableFuture<Integer> deleteDBAsync(final EntityInfo<T> info, String[] tables, Flipper flipper, final String... sqls) {
return supplyAsync(() -> deleteDB(info, tables, flipper, sqls));
protected <T> CompletableFuture<Integer> deleteDBAsync(final EntityInfo<T> info, String[] tables, Flipper flipper, FilterNode node, Map<String, List<Serializable>> pkmap, final String... sqls) {
return supplyAsync(() -> deleteDB(info, tables, flipper, node, pkmap, sqls));
}
@Override
protected <T> int deleteDB(EntityInfo<T> info, String[] tables, Flipper flipper, String... sqls) {
protected <T> int deleteDB(EntityInfo<T> info, String[] tables, Flipper flipper, FilterNode node, Map<String, List<Serializable>> pkmap, String... sqls) {
Connection conn = null;
final long s = System.currentTimeMillis();
try {
conn = writePool.pollConnection();
conn.setReadOnly(false);
conn.setAutoCommit(false);
int c = 0;
int c;
if (sqls.length == 1) {
String sql = sqls[0];
sql += ((flipper == null || flipper.getLimit() < 1) ? "" : (" LIMIT " + flipper.getLimit()));
if (info.isLoggable(logger, Level.FINEST, sql)) {
logger.finest(info.getType().getSimpleName() + " delete sql=" + sql);
}
final Statement stmt = conn.createStatement();
c = stmt.executeUpdate(sql);
int c1 = stmt.executeUpdate(sqls[0]);
stmt.close();
c = c1;
} else {
if (flipper == null || flipper.getLimit() < 1) {
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " delete sqls=" + Arrays.toString(sqls));
}
final Statement stmt = conn.createStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
int[] cs = stmt.executeBatch();
stmt.close();
for (int cc : cs) {
c += cc;
}
} else {
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " limit " + flipper.getLimit() + " delete sqls=" + Arrays.toString(sqls));
}
final Statement stmt = conn.createStatement();
for (String sql : sqls) {
stmt.addBatch(sql + " LIMIT " + flipper.getLimit());
}
int[] cs = stmt.executeBatch();
stmt.close();
for (int cc : cs) {
c += cc;
}
final Statement stmt = conn.createStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
int c1 = 0;
int[] cs = stmt.executeBatch();
stmt.close();
for (int cc : cs) {
c1 += cc;
}
c = c1;
}
conn.commit();
slowLog(s, sqls);
return c;
} catch (SQLException e) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException se) {
}
try {
conn.rollback();
} catch (SQLException se) {
}
if (isTableNotExist(info, e.getSQLState())) {
if (info.getTableStrategy() == null) {
@@ -283,6 +261,57 @@ public class DataJdbcSource extends DataSqlSource {
throw new SourceException(e2);
}
}
//单表结构不存在
return 0;
} else if (tables != null && tables.length == 1) {
//只查一个不存在的分表
return 0;
} else if (tables != null && tables.length > 1) {
//多分表查询中一个或多个分表不存在
String tableName = parseNotExistTableName(e);
if (tableName == null) {
throw new SourceException(e);
}
String[] oldTables = tables;
List<String> notExistTables = checkNotExistTablesNoThrows(conn, tables, tableName);
if (notExistTables.isEmpty()) {
throw new SourceException(e);
}
System.out.println(tableName + " notExistTables : " + notExistTables);
for (String t : notExistTables) {
if (pkmap != null) {
pkmap.remove(t);
} else {
tables = Utility.remove(tables, t);
}
}
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "deleteDB, old-tables: " + Arrays.toString(oldTables) + ", new-tables: " + (pkmap != null ? pkmap.keySet() : Arrays.toString(tables)));
}
if ((pkmap != null ? pkmap.size() : tables.length) == 0) { //分表全部不存在
return 0;
}
sqls = pkmap != null ? deleteSql(info, pkmap) : deleteSql(info, tables, flipper, node);
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " delete sql=" + Arrays.toString(sqls));
}
try {
final Statement stmt = conn.createStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
int c = 0;
int[] cs = stmt.executeBatch();
stmt.close();
for (int cc : cs) {
c += cc;
}
conn.commit();
slowLog(s, sqls);
return c;
} catch (SQLException se) {
throw new SourceException(se);
}
} else {
throw new SourceException(e);
}
@@ -296,45 +325,96 @@ public class DataJdbcSource extends DataSqlSource {
}
@Override
protected <T> CompletableFuture<Integer> clearTableDBAsync(EntityInfo<T> info, final String[] tables, String... sqls) {
return supplyAsync(() -> clearTableDB(info, tables, sqls));
protected <T> CompletableFuture<Integer> clearTableDBAsync(EntityInfo<T> info, final String[] tables, FilterNode node, String... sqls) {
return supplyAsync(() -> clearTableDB(info, tables, node, sqls));
}
@Override
protected <T> int clearTableDB(EntityInfo<T> info, final String[] tables, String... sqls) {
protected <T> int clearTableDB(EntityInfo<T> info, String[] tables, FilterNode node, String... sqls) {
Connection conn = null;
final long s = System.currentTimeMillis();
try {
conn = writePool.pollConnection();
conn.setReadOnly(false);
conn.setAutoCommit(false);
int c = 0;
final Statement stmt = conn.createStatement();
int c;
if (sqls.length == 1) {
String sql = sqls[0];
c = stmt.executeUpdate(sql);
final Statement stmt = conn.createStatement();
int c1 = stmt.executeUpdate(sqls[0]);
stmt.close();
c = c1;
} else {
final Statement stmt = conn.createStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
int c1 = 0;
int[] cs = stmt.executeBatch();
stmt.close();
for (int cc : cs) {
c += cc;
c1 += cc;
}
c = c1;
}
stmt.close();
conn.commit();
slowLog(s, sqls);
return c;
} catch (SQLException e) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException se) {
}
try {
conn.rollback();
} catch (SQLException se) {
}
if (isTableNotExist(info, e.getSQLState())) {
return -1;
if (info.getTableStrategy() == null) {
//单表结构不存在
return 0;
} else if (tables != null && tables.length == 1) {
//只查一个不存在的分表
return 0;
} else if (tables != null && tables.length > 1) {
//多分表查询中一个或多个分表不存在
String tableName = parseNotExistTableName(e);
if (tableName == null) {
throw new SourceException(e);
}
String[] oldTables = tables;
List<String> notExistTables = checkNotExistTablesNoThrows(conn, tables, tableName);
if (notExistTables.isEmpty()) {
throw new SourceException(e);
}
for (String t : notExistTables) {
tables = Utility.remove(tables, t);
}
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "clearTableDB, old-tables: " + Arrays.toString(oldTables) + ", new-tables: " + Arrays.toString(tables));
}
if (tables.length == 0) { //分表全部不存在
return 0;
}
sqls = clearTableSql(info, tables, node);
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " clearTable sql=" + Arrays.toString(sqls));
}
try {
final Statement stmt = conn.createStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
int c = 0;
int[] cs = stmt.executeBatch();
stmt.close();
for (int cc : cs) {
c += cc;
}
conn.commit();
slowLog(s, sqls);
return c;
} catch (SQLException se) {
throw new SourceException(se);
}
} else {
throw new SourceException(e);
}
}
throw new SourceException(e);
} finally {
@@ -345,51 +425,96 @@ public class DataJdbcSource extends DataSqlSource {
}
@Override
protected <T> CompletableFuture<Integer> dropTableDBAsync(EntityInfo<T> info, final String[] tables, String... sqls) {
return supplyAsync(() -> dropTableDB(info, tables, sqls));
protected <T> CompletableFuture<Integer> dropTableDBAsync(EntityInfo<T> info, final String[] tables, FilterNode node, String... sqls) {
return supplyAsync(() -> dropTableDB(info, tables, node, sqls));
}
@Override
protected <T> int dropTableDB(EntityInfo<T> info, String[] tables, String... sqls) {
protected <T> int dropTableDB(EntityInfo<T> info, String[] tables, FilterNode node, String... sqls) {
Connection conn = null;
final long s = System.currentTimeMillis();
try {
conn = writePool.pollConnection();
conn.setReadOnly(false);
conn.setAutoCommit(false);
int c = 0;
final Statement stmt = conn.createStatement();
int c;
if (sqls.length == 1) {
String sql = sqls[0];
c = stmt.executeUpdate(sql);
final Statement stmt = conn.createStatement();
int c1 = stmt.executeUpdate(sqls[0]);
stmt.close();
c = c1;
} else {
final Statement stmt = conn.createStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
int c1 = 0;
int[] cs = stmt.executeBatch();
stmt.close();
for (int cc : cs) {
c += cc;
c1 += cc;
}
c = c1;
}
stmt.close();
conn.commit();
if (info.getTableStrategy() != null) {
for (String table : tables) {
String tablekey = table.indexOf('.') > 0 ? table : (conn.getCatalog() + '.' + table);
info.removeDisTable(tablekey);
}
}
slowLog(s, sqls);
return c;
} catch (SQLException e) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException se) {
}
try {
conn.rollback();
} catch (SQLException se) {
}
if (isTableNotExist(info, e.getSQLState())) {
return -1;
if (info.getTableStrategy() == null) {
//单表结构不存在
return 0;
} else if (tables != null && tables.length == 1) {
//只查一个不存在的分表
return 0;
} else if (tables != null && tables.length > 1) {
//多分表查询中一个或多个分表不存在
String tableName = parseNotExistTableName(e);
if (tableName == null) {
throw new SourceException(e);
}
String[] oldTables = tables;
List<String> notExistTables = checkNotExistTablesNoThrows(conn, tables, tableName);
if (notExistTables.isEmpty()) {
throw new SourceException(e);
}
for (String t : notExistTables) {
tables = Utility.remove(tables, t);
}
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "dropTableDB, old-tables: " + Arrays.toString(oldTables) + ", new-tables: " + Arrays.toString(tables));
}
if (tables.length == 0) { //分表全部不存在
return 0;
}
sqls = dropTableSql(info, tables, node);
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " dropTable sql=" + Arrays.toString(sqls));
}
try {
final Statement stmt = conn.createStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
int c = 0;
int[] cs = stmt.executeBatch();
stmt.close();
for (int cc : cs) {
c += cc;
}
conn.commit();
slowLog(s, sqls);
return c;
} catch (SQLException se) {
throw new SourceException(se);
}
} else {
throw new SourceException(e);
}
}
throw new SourceException(e);
} finally {
@@ -1565,25 +1690,35 @@ public class DataJdbcSource extends DataSqlSource {
return new String[]{listSql, countSql};
}
protected List<String> checkNotExistTablesNoThrows(Connection conn, String[] tables, String firstNotExistTable) {
try {
return checkNotExistTables(conn, tables, firstNotExistTable);
} catch (SQLException e) {
throw new SourceException(e);
}
}
protected List<String> checkNotExistTables(Connection conn, String[] tables, String firstNotExistTable) throws SQLException {
String minTableName = (firstNotExistTable.indexOf('.') > 0) ? firstNotExistTable.substring(firstNotExistTable.indexOf('.') + 1) : null;
List<String> maybeNoTables = new ArrayList<>();
for (String t : tables) {
if (!maybeNoTables.isEmpty()) {
maybeNoTables.add(t);
}
if (t.equals(firstNotExistTable) || (minTableName != null && t.equals(minTableName))) {
maybeNoTables.add(t);
}
}
if (maybeNoTables.isEmpty()) {
return maybeNoTables;
}
// 数据库不一定要按批量提交的SQL顺序执行 所以第一个不存在的表不一定再tables的第一位,
// 比如 DELETE FROM table1; DELETE FROM table2; 如果table1、table2都不存在SQL可能会抛出table2不存在的异常
// List<String> maybeNoTables = new ArrayList<>();
// String minTableName = (firstNotExistTable.indexOf('.') > 0) ? firstNotExistTable.substring(firstNotExistTable.indexOf('.') + 1) : null;
// for (String t : tables) {
// if (!maybeNoTables.isEmpty()) {
// maybeNoTables.add(t);
// }
// if (t.equals(firstNotExistTable) || (minTableName != null && t.equals(minTableName))) {
// maybeNoTables.add(t);
// }
// }
// if (maybeNoTables.isEmpty()) {
// return maybeNoTables;
// }
String[] tableTypes = new String[]{"TABLE"};
DatabaseMetaData dmd = conn.getMetaData();
List<String> rs = new ArrayList<>();
for (String t : maybeNoTables) {
for (String t : tables) { //maybeNoTables
String catalog = null;
String table = t;
int pos = t.indexOf('.');

View File

@@ -122,17 +122,17 @@ public class DataMemorySource extends DataSqlSource implements SearchSource {
}
@Override
protected <T> int deleteDB(EntityInfo<T> info, String[] tables, Flipper flipper, String... sqls) {
protected <T> int deleteDB(EntityInfo<T> info, String[] tables, Flipper flipper, FilterNode node, Map<String, List<Serializable>> pkmap, String... sqls) {
return 0;
}
@Override
protected <T> int clearTableDB(EntityInfo<T> info, String[] tables, String... sqls) {
protected <T> int clearTableDB(EntityInfo<T> info, String[] tables, FilterNode node, String... sqls) {
return 0;
}
@Override
protected <T> int dropTableDB(EntityInfo<T> info, String[] tables, String... sqls) {
protected <T> int dropTableDB(EntityInfo<T> info, String[] tables, FilterNode node, String... sqls) {
return 0;
}
@@ -192,17 +192,17 @@ public class DataMemorySource extends DataSqlSource implements SearchSource {
}
@Override
protected <T> CompletableFuture<Integer> deleteDBAsync(EntityInfo<T> info, String[] tables, Flipper flipper, String... sqls) {
protected <T> CompletableFuture<Integer> deleteDBAsync(EntityInfo<T> info, String[] tables, Flipper flipper, FilterNode node, Map<String, List<Serializable>> pkmap, String... sqls) {
return CompletableFuture.completedFuture(0);
}
@Override
protected <T> CompletableFuture<Integer> clearTableDBAsync(EntityInfo<T> info, String[] tables, String... sqls) {
protected <T> CompletableFuture<Integer> clearTableDBAsync(EntityInfo<T> info, String[] tables, FilterNode node, String... sqls) {
return CompletableFuture.completedFuture(0);
}
@Override
protected <T> CompletableFuture<Integer> dropTableDBAsync(EntityInfo<T> info, String[] tables, String... sqls) {
protected <T> CompletableFuture<Integer> dropTableDBAsync(EntityInfo<T> info, String[] tables, FilterNode node, String... sqls) {
return CompletableFuture.completedFuture(0);
}

View File

@@ -667,13 +667,13 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
protected abstract <T> CompletableFuture<Integer> insertDBAsync(final EntityInfo<T> info, T... entitys);
//删除记录
protected abstract <T> CompletableFuture<Integer> deleteDBAsync(final EntityInfo<T> info, String[] tables, Flipper flipper, final String... sqls);
protected abstract <T> CompletableFuture<Integer> deleteDBAsync(final EntityInfo<T> info, String[] tables, Flipper flipper, FilterNode node, Map<String, List<Serializable>> pkmap, final String... sqls);
//清空表
protected abstract <T> CompletableFuture<Integer> clearTableDBAsync(final EntityInfo<T> info, String[] tables, final String... sqls);
protected abstract <T> CompletableFuture<Integer> clearTableDBAsync(final EntityInfo<T> info, String[] tables, FilterNode node, final String... sqls);
//删除表
protected abstract <T> CompletableFuture<Integer> dropTableDBAsync(final EntityInfo<T> info, String[] tables, final String... sqls);
protected abstract <T> CompletableFuture<Integer> dropTableDBAsync(final EntityInfo<T> info, String[] tables, FilterNode node, final String... sqls);
//更新纪录
protected abstract <T> CompletableFuture<Integer> updateEntityDBAsync(final EntityInfo<T> info, T... entitys);
@@ -711,18 +711,18 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
}
//删除记录
protected <T> int deleteDB(final EntityInfo<T> info, String[] tables, Flipper flipper, final String... sqls) {
return deleteDBAsync(info, tables, flipper, sqls).join();
protected <T> int deleteDB(final EntityInfo<T> info, String[] tables, Flipper flipper, FilterNode node, Map<String, List<Serializable>> pkmap, final String... sqls) {
return deleteDBAsync(info, tables, flipper, node, pkmap, sqls).join();
}
//清空表
protected <T> int clearTableDB(final EntityInfo<T> info, String[] tables, final String... sqls) {
return clearTableDBAsync(info, tables, sqls).join();
protected <T> int clearTableDB(final EntityInfo<T> info, String[] tables, FilterNode node, final String... sqls) {
return clearTableDBAsync(info, tables, node, sqls).join();
}
//删除表
protected <T> int dropTableDB(final EntityInfo<T> info, String[] tables, final String... sqls) {
return dropTableDBAsync(info, tables, sqls).join();
protected <T> int dropTableDB(final EntityInfo<T> info, String[] tables, FilterNode node, final String... sqls) {
return dropTableDBAsync(info, tables, node, sqls).join();
}
//更新纪录
@@ -974,17 +974,18 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
if (isOnlyCache(info)) {
return deleteCache(info, -1, pks);
}
String[] tables = new String[]{info.getTable(pks[0])};
String[] sqls = deleteSql(info, pks);
Map<String, List<Serializable>> pkmap = info.getTableMap(pks);
String[] tables = pkmap.keySet().toArray(new String[pkmap.size()]);
String[] sqls = deleteSql(info, pkmap);
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " delete sql=" + Arrays.toString(sqls));
}
if (isAsync()) {
int rs = deleteDBAsync(info, tables, null, sqls).join();
int rs = deleteDBAsync(info, tables, null, null, pkmap, sqls).join();
deleteCache(info, rs, pks);
return rs;
} else {
int rs = deleteDB(info, tables, null, sqls);
int rs = deleteDB(info, tables, null, null, pkmap, sqls);
deleteCache(info, rs, pks);
return rs;
}
@@ -999,13 +1000,14 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
if (isOnlyCache(info)) {
return CompletableFuture.completedFuture(deleteCache(info, -1, pks));
}
String[] tables = new String[]{info.getTable(pks[0])};
String[] sqls = deleteSql(info, pks);
Map<String, List<Serializable>> pkmap = info.getTableMap(pks);
String[] tables = pkmap.keySet().toArray(new String[pkmap.size()]);
String[] sqls = deleteSql(info, pkmap);
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " delete sql=" + Arrays.toString(sqls));
}
if (isAsync()) {
return deleteDBAsync(info, tables, null, sqls).whenComplete((rs, t) -> {
return deleteDBAsync(info, tables, null, null, pkmap, sqls).whenComplete((rs, t) -> {
if (t != null) {
errorCompleteConsumer.accept(rs, t);
} else {
@@ -1013,7 +1015,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
}
});
} else {
return supplyAsync(() -> deleteDB(info, tables, null, sqls)).whenComplete((rs, t) -> {
return supplyAsync(() -> deleteDB(info, tables, null, null, pkmap, sqls)).whenComplete((rs, t) -> {
if (t != null) {
errorCompleteConsumer.accept(rs, t);
} else {
@@ -1045,11 +1047,11 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
logger.finest(info.getType().getSimpleName() + " delete sql=" + Arrays.toString(sqls));
}
if (isAsync()) {
int rs = deleteDBAsync(info, tables, flipper, sqls).join();
int rs = deleteDBAsync(info, tables, flipper, node, null, sqls).join();
deleteCache(info, rs, flipper, sqls);
return rs;
} else {
int rs = deleteDB(info, tables, flipper, sqls);
int rs = deleteDB(info, tables, flipper, node, null, sqls);
deleteCache(info, rs, flipper, sqls);
return rs;
}
@@ -1067,7 +1069,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
logger.finest(info.getType().getSimpleName() + " delete sql=" + Arrays.toString(sqls));
}
if (isAsync()) {
return deleteDBAsync(info, tables, flipper, sqls).whenComplete((rs, t) -> {
return deleteDBAsync(info, tables, flipper, node, null, sqls).whenComplete((rs, t) -> {
if (t != null) {
errorCompleteConsumer.accept(rs, t);
} else {
@@ -1075,7 +1077,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
}
});
} else {
return supplyAsync(() -> deleteDB(info, tables, flipper, sqls)).whenComplete((rs, t) -> {
return supplyAsync(() -> deleteDB(info, tables, flipper, node, null, sqls)).whenComplete((rs, t) -> {
if (t != null) {
errorCompleteConsumer.accept(rs, t);
} else {
@@ -1085,25 +1087,29 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
}
}
protected <T> String[] deleteSql(final EntityInfo<T> info, final Serializable... pks) {
if (pks.length == 1) {
String sql = "DELETE FROM " + info.getTable(pks[0]) + " WHERE " + info.getPrimarySQLColumn() + "=" + info.formatSQLValue(info.getPrimarySQLColumn(), pks[0], sqlFormatter);
return new String[]{sql};
} else {
String sql = "DELETE FROM " + info.getTable(pks[0]) + " WHERE " + info.getPrimarySQLColumn() + " IN (";
for (int i = 0; i < pks.length; i++) {
if (i > 0) {
sql += ',';
protected <T> String[] deleteSql(final EntityInfo<T> info, final Map<String, List<Serializable>> pkmap) {
List<String> sqls = new ArrayList<>();
final String pkSQLColumn = info.getPrimarySQLColumn();
pkmap.forEach((table, pks) -> {
String sql;
if (pks.size() == 1) {
sql = "DELETE FROM " + table + " WHERE " + pkSQLColumn + " = " + info.formatSQLValue(pkSQLColumn, pks.get(0), sqlFormatter);
} else {
sql = "DELETE FROM " + table + " WHERE " + pkSQLColumn + " IN (";
for (int i = 0; i < pks.size(); i++) {
if (i > 0) {
sql += ',';
}
sql += info.formatSQLValue(pkSQLColumn, pks.get(i), sqlFormatter);
}
sql += info.formatSQLValue(info.getPrimarySQLColumn(), pks[i], sqlFormatter);
sql += ")";
}
sql += ")";
return new String[]{sql};
}
sqls.add(sql);
});
return sqls.toArray(new String[sqls.size()]);
}
protected <T> String[] deleteSql(final EntityInfo<T> info, String[] tables, final Flipper flipper, final FilterNode node) {
boolean pgsql = "postgresql".equals(dbtype());
Map<Class, String> joinTabalis = null;
CharSequence join = null;
CharSequence where = null;
@@ -1119,25 +1125,23 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
join1 = multiSplit('[', ']', ",", new StringBuilder(), joinstr, 0);
join2 = multiSplit('{', '}', " AND ", new StringBuilder(), joinstr, 0);
}
final String join2AndWhere = ((where == null || where.length() == 0) ? (join2 == null ? ""
: (" WHERE " + join2)) : (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))));
if (pgsql && flipper != null && flipper.getLimit() > 0) {
String wherestr = ((where == null || where.length() == 0) ? (join2 == null ? "" : (" WHERE " + join2))
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2))));
if ("postgresql".equals(dbtype()) && flipper != null && flipper.getLimit() > 0) {
List<String> sqls = new ArrayList<>();
for (String table : tables) {
String sql = "DELETE FROM " + table + " a" + (join1 == null ? "" : (", " + join1))
+ " WHERE " + info.getPrimarySQLColumn() + " IN (SELECT " + info.getPrimaryColumn() + " FROM " + table
+ wherestr + info.createSQLOrderby(flipper) + " OFFSET 0 LIMIT " + flipper.getLimit() + ")";
String sql = "DELETE FROM " + table + " a" + (join1 == null ? "" : (", " + join1)) + " WHERE " + info.getPrimarySQLColumn() + " IN (SELECT " + info.getPrimaryColumn() + " FROM " + table
+ join2AndWhere + info.createSQLOrderby(flipper) + " OFFSET 0 LIMIT " + flipper.getLimit() + ")";
sqls.add(sql);
}
return sqls.toArray(new String[sqls.size()]);
} else {
boolean mysql = "mysql".equals(dbtype());
List<String> sqls = new ArrayList<>();
for (String table : tables) {
String sql = "DELETE " + ("mysql".equals(dbtype()) ? "a" : "") + " FROM " + table + " a" + (join1 == null ? "" : (", " + join1))
+ ((where == null || where.length() == 0) ? (join2 == null ? "" : (" WHERE " + join2))
: (" WHERE " + where + (join2 == null ? "" : (" AND " + join2)))) + info.createSQLOrderby(flipper)
+ (("mysql".equals(dbtype()) && flipper != null && flipper.getLimit() > 0) ? (" LIMIT " + flipper.getLimit()) : "");
String sql = "DELETE " + (mysql ? "a" : "") + " FROM " + table + " a" + (join1 == null ? "" : (", " + join1)) + join2AndWhere + info.createSQLOrderby(flipper)
+ ((mysql && flipper != null && flipper.getLimit() > 0) ? (" LIMIT " + flipper.getLimit()) : "");
sqls.add(sql);
}
return sqls.toArray(new String[sqls.size()]);
@@ -1162,11 +1166,11 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
logger.finest(info.getType().getSimpleName() + " clearTable sql=" + Arrays.toString(sqls));
}
if (isAsync()) {
int rs = clearTableDBAsync(info, tables, sqls).join();
int rs = clearTableDBAsync(info, tables, node, sqls).join();
clearTableCache(info, node);
return rs;
} else {
int rs = clearTableDB(info, tables, sqls);
int rs = clearTableDB(info, tables, node, sqls);
clearTableCache(info, node);
return rs;
}
@@ -1184,7 +1188,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
logger.finest(info.getType().getSimpleName() + " clearTable sql=" + Arrays.toString(sqls));
}
if (isAsync()) {
return clearTableDBAsync(info, tables, sqls).whenComplete((rs, t) -> {
return clearTableDBAsync(info, tables, node, sqls).whenComplete((rs, t) -> {
if (t != null) {
errorCompleteConsumer.accept(rs, t);
} else {
@@ -1192,7 +1196,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
}
});
} else {
return supplyAsync(() -> clearTableDB(info, tables, sqls)).whenComplete((rs, t) -> {
return supplyAsync(() -> clearTableDB(info, tables, node, sqls)).whenComplete((rs, t) -> {
if (t != null) {
errorCompleteConsumer.accept(rs, t);
} else {
@@ -1218,16 +1222,16 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
return dropTableCache(info, node);
}
final String[] tables = info.getTables(node);
String[] sqls = clearTableSql(info, tables, node);
String[] sqls = dropTableSql(info, tables, node);
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " dropTable sql=" + Arrays.toString(sqls));
}
if (isAsync()) {
int rs = dropTableDBAsync(info, tables, sqls).join();
int rs = dropTableDBAsync(info, tables, node, sqls).join();
dropTableCache(info, node);
return rs;
} else {
int rs = dropTableDB(info, tables, sqls);
int rs = dropTableDB(info, tables, node, sqls);
dropTableCache(info, node);
return rs;
}
@@ -1240,12 +1244,12 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
return CompletableFuture.completedFuture(dropTableCache(info, node));
}
final String[] tables = info.getTables(node);
String[] sqls = clearTableSql(info, tables, node);
String[] sqls = dropTableSql(info, tables, node);
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " dropTable sql=" + Arrays.toString(sqls));
}
if (isAsync()) {
return dropTableDBAsync(info, tables, sqls).whenComplete((rs, t) -> {
return dropTableDBAsync(info, tables, node, sqls).whenComplete((rs, t) -> {
if (t != null) {
errorCompleteConsumer.accept(rs, t);
} else {
@@ -1253,7 +1257,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
}
});
} else {
return supplyAsync(() -> dropTableDB(info, tables, sqls)).whenComplete((rs, t) -> {
return supplyAsync(() -> dropTableDB(info, tables, node, sqls)).whenComplete((rs, t) -> {
if (t != null) {
errorCompleteConsumer.accept(rs, t);
} else {