优化DataJdbcSource

This commit is contained in:
Redkale
2023-01-02 10:01:24 +08:00
parent 3a306a6235
commit 496b165d42

View File

@@ -201,328 +201,6 @@ public class DataJdbcSource extends DataSqlSource {
return i;
}
@Override
protected <T> CompletableFuture<Integer> deleteDBAsync(final EntityInfo<T> info, String[] tables, Flipper flipper, FilterNode node, Map<String, List<Serializable>> pkmap, final String... sqls) {
return supplyAsync(() -> deleteDB(info, tables, flipper, node, pkmap, sqls));
}
@Override
protected <T> int deleteDB(EntityInfo<T> info, String[] tables, Flipper flipper, FilterNode node, Map<String, List<Serializable>> pkmap, String... sqls) {
Connection conn = null;
final long s = System.currentTimeMillis();
try {
conn = writePool.pollConnection();
conn.setReadOnly(false);
conn.setAutoCommit(false);
int c;
if (sqls.length == 1) {
final Statement stmt = conn.createStatement();
int c1 = stmt.executeUpdate(sqls[0]);
stmt.close();
c = c1;
} else {
final Statement stmt = conn.createStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
int c1 = 0;
int[] cs = stmt.executeBatch();
stmt.close();
for (int cc : cs) {
c1 += cc;
}
c = c1;
}
conn.commit();
slowLog(s, sqls);
return c;
} catch (SQLException e) {
try {
conn.rollback();
} catch (SQLException se) {
}
if (isTableNotExist(info, e.getSQLState())) {
if (info.getTableStrategy() == null) {
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) {
try {
Statement st = conn.createStatement();
if (tableSqls.length == 1) {
st.execute(tableSqls[0]);
} else {
for (String tableSql : tableSqls) {
st.addBatch(tableSql);
}
st.executeBatch();
}
st.close();
return 0;
} catch (SQLException e2) {
throw new SourceException(e2);
}
}
//单表结构不存在
return 0;
} else if (tables != null && tables.length == 1) {
//只查一个不存在的分表
return 0;
} else if (tables != null && tables.length > 1) {
//多分表查询中一个或多个分表不存在
// String tableName = parseNotExistTableName(e);
// if (tableName == null) {
// throw new SourceException(e);
// }
String[] oldTables = tables;
List<String> notExistTables = checkNotExistTablesNoThrows(conn, tables);
if (notExistTables.isEmpty()) {
throw new SourceException(e);
}
for (String t : notExistTables) {
if (pkmap != null) {
pkmap.remove(t);
} else {
tables = Utility.remove(tables, t);
}
}
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "deleteDB, old-tables: " + Arrays.toString(oldTables) + ", new-tables: " + (pkmap != null ? pkmap.keySet() : Arrays.toString(tables)));
}
if ((pkmap != null ? pkmap.size() : tables.length) == 0) { //分表全部不存在
return 0;
}
sqls = pkmap != null ? deleteSql(info, pkmap) : deleteSql(info, tables, flipper, node);
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " delete sql=" + Arrays.toString(sqls));
}
try {
final Statement stmt = conn.createStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
int c = 0;
int[] cs = stmt.executeBatch();
stmt.close();
for (int cc : cs) {
c += cc;
}
conn.commit();
slowLog(s, sqls);
return c;
} catch (SQLException se) {
throw new SourceException(se);
}
} else {
throw new SourceException(e);
}
}
throw new SourceException(e);
} finally {
if (conn != null) {
writePool.offerConnection(conn);
}
}
}
@Override
protected <T> CompletableFuture<Integer> clearTableDBAsync(EntityInfo<T> info, final String[] tables, FilterNode node, String... sqls) {
return supplyAsync(() -> clearTableDB(info, tables, node, sqls));
}
@Override
protected <T> int clearTableDB(EntityInfo<T> info, String[] tables, FilterNode node, String... sqls) {
Connection conn = null;
final long s = System.currentTimeMillis();
try {
conn = writePool.pollConnection();
conn.setReadOnly(false);
conn.setAutoCommit(false);
int c;
if (sqls.length == 1) {
final Statement stmt = conn.createStatement();
int c1 = stmt.executeUpdate(sqls[0]);
stmt.close();
c = c1;
} else {
final Statement stmt = conn.createStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
int c1 = 0;
int[] cs = stmt.executeBatch();
stmt.close();
for (int cc : cs) {
c1 += cc;
}
c = c1;
}
conn.commit();
slowLog(s, sqls);
return c;
} catch (SQLException e) {
try {
conn.rollback();
} catch (SQLException se) {
}
if (isTableNotExist(info, e.getSQLState())) {
if (info.getTableStrategy() == null) {
//单表结构不存在
return 0;
} else if (tables != null && tables.length == 1) {
//只查一个不存在的分表
return 0;
} else if (tables != null && tables.length > 1) {
//多分表查询中一个或多个分表不存在
// String tableName = parseNotExistTableName(e);
// if (tableName == null) {
// throw new SourceException(e);
// }
String[] oldTables = tables;
List<String> notExistTables = checkNotExistTablesNoThrows(conn, tables);
if (notExistTables.isEmpty()) {
throw new SourceException(e);
}
for (String t : notExistTables) {
tables = Utility.remove(tables, t);
}
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "clearTableDB, old-tables: " + Arrays.toString(oldTables) + ", new-tables: " + Arrays.toString(tables));
}
if (tables.length == 0) { //分表全部不存在
return 0;
}
sqls = clearTableSql(info, tables, node);
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " clearTable sql=" + Arrays.toString(sqls));
}
try {
final Statement stmt = conn.createStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
int c = 0;
int[] cs = stmt.executeBatch();
stmt.close();
for (int cc : cs) {
c += cc;
}
conn.commit();
slowLog(s, sqls);
return c;
} catch (SQLException se) {
throw new SourceException(se);
}
} else {
throw new SourceException(e);
}
}
throw new SourceException(e);
} finally {
if (conn != null) {
writePool.offerConnection(conn);
}
}
}
@Override
protected <T> CompletableFuture<Integer> dropTableDBAsync(EntityInfo<T> info, final String[] tables, FilterNode node, String... sqls) {
return supplyAsync(() -> dropTableDB(info, tables, node, sqls));
}
@Override
protected <T> int dropTableDB(EntityInfo<T> info, String[] tables, FilterNode node, String... sqls) {
Connection conn = null;
final long s = System.currentTimeMillis();
try {
conn = writePool.pollConnection();
conn.setReadOnly(false);
conn.setAutoCommit(false);
int c;
if (sqls.length == 1) {
final Statement stmt = conn.createStatement();
int c1 = stmt.executeUpdate(sqls[0]);
stmt.close();
c = c1;
} else {
final Statement stmt = conn.createStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
int c1 = 0;
int[] cs = stmt.executeBatch();
stmt.close();
for (int cc : cs) {
c1 += cc;
}
c = c1;
}
conn.commit();
slowLog(s, sqls);
return c;
} catch (SQLException e) {
try {
conn.rollback();
} catch (SQLException se) {
}
if (isTableNotExist(info, e.getSQLState())) {
if (info.getTableStrategy() == null) {
//单表结构不存在
return 0;
} else if (tables != null && tables.length == 1) {
//只查一个不存在的分表
return 0;
} else if (tables != null && tables.length > 1) {
//多分表查询中一个或多个分表不存在
// String tableName = parseNotExistTableName(e);
// if (tableName == null) {
// throw new SourceException(e);
// }
String[] oldTables = tables;
List<String> notExistTables = checkNotExistTablesNoThrows(conn, tables);
if (notExistTables.isEmpty()) {
throw new SourceException(e);
}
for (String t : notExistTables) {
tables = Utility.remove(tables, t);
}
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "dropTableDB, old-tables: " + Arrays.toString(oldTables) + ", new-tables: " + Arrays.toString(tables));
}
if (tables.length == 0) { //分表全部不存在
return 0;
}
sqls = dropTableSql(info, tables, node);
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " dropTable sql=" + Arrays.toString(sqls));
}
try {
final Statement stmt = conn.createStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
int c = 0;
int[] cs = stmt.executeBatch();
stmt.close();
for (int cc : cs) {
c += cc;
}
conn.commit();
slowLog(s, sqls);
return c;
} catch (SQLException se) {
throw new SourceException(se);
}
} else {
throw new SourceException(e);
}
}
throw new SourceException(e);
} finally {
if (conn != null) {
writePool.offerConnection(conn);
}
}
}
@Override
public int batch(final DataBatch batch) {
Objects.requireNonNull(batch);
@@ -805,6 +483,328 @@ public class DataJdbcSource extends DataSqlSource {
}
}
@Override
protected <T> CompletableFuture<Integer> deleteDBAsync(final EntityInfo<T> info, String[] tables, Flipper flipper, FilterNode node, Map<String, List<Serializable>> pkmap, final String... sqls) {
return supplyAsync(() -> deleteDB(info, tables, flipper, node, pkmap, sqls));
}
@Override
protected <T> int deleteDB(EntityInfo<T> info, String[] tables, Flipper flipper, FilterNode node, Map<String, List<Serializable>> pkmap, String... sqls) {
Connection conn = null;
final long s = System.currentTimeMillis();
try {
conn = writePool.pollConnection();
conn.setReadOnly(false);
conn.setAutoCommit(false);
int c;
if (sqls.length == 1) {
final Statement stmt = conn.createStatement();
int c1 = stmt.executeUpdate(sqls[0]);
stmt.close();
c = c1;
} else {
final Statement stmt = conn.createStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
int c1 = 0;
int[] cs = stmt.executeBatch();
stmt.close();
for (int cc : cs) {
c1 += cc;
}
c = c1;
}
conn.commit();
slowLog(s, sqls);
return c;
} catch (SQLException e) {
try {
conn.rollback();
} catch (SQLException se) {
}
if (isTableNotExist(info, e.getSQLState())) {
if (info.getTableStrategy() == null) {
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) {
try {
Statement st = conn.createStatement();
if (tableSqls.length == 1) {
st.execute(tableSqls[0]);
} else {
for (String tableSql : tableSqls) {
st.addBatch(tableSql);
}
st.executeBatch();
}
st.close();
return 0;
} catch (SQLException e2) {
throw new SourceException(e2);
}
}
//单表结构不存在
return 0;
} else if (tables != null && tables.length == 1) {
//只查一个不存在的分表
return 0;
} else if (tables != null && tables.length > 1) {
//多分表查询中一个或多个分表不存在
// String tableName = parseNotExistTableName(e);
// if (tableName == null) {
// throw new SourceException(e);
// }
String[] oldTables = tables;
List<String> notExistTables = checkNotExistTablesNoThrows(conn, tables);
if (notExistTables.isEmpty()) {
throw new SourceException(e);
}
for (String t : notExistTables) {
if (pkmap != null) {
pkmap.remove(t);
} else {
tables = Utility.remove(tables, t);
}
}
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "delete, old-tables: " + Arrays.toString(oldTables) + ", new-tables: " + (pkmap != null ? pkmap.keySet() : Arrays.toString(tables)));
}
if ((pkmap != null ? pkmap.size() : tables.length) == 0) { //分表全部不存在
return 0;
}
sqls = pkmap != null ? deleteSql(info, pkmap) : deleteSql(info, tables, flipper, node);
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " delete sql=" + Arrays.toString(sqls));
}
try {
final Statement stmt = conn.createStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
int c = 0;
int[] cs = stmt.executeBatch();
stmt.close();
for (int cc : cs) {
c += cc;
}
conn.commit();
slowLog(s, sqls);
return c;
} catch (SQLException se) {
throw new SourceException(se);
}
} else {
throw new SourceException(e);
}
}
throw new SourceException(e);
} finally {
if (conn != null) {
writePool.offerConnection(conn);
}
}
}
@Override
protected <T> CompletableFuture<Integer> clearTableDBAsync(EntityInfo<T> info, final String[] tables, FilterNode node, String... sqls) {
return supplyAsync(() -> clearTableDB(info, tables, node, sqls));
}
@Override
protected <T> int clearTableDB(EntityInfo<T> info, String[] tables, FilterNode node, String... sqls) {
Connection conn = null;
final long s = System.currentTimeMillis();
try {
conn = writePool.pollConnection();
conn.setReadOnly(false);
conn.setAutoCommit(false);
int c;
if (sqls.length == 1) {
final Statement stmt = conn.createStatement();
int c1 = stmt.executeUpdate(sqls[0]);
stmt.close();
c = c1;
} else {
final Statement stmt = conn.createStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
int c1 = 0;
int[] cs = stmt.executeBatch();
stmt.close();
for (int cc : cs) {
c1 += cc;
}
c = c1;
}
conn.commit();
slowLog(s, sqls);
return c;
} catch (SQLException e) {
try {
conn.rollback();
} catch (SQLException se) {
}
if (isTableNotExist(info, e.getSQLState())) {
if (info.getTableStrategy() == null) {
//单表结构不存在
return 0;
} else if (tables != null && tables.length == 1) {
//只查一个不存在的分表
return 0;
} else if (tables != null && tables.length > 1) {
//多分表查询中一个或多个分表不存在
// String tableName = parseNotExistTableName(e);
// if (tableName == null) {
// throw new SourceException(e);
// }
String[] oldTables = tables;
List<String> notExistTables = checkNotExistTablesNoThrows(conn, tables);
if (notExistTables.isEmpty()) {
throw new SourceException(e);
}
for (String t : notExistTables) {
tables = Utility.remove(tables, t);
}
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "clearTable, old-tables: " + Arrays.toString(oldTables) + ", new-tables: " + Arrays.toString(tables));
}
if (tables.length == 0) { //分表全部不存在
return 0;
}
sqls = clearTableSql(info, tables, node);
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " clearTable sql=" + Arrays.toString(sqls));
}
try {
final Statement stmt = conn.createStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
int c = 0;
int[] cs = stmt.executeBatch();
stmt.close();
for (int cc : cs) {
c += cc;
}
conn.commit();
slowLog(s, sqls);
return c;
} catch (SQLException se) {
throw new SourceException(se);
}
} else {
throw new SourceException(e);
}
}
throw new SourceException(e);
} finally {
if (conn != null) {
writePool.offerConnection(conn);
}
}
}
@Override
protected <T> CompletableFuture<Integer> dropTableDBAsync(EntityInfo<T> info, final String[] tables, FilterNode node, String... sqls) {
return supplyAsync(() -> dropTableDB(info, tables, node, sqls));
}
@Override
protected <T> int dropTableDB(EntityInfo<T> info, String[] tables, FilterNode node, String... sqls) {
Connection conn = null;
final long s = System.currentTimeMillis();
try {
conn = writePool.pollConnection();
conn.setReadOnly(false);
conn.setAutoCommit(false);
int c;
if (sqls.length == 1) {
final Statement stmt = conn.createStatement();
int c1 = stmt.executeUpdate(sqls[0]);
stmt.close();
c = c1;
} else {
final Statement stmt = conn.createStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
int c1 = 0;
int[] cs = stmt.executeBatch();
stmt.close();
for (int cc : cs) {
c1 += cc;
}
c = c1;
}
conn.commit();
slowLog(s, sqls);
return c;
} catch (SQLException e) {
try {
conn.rollback();
} catch (SQLException se) {
}
if (isTableNotExist(info, e.getSQLState())) {
if (info.getTableStrategy() == null) {
//单表结构不存在
return 0;
} else if (tables != null && tables.length == 1) {
//只查一个不存在的分表
return 0;
} else if (tables != null && tables.length > 1) {
//多分表查询中一个或多个分表不存在
// String tableName = parseNotExistTableName(e);
// if (tableName == null) {
// throw new SourceException(e);
// }
String[] oldTables = tables;
List<String> notExistTables = checkNotExistTablesNoThrows(conn, tables);
if (notExistTables.isEmpty()) {
throw new SourceException(e);
}
for (String t : notExistTables) {
tables = Utility.remove(tables, t);
}
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "dropTable, old-tables: " + Arrays.toString(oldTables) + ", new-tables: " + Arrays.toString(tables));
}
if (tables.length == 0) { //分表全部不存在
return 0;
}
sqls = dropTableSql(info, tables, node);
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
logger.finest(info.getType().getSimpleName() + " dropTable sql=" + Arrays.toString(sqls));
}
try {
final Statement stmt = conn.createStatement();
for (String sql : sqls) {
stmt.addBatch(sql);
}
int c = 0;
int[] cs = stmt.executeBatch();
stmt.close();
for (int cc : cs) {
c += cc;
}
conn.commit();
slowLog(s, sqls);
return c;
} catch (SQLException se) {
throw new SourceException(se);
}
} else {
throw new SourceException(e);
}
}
throw new SourceException(e);
} finally {
if (conn != null) {
writePool.offerConnection(conn);
}
}
}
@Override
protected <T> CompletableFuture<Integer> updateEntityDBAsync(EntityInfo<T> info, T... entitys) {
return supplyAsync(() -> updateEntityDB(info, entitys));