DataJdbcSource优化

This commit is contained in:
redkale
2023-07-21 14:03:48 +08:00
parent 7dcaa3eda0
commit ede8f40b0d

View File

@@ -217,8 +217,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
int c = 0;
SourceConnection conn = null;
try {
conn = writePool.pollConnection();
conn.setAutoCommit(false);
conn = writePool.pollTransConnection();
for (BatchAction action : dataBatch.actions) {
if (action instanceof InsertBatchAction1) {
InsertBatchAction1 act = (InsertBatchAction1) action;
@@ -293,7 +292,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
throw new SourceException(e);
} finally {
if (conn != null) {
writePool.offerConnection(conn);
writePool.offerTransConnection(conn);
}
}
}
@@ -355,7 +354,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
c1 += cc;
}
c = c1;
prestmt.close();
conn.offerUpdateStatement(prestmt);
} else { //分库分表
int c1 = 0;
for (PreparedStatement stmt : prestmts) {
@@ -365,9 +364,9 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
c = c1;
//for (PreparedStatement stmt : prestmts) {
// stmt.close();
//}
for (PreparedStatement stmt : prestmts) {
conn.offerUpdateStatement(stmt);
}
}
if (!batch) {
conn.commit();
@@ -394,7 +393,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
stmt.executeBatch();
}
stmt.close();
conn.offerUpdateStatement(stmt);
} else { //分库分表
info.disTableLock().lock();
try {
@@ -414,7 +413,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
stmt.addBatch(copySql);
}
stmt.executeBatch();
stmt.close();
conn.offerUpdateStatement(stmt);
} catch (SQLException sqle) { //多进程并发时可能会出现重复建表
if (isTableNotExist(info, sqle.getSQLState())) {
if (newCatalogs.isEmpty()) { //分表的原始表不存在
@@ -430,14 +429,14 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
stmt.executeBatch();
}
stmt.close();
conn.offerUpdateStatement(stmt);
//再执行一遍创建分表操作
stmt = conn.createUpdateStatement();
for (String copySql : tableCopys) {
stmt.addBatch(copySql);
}
stmt.executeBatch();
stmt.close();
conn.offerUpdateStatement(stmt);
}
} else { //需要先建库
Statement stmt;
@@ -447,7 +446,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
stmt.addBatch(("postgresql".equals(dbtype()) ? "CREATE SCHEMA IF NOT EXISTS " : "CREATE DATABASE IF NOT EXISTS ") + newCatalog);
}
stmt.executeBatch();
stmt.close();
conn.offerUpdateStatement(stmt);
} catch (SQLException sqle1) {
logger.log(Level.SEVERE, "create database " + tableCopys + " error", sqle1);
}
@@ -458,7 +457,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
stmt.addBatch(copySql);
}
stmt.executeBatch();
stmt.close();
conn.offerUpdateStatement(stmt);
} catch (SQLException sqle2) {
if (isTableNotExist(info, sqle2.getSQLState())) {
String[] tableSqls = createTableSqls(info);
@@ -472,14 +471,14 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
stmt.executeBatch();
}
stmt.close();
conn.offerUpdateStatement(stmt);
//再执行一遍创建分表操作
stmt = conn.createUpdateStatement();
for (String copySql : tableCopys) {
stmt.addBatch(copySql);
}
stmt.executeBatch();
stmt.close();
conn.offerUpdateStatement(stmt);
}
} else {
logger.log(Level.SEVERE, "create table2 " + tableCopys + " error", sqle2);
@@ -493,7 +492,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
if (info.getTableStrategy() == null) { //单库单表
prestmt.close();
conn.offerUpdateStatement(prestmt);
prestmt = prepareInsertEntityStatement(conn, presql, info, entitys);
int c1 = 0;
int[] cs = prestmt.executeBatch();
@@ -501,11 +500,11 @@ public class DataJdbcSource extends AbstractDataSqlSource {
c1 += cc;
}
c = c1;
prestmt.close();
conn.offerUpdateStatement(prestmt);
} else { //分库分表
//for (PreparedStatement stmt : prestmts) {
// stmt.close();
//}
for (PreparedStatement stmt : prestmts) {
conn.offerUpdateStatement(stmt);
}
prestmts = prepareInsertEntityStatements(conn, info, prepareInfos, entitys);
int c1 = 0;
for (PreparedStatement stmt : prestmts) {
@@ -515,9 +514,9 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
c = c1;
//for (PreparedStatement stmt : prestmts) {
// stmt.close();
//}
for (PreparedStatement stmt : prestmts) {
conn.offerUpdateStatement(stmt);
}
}
}
//------------------------------------------------------------
@@ -620,7 +619,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
if (sqls.length == 1) {
final Statement stmt = conn.createUpdateStatement();
int c1 = stmt.executeUpdate(sqls[0]);
stmt.close();
conn.offerUpdateStatement(stmt);
c = c1;
} else {
final Statement stmt = conn.createUpdateStatement();
@@ -629,7 +628,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
int c1 = 0;
int[] cs = stmt.executeBatch();
stmt.close();
conn.offerUpdateStatement(stmt);
for (int cc : cs) {
c1 += cc;
}
@@ -660,7 +659,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
stmt.executeBatch();
}
stmt.close();
conn.offerUpdateStatement(stmt);
return 0;
}
//单表结构不存在
@@ -703,7 +702,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
int c = 0;
int[] cs = stmt.executeBatch();
stmt.close();
conn.offerUpdateStatement(stmt);
for (int cc : cs) {
c += cc;
}
@@ -737,7 +736,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
if (sqls.length == 1) {
final Statement stmt = conn.createUpdateStatement();
int c1 = stmt.executeUpdate(sqls[0]);
stmt.close();
conn.offerUpdateStatement(stmt);
c = c1;
} else {
final Statement stmt = conn.createUpdateStatement();
@@ -746,7 +745,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
int c1 = 0;
int[] cs = stmt.executeBatch();
stmt.close();
conn.offerUpdateStatement(stmt);
for (int cc : cs) {
c1 += cc;
}
@@ -798,7 +797,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
int c = 0;
int[] cs = stmt.executeBatch();
stmt.close();
conn.offerUpdateStatement(stmt);
for (int cc : cs) {
c += cc;
}
@@ -843,7 +842,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
if (sqls.length == 1) {
stmt = conn.createUpdateStatement();
int c1 = stmt.executeUpdate(sqls[0]);
stmt.close();
conn.offerUpdateStatement(stmt);
c = c1;
} else {
stmt = conn.createUpdateStatement();
@@ -852,7 +851,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
int c1 = 0;
int[] cs = stmt.executeBatch();
stmt.close();
conn.offerUpdateStatement(stmt);
for (int cc : cs) {
c1 += cc;
}
@@ -879,14 +878,14 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
stmt.executeBatch();
}
stmt.close();
conn.offerUpdateStatement(stmt);
//再执行一遍创建分表操作
if (info.isLoggable(logger, Level.FINEST, copyTableSql)) {
logger.finest(info.getType().getSimpleName() + " createTable sql=" + copyTableSql);
}
stmt = conn.createUpdateStatement();
c = stmt.executeUpdate(copyTableSql);
stmt.close();
conn.offerUpdateStatement(stmt);
} else { //需要先建库
String newCatalog = newTable.substring(0, newTable.indexOf('.'));
@@ -897,7 +896,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
stmt = conn.createUpdateStatement();
stmt.executeUpdate(catalogSql);
stmt.close();
conn.offerUpdateStatement(stmt);
} catch (SQLException sqle1) {
logger.log(Level.SEVERE, "create database " + copyTableSql + " error", sqle1);
}
@@ -908,7 +907,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
stmt = conn.createUpdateStatement();
c = stmt.executeUpdate(copyTableSql);
stmt.close();
conn.offerUpdateStatement(stmt);
} catch (SQLException sqle2) {
if (isTableNotExist(info, sqle2.getSQLState())) {
if (info.isLoggable(logger, Level.FINEST, sqls[0])) {
@@ -924,14 +923,14 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
stmt.executeBatch();
}
stmt.close();
conn.offerUpdateStatement(stmt);
//再执行一遍创建分表操作
if (info.isLoggable(logger, Level.FINEST, copyTableSql)) {
logger.finest(info.getType().getSimpleName() + " createTable sql=" + copyTableSql);
}
stmt = conn.createUpdateStatement();
c = stmt.executeUpdate(copyTableSql);
stmt.close();
conn.offerUpdateStatement(stmt);
} else {
throw new SourceException(sqle2);
}
@@ -970,7 +969,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
if (sqls.length == 1) {
final Statement stmt = conn.createUpdateStatement();
int c1 = stmt.executeUpdate(sqls[0]);
stmt.close();
conn.offerUpdateStatement(stmt);
c = c1;
} else {
final Statement stmt = conn.createUpdateStatement();
@@ -979,7 +978,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
int c1 = 0;
int[] cs = stmt.executeBatch();
stmt.close();
conn.offerUpdateStatement(stmt);
for (int cc : cs) {
c1 += cc;
}
@@ -1031,7 +1030,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
int c = 0;
int[] cs = stmt.executeBatch();
stmt.close();
conn.offerUpdateStatement(stmt);
for (int cc : cs) {
c += cc;
}
@@ -1120,7 +1119,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
c = c1;
prestmt.close();
conn.offerUpdateStatement(prestmt);
} else {
prepareInfos = getUpdateQuestionPrepareInfo(info, entitys);
prestmts = prepareUpdateEntityStatements(conn, info, prepareInfos, entitys);
@@ -1132,9 +1131,9 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
c = c1;
//for (PreparedStatement stmt : prestmts) {
// stmt.close();
//}
for (PreparedStatement stmt : prestmts) {
conn.offerUpdateStatement(stmt);
}
}
if (!batch) {
conn.commit();
@@ -1157,7 +1156,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
stmt.executeBatch();
}
stmt.close();
conn.offerUpdateStatement(stmt);
} catch (SQLException e2) {
}
}
@@ -1168,9 +1167,9 @@ public class DataJdbcSource extends AbstractDataSqlSource {
if (prepareInfos == null) {
throw se;
}
//for (PreparedStatement stmt : prestmts) {
// stmt.close();
//}
for (PreparedStatement stmt : prestmts) {
conn.offerUpdateStatement(stmt);
}
String[] oldTables = prepareInfos.keySet().toArray(new String[prepareInfos.size()]);
List<String> notExistTables = checkNotExistTables(conn, oldTables);
@@ -1195,9 +1194,9 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
c = c1;
//for (PreparedStatement stmt : prestmts) {
// stmt.close();
//}
for (PreparedStatement stmt : prestmts) {
conn.offerUpdateStatement(stmt);
}
conn.commit();
}
} else {
@@ -1316,7 +1315,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
logger.finest(info.getType().getSimpleName() + " updateColumn sql=" + sql.sql);
}
c = prestmt.executeUpdate();
prestmt.close();
conn.offerUpdateStatement(prestmt);
if (!batch) {
conn.commit();
}
@@ -1344,12 +1343,12 @@ public class DataJdbcSource extends AbstractDataSqlSource {
logger.finest(info.getType().getSimpleName() + " updateColumn sql=" + Arrays.toString(sqls));
}
int c1 = 0;
for (PreparedStatement prestmt : prestmts) {
int[] cs = prestmt.executeBatch();
for (PreparedStatement stmt : prestmts) {
int[] cs = stmt.executeBatch();
for (int cc : cs) {
c1 += cc;
}
prestmt.close();
conn.offerUpdateStatement(stmt);
}
c = c1;
if (!batch) {
@@ -1364,7 +1363,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
final Statement stmt = conn.createUpdateStatement();
c = stmt.executeUpdate(sql.sql);
stmt.close();
conn.offerUpdateStatement(stmt);
if (!batch) {
conn.commit();
}
@@ -1389,7 +1388,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
stmt.executeBatch();
}
stmt.close();
conn.offerUpdateStatement(stmt);
} catch (SQLException e2) {
}
}
@@ -1437,12 +1436,12 @@ public class DataJdbcSource extends AbstractDataSqlSource {
logger.finest(info.getType().getSimpleName() + " updateColumn sql=" + Arrays.toString(sqls));
}
int c1 = 0;
for (PreparedStatement prestmt : prestmts) {
int[] cs = prestmt.executeBatch();
for (PreparedStatement stmt : prestmts) {
int[] cs = stmt.executeBatch();
for (int cc : cs) {
c1 += cc;
}
prestmt.close();
conn.offerUpdateStatement(stmt);
}
c = c1;
if (!batch) {
@@ -1486,7 +1485,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
set.close();
stmt.close();
conn.offerQueryStatement(stmt);
slowLog(s, sql);
return map;
} catch (SQLException e) {
@@ -1526,7 +1525,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
logger.finest(info.getType().getSimpleName() + " getNumberMap sql=" + sql);
}
if (stmt != null) {
stmt.close();
conn.offerQueryStatement(stmt);
}
stmt = conn.createQueryStatement();
ResultSet set = stmt.executeQuery(sql);
@@ -1544,7 +1543,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
set.close();
stmt.close();
conn.offerQueryStatement(stmt);
slowLog(s, sql);
return map;
} catch (SQLException se) {
@@ -1582,7 +1581,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
set.close();
stmt.close();
conn.offerQueryStatement(stmt);
slowLog(s, sql);
return rs;
} catch (SQLException e) {
@@ -1621,7 +1620,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
logger.finest(info.getType().getSimpleName() + " getNumberResult sql=" + sql);
}
if (stmt != null) {
stmt.close();
conn.offerQueryStatement(stmt);
}
stmt = conn.createQueryStatement();
Number rs = defVal;
@@ -1633,7 +1632,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
}
set.close();
stmt.close();
conn.offerQueryStatement(stmt);
slowLog(s, sql);
return rs;
} catch (SQLException se) {
@@ -1670,7 +1669,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
rs.put((K) (smallint ? set.getShort(1) : set.getObject(1)), (N) set.getObject(2));
}
set.close();
stmt.close();
conn.offerQueryStatement(stmt);
slowLog(s, sql);
return rs;
} catch (SQLException e) {
@@ -1710,7 +1709,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
logger.finest(info.getType().getSimpleName() + " queryColumnMap sql=" + sql);
}
if (stmt != null) {
stmt.close();
conn.offerQueryStatement(stmt);
}
stmt = conn.createQueryStatement();
ResultSet set = stmt.executeQuery(sql);
@@ -1720,7 +1719,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
rs.put((K) (smallint ? set.getShort(1) : set.getObject(1)), (N) set.getObject(2));
}
set.close();
stmt.close();
conn.offerQueryStatement(stmt);
slowLog(s, sql);
return rs;
} catch (SQLException se) {
@@ -1772,7 +1771,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
rs.put(keys, vals);
}
set.close();
stmt.close();
conn.offerQueryStatement(stmt);
slowLog(s, sql);
return rs;
} catch (SQLException e) {
@@ -1812,7 +1811,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
logger.finest(info.getType().getSimpleName() + " queryColumnMap sql=" + sql);
}
if (stmt != null) {
stmt.close();
conn.offerQueryStatement(stmt);
}
stmt = conn.createQueryStatement();
ResultSet set = stmt.executeQuery(sql);
@@ -1822,7 +1821,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
rs.put((K) (smallint ? set.getShort(1) : set.getObject(1)), (N) set.getObject(2));
}
set.close();
stmt.close();
conn.offerQueryStatement(stmt);
slowLog(s, sql);
return rs;
} catch (SQLException se) {
@@ -1868,11 +1867,10 @@ public class DataJdbcSource extends AbstractDataSqlSource {
final DataResultSet set = createDataResultSet(info, prestmt.executeQuery());
T rs = set.next() ? info.getFullEntityValue(set) : null;
set.close();
prestmt.close();
conn.offerQueryStatement(prestmt);
slowLog(s, prepareSQL);
return rs;
} catch (SQLException e) {
e.printStackTrace();
if (isTableNotExist(info, e.getSQLState())) {
return null;
}
@@ -1901,7 +1899,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
final DataResultSet set = createDataResultSet(info, prestmt.executeQuery());
T rs = set.next() ? selects == null ? info.getFullEntityValue(set) : info.getEntityValue(selects, set) : null;
set.close();
prestmt.close();
conn.offerQueryStatement(prestmt);
slowLog(s, sql);
return rs;
} catch (SQLException e) {
@@ -1940,14 +1938,14 @@ public class DataJdbcSource extends AbstractDataSqlSource {
logger.finest(info.getType().getSimpleName() + " find sql=" + sql);
}
if (prestmt != null) {
prestmt.close();
conn.offerQueryStatement(prestmt);
}
prestmt = conn.prepareQueryStatement(sql);
prestmt.setFetchSize(1);
final DataResultSet set = createDataResultSet(info, prestmt.executeQuery());
T rs = set.next() ? selects == null ? info.getFullEntityValue(set) : info.getEntityValue(selects, set) : null;
set.close();
prestmt.close();
conn.offerQueryStatement(prestmt);
slowLog(s, sql);
return rs;
} catch (SQLException se) {
@@ -1984,7 +1982,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
val = info.getFieldValue(attr, set, 1);
}
set.close();
prestmt.close();
conn.offerQueryStatement(prestmt);
slowLog(s, sql);
return val == null ? defValue : val;
} catch (SQLException e) {
@@ -2023,7 +2021,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
logger.finest(info.getType().getSimpleName() + " findColumn sql=" + sql);
}
if (prestmt != null) {
prestmt.close();
conn.offerQueryStatement(prestmt);
}
prestmt = conn.prepareQueryStatement(sql);
prestmt.setFetchSize(1);
@@ -2033,7 +2031,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
val = info.getFieldValue(attr, set, 1);
}
set.close();
prestmt.close();
conn.offerQueryStatement(prestmt);
slowLog(s, sql);
return val == null ? defValue : val;
} catch (SQLException se) {
@@ -2065,7 +2063,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
final ResultSet set = prestmt.executeQuery();
boolean rs = set.next() ? (set.getInt(1) > 0) : false;
set.close();
prestmt.close();
conn.offerQueryStatement(prestmt);
if (info.isLoggable(logger, Level.FINEST, sql)) {
logger.finest(info.getType().getSimpleName() + " exists (" + rs + ") sql=" + sql);
}
@@ -2107,13 +2105,13 @@ public class DataJdbcSource extends AbstractDataSqlSource {
logger.finest(info.getType().getSimpleName() + " exists sql=" + sql);
}
if (prestmt != null) {
// prestmt.close();
conn.offerQueryStatement(prestmt);
}
prestmt = conn.prepareQueryStatement(sql);
final ResultSet set = prestmt.executeQuery();
boolean rs = set.next() ? (set.getInt(1) > 0) : false;
set.close();
prestmt.close();
conn.offerQueryStatement(prestmt);
if (info.isLoggable(logger, Level.FINEST, sql)) {
logger.finest(info.getType().getSimpleName() + " exists (" + rs + ") sql=" + sql);
}
@@ -2157,7 +2155,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
set.close();
}
prestmt.close();
conn.offerQueryStatement(prestmt);
slowLog(s, prepareSQL);
return list;
} catch (SQLException se) {
@@ -2205,7 +2203,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
list.add(getEntityValue(info, null, rr));
}
set.close();
prestmt.close();
conn.offerQueryStatement(prestmt);
slowLog(s, prepareSQL);
return Sheet.asSheet(list);
} catch (SQLException se) {
@@ -2311,7 +2309,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
list.add(getEntityValue(info, sels, rr));
}
set.close();
prestmt.close();
conn.offerQueryStatement(prestmt);
long total = list.size();
if (needTotal) {
prestmt = conn.prepareQueryStatement(countSql);
@@ -2320,7 +2318,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
total = set.getLong(1);
}
set.close();
prestmt.close();
conn.offerQueryStatement(prestmt);
}
slowLog(s, listSql);
return new Sheet<>(total, list);
@@ -2359,7 +2357,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
total = set.getRow();
}
set.close();
prestmt.close();
conn.offerQueryStatement(prestmt);
slowLog(s, listSql);
return new Sheet<>(total, list);
}
@@ -2494,7 +2492,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
for (String sql : sqls) {
rs[++i] = stmt.execute(sql) ? 1 : 0;
}
stmt.close();
conn.offerUpdateStatement(stmt);
conn.commit();
slowLog(s, sqls);
return rs;
@@ -2535,7 +2533,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
final ResultSet set = stmt.executeQuery(sql);// prestmt.executeQuery();
V rs = handler.apply(createDataResultSet(null, set));
set.close();
stmt.close();
conn.offerQueryStatement(stmt);
slowLog(s, sql);
return rs;
} catch (Exception ex) {
@@ -2692,7 +2690,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
protected int maxConns;
protected Semaphore canNewSemaphore;
protected Semaphore maxSemaphore;
protected String url;
@@ -2707,7 +2705,7 @@ public class DataJdbcSource extends AbstractDataSqlSource {
defMaxConns = Math.min(1000, Utility.cpus() * 100);
}
this.maxConns = Math.max(1, Integer.decode(prop.getProperty(DATA_SOURCE_MAXCONNS, "" + defMaxConns)));
this.canNewSemaphore = new Semaphore(this.maxConns);
this.maxSemaphore = new Semaphore(this.maxConns);
this.queue = new ArrayBlockingQueue<>(maxConns);
this.url = prop.getProperty(DATA_SOURCE_URL);
String username = prop.getProperty(DATA_SOURCE_USER, "");
@@ -2790,21 +2788,21 @@ public class DataJdbcSource extends AbstractDataSqlSource {
private void changeMaxConns(int newMaxconns) {
ArrayBlockingQueue<SourceConnection> newQueue = new ArrayBlockingQueue<>(newMaxconns);
ArrayBlockingQueue<SourceConnection> oldQueue = this.queue;
Semaphore oldSemaphore = this.canNewSemaphore;
Semaphore oldSemaphore = this.maxSemaphore;
this.queue = newQueue;
this.maxConns = newMaxconns;
this.canNewSemaphore = new Semaphore(this.maxConns);
this.maxSemaphore = new Semaphore(this.maxConns);
SourceConnection c;
while ((c = oldQueue.poll()) != null) {
c.version = -1;
offerConnection(c, oldSemaphore);
offerConnection(c, oldSemaphore, this.queue);
}
}
public SourceConnection pollConnection() {
SourceConnection conn = queue.poll();
if (conn == null) {
return newConnection();
return newConnection(this.queue);
}
usingCounter.increment();
if (checkValid(conn)) {
@@ -2814,11 +2812,28 @@ public class DataJdbcSource extends AbstractDataSqlSource {
offerConnection(conn);
conn = null;
}
return newConnection();
return newConnection(this.queue);
}
private SourceConnection newConnection() {
Semaphore semaphore = this.canNewSemaphore;
//用于事务的连接
public SourceConnection pollTransConnection() {
SourceConnection conn = queue.poll();
if (conn == null) {
return newConnection(this.queue);
}
usingCounter.increment();
if (checkValid(conn)) {
cycleCounter.increment();
return conn;
} else {
offerConnection(conn);
conn = null;
}
return newConnection(this.queue);
}
private SourceConnection newConnection(ArrayBlockingQueue<SourceConnection> queue) {
Semaphore semaphore = this.maxSemaphore;
SourceConnection conn = null;
if (semaphore.tryAcquire()) {
try {
@@ -2843,10 +2858,14 @@ public class DataJdbcSource extends AbstractDataSqlSource {
}
public <C> void offerConnection(final C connection) {
offerConnection(connection, this.canNewSemaphore);
offerConnection(connection, this.maxSemaphore, this.queue);
}
private <C> void offerConnection(final C connection, Semaphore semaphore) {
public <C> void offerTransConnection(final C connection) {
offerConnection(connection, this.maxSemaphore, this.queue);
}
private <C> void offerConnection(final C connection, Semaphore semaphore, Queue<SourceConnection> queue) {
SourceConnection conn = (SourceConnection) connection;
if (conn != null) {
try {
@@ -2904,22 +2923,42 @@ public class DataJdbcSource extends AbstractDataSqlSource {
return statement;
}
public void offerStreamStatement(final Statement stmt) throws SQLException {
stmt.close();
}
public Statement createQueryStatement() throws SQLException {
return conn.createStatement();
}
public void offerQueryStatement(final Statement stmt) throws SQLException {
stmt.close();
}
public Statement createUpdateStatement() throws SQLException {
return conn.createStatement();
}
public void offerUpdateStatement(final Statement stmt) throws SQLException {
stmt.close();
}
public PreparedStatement prepareQueryStatement(String sql) throws SQLException {
return conn.prepareStatement(sql);
}
public void offerQueryStatement(final PreparedStatement stmt) throws SQLException {
stmt.close();
}
public PreparedStatement prepareUpdateStatement(String sql) throws SQLException {
return conn.prepareStatement(sql);
}
public void offerUpdateStatement(final PreparedStatement stmt) throws SQLException {
stmt.close();
}
public void setAutoCommit(boolean autoCommit) throws SQLException {
conn.setAutoCommit(autoCommit);
}