优化DataJdbcSource

This commit is contained in:
Redkale
2022-12-28 21:26:12 +08:00
parent 1622cb0285
commit 43a665497d
2 changed files with 280 additions and 230 deletions

View File

@@ -445,7 +445,6 @@ public class DataJdbcSource extends DataSqlSource {
st.close();
} else { //分库分表
synchronized (info.disTableLock()) {
final String catalog = conn.getCatalog();
final Set<String> newCatalogs = new LinkedHashSet<>();
final List<String> tableCopys = new ArrayList<>();
prepareInfos.forEach((t, p) -> {
@@ -647,17 +646,12 @@ public class DataJdbcSource extends DataSqlSource {
List<PreparedStatement> prestmts = null;
Map<String, PrepareInfo<T>> prepareInfos = null;
SQLException ex = null;
try {
conn = writePool.pollConnection();
conn.setReadOnly(false);
conn.setAutoCommit(false);
int c = -1;
final Attribute<T, Serializable>[] attrs = info.updateAttributes;
int retry = 0;
AGAIN:
while (retry++ < MAX_RETRYS) {
try {
if (info.getTableStrategy() == null) {
presql = info.getUpdateQuestionPrepareSQL(entitys[0]);
@@ -670,9 +664,7 @@ public class DataJdbcSource extends DataSqlSource {
c = c1;
prestmt.close();
} else {
if (prepareInfos == null) {
prepareInfos = getUpdateQuestionPrepareInfo(info, entitys);
}
prestmts = createUpdatePreparedStatements(conn, info, prepareInfos, entitys);
int c1 = 0;
for (PreparedStatement stmt : prestmts) {
@@ -687,9 +679,7 @@ public class DataJdbcSource extends DataSqlSource {
}
}
conn.commit();
break;
} catch (SQLException se) {
ex = se;
conn.rollback();
if (isTableNotExist(info, se.getSQLState())) {
if (info.getTableStrategy() == null) {
@@ -709,40 +699,46 @@ public class DataJdbcSource extends DataSqlSource {
} catch (SQLException e2) {
}
}
//表不存在更新条数为0
//表不存在更新条数为0
return CompletableFuture.completedFuture(0);
} else {
String tableName = parseNotExistTableName(se);
if (tableName == null || prepareInfos == null) {
return CompletableFuture.failedFuture(se);
}
String minTableName = (tableName.indexOf('.') > 0) ? tableName.substring(tableName.indexOf('.') + 1) : null;
for (String t : prepareInfos.keySet()) {
if (t.equals(tableName)) {
prepareInfos.remove(t);
if (info.getTableStrategy() == null) {
prestmt.close();
} else {
for (PreparedStatement stmt : prestmts) {
stmt.close();
}
}
continue AGAIN;
} else if (minTableName != null && t.equals(minTableName)) {
prepareInfos.remove(t);
if (info.getTableStrategy() == null) {
prestmt.close();
} else {
for (PreparedStatement stmt : prestmts) {
stmt.close();
}
}
continue AGAIN;
}
}
String[] oldTables = prepareInfos.keySet().toArray(new String[prepareInfos.size()]);
List<String> notExistTables = checkNotExistTables(conn, oldTables, tableName);
if (notExistTables.isEmpty()) {
return CompletableFuture.failedFuture(se);
}
for (String t : notExistTables) {
prepareInfos.remove(t);
}
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "update entitys, old-tables: " + Arrays.toString(oldTables) + ", new-tables: " + prepareInfos.keySet());
}
if (prepareInfos.isEmpty()) { //分表全部不存在
return CompletableFuture.completedFuture(0);
}
prestmts = createUpdatePreparedStatements(conn, info, prepareInfos, entitys);
int c1 = 0;
for (PreparedStatement stmt : prestmts) {
int[] cs = stmt.executeBatch();
for (int cc : cs) {
c1 += cc;
}
}
c = c1;
for (PreparedStatement stmt : prestmts) {
stmt.close();
}
conn.commit();
}
} else {
throw se;
}
}
@@ -804,11 +800,7 @@ public class DataJdbcSource extends DataSqlSource {
});
slowLog(s, presqls.toArray(new String[presqls.size()]));
}
if (c >= 0) {
return CompletableFuture.completedFuture(c);
} else {
return CompletableFuture.failedFuture(ex);
}
} catch (SQLException e) {
if (conn != null) {
try {
@@ -1250,68 +1242,90 @@ public class DataJdbcSource extends DataSqlSource {
}
@Override
protected <T> CompletableFuture<Sheet<T>> querySheetDB(EntityInfo<T> info, final boolean readcache, boolean needTotal, final boolean distinct, SelectColumn selects, Flipper flipper, FilterNode node) {
protected <T> CompletableFuture<Sheet<T>> querySheetDB(EntityInfo<T> info, final boolean readCache, boolean needTotal, final boolean distinct, SelectColumn selects, Flipper flipper, FilterNode node) {
Connection conn = null;
final long s = System.currentTimeMillis();
final SelectColumn sels = selects;
final List<T> list = new ArrayList();
final Map<Class, String> joinTabalis = node == null ? null : node.getJoinTabalis();
final CharSequence join = node == null ? null : node.createSQLJoin(this, false, joinTabalis, new HashSet<>(), info);
final CharSequence where = node == null ? null : node.createSQLExpress(this, info, joinTabalis);
String[] tables = info.getTables(node);
final String joinAndWhere = (join == null ? "" : join) + ((where == null || where.length() == 0) ? "" : (" WHERE " + where));
final boolean mysqlOrPgsql = "mysql".equals(dbtype()) || "postgresql".equals(dbtype());
SQLException ex = null;
try {
conn = readPool.pollConnection();
PreparedStatement ps = null;
//conn.setReadOnly(true);
int retry = 0;
AGAIN:
while (retry++ < MAX_RETRYS) {
String listSql = null;
String countSql = null;
{ //组装listSql、countSql
String listSubSql;
StringBuilder union = new StringBuilder();
if (tables.length == 1) {
listSubSql = "SELECT " + (distinct ? "DISTINCT " : "") + info.getFullQueryColumns("a", selects) + " FROM " + tables[0] + " a" + joinAndWhere;
} else {
int b = 0;
for (String table : tables) {
if (!union.isEmpty()) union.append(" UNION ALL ");
union.append("SELECT ").append(info.getFullQueryColumns("a", selects)).append(" FROM ").append(table).append(" a").append(joinAndWhere);
}
listSubSql = "SELECT " + (distinct ? "DISTINCT " : "") + info.getFullQueryColumns("a", selects) + " FROM (" + (union) + ") a";
}
listSql = listSubSql + createSQLOrderby(info, flipper);
if (mysqlOrPgsql) {
listSql += (flipper == null || flipper.getLimit() < 1 ? "" : (" LIMIT " + flipper.getLimit() + " OFFSET " + flipper.getOffset()));
if (readcache && info.isLoggable(logger, Level.FINEST, listSql)) {
logger.finest(info.getType().getSimpleName() + " query sql=" + listSql);
}
} else {
if (readcache && info.isLoggable(logger, Level.FINEST, listSql)) {
logger.finest(info.getType().getSimpleName() + " query sql=" + listSql + (flipper == null || flipper.getLimit() < 1 ? "" : (" LIMIT " + flipper.getLimit() + " OFFSET " + flipper.getOffset())));
}
}
if (mysqlOrPgsql && needTotal) {
String countSubSql;
if (tables.length == 1) {
countSubSql = "SELECT " + (distinct ? "DISTINCT COUNT(" + info.getQueryColumns("a", selects) + ")" : "COUNT(*)") + " FROM " + tables[0] + " a" + joinAndWhere;
} else {
countSubSql = "SELECT " + (distinct ? "DISTINCT COUNT(" + info.getQueryColumns("a", selects) + ")" : "COUNT(*)") + " FROM (" + (union) + ") a";
}
countSql = countSubSql;
if (readcache && info.isLoggable(logger, Level.FINEST, countSql)) {
logger.finest(info.getType().getSimpleName() + " query countsql=" + countSql);
}
}
}
String[] sqls = createSheetListAndCountSql(info, readCache, needTotal, distinct, selects, flipper, mysqlOrPgsql, tables, joinAndWhere);
String listSql = sqls[0];
String countSql = sqls[1];
try {
return executeQuerySheet(info, needTotal, flipper, sels, s, conn, mysqlOrPgsql, listSql, countSql);
} catch (SQLException se) {
if (isTableNotExist(info, se.getSQLState())) {
if (info.getTableStrategy() == null) {
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) {
try {
Statement st = conn.createStatement();
if (tableSqls.length == 1) {
st.execute(tableSqls[0]);
} else {
for (String tableSql : tableSqls) {
st.addBatch(tableSql);
}
st.executeBatch();
}
st.close();
} catch (SQLException e2) {
}
}
return CompletableFuture.completedFuture(new Sheet<>(0, new ArrayList()));
} else if (tables != null && tables.length == 1) {
//只查一个不存在的分表
return CompletableFuture.completedFuture(new Sheet<>(0, new ArrayList()));
} else if (tables != null && tables.length > 1) {
//多分表查询中一个或多个分表不存在
String tableName = parseNotExistTableName(se);
if (tableName == null) {
return CompletableFuture.failedFuture(se);
}
String[] oldTables = tables;
List<String> notExistTables = checkNotExistTables(conn, tables, tableName);
if (notExistTables.isEmpty()) {
return CompletableFuture.failedFuture(se);
}
for (String t : notExistTables) {
tables = Utility.remove(tables, t);
}
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "query sheet, old-tables: " + Arrays.toString(oldTables) + ", new-tables: " + Arrays.toString(tables));
}
if (tables.length == 0) { //分表全部不存在
return CompletableFuture.completedFuture(new Sheet<>(0, new ArrayList()));
}
//重新查询一次
sqls = createSheetListAndCountSql(info, readCache, needTotal, distinct, selects, flipper, mysqlOrPgsql, tables, joinAndWhere);
listSql = sqls[0];
countSql = sqls[1];
return executeQuerySheet(info, needTotal, flipper, sels, s, conn, mysqlOrPgsql, listSql, countSql);
} else {
return CompletableFuture.failedFuture(se);
}
}
return CompletableFuture.failedFuture(se);
}
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
} finally {
if (conn != null) readPool.offerConnection(conn);
}
}
private <T> CompletableFuture<Sheet<T>> executeQuerySheet(EntityInfo<T> info, boolean needTotal, Flipper flipper, SelectColumn sels,
long s, Connection conn, boolean mysqlOrPgsql, String listSql, String countSql) throws SQLException {
final List<T> list = new ArrayList();
if (mysqlOrPgsql) { //sql可以带limit、offset
ps = conn.prepareStatement(listSql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
PreparedStatement ps = conn.prepareStatement(listSql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
ResultSet set = ps.executeQuery();
final DataResultSet rr = createDataResultSet(info, set);
while (set.next()) {
@@ -1331,7 +1345,7 @@ public class DataJdbcSource extends DataSqlSource {
return CompletableFuture.completedFuture(new Sheet<>(total, list));
} else {
//conn.setReadOnly(true);
ps = conn.prepareStatement(listSql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
PreparedStatement ps = conn.prepareStatement(listSql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
if (flipper != null && flipper.getLimit() > 0) ps.setFetchSize(flipper.getLimit());
ResultSet set = ps.executeQuery();
if (flipper != null && flipper.getOffset() > 0) set.absolute(flipper.getOffset());
@@ -1361,60 +1375,85 @@ public class DataJdbcSource extends DataSqlSource {
slowLog(s, listSql);
return CompletableFuture.completedFuture(new Sheet<>(total, list));
}
} catch (SQLException se) {
ex = se;
if (isTableNotExist(info, se.getSQLState())) {
if (info.getTableStrategy() == null) {
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) {
try {
Statement st = conn.createStatement();
if (tableSqls.length == 1) {
st.execute(tableSqls[0]);
}
private <T> String[] createSheetListAndCountSql(EntityInfo<T> info, final boolean readCache, boolean needTotal,
final boolean distinct, SelectColumn selects, Flipper flipper, boolean mysqlOrPgsql, String[] tables, String joinAndWhere) {
String listSql = null;
String countSql = null;
{ //组装listSql、countSql
String listSubSql;
StringBuilder union = new StringBuilder();
if (tables.length == 1) {
listSubSql = "SELECT " + (distinct ? "DISTINCT " : "") + info.getFullQueryColumns("a", selects) + " FROM " + tables[0] + " a" + joinAndWhere;
} else {
for (String tableSql : tableSqls) {
st.addBatch(tableSql);
int b = 0;
for (String table : tables) {
if (!union.isEmpty()) union.append(" UNION ALL ");
union.append("SELECT ").append(info.getFullQueryColumns("a", selects)).append(" FROM ").append(table).append(" a").append(joinAndWhere);
}
st.executeBatch();
listSubSql = "SELECT " + (distinct ? "DISTINCT " : "") + info.getFullQueryColumns("a", selects) + " FROM (" + (union) + ") a";
}
st.close();
} catch (SQLException e2) {
listSql = listSubSql + createSQLOrderby(info, flipper);
if (mysqlOrPgsql) {
listSql += (flipper == null || flipper.getLimit() < 1 ? "" : (" LIMIT " + flipper.getLimit() + " OFFSET " + flipper.getOffset()));
if (readCache && info.isLoggable(logger, Level.FINEST, listSql)) {
logger.finest(info.getType().getSimpleName() + " query sql=" + listSql);
}
} else {
if (readCache && info.isLoggable(logger, Level.FINEST, listSql)) {
logger.finest(info.getType().getSimpleName() + " query sql=" + listSql + (flipper == null || flipper.getLimit() < 1 ? "" : (" LIMIT " + flipper.getLimit() + " OFFSET " + flipper.getOffset())));
}
}
return CompletableFuture.completedFuture(new Sheet<>(0, new ArrayList()));
} else if (tables != null && tables.length == 1) {
//只查一个不存在的分表
return CompletableFuture.completedFuture(new Sheet<>(0, new ArrayList()));
} else if (tables != null && tables.length > 1) {
//多分表查询中一个或多个分表不存在
String tableName = parseNotExistTableName(se);
if (tableName == null) {
return CompletableFuture.failedFuture(se);
if (mysqlOrPgsql && needTotal) {
String countSubSql;
if (tables.length == 1) {
countSubSql = "SELECT " + (distinct ? "DISTINCT COUNT(" + info.getQueryColumns("a", selects) + ")" : "COUNT(*)") + " FROM " + tables[0] + " a" + joinAndWhere;
} else {
countSubSql = "SELECT " + (distinct ? "DISTINCT COUNT(" + info.getQueryColumns("a", selects) + ")" : "COUNT(*)") + " FROM (" + (union) + ") a";
}
String minTableName = (tableName.indexOf('.') > 0) ? tableName.substring(tableName.indexOf('.') + 1) : null;
if (ps != null) ps.close();
countSql = countSubSql;
if (readCache && info.isLoggable(logger, Level.FINEST, countSql)) {
logger.finest(info.getType().getSimpleName() + " query countsql=" + countSql);
}
}
}
return new String[]{listSql, countSql};
}
protected List<String> checkNotExistTables(Connection conn, String[] tables, String firstNotExistTable) throws SQLException {
String minTableName = (firstNotExistTable.indexOf('.') > 0) ? firstNotExistTable.substring(firstNotExistTable.indexOf('.') + 1) : null;
List<String> maybeNoTables = new ArrayList<>();
for (String t : tables) {
if (t.equals(tableName) || (minTableName != null && t.equals(minTableName))) {
tables = Utility.remove(tables, t);
if (tables.length < 1) { //分表都不存在
return CompletableFuture.completedFuture(new Sheet<>(0, new ArrayList()));
if (!maybeNoTables.isEmpty()) {
maybeNoTables.add(t);
}
continue AGAIN;
if (t.equals(firstNotExistTable) || (minTableName != null && t.equals(minTableName))) {
maybeNoTables.add(t);
}
}
} else {
return CompletableFuture.failedFuture(se);
if (maybeNoTables.isEmpty()) {
return maybeNoTables;
}
String[] tableTypes = new String[]{"TABLE"};
DatabaseMetaData dmd = conn.getMetaData();
List<String> rs = new ArrayList<>();
for (String t : maybeNoTables) {
String catalog = null;
String table = t;
int pos = t.indexOf('.');
if (pos > 0) {
catalog = t.substring(0, pos);
table = t.substring(pos + 1);
}
return CompletableFuture.failedFuture(se);
ResultSet dmdrs = dmd.getTables(catalog, null, table, tableTypes);
if (!dmdrs.next()) { //不存在表
rs.add(t);
}
dmdrs.close();
}
throw ex;
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
} finally {
if (conn != null) readPool.offerConnection(conn);
}
return rs;
}
/**
@@ -1511,21 +1550,31 @@ public class DataJdbcSource extends DataSqlSource {
@Override
public <T> Serializable getObject(Attribute<T, Serializable> attr, int index, String column) {
Class t = attr.type();
if (t == java.util.Date.class) {
Object val = index > 0 ? getObject(index) : getObject(column);
if (val == null) return null;
return new java.util.Date(((java.sql.Date) val).getTime());
if (val
== null) return null;
return new java.util.Date(
((java.sql.Date) val).getTime());
} else if (t == java.time.LocalDate.class) {
Object val = index > 0 ? getObject(index) : getObject(column);
if (val == null) return null;
if (val
== null) return null;
return ((java.sql.Date) val).toLocalDate();
} else if (t == java.time.LocalTime.class) {
Object val = index > 0 ? getObject(index) : getObject(column);
if (val == null) return null;
if (val
== null) return null;
return ((java.sql.Time) val).toLocalTime();
} else if (t == java.time.LocalDateTime.class) {
Object val = index > 0 ? getObject(index) : getObject(column);
if (val == null) return null;
if (val
== null) return null;
return ((java.sql.Timestamp) val).toLocalDateTime();
} else if (t.getName().startsWith("java.sql.")) {
return index > 0 ? (Serializable) getObject(index) : (Serializable) getObject(column);
@@ -1599,6 +1648,7 @@ public class DataJdbcSource extends DataSqlSource {
}
};
}
protected class ConnectionPool implements AutoCloseable {

View File

@@ -537,7 +537,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
@Local
protected <T> Map<String, PrepareInfo<T>> getInsertQuestionPrepareInfo(EntityInfo<T> info, T... entitys) {
Map<String, PrepareInfo<T>> map = new LinkedHashMap<>();
Map<String, PrepareInfo<T>> map = new LinkedHashMap<>();//一定要是LinkedHashMap
for (T entity : entitys) {
String table = info.getTable(entity);
map.computeIfAbsent(table, t -> new PrepareInfo(info.getInsertQuestionPrepareSQL(entity))).addEntity(entity);
@@ -547,7 +547,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
@Local
protected <T> Map<String, PrepareInfo<T>> getInsertDollarPrepareInfo(EntityInfo<T> info, T... entitys) {
Map<String, PrepareInfo<T>> map = new LinkedHashMap<>();
Map<String, PrepareInfo<T>> map = new LinkedHashMap<>();//一定要是LinkedHashMap
for (T entity : entitys) {
String table = info.getTable(entity);
map.computeIfAbsent(table, t -> new PrepareInfo(info.getInsertDollarPrepareSQL(entity))).addEntity(entity);
@@ -557,7 +557,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
@Local
protected <T> Map<String, PrepareInfo<T>> getUpdateQuestionPrepareInfo(EntityInfo<T> info, T... entitys) {
Map<String, PrepareInfo<T>> map = new LinkedHashMap<>();
Map<String, PrepareInfo<T>> map = new LinkedHashMap<>(); //一定要是LinkedHashMap
for (T entity : entitys) {
String table = info.getTable(entity);
map.computeIfAbsent(table, t -> new PrepareInfo(info.getUpdateQuestionPrepareSQL(entity))).addEntity(entity);
@@ -567,7 +567,7 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
@Local
protected <T> Map<String, PrepareInfo<T>> getUpdateDollarPrepareInfo(EntityInfo<T> info, T... entitys) {
Map<String, PrepareInfo<T>> map = new LinkedHashMap<>();
Map<String, PrepareInfo<T>> map = new LinkedHashMap<>();//一定要是LinkedHashMap
for (T entity : entitys) {
String table = info.getTable(entity);
map.computeIfAbsent(table, t -> new PrepareInfo(info.getUpdateDollarPrepareSQL(entity))).addEntity(entity);