优化DataJdbcSource

This commit is contained in:
Redkale
2022-12-28 15:28:52 +08:00
parent 65f7692116
commit 34023f6d08
3 changed files with 194 additions and 140 deletions

View File

@@ -622,16 +622,17 @@ public class DataJdbcSource extends DataSqlSource {
List<PreparedStatement> prestmts = null;
Map<String, PrepareInfo<T>> prepareInfos = null;
SQLException ex = null;
try {
conn = writePool.pollConnection();
conn.setReadOnly(false);
conn.setAutoCommit(false);
int c = 0;
int c = -1;
final Attribute<T, Serializable>[] attrs = info.updateAttributes;
int retry = 0;
AGAIN:
while (true) {
while (retry++ < MAX_RETRYS) {
try {
if (info.getTableStrategy() == null) {
presql = info.getUpdateQuestionPrepareSQL(entitys[0]);
@@ -663,6 +664,7 @@ public class DataJdbcSource extends DataSqlSource {
conn.commit();
break;
} catch (SQLException se) {
ex = se;
conn.rollback();
if (isTableNotExist(info, se.getSQLState())) {
if (info.getTableStrategy() == null) {
@@ -777,7 +779,11 @@ public class DataJdbcSource extends DataSqlSource {
});
slowLog(s, presqls.toArray(new String[presqls.size()]));
}
return CompletableFuture.completedFuture(c);
if (c >= 0) {
return CompletableFuture.completedFuture(c);
} else {
return CompletableFuture.failedFuture(ex);
}
} catch (SQLException e) {
if (conn != null) {
try {
@@ -1211,138 +1217,167 @@ public class DataJdbcSource extends DataSqlSource {
}
@Override
protected <T> CompletableFuture<Sheet<T>> querySheetDB(EntityInfo<T> info, final boolean readcache, boolean needtotal, final boolean distinct, SelectColumn selects, Flipper flipper, FilterNode node) {
protected <T> CompletableFuture<Sheet<T>> querySheetDB(EntityInfo<T> info, final boolean readcache, boolean needTotal, final boolean distinct, SelectColumn selects, Flipper flipper, FilterNode node) {
Connection conn = null;
final long s = System.currentTimeMillis();
final SelectColumn sels = selects;
final List<T> list = new ArrayList();
final Map<Class, String> joinTabalis = node == null ? null : node.getJoinTabalis();
final CharSequence join = node == null ? null : node.createSQLJoin(this, false, joinTabalis, new HashSet<>(), info);
final CharSequence where = node == null ? null : node.createSQLExpress(this, info, joinTabalis);
String[] tables = info.getTables(node);
final String joinAndWhere = (join == null ? "" : join) + ((where == null || where.length() == 0) ? "" : (" WHERE " + where));
final boolean mysqlOrPgsql = "mysql".equals(dbtype()) || "postgresql".equals(dbtype());
SQLException ex = null;
try {
conn = readPool.pollConnection();
PreparedStatement ps = null;
//conn.setReadOnly(true);
final SelectColumn sels = selects;
final List<T> list = new ArrayList();
final Map<Class, String> joinTabalis = node == null ? null : node.getJoinTabalis();
final CharSequence join = node == null ? null : node.createSQLJoin(this, false, joinTabalis, new HashSet<>(), info);
final CharSequence where = node == null ? null : node.createSQLExpress(this, info, joinTabalis);
String[] tables = info.getTables(node);
String joinAndWhere = (join == null ? "" : join) + ((where == null || where.length() == 0) ? "" : (" WHERE " + where));
if ("mysql".equals(dbtype()) || "postgresql".equals(dbtype())) { //sql可以带limit、offset
String listsubsql;
StringBuilder union = new StringBuilder();
if (tables.length == 1) {
listsubsql = "SELECT " + (distinct ? "DISTINCT " : "") + info.getFullQueryColumns("a", selects) + " FROM " + tables[0] + " a" + joinAndWhere;
} else {
int b = 0;
for (String table : tables) {
if (!union.isEmpty()) union.append(" UNION ALL ");
String tabalis = "t" + (++b);
union.append("SELECT ").append(info.getFullQueryColumns(tabalis, selects)).append(" FROM ").append(table).append(" ").append(tabalis).append(joinAndWhere);
}
listsubsql = "SELECT " + (distinct ? "DISTINCT " : "") + info.getFullQueryColumns("a", selects) + " FROM (" + (union) + ") a";
}
final String listsql = listsubsql + createSQLOrderby(info, flipper)
+ (flipper == null || flipper.getLimit() < 1 ? "" : (" LIMIT " + flipper.getLimit() + " OFFSET " + flipper.getOffset()));
if (readcache && info.isLoggable(logger, Level.FINEST, listsql)) {
logger.finest(info.getType().getSimpleName() + " query sql=" + listsql);
}
PreparedStatement ps = conn.prepareStatement(listsql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
ResultSet set = ps.executeQuery();
final DataResultSet rr = createDataResultSet(info, set);
while (set.next()) {
list.add(getEntityValue(info, sels, rr));
}
set.close();
ps.close();
long total = list.size();
if (needtotal) {
String countsubsql;
int retry = 0;
AGAIN:
while (retry++ < MAX_RETRYS) {
String listSql = null;
String countSql = null;
{ //组装listSql、countSql
String listSubSql;
StringBuilder union = new StringBuilder();
if (tables.length == 1) {
countsubsql = "SELECT " + (distinct ? "DISTINCT COUNT(" + info.getQueryColumns("a", selects) + ")" : "COUNT(*)") + " FROM " + tables[0] + " a" + joinAndWhere;
listSubSql = "SELECT " + (distinct ? "DISTINCT " : "") + info.getFullQueryColumns("a", selects) + " FROM " + tables[0] + " a" + joinAndWhere;
} else {
countsubsql = "SELECT " + (distinct ? "DISTINCT COUNT(" + info.getQueryColumns("a", selects) + ")" : "COUNT(*)") + " FROM (" + (union) + ") a";
int b = 0;
for (String table : tables) {
if (!union.isEmpty()) union.append(" UNION ALL ");
union.append("SELECT ").append(info.getFullQueryColumns("a", selects)).append(" FROM ").append(table).append(" a").append(joinAndWhere);
}
listSubSql = "SELECT " + (distinct ? "DISTINCT " : "") + info.getFullQueryColumns("a", selects) + " FROM (" + (union) + ") a";
}
final String countsql = countsubsql;
if (readcache && info.isLoggable(logger, Level.FINEST, countsql)) {
logger.finest(info.getType().getSimpleName() + " query countsql=" + countsql);
}
ps = conn.prepareStatement(countsql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
set = ps.executeQuery();
if (set.next()) total = set.getLong(1);
set.close();
ps.close();
}
slowLog(s, listsql);
return CompletableFuture.completedFuture(new Sheet<>(total, list));
}
String listsubsql;
StringBuilder union = new StringBuilder();
if (tables.length == 1) {
listsubsql = "SELECT " + (distinct ? "DISTINCT " : "") + info.getFullQueryColumns("a", selects) + " FROM " + tables[0] + " a" + joinAndWhere;
} else {
int b = 0;
for (String table : tables) {
if (!union.isEmpty()) union.append(" UNION ALL ");
String tabalis = "t" + (++b);
union.append("SELECT ").append(distinct ? "DISTINCT " : "").append(info.getFullQueryColumns(tabalis, selects)).append(" FROM ").append(table).append(" ").append(tabalis).append(joinAndWhere);
}
listsubsql = "SELECT " + (distinct ? "DISTINCT " : "") + info.getFullQueryColumns("a", selects) + " FROM (" + (union) + ") a";
}
final String listsql = listsubsql + info.createSQLOrderby(flipper);
if (readcache && info.isLoggable(logger, Level.FINEST, listsql)) {
logger.finest(info.getType().getSimpleName() + " query sql=" + listsql + (flipper == null || flipper.getLimit() < 1 ? "" : (" LIMIT " + flipper.getLimit() + " OFFSET " + flipper.getOffset())));
}
//conn.setReadOnly(true);
final PreparedStatement ps = conn.prepareStatement(listsql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
if (flipper != null && flipper.getLimit() > 0) ps.setFetchSize(flipper.getLimit());
final ResultSet set = ps.executeQuery();
if (flipper != null && flipper.getOffset() > 0) set.absolute(flipper.getOffset());
final int limit = flipper == null || flipper.getLimit() < 1 ? Integer.MAX_VALUE : flipper.getLimit();
int i = 0;
final DataResultSet rr = createDataResultSet(info, set);
if (sels == null) {
while (set.next()) {
i++;
list.add(info.getFullEntityValue(rr));
if (limit <= i) break;
}
} else {
while (set.next()) {
i++;
list.add(info.getEntityValue(sels, rr));
if (limit <= i) break;
}
}
long total = list.size();
if (needtotal && flipper != null) {
set.last();
total = set.getRow();
}
set.close();
ps.close();
slowLog(s, listsql);
return CompletableFuture.completedFuture(new Sheet<>(total, list));
} catch (SQLException e) {
if (isTableNotExist(info, e.getSQLState())) {
if (info.getTableStrategy() == null) {
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) {
try {
Statement st = conn.createStatement();
if (tableSqls.length == 1) {
st.execute(tableSqls[0]);
} else {
for (String tableSql : tableSqls) {
st.addBatch(tableSql);
}
st.executeBatch();
}
st.close();
} catch (SQLException e2) {
listSql = listSubSql + createSQLOrderby(info, flipper);
if (mysqlOrPgsql) {
listSql += (flipper == null || flipper.getLimit() < 1 ? "" : (" LIMIT " + flipper.getLimit() + " OFFSET " + flipper.getOffset()));
if (readcache && info.isLoggable(logger, Level.FINEST, listSql)) {
logger.finest(info.getType().getSimpleName() + " query sql=" + listSql);
}
} else {
if (readcache && info.isLoggable(logger, Level.FINEST, listSql)) {
logger.finest(info.getType().getSimpleName() + " query sql=" + listSql + (flipper == null || flipper.getLimit() < 1 ? "" : (" LIMIT " + flipper.getLimit() + " OFFSET " + flipper.getOffset())));
}
}
return CompletableFuture.completedFuture(new Sheet<>(0, new ArrayList()));
} else {
return CompletableFuture.failedFuture(e);
if (mysqlOrPgsql && needTotal) {
String countSubSql;
if (tables.length == 1) {
countSubSql = "SELECT " + (distinct ? "DISTINCT COUNT(" + info.getQueryColumns("a", selects) + ")" : "COUNT(*)") + " FROM " + tables[0] + " a" + joinAndWhere;
} else {
countSubSql = "SELECT " + (distinct ? "DISTINCT COUNT(" + info.getQueryColumns("a", selects) + ")" : "COUNT(*)") + " FROM (" + (union) + ") a";
}
countSql = countSubSql;
if (readcache && info.isLoggable(logger, Level.FINEST, countSql)) {
logger.finest(info.getType().getSimpleName() + " query countsql=" + countSql);
}
}
}
try {
if (mysqlOrPgsql) { //sql可以带limit、offset
ps = conn.prepareStatement(listSql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
ResultSet set = ps.executeQuery();
final DataResultSet rr = createDataResultSet(info, set);
while (set.next()) {
list.add(getEntityValue(info, sels, rr));
}
set.close();
ps.close();
long total = list.size();
if (needTotal) {
ps = conn.prepareStatement(countSql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
set = ps.executeQuery();
if (set.next()) total = set.getLong(1);
set.close();
ps.close();
}
slowLog(s, listSql);
return CompletableFuture.completedFuture(new Sheet<>(total, list));
} else {
//conn.setReadOnly(true);
ps = conn.prepareStatement(listSql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
if (flipper != null && flipper.getLimit() > 0) ps.setFetchSize(flipper.getLimit());
ResultSet set = ps.executeQuery();
if (flipper != null && flipper.getOffset() > 0) set.absolute(flipper.getOffset());
final int limit = flipper == null || flipper.getLimit() < 1 ? Integer.MAX_VALUE : flipper.getLimit();
int i = 0;
final DataResultSet rr = createDataResultSet(info, set);
if (sels == null) {
while (set.next()) {
i++;
list.add(info.getFullEntityValue(rr));
if (limit <= i) break;
}
} else {
while (set.next()) {
i++;
list.add(info.getEntityValue(sels, rr));
if (limit <= i) break;
}
}
long total = list.size();
if (needTotal && flipper != null) {
set.last();
total = set.getRow();
}
set.close();
ps.close();
slowLog(s, listSql);
return CompletableFuture.completedFuture(new Sheet<>(total, list));
}
} catch (SQLException se) {
ex = se;
if (isTableNotExist(info, se.getSQLState())) {
if (info.getTableStrategy() == null) {
String[] tableSqls = createTableSqls(info);
if (tableSqls != null) {
try {
Statement st = conn.createStatement();
if (tableSqls.length == 1) {
st.execute(tableSqls[0]);
} else {
for (String tableSql : tableSqls) {
st.addBatch(tableSql);
}
st.executeBatch();
}
st.close();
} catch (SQLException e2) {
}
}
return CompletableFuture.completedFuture(new Sheet<>(0, new ArrayList()));
} else if (tables != null && tables.length == 1) {
//只查一个不存在的分表
return CompletableFuture.completedFuture(new Sheet<>(0, new ArrayList()));
} else if (tables != null && tables.length > 1) {
//多分表查询中一个或多个分表不存在
String tableName = parseNotExistTableName(se);
if (tableName == null) {
return CompletableFuture.failedFuture(se);
}
String minTableName = (tableName.indexOf('.') > 0) ? tableName.substring(tableName.indexOf('.') + 1) : null;
if (ps != null) ps.close();
for (String t : tables) {
if (t.equals(tableName) || (minTableName != null && t.equals(minTableName))) {
tables = Utility.remove(tables, t);
if (tables.length < 1) { //分表都不存在
return CompletableFuture.completedFuture(new Sheet<>(0, new ArrayList()));
}
continue AGAIN;
}
}
} else {
return CompletableFuture.failedFuture(se);
}
}
return CompletableFuture.failedFuture(se);
}
}
throw ex;
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
} finally {
if (conn != null) readPool.offerConnection(conn);

View File

@@ -40,6 +40,9 @@ import org.redkale.util.*;
@ResourceType(DataSource.class)
public abstract class DataSqlSource extends AbstractDataSource implements Function<Class, EntityInfo> {
//不存在分表时最大重试次数
protected static final int MAX_RETRYS = 3;
protected static final Flipper FLIPPER_ONE = new Flipper(1);
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
@@ -1948,9 +1951,8 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
StringBuilder union = new StringBuilder();
for (String table : tables) {
if (!union.isEmpty()) union.append(" UNION ALL ");
String tabalis = "t" + (++b);
union.append("SELECT ").append(tabalis).append(".").append(keySqlColumn).append(", ").append(funcSqlColumn.replace("a.", tabalis + "."))
.append(" FROM ").append(table).append(" ").append(tabalis).append(joinAndWhere);
union.append("SELECT a.").append(keySqlColumn).append(", ").append(funcSqlColumn)
.append(" FROM ").append(table).append(" a").append(joinAndWhere);
}
sql = "SELECT a." + keySqlColumn + ", " + funcSqlColumn + " FROM (" + (union) + ") a";
}
@@ -2043,14 +2045,12 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
if (groupBySqlColumns.length() > 0) sql += groupBySqlColumns + ", ";
sql += funcSqlColumns + " FROM " + tables[0] + " a" + joinAndWhere;
} else {
int b = 0;
StringBuilder union = new StringBuilder();
for (String table : tables) {
if (!union.isEmpty()) union.append(" UNION ALL ");
String tabalis = "t" + (++b);
String subsql = "SELECT ";
if (groupBySqlColumns.length() > 0) subsql += groupBySqlColumns.toString().replace("a.", tabalis + ".") + ", ";
subsql += funcSqlColumns.toString().replace("a.", tabalis + ".") + " FROM " + table + " " + tabalis + joinAndWhere;
if (groupBySqlColumns.length() > 0) subsql += groupBySqlColumns.toString() + ", ";
subsql += funcSqlColumns.toString() + " FROM " + table + " a" + joinAndWhere;
union.append(subsql);
}
sql = "SELECT ";
@@ -2191,12 +2191,10 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
if (tables.length == 1) {
sql = "SELECT " + info.getQueryColumns("a", selects) + " FROM " + tables[0] + " a" + joinAndWhere;
} else {
int b = 0;
StringBuilder union = new StringBuilder();
for (String table : tables) {
if (!union.isEmpty()) union.append(" UNION ALL ");
String tabalis = "t" + (++b);
union.append("SELECT ").append(info.getQueryColumns(tabalis, selects)).append(" FROM ").append(table).append(" ").append(tabalis).append(joinAndWhere);
union.append("SELECT ").append(info.getQueryColumns("a", selects)).append(" FROM ").append(table).append(" a").append(joinAndWhere);
}
sql = "SELECT " + info.getQueryColumns("a", selects) + " FROM (" + (union) + ") a";
}
@@ -2275,12 +2273,10 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
if (tables.length == 1) {
sql = "SELECT " + info.getSQLColumn("a", column) + " FROM " + tables[0] + " a" + joinAndWhere;
} else {
int b = 0;
StringBuilder union = new StringBuilder();
for (String table : tables) {
if (!union.isEmpty()) union.append(" UNION ALL ");
String tabalis = "t" + (++b);
union.append("SELECT ").append(info.getSQLColumn(tabalis, column)).append(" FROM ").append(table).append(" ").append(tabalis).append(joinAndWhere);
union.append("SELECT ").append(info.getSQLColumn("a", column)).append(" FROM ").append(table).append(" a").append(joinAndWhere);
}
sql = "SELECT " + info.getSQLColumn("a", column) + " FROM (" + (union) + ") a";
}
@@ -2364,12 +2360,10 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
if (tables.length == 1) {
sql = "SELECT COUNT(" + info.getPrimarySQLColumn("a") + ") FROM " + tables[0] + " a" + joinAndWhere;
} else {
int b = 0;
StringBuilder union = new StringBuilder();
for (String table : tables) {
if (!union.isEmpty()) union.append(" UNION ALL ");
String tabalis = "t" + (++b);
union.append("SELECT ").append(info.getPrimarySQLColumn(tabalis)).append(" FROM ").append(table).append(" ").append(tabalis).append(joinAndWhere);
union.append("SELECT ").append(info.getPrimarySQLColumn("a")).append(" FROM ").append(table).append(" a").append(joinAndWhere);
}
sql = "SELECT COUNT(" + info.getPrimarySQLColumn("a") + ") FROM (" + (union) + ") a";
}

View File

@@ -1096,6 +1096,31 @@ public final class Utility {
return rs;
}
/**
* 将符合条件的元素从数组中删除
*
* @param array 原数组
* @param item 元素
*
* @return 新数组
*/
public static String[] remove(final String[] array, final String item) {
if (array == null || array.length == 0) return array;
final String[] news = new String[array.length];
int index = 0;
for (int i = 0; i < news.length; i++) {
if (item != null && !item.equals(array[i])) {
news[index++] = array[i];
} else if (item == null && array[i] != null) {
news[index++] = array[i];
}
}
if (index == array.length) return array;
final String[] rs = new String[index];
System.arraycopy(news, 0, rs, 0, index);
return rs;
}
/**
* 将指定的long元素从数组中删除, 相同的元素会根据items里重复次数来执行删除 <br>
* 例如: <br>