DataJdbcSource增加PreparedStatement缓存

This commit is contained in:
redkale
2023-04-22 14:37:48 +08:00
parent 331047957a
commit 0d9216dd35

View File

@@ -132,7 +132,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
return false;
}
protected <T> List<PreparedStatement> prepareInsertEntityStatements(Connection conn, EntityInfo<T> info, Map<String, PrepareInfo<T>> prepareInfos, T... entitys) throws SQLException {
protected <T> List<PreparedStatement> prepareInsertEntityStatements(SourceConnection conn, EntityInfo<T> info, Map<String, PrepareInfo<T>> prepareInfos, T... entitys) throws SQLException {
Attribute<T, Serializable>[] attrs = info.insertAttributes;
final List<PreparedStatement> prestmts = new ArrayList<>();
for (Map.Entry<String, PrepareInfo<T>> en : prepareInfos.entrySet()) {
@@ -147,7 +147,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
return prestmts;
}
protected <T> PreparedStatement prepareInsertEntityStatement(Connection conn, String sql, EntityInfo<T> info, T... entitys) throws SQLException {
protected <T> PreparedStatement prepareInsertEntityStatement(SourceConnection conn, String sql, EntityInfo<T> info, T... entitys) throws SQLException {
Attribute<T, Serializable>[] attrs = info.insertAttributes;
final PreparedStatement prestmt = conn.prepareStatement(sql);
for (final T value : entitys) {
@@ -157,7 +157,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
return prestmt;
}
protected <T> List<PreparedStatement> prepareUpdateEntityStatements(Connection conn, EntityInfo<T> info, Map<String, PrepareInfo<T>> prepareInfos, T... entitys) throws SQLException {
protected <T> List<PreparedStatement> prepareUpdateEntityStatements(SourceConnection conn, EntityInfo<T> info, Map<String, PrepareInfo<T>> prepareInfos, T... entitys) throws SQLException {
Attribute<T, Serializable> primary = info.primary;
Attribute<T, Serializable>[] attrs = info.updateAttributes;
final List<PreparedStatement> prestmts = new ArrayList<>();
@@ -174,7 +174,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
return prestmts;
}
protected <T> PreparedStatement prepareUpdateEntityStatement(Connection conn, String prepareSQL, EntityInfo<T> info, T... entitys) throws SQLException {
protected <T> PreparedStatement prepareUpdateEntityStatement(SourceConnection conn, String prepareSQL, EntityInfo<T> info, T... entitys) throws SQLException {
Attribute<T, Serializable> primary = info.primary;
Attribute<T, Serializable>[] attrs = info.updateAttributes;
final PreparedStatement prestmt = conn.prepareStatement(prepareSQL);
@@ -186,7 +186,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
return prestmt;
}
protected <T> int bindStatementParameters(Connection conn, PreparedStatement prestmt, EntityInfo<T> info, Attribute<T, Serializable>[] attrs, T entity) throws SQLException {
protected <T> int bindStatementParameters(SourceConnection conn, PreparedStatement prestmt, EntityInfo<T> info, Attribute<T, Serializable>[] attrs, T entity) throws SQLException {
int i = 0;
for (Attribute<T, Serializable> attr : attrs) {
Object val = getEntityAttrValue(info, attr, entity);
@@ -215,7 +215,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
return 0;
}
int c = 0;
Connection conn = null;
SourceConnection conn = null;
try {
conn = writePool.pollConnection();
conn.setAutoCommit(false);
@@ -310,7 +310,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T> int insertDB(EntityInfo<T> info, T... entitys) {
Connection conn = null;
SourceConnection conn = null;
try {
conn = writePool.pollConnection();
conn.setAutoCommit(false);
@@ -332,7 +332,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
private <T> int insertDB(final boolean batch, final Connection conn, final EntityInfo<T> info, T... entitys) throws SQLException {
private <T> int insertDB(final boolean batch, final SourceConnection conn, final EntityInfo<T> info, T... entitys) throws SQLException {
final long s = System.currentTimeMillis();
int c = 0;
String presql = null;
@@ -355,7 +355,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
c1 += cc;
}
c = c1;
prestmt.close();
//prestmt.close();
} else { //分库分表
int c1 = 0;
for (PreparedStatement stmt : prestmts) {
@@ -365,9 +365,9 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
c = c1;
for (PreparedStatement stmt : prestmts) {
stmt.close();
}
//for (PreparedStatement stmt : prestmts) {
// stmt.close();
//}
}
if (!batch) {
conn.commit();
@@ -493,7 +493,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
if (info.getTableStrategy() == null) { //单库单表
prestmt.close();
//prestmt.close();
prestmt = prepareInsertEntityStatement(conn, presql, info, entitys);
int c1 = 0;
int[] cs = prestmt.executeBatch();
@@ -501,11 +501,11 @@ public class DataJdbcSource extends AbstractDataSqlSource {
c1 += cc;
}
c = c1;
prestmt.close();
//prestmt.close();
} else { //分库分表
for (PreparedStatement stmt : prestmts) {
stmt.close();
}
//for (PreparedStatement stmt : prestmts) {
// stmt.close();
//}
prestmts = prepareInsertEntityStatements(conn, info, prepareInfos, entitys);
int c1 = 0;
for (PreparedStatement stmt : prestmts) {
@@ -515,9 +515,9 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
c = c1;
for (PreparedStatement stmt : prestmts) {
stmt.close();
}
//for (PreparedStatement stmt : prestmts) {
// stmt.close();
//}
}
}
//------------------------------------------------------------
@@ -591,7 +591,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T> int deleteDB(EntityInfo<T> info, String[] tables, Flipper flipper, FilterNode node, Map<String, List<Serializable>> pkmap, String... sqls) {
Connection conn = null;
SourceConnection conn = null;
try {
conn = writePool.pollConnection();
conn.setAutoCommit(false);
@@ -613,7 +613,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
private <T> int deleteDB(final boolean batch, final Connection conn, final EntityInfo<T> info, String[] tables, Flipper flipper, FilterNode node, Map<String, List<Serializable>> pkmap, String... sqls) throws SQLException {
private <T> int deleteDB(final boolean batch, final SourceConnection conn, final EntityInfo<T> info, String[] tables, Flipper flipper, FilterNode node, Map<String, List<Serializable>> pkmap, String... sqls) throws SQLException {
final long s = System.currentTimeMillis();
try {
int c;
@@ -728,7 +728,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T> int clearTableDB(EntityInfo<T> info, String[] tables, FilterNode node, String... sqls) {
Connection conn = null;
SourceConnection conn = null;
final long s = System.currentTimeMillis();
try {
conn = writePool.pollConnection();
@@ -832,7 +832,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T> int createTableDB(EntityInfo<T> info, String copyTableSql, Serializable pk, String... sqls) {
Connection conn = null;
SourceConnection conn = null;
Statement stmt;
final long s = System.currentTimeMillis();
try {
@@ -961,7 +961,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T> int dropTableDB(EntityInfo<T> info, String[] tables, FilterNode node, String... sqls) {
Connection conn = null;
SourceConnection conn = null;
final long s = System.currentTimeMillis();
try {
conn = writePool.pollConnection();
@@ -1060,7 +1060,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T> int updateEntityDB(EntityInfo<T> info, T... entitys) {
Connection conn = null;
SourceConnection conn = null;
try {
conn = writePool.pollConnection();
conn.setAutoCommit(false);
@@ -1082,7 +1082,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
private <T> int updateEntityDB(final boolean batch, final Connection conn, final EntityInfo<T> info, T... entitys) throws SQLException {
private <T> int updateEntityDB(final boolean batch, final SourceConnection conn, final EntityInfo<T> info, T... entitys) throws SQLException {
final long s = System.currentTimeMillis();
String presql = null;
String caseSql = null;
@@ -1120,7 +1120,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
c = c1;
prestmt.close();
//prestmt.close();
} else {
prepareInfos = getUpdateQuestionPrepareInfo(info, entitys);
prestmts = prepareUpdateEntityStatements(conn, info, prepareInfos, entitys);
@@ -1132,9 +1132,9 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
c = c1;
for (PreparedStatement stmt : prestmts) {
stmt.close();
}
//for (PreparedStatement stmt : prestmts) {
// stmt.close();
//}
}
if (!batch) {
conn.commit();
@@ -1168,9 +1168,9 @@ public class DataJdbcSource extends AbstractDataSqlSource {
if (prepareInfos == null) {
throw se;
}
for (PreparedStatement stmt : prestmts) {
stmt.close();
}
//for (PreparedStatement stmt : prestmts) {
// stmt.close();
//}
String[] oldTables = prepareInfos.keySet().toArray(new String[prepareInfos.size()]);
List<String> notExistTables = checkNotExistTables(conn, oldTables);
@@ -1195,9 +1195,9 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
c = c1;
for (PreparedStatement stmt : prestmts) {
stmt.close();
}
//for (PreparedStatement stmt : prestmts) {
// stmt.close();
//}
conn.commit();
}
} else {
@@ -1276,7 +1276,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T> int updateColumnDB(EntityInfo<T> info, Flipper flipper, UpdateSqlInfo sql) {
Connection conn = null;
SourceConnection conn = null;
try {
conn = writePool.pollConnection();
conn.setAutoCommit(false);
@@ -1298,7 +1298,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
private <T> int updateColumnDB(final boolean batch, final Connection conn, final EntityInfo<T> info, Flipper flipper, UpdateSqlInfo sql) throws SQLException { //String sql, boolean prepared, Object... blobs) {
private <T> int updateColumnDB(final boolean batch, final SourceConnection conn, final EntityInfo<T> info, Flipper flipper, UpdateSqlInfo sql) throws SQLException { //String sql, boolean prepared, Object... blobs) {
final long s = System.currentTimeMillis();
int c = -1;
String firstTable = null;
@@ -1316,7 +1316,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
logger.finest(info.getType().getSimpleName() + " updateColumn sql=" + sql.sql);
}
c = prestmt.executeUpdate();
prestmt.close();
//prestmt.close();
if (!batch) {
conn.commit();
}
@@ -1349,7 +1349,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
for (int cc : cs) {
c1 += cc;
}
prestmt.close();
//prestmt.close();
}
c = c1;
if (!batch) {
@@ -1442,7 +1442,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
for (int cc : cs) {
c1 += cc;
}
prestmt.close();
//prestmt.close();
}
c = c1;
if (!batch) {
@@ -1464,7 +1464,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T, N extends Number> Map<String, N> getNumberMapDB(EntityInfo<T> info, String[] tables, String sql, FilterNode node, FilterFuncColumn... columns) {
Connection conn = null;
SourceConnection conn = null;
final Map map = new HashMap<>();
final long s = System.currentTimeMillis();
Statement stmt = null;
@@ -1567,7 +1567,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T> Number getNumberResultDB(EntityInfo<T> info, String[] tables, String sql, FilterFunc func, Number defVal, String column, FilterNode node) {
Connection conn = null;
SourceConnection conn = null;
final long s = System.currentTimeMillis();
Statement stmt = null;
try {
@@ -1656,7 +1656,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T, K extends Serializable, N extends Number> Map<K, N> queryColumnMapDB(EntityInfo<T> info, String[] tables, String sql, String keyColumn, FilterFunc func, String funcColumn, FilterNode node) {
Connection conn = null;
SourceConnection conn = null;
final long s = System.currentTimeMillis();
Map<K, N> rs = new LinkedHashMap<>();
Statement stmt = null;
@@ -1743,7 +1743,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T, K extends Serializable, N extends Number> Map<K[], N[]> queryColumnMapDB(EntityInfo<T> info, String[] tables, String sql, final ColumnNode[] funcNodes, final String[] groupByColumns, final FilterNode node) {
Connection conn = null;
SourceConnection conn = null;
Map rs = new LinkedHashMap<>();
final long s = System.currentTimeMillis();
Statement stmt = null;
@@ -1857,7 +1857,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
protected <T> T findDB(EntityInfo<T> info, Serializable pk) {
Connection conn = null;
SourceConnection conn = null;
final long s = System.currentTimeMillis();
PreparedStatement prestmt = null;
try {
@@ -1869,7 +1869,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
final DataResultSet set = createDataResultSet(info, prestmt.executeQuery());
T rs = set.next() ? info.getFullEntityValue(set) : null;
set.close();
prestmt.close();
//prestmt.close();
slowLog(s, prepareSQL);
return rs;
} catch (SQLException e) {
@@ -1892,17 +1892,17 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T> T findDB(EntityInfo<T> info, String[] tables, String sql, boolean onlypk, SelectColumn selects, Serializable pk, FilterNode node) {
Connection conn = null;
SourceConnection conn = null;
final long s = System.currentTimeMillis();
PreparedStatement ps = null;
PreparedStatement prestmt = null;
try {
conn = readPool.pollConnection();
ps = conn.prepareStatement(sql);
ps.setFetchSize(1);
final DataResultSet set = createDataResultSet(info, ps.executeQuery());
prestmt = conn.prepareStatement(sql);
prestmt.setFetchSize(1);
final DataResultSet set = createDataResultSet(info, prestmt.executeQuery());
T rs = set.next() ? selects == null ? info.getFullEntityValue(set) : info.getEntityValue(selects, set) : null;
set.close();
ps.close();
//prestmt.close();
slowLog(s, sql);
return rs;
} catch (SQLException e) {
@@ -1940,15 +1940,15 @@ public class DataJdbcSource extends AbstractDataSqlSource {
if (info.isLoggable(logger, Level.FINEST, sql)) {
logger.finest(info.getType().getSimpleName() + " find sql=" + sql);
}
if (ps != null) {
ps.close();
if (prestmt != null) {
// prestmt.close();
}
ps = conn.prepareStatement(sql);
ps.setFetchSize(1);
final DataResultSet set = createDataResultSet(info, ps.executeQuery());
prestmt = conn.prepareStatement(sql);
prestmt.setFetchSize(1);
final DataResultSet set = createDataResultSet(info, prestmt.executeQuery());
T rs = set.next() ? selects == null ? info.getFullEntityValue(set) : info.getEntityValue(selects, set) : null;
set.close();
ps.close();
//prestmt.close();
slowLog(s, sql);
return rs;
} catch (SQLException se) {
@@ -1971,21 +1971,21 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T> Serializable findColumnDB(EntityInfo<T> info, String[] tables, String sql, boolean onlypk, String column, Serializable defValue, Serializable pk, FilterNode node) {
Connection conn = null;
SourceConnection conn = null;
final long s = System.currentTimeMillis();
PreparedStatement ps = null;
PreparedStatement prestmt = null;
final Attribute<T, Serializable> attr = info.getAttribute(column);
try {
conn = readPool.pollConnection();
ps = conn.prepareStatement(sql);
ps.setFetchSize(1);
final DataResultSet set = createDataResultSet(info, ps.executeQuery());
prestmt = conn.prepareStatement(sql);
prestmt.setFetchSize(1);
final DataResultSet set = createDataResultSet(info, prestmt.executeQuery());
Serializable val = defValue;
if (set.next()) {
val = info.getFieldValue(attr, set, 1);
}
set.close();
ps.close();
//prestmt.close();
slowLog(s, sql);
return val == null ? defValue : val;
} catch (SQLException e) {
@@ -2023,18 +2023,18 @@ public class DataJdbcSource extends AbstractDataSqlSource {
if (info.isLoggable(logger, Level.FINEST, sql)) {
logger.finest(info.getType().getSimpleName() + " findColumn sql=" + sql);
}
if (ps != null) {
ps.close();
if (prestmt != null) {
// prestmt.close();
}
ps = conn.prepareStatement(sql);
ps.setFetchSize(1);
final DataResultSet set = createDataResultSet(info, ps.executeQuery());
prestmt = conn.prepareStatement(sql);
prestmt.setFetchSize(1);
final DataResultSet set = createDataResultSet(info, prestmt.executeQuery());
Serializable val = defValue;
if (set.next()) {
val = info.getFieldValue(attr, set, 1);
}
set.close();
ps.close();
//prestmt.close();
slowLog(s, sql);
return val == null ? defValue : val;
} catch (SQLException se) {
@@ -2057,16 +2057,16 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
protected <T> boolean existsDB(EntityInfo<T> info, String[] tables, String sql, boolean onlypk, Serializable pk, FilterNode node) {
Connection conn = null;
SourceConnection conn = null;
final long s = System.currentTimeMillis();
PreparedStatement ps = null;
PreparedStatement prestmt = null;
try {
conn = readPool.pollConnection();
ps = conn.prepareStatement(sql);
final ResultSet set = ps.executeQuery();
prestmt = conn.prepareStatement(sql);
final ResultSet set = prestmt.executeQuery();
boolean rs = set.next() ? (set.getInt(1) > 0) : false;
set.close();
ps.close();
//prestmt.close();
if (info.isLoggable(logger, Level.FINEST, sql)) {
logger.finest(info.getType().getSimpleName() + " exists (" + rs + ") sql=" + sql);
}
@@ -2107,14 +2107,14 @@ public class DataJdbcSource extends AbstractDataSqlSource {
if (info.isLoggable(logger, Level.FINEST, sql)) {
logger.finest(info.getType().getSimpleName() + " exists sql=" + sql);
}
if (ps != null) {
ps.close();
if (prestmt != null) {
// prestmt.close();
}
ps = conn.prepareStatement(sql);
final ResultSet set = ps.executeQuery();
prestmt = conn.prepareStatement(sql);
final ResultSet set = prestmt.executeQuery();
boolean rs = set.next() ? (set.getInt(1) > 0) : false;
set.close();
ps.close();
//prestmt.close();
if (info.isLoggable(logger, Level.FINEST, sql)) {
logger.finest(info.getType().getSimpleName() + " exists (" + rs + ") sql=" + sql);
}
@@ -2138,18 +2138,18 @@ public class DataJdbcSource extends AbstractDataSqlSource {
final EntityInfo<T> info = loadEntityInfo(clazz);
Serializable[] ids = pks.toArray(serialArrayFunc);
if (info.getTableStrategy() == null) {
Connection conn = null;
SourceConnection conn = null;
final long s = System.currentTimeMillis();
try {
conn = readPool.pollConnection();
final List<T> list = new ArrayList();
try {
String prepareSQL = info.getFindQuestionPrepareSQL(ids[0]);
PreparedStatement ps = conn.prepareStatement(prepareSQL);
PreparedStatement prestmt = conn.prepareStatement(prepareSQL);
DataJdbcResultSet rr = new DataJdbcResultSet(info);
for (Serializable pk : ids) {
ps.setObject(1, pk);
ResultSet set = ps.executeQuery();
prestmt.setObject(1, pk);
ResultSet set = prestmt.executeQuery();
rr.resultSet(set);
if (set.next()) {
list.add(getEntityValue(info, null, rr));
@@ -2157,7 +2157,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
list.add(null);
}
}
ps.close();
//prestmt.close();
slowLog(s, prepareSQL);
return list;
} catch (SQLException se) {
@@ -2191,21 +2191,21 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
protected <T> Sheet<T> querySheetFullListDB(EntityInfo<T> info) {
Connection conn = null;
SourceConnection conn = null;
final long s = System.currentTimeMillis();
try {
conn = readPool.pollConnection();
final List<T> list = new ArrayList();
try {
String prepareSQL = info.getAllQueryPrepareSQL();
PreparedStatement ps = conn.prepareStatement(prepareSQL);
ResultSet set = ps.executeQuery();
PreparedStatement prestmt = conn.prepareStatement(prepareSQL);
ResultSet set = prestmt.executeQuery();
final DataResultSet rr = createDataResultSet(info, set);
while (set.next()) {
list.add(getEntityValue(info, null, rr));
}
set.close();
ps.close();
//prestmt.close();
slowLog(s, prepareSQL);
return Sheet.asSheet(list);
} catch (SQLException se) {
@@ -2230,7 +2230,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
if (!needTotal && !distinct && selects == null && flipper == null && node == null && info.getTableStrategy() == null) {
return querySheetFullListDB(info);
}
Connection conn = null;
SourceConnection conn = null;
final long s = System.currentTimeMillis();
final SelectColumn sels = selects;
final Map<Class, String> joinTabalis = node == null ? null : node.getJoinTabalis();
@@ -2301,35 +2301,35 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
private <T> Sheet<T> executeQuerySheet(EntityInfo<T> info, boolean needTotal, Flipper flipper, SelectColumn sels,
long s, Connection conn, boolean mysqlOrPgsql, String listSql, String countSql) throws SQLException {
long s, SourceConnection conn, boolean mysqlOrPgsql, String listSql, String countSql) throws SQLException {
final List<T> list = new ArrayList();
if (mysqlOrPgsql) { //sql可以带limit、offset
PreparedStatement ps = conn.prepareStatement(listSql);
ResultSet set = ps.executeQuery();
PreparedStatement prestmt = conn.prepareStatement(listSql);
ResultSet set = prestmt.executeQuery();
final DataResultSet rr = createDataResultSet(info, set);
while (set.next()) {
list.add(getEntityValue(info, sels, rr));
}
set.close();
ps.close();
//prestmt.close();
long total = list.size();
if (needTotal) {
ps = conn.prepareStatement(countSql);
set = ps.executeQuery();
prestmt = conn.prepareStatement(countSql);
set = prestmt.executeQuery();
if (set.next()) {
total = set.getLong(1);
}
set.close();
ps.close();
//prestmt.close();
}
slowLog(s, listSql);
return new Sheet<>(total, list);
} else {
PreparedStatement ps = conn.prepareStatement(listSql);
PreparedStatement prestmt = conn.prepareStatement(listSql);
if (flipper != null && flipper.getLimit() > 0) {
ps.setFetchSize(flipper.getLimit());
prestmt.setFetchSize(flipper.getLimit());
}
ResultSet set = ps.executeQuery();
ResultSet set = prestmt.executeQuery();
if (flipper != null && flipper.getOffset() > 0) {
set.absolute(flipper.getOffset());
}
@@ -2359,7 +2359,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
total = set.getRow();
}
set.close();
ps.close();
//prestmt.close();
slowLog(s, listSql);
return new Sheet<>(total, list);
}
@@ -2411,7 +2411,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
return new String[]{listSql, countSql};
}
protected List<String> checkNotExistTablesNoThrows(Connection conn, String[] tables) {
protected List<String> checkNotExistTablesNoThrows(SourceConnection conn, String[] tables) {
try {
return checkNotExistTables(conn, tables); //, firstNotExistTable
} catch (SQLException e) {
@@ -2419,7 +2419,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
protected List<String> checkNotExistTables(Connection conn, String[] tables) throws SQLException { //, String firstNotExistTable
protected List<String> checkNotExistTables(SourceConnection conn, String[] tables) throws SQLException { //, String firstNotExistTable
// 数据库不一定要按批量提交的SQL顺序执行 所以第一个不存在的表不一定在tables的第一位,
// 比如 DELETE FROM table1; DELETE FROM table2; 如果table1、table2都不存在SQL可能会抛出table2不存在的异常
// List<String> maybeNoTables = new ArrayList<>();
@@ -2485,7 +2485,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
return new int[0];
}
final long s = System.currentTimeMillis();
Connection conn = writePool.pollConnection();
SourceConnection conn = writePool.pollConnection();
try {
conn.setAutoCommit(false);
final Statement stmt = conn.createStatement();
@@ -2525,14 +2525,14 @@ public class DataJdbcSource extends AbstractDataSqlSource {
@Override
public <V> V nativeQuery(String sql, Function<DataResultSet, V> handler) {
final long s = System.currentTimeMillis();
final Connection conn = readPool.pollConnection();
final SourceConnection conn = readPool.pollConnection();
try {
if (logger.isLoggable(Level.FINEST)) {
logger.finest("direct query sql=" + sql);
}
final Statement statement = conn.createStatement();
//final PreparedStatement statement = conn.prepareStatement(sql);
final ResultSet set = statement.executeQuery(sql);// ps.executeQuery();
//final PreparedStatement prestmt = conn.prepareStatement(sql);
final ResultSet set = statement.executeQuery(sql);// prestmt.executeQuery();
V rs = handler.apply(createDataResultSet(null, set));
set.close();
statement.close();
@@ -2706,7 +2706,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
protected final Properties connectAttrs;
protected ArrayBlockingQueue<Connection> queue;
protected ArrayBlockingQueue<SourceConnection> queue;
protected int connectTimeoutSeconds;
@@ -2814,13 +2814,13 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
private void changeMaxConns(int newMaxconns) {
ArrayBlockingQueue<Connection> newQueue = new ArrayBlockingQueue<>(newMaxconns);
ArrayBlockingQueue<Connection> oldQueue = this.queue;
ArrayBlockingQueue<SourceConnection> newQueue = new ArrayBlockingQueue<>(newMaxconns);
ArrayBlockingQueue<SourceConnection> oldQueue = this.queue;
Semaphore oldSemaphore = this.canNewSemaphore;
this.queue = newQueue;
this.maxConns = newMaxconns;
this.canNewSemaphore = new Semaphore(this.maxConns);
Connection c;
SourceConnection c;
while ((c = oldQueue.poll()) != null) {
try {
if (c.getClientInfo() != null) {
@@ -2832,8 +2832,8 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
public Connection pollConnection() {
Connection conn = queue.poll();
public SourceConnection pollConnection() {
SourceConnection conn = queue.poll();
if (conn == null) {
return newConnection();
}
@@ -2848,12 +2848,12 @@ public class DataJdbcSource extends AbstractDataSqlSource {
return newConnection();
}
private Connection newConnection() {
private SourceConnection newConnection() {
Semaphore semaphore = this.canNewSemaphore;
Connection conn = null;
SourceConnection conn = null;
if (semaphore.tryAcquire()) {
try {
conn = driver.connect(url, connectAttrs);
conn = new SourceConnection(driver.connect(url, connectAttrs));
if (conn.getClientInfo() != null) {
conn.getClientInfo().put("version", clientInfo.getProperty("version"));
} else {
@@ -2883,7 +2883,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
private <C> void offerConnection(final C connection, Semaphore semaphore) {
Connection conn = (Connection) connection;
SourceConnection conn = (SourceConnection) connection;
if (conn == null) {
return;
}
@@ -2901,9 +2901,9 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
protected boolean checkValid(Connection conn) {
protected boolean checkValid(SourceConnection conn) {
try {
boolean rs = !conn.isClosed() && conn.isValid(1);
boolean rs = !conn.conn.isClosed() && conn.conn.isValid(1);
if (!rs) {
return rs;
}
@@ -2930,4 +2930,67 @@ public class DataJdbcSource extends AbstractDataSqlSource {
});
}
}
protected class SourceConnection {
public final Connection conn;
private final Map<String, PreparedStatement> prestms = new ConcurrentHashMap<>();
public SourceConnection(Connection conn) {
Objects.requireNonNull(conn);
this.conn = conn;
}
public Properties getClientInfo() throws SQLException {
return conn.getClientInfo();
}
public void setClientInfo(Properties clientInfo) throws SQLException {
conn.setClientInfo(clientInfo);
}
public Statement createStatement() throws SQLException {
return conn.createStatement();
}
public PreparedStatement prepareStatement(String sql) throws SQLException {
PreparedStatement rs = prestms.computeIfAbsent(sql, s -> {
try {
return conn.prepareStatement(sql);
} catch (SQLException e) {
throw new RedkaleException(e);
}
});
rs.clearParameters();
rs.clearBatch();
return rs;
}
public void setAutoCommit(boolean autoCommit) throws SQLException {
conn.setAutoCommit(autoCommit);
}
public void commit() throws SQLException {
conn.commit();
}
public void rollback() throws SQLException {
conn.rollback();
}
public DatabaseMetaData getMetaData() throws SQLException {
return conn.getMetaData();
}
public Blob createBlob() throws SQLException {
return conn.createBlob();
}
public void close() throws SQLException {
conn.close();
}
}
}