From ede8f40b0d16adac384ac3fc06d04428cca72119 Mon Sep 17 00:00:00 2001 From: redkale Date: Fri, 21 Jul 2023 14:03:48 +0800 Subject: [PATCH] =?UTF-8?q?DataJdbcSource=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/source/DataJdbcSource.java | 243 ++++++++++-------- 1 file changed, 141 insertions(+), 102 deletions(-) diff --git a/src/main/java/org/redkale/source/DataJdbcSource.java b/src/main/java/org/redkale/source/DataJdbcSource.java index 4b4c5591e..75fcd9cc0 100644 --- a/src/main/java/org/redkale/source/DataJdbcSource.java +++ b/src/main/java/org/redkale/source/DataJdbcSource.java @@ -217,8 +217,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { int c = 0; SourceConnection conn = null; try { - conn = writePool.pollConnection(); - conn.setAutoCommit(false); + conn = writePool.pollTransConnection(); for (BatchAction action : dataBatch.actions) { if (action instanceof InsertBatchAction1) { InsertBatchAction1 act = (InsertBatchAction1) action; @@ -293,7 +292,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { throw new SourceException(e); } finally { if (conn != null) { - writePool.offerConnection(conn); + writePool.offerTransConnection(conn); } } } @@ -355,7 +354,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { c1 += cc; } c = c1; - prestmt.close(); + conn.offerUpdateStatement(prestmt); } else { //分库分表 int c1 = 0; for (PreparedStatement stmt : prestmts) { @@ -365,9 +364,9 @@ public class DataJdbcSource extends AbstractDataSqlSource { } } c = c1; - //for (PreparedStatement stmt : prestmts) { - // stmt.close(); - //} + for (PreparedStatement stmt : prestmts) { + conn.offerUpdateStatement(stmt); + } } if (!batch) { conn.commit(); @@ -394,7 +393,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } stmt.executeBatch(); } - stmt.close(); + conn.offerUpdateStatement(stmt); } else { //分库分表 info.disTableLock().lock(); try { @@ -414,7 +413,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { stmt.addBatch(copySql); } stmt.executeBatch(); - stmt.close(); + conn.offerUpdateStatement(stmt); } catch (SQLException sqle) { //多进程并发时可能会出现重复建表 if (isTableNotExist(info, sqle.getSQLState())) { if (newCatalogs.isEmpty()) { //分表的原始表不存在 @@ -430,14 +429,14 @@ public class DataJdbcSource extends AbstractDataSqlSource { } stmt.executeBatch(); } - stmt.close(); + conn.offerUpdateStatement(stmt); //再执行一遍创建分表操作 stmt = conn.createUpdateStatement(); for (String copySql : tableCopys) { stmt.addBatch(copySql); } stmt.executeBatch(); - stmt.close(); + conn.offerUpdateStatement(stmt); } } else { //需要先建库 Statement stmt; @@ -447,7 +446,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { stmt.addBatch(("postgresql".equals(dbtype()) ? "CREATE SCHEMA IF NOT EXISTS " : "CREATE DATABASE IF NOT EXISTS ") + newCatalog); } stmt.executeBatch(); - stmt.close(); + conn.offerUpdateStatement(stmt); } catch (SQLException sqle1) { logger.log(Level.SEVERE, "create database " + tableCopys + " error", sqle1); } @@ -458,7 +457,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { stmt.addBatch(copySql); } stmt.executeBatch(); - stmt.close(); + conn.offerUpdateStatement(stmt); } catch (SQLException sqle2) { if (isTableNotExist(info, sqle2.getSQLState())) { String[] tableSqls = createTableSqls(info); @@ -472,14 +471,14 @@ public class DataJdbcSource extends AbstractDataSqlSource { } stmt.executeBatch(); } - stmt.close(); + conn.offerUpdateStatement(stmt); //再执行一遍创建分表操作 stmt = conn.createUpdateStatement(); for (String copySql : tableCopys) { stmt.addBatch(copySql); } stmt.executeBatch(); - stmt.close(); + conn.offerUpdateStatement(stmt); } } else { logger.log(Level.SEVERE, "create table2 " + tableCopys + " error", sqle2); @@ -493,7 +492,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } } if (info.getTableStrategy() == null) { //单库单表 - prestmt.close(); + conn.offerUpdateStatement(prestmt); prestmt = prepareInsertEntityStatement(conn, presql, info, entitys); int c1 = 0; int[] cs = prestmt.executeBatch(); @@ -501,11 +500,11 @@ public class DataJdbcSource extends AbstractDataSqlSource { c1 += cc; } c = c1; - prestmt.close(); + conn.offerUpdateStatement(prestmt); } else { //分库分表 - //for (PreparedStatement stmt : prestmts) { - // stmt.close(); - //} + for (PreparedStatement stmt : prestmts) { + conn.offerUpdateStatement(stmt); + } prestmts = prepareInsertEntityStatements(conn, info, prepareInfos, entitys); int c1 = 0; for (PreparedStatement stmt : prestmts) { @@ -515,9 +514,9 @@ public class DataJdbcSource extends AbstractDataSqlSource { } } c = c1; - //for (PreparedStatement stmt : prestmts) { - // stmt.close(); - //} + for (PreparedStatement stmt : prestmts) { + conn.offerUpdateStatement(stmt); + } } } //------------------------------------------------------------ @@ -620,7 +619,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { if (sqls.length == 1) { final Statement stmt = conn.createUpdateStatement(); int c1 = stmt.executeUpdate(sqls[0]); - stmt.close(); + conn.offerUpdateStatement(stmt); c = c1; } else { final Statement stmt = conn.createUpdateStatement(); @@ -629,7 +628,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } int c1 = 0; int[] cs = stmt.executeBatch(); - stmt.close(); + conn.offerUpdateStatement(stmt); for (int cc : cs) { c1 += cc; } @@ -660,7 +659,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } stmt.executeBatch(); } - stmt.close(); + conn.offerUpdateStatement(stmt); return 0; } //单表结构不存在 @@ -703,7 +702,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } int c = 0; int[] cs = stmt.executeBatch(); - stmt.close(); + conn.offerUpdateStatement(stmt); for (int cc : cs) { c += cc; } @@ -737,7 +736,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { if (sqls.length == 1) { final Statement stmt = conn.createUpdateStatement(); int c1 = stmt.executeUpdate(sqls[0]); - stmt.close(); + conn.offerUpdateStatement(stmt); c = c1; } else { final Statement stmt = conn.createUpdateStatement(); @@ -746,7 +745,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } int c1 = 0; int[] cs = stmt.executeBatch(); - stmt.close(); + conn.offerUpdateStatement(stmt); for (int cc : cs) { c1 += cc; } @@ -798,7 +797,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } int c = 0; int[] cs = stmt.executeBatch(); - stmt.close(); + conn.offerUpdateStatement(stmt); for (int cc : cs) { c += cc; } @@ -843,7 +842,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { if (sqls.length == 1) { stmt = conn.createUpdateStatement(); int c1 = stmt.executeUpdate(sqls[0]); - stmt.close(); + conn.offerUpdateStatement(stmt); c = c1; } else { stmt = conn.createUpdateStatement(); @@ -852,7 +851,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } int c1 = 0; int[] cs = stmt.executeBatch(); - stmt.close(); + conn.offerUpdateStatement(stmt); for (int cc : cs) { c1 += cc; } @@ -879,14 +878,14 @@ public class DataJdbcSource extends AbstractDataSqlSource { } stmt.executeBatch(); } - stmt.close(); + conn.offerUpdateStatement(stmt); //再执行一遍创建分表操作 if (info.isLoggable(logger, Level.FINEST, copyTableSql)) { logger.finest(info.getType().getSimpleName() + " createTable sql=" + copyTableSql); } stmt = conn.createUpdateStatement(); c = stmt.executeUpdate(copyTableSql); - stmt.close(); + conn.offerUpdateStatement(stmt); } else { //需要先建库 String newCatalog = newTable.substring(0, newTable.indexOf('.')); @@ -897,7 +896,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } stmt = conn.createUpdateStatement(); stmt.executeUpdate(catalogSql); - stmt.close(); + conn.offerUpdateStatement(stmt); } catch (SQLException sqle1) { logger.log(Level.SEVERE, "create database " + copyTableSql + " error", sqle1); } @@ -908,7 +907,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } stmt = conn.createUpdateStatement(); c = stmt.executeUpdate(copyTableSql); - stmt.close(); + conn.offerUpdateStatement(stmt); } catch (SQLException sqle2) { if (isTableNotExist(info, sqle2.getSQLState())) { if (info.isLoggable(logger, Level.FINEST, sqls[0])) { @@ -924,14 +923,14 @@ public class DataJdbcSource extends AbstractDataSqlSource { } stmt.executeBatch(); } - stmt.close(); + conn.offerUpdateStatement(stmt); //再执行一遍创建分表操作 if (info.isLoggable(logger, Level.FINEST, copyTableSql)) { logger.finest(info.getType().getSimpleName() + " createTable sql=" + copyTableSql); } stmt = conn.createUpdateStatement(); c = stmt.executeUpdate(copyTableSql); - stmt.close(); + conn.offerUpdateStatement(stmt); } else { throw new SourceException(sqle2); } @@ -970,7 +969,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { if (sqls.length == 1) { final Statement stmt = conn.createUpdateStatement(); int c1 = stmt.executeUpdate(sqls[0]); - stmt.close(); + conn.offerUpdateStatement(stmt); c = c1; } else { final Statement stmt = conn.createUpdateStatement(); @@ -979,7 +978,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } int c1 = 0; int[] cs = stmt.executeBatch(); - stmt.close(); + conn.offerUpdateStatement(stmt); for (int cc : cs) { c1 += cc; } @@ -1031,7 +1030,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } int c = 0; int[] cs = stmt.executeBatch(); - stmt.close(); + conn.offerUpdateStatement(stmt); for (int cc : cs) { c += cc; } @@ -1120,7 +1119,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } } c = c1; - prestmt.close(); + conn.offerUpdateStatement(prestmt); } else { prepareInfos = getUpdateQuestionPrepareInfo(info, entitys); prestmts = prepareUpdateEntityStatements(conn, info, prepareInfos, entitys); @@ -1132,9 +1131,9 @@ public class DataJdbcSource extends AbstractDataSqlSource { } } c = c1; - //for (PreparedStatement stmt : prestmts) { - // stmt.close(); - //} + for (PreparedStatement stmt : prestmts) { + conn.offerUpdateStatement(stmt); + } } if (!batch) { conn.commit(); @@ -1157,7 +1156,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } stmt.executeBatch(); } - stmt.close(); + conn.offerUpdateStatement(stmt); } catch (SQLException e2) { } } @@ -1168,9 +1167,9 @@ public class DataJdbcSource extends AbstractDataSqlSource { if (prepareInfos == null) { throw se; } - //for (PreparedStatement stmt : prestmts) { - // stmt.close(); - //} + for (PreparedStatement stmt : prestmts) { + conn.offerUpdateStatement(stmt); + } String[] oldTables = prepareInfos.keySet().toArray(new String[prepareInfos.size()]); List notExistTables = checkNotExistTables(conn, oldTables); @@ -1195,9 +1194,9 @@ public class DataJdbcSource extends AbstractDataSqlSource { } } c = c1; - //for (PreparedStatement stmt : prestmts) { - // stmt.close(); - //} + for (PreparedStatement stmt : prestmts) { + conn.offerUpdateStatement(stmt); + } conn.commit(); } } else { @@ -1316,7 +1315,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { logger.finest(info.getType().getSimpleName() + " updateColumn sql=" + sql.sql); } c = prestmt.executeUpdate(); - prestmt.close(); + conn.offerUpdateStatement(prestmt); if (!batch) { conn.commit(); } @@ -1344,12 +1343,12 @@ public class DataJdbcSource extends AbstractDataSqlSource { logger.finest(info.getType().getSimpleName() + " updateColumn sql=" + Arrays.toString(sqls)); } int c1 = 0; - for (PreparedStatement prestmt : prestmts) { - int[] cs = prestmt.executeBatch(); + for (PreparedStatement stmt : prestmts) { + int[] cs = stmt.executeBatch(); for (int cc : cs) { c1 += cc; } - prestmt.close(); + conn.offerUpdateStatement(stmt); } c = c1; if (!batch) { @@ -1364,7 +1363,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } final Statement stmt = conn.createUpdateStatement(); c = stmt.executeUpdate(sql.sql); - stmt.close(); + conn.offerUpdateStatement(stmt); if (!batch) { conn.commit(); } @@ -1389,7 +1388,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } stmt.executeBatch(); } - stmt.close(); + conn.offerUpdateStatement(stmt); } catch (SQLException e2) { } } @@ -1437,12 +1436,12 @@ public class DataJdbcSource extends AbstractDataSqlSource { logger.finest(info.getType().getSimpleName() + " updateColumn sql=" + Arrays.toString(sqls)); } int c1 = 0; - for (PreparedStatement prestmt : prestmts) { - int[] cs = prestmt.executeBatch(); + for (PreparedStatement stmt : prestmts) { + int[] cs = stmt.executeBatch(); for (int cc : cs) { c1 += cc; } - prestmt.close(); + conn.offerUpdateStatement(stmt); } c = c1; if (!batch) { @@ -1486,7 +1485,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } } set.close(); - stmt.close(); + conn.offerQueryStatement(stmt); slowLog(s, sql); return map; } catch (SQLException e) { @@ -1526,7 +1525,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { logger.finest(info.getType().getSimpleName() + " getNumberMap sql=" + sql); } if (stmt != null) { - stmt.close(); + conn.offerQueryStatement(stmt); } stmt = conn.createQueryStatement(); ResultSet set = stmt.executeQuery(sql); @@ -1544,7 +1543,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } } set.close(); - stmt.close(); + conn.offerQueryStatement(stmt); slowLog(s, sql); return map; } catch (SQLException se) { @@ -1582,7 +1581,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } } set.close(); - stmt.close(); + conn.offerQueryStatement(stmt); slowLog(s, sql); return rs; } catch (SQLException e) { @@ -1621,7 +1620,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { logger.finest(info.getType().getSimpleName() + " getNumberResult sql=" + sql); } if (stmt != null) { - stmt.close(); + conn.offerQueryStatement(stmt); } stmt = conn.createQueryStatement(); Number rs = defVal; @@ -1633,7 +1632,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } } set.close(); - stmt.close(); + conn.offerQueryStatement(stmt); slowLog(s, sql); return rs; } catch (SQLException se) { @@ -1670,7 +1669,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { rs.put((K) (smallint ? set.getShort(1) : set.getObject(1)), (N) set.getObject(2)); } set.close(); - stmt.close(); + conn.offerQueryStatement(stmt); slowLog(s, sql); return rs; } catch (SQLException e) { @@ -1710,7 +1709,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { logger.finest(info.getType().getSimpleName() + " queryColumnMap sql=" + sql); } if (stmt != null) { - stmt.close(); + conn.offerQueryStatement(stmt); } stmt = conn.createQueryStatement(); ResultSet set = stmt.executeQuery(sql); @@ -1720,7 +1719,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { rs.put((K) (smallint ? set.getShort(1) : set.getObject(1)), (N) set.getObject(2)); } set.close(); - stmt.close(); + conn.offerQueryStatement(stmt); slowLog(s, sql); return rs; } catch (SQLException se) { @@ -1772,7 +1771,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { rs.put(keys, vals); } set.close(); - stmt.close(); + conn.offerQueryStatement(stmt); slowLog(s, sql); return rs; } catch (SQLException e) { @@ -1812,7 +1811,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { logger.finest(info.getType().getSimpleName() + " queryColumnMap sql=" + sql); } if (stmt != null) { - stmt.close(); + conn.offerQueryStatement(stmt); } stmt = conn.createQueryStatement(); ResultSet set = stmt.executeQuery(sql); @@ -1822,7 +1821,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { rs.put((K) (smallint ? set.getShort(1) : set.getObject(1)), (N) set.getObject(2)); } set.close(); - stmt.close(); + conn.offerQueryStatement(stmt); slowLog(s, sql); return rs; } catch (SQLException se) { @@ -1868,11 +1867,10 @@ public class DataJdbcSource extends AbstractDataSqlSource { final DataResultSet set = createDataResultSet(info, prestmt.executeQuery()); T rs = set.next() ? info.getFullEntityValue(set) : null; set.close(); - prestmt.close(); + conn.offerQueryStatement(prestmt); slowLog(s, prepareSQL); return rs; } catch (SQLException e) { - e.printStackTrace(); if (isTableNotExist(info, e.getSQLState())) { return null; } @@ -1901,7 +1899,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { final DataResultSet set = createDataResultSet(info, prestmt.executeQuery()); T rs = set.next() ? selects == null ? info.getFullEntityValue(set) : info.getEntityValue(selects, set) : null; set.close(); - prestmt.close(); + conn.offerQueryStatement(prestmt); slowLog(s, sql); return rs; } catch (SQLException e) { @@ -1940,14 +1938,14 @@ public class DataJdbcSource extends AbstractDataSqlSource { logger.finest(info.getType().getSimpleName() + " find sql=" + sql); } if (prestmt != null) { - prestmt.close(); + conn.offerQueryStatement(prestmt); } prestmt = conn.prepareQueryStatement(sql); prestmt.setFetchSize(1); final DataResultSet set = createDataResultSet(info, prestmt.executeQuery()); T rs = set.next() ? selects == null ? info.getFullEntityValue(set) : info.getEntityValue(selects, set) : null; set.close(); - prestmt.close(); + conn.offerQueryStatement(prestmt); slowLog(s, sql); return rs; } catch (SQLException se) { @@ -1984,7 +1982,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { val = info.getFieldValue(attr, set, 1); } set.close(); - prestmt.close(); + conn.offerQueryStatement(prestmt); slowLog(s, sql); return val == null ? defValue : val; } catch (SQLException e) { @@ -2023,7 +2021,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { logger.finest(info.getType().getSimpleName() + " findColumn sql=" + sql); } if (prestmt != null) { - prestmt.close(); + conn.offerQueryStatement(prestmt); } prestmt = conn.prepareQueryStatement(sql); prestmt.setFetchSize(1); @@ -2033,7 +2031,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { val = info.getFieldValue(attr, set, 1); } set.close(); - prestmt.close(); + conn.offerQueryStatement(prestmt); slowLog(s, sql); return val == null ? defValue : val; } catch (SQLException se) { @@ -2065,7 +2063,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { final ResultSet set = prestmt.executeQuery(); boolean rs = set.next() ? (set.getInt(1) > 0) : false; set.close(); - prestmt.close(); + conn.offerQueryStatement(prestmt); if (info.isLoggable(logger, Level.FINEST, sql)) { logger.finest(info.getType().getSimpleName() + " exists (" + rs + ") sql=" + sql); } @@ -2107,13 +2105,13 @@ public class DataJdbcSource extends AbstractDataSqlSource { logger.finest(info.getType().getSimpleName() + " exists sql=" + sql); } if (prestmt != null) { - // prestmt.close(); + conn.offerQueryStatement(prestmt); } prestmt = conn.prepareQueryStatement(sql); final ResultSet set = prestmt.executeQuery(); boolean rs = set.next() ? (set.getInt(1) > 0) : false; set.close(); - prestmt.close(); + conn.offerQueryStatement(prestmt); if (info.isLoggable(logger, Level.FINEST, sql)) { logger.finest(info.getType().getSimpleName() + " exists (" + rs + ") sql=" + sql); } @@ -2157,7 +2155,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { } set.close(); } - prestmt.close(); + conn.offerQueryStatement(prestmt); slowLog(s, prepareSQL); return list; } catch (SQLException se) { @@ -2205,7 +2203,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { list.add(getEntityValue(info, null, rr)); } set.close(); - prestmt.close(); + conn.offerQueryStatement(prestmt); slowLog(s, prepareSQL); return Sheet.asSheet(list); } catch (SQLException se) { @@ -2311,7 +2309,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { list.add(getEntityValue(info, sels, rr)); } set.close(); - prestmt.close(); + conn.offerQueryStatement(prestmt); long total = list.size(); if (needTotal) { prestmt = conn.prepareQueryStatement(countSql); @@ -2320,7 +2318,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { total = set.getLong(1); } set.close(); - prestmt.close(); + conn.offerQueryStatement(prestmt); } slowLog(s, listSql); return new Sheet<>(total, list); @@ -2359,7 +2357,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { total = set.getRow(); } set.close(); - prestmt.close(); + conn.offerQueryStatement(prestmt); slowLog(s, listSql); return new Sheet<>(total, list); } @@ -2494,7 +2492,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { for (String sql : sqls) { rs[++i] = stmt.execute(sql) ? 1 : 0; } - stmt.close(); + conn.offerUpdateStatement(stmt); conn.commit(); slowLog(s, sqls); return rs; @@ -2535,7 +2533,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { final ResultSet set = stmt.executeQuery(sql);// prestmt.executeQuery(); V rs = handler.apply(createDataResultSet(null, set)); set.close(); - stmt.close(); + conn.offerQueryStatement(stmt); slowLog(s, sql); return rs; } catch (Exception ex) { @@ -2692,7 +2690,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { protected int maxConns; - protected Semaphore canNewSemaphore; + protected Semaphore maxSemaphore; protected String url; @@ -2707,7 +2705,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { defMaxConns = Math.min(1000, Utility.cpus() * 100); } this.maxConns = Math.max(1, Integer.decode(prop.getProperty(DATA_SOURCE_MAXCONNS, "" + defMaxConns))); - this.canNewSemaphore = new Semaphore(this.maxConns); + this.maxSemaphore = new Semaphore(this.maxConns); this.queue = new ArrayBlockingQueue<>(maxConns); this.url = prop.getProperty(DATA_SOURCE_URL); String username = prop.getProperty(DATA_SOURCE_USER, ""); @@ -2790,21 +2788,21 @@ public class DataJdbcSource extends AbstractDataSqlSource { private void changeMaxConns(int newMaxconns) { ArrayBlockingQueue newQueue = new ArrayBlockingQueue<>(newMaxconns); ArrayBlockingQueue oldQueue = this.queue; - Semaphore oldSemaphore = this.canNewSemaphore; + Semaphore oldSemaphore = this.maxSemaphore; this.queue = newQueue; this.maxConns = newMaxconns; - this.canNewSemaphore = new Semaphore(this.maxConns); + this.maxSemaphore = new Semaphore(this.maxConns); SourceConnection c; while ((c = oldQueue.poll()) != null) { c.version = -1; - offerConnection(c, oldSemaphore); + offerConnection(c, oldSemaphore, this.queue); } } public SourceConnection pollConnection() { SourceConnection conn = queue.poll(); if (conn == null) { - return newConnection(); + return newConnection(this.queue); } usingCounter.increment(); if (checkValid(conn)) { @@ -2814,11 +2812,28 @@ public class DataJdbcSource extends AbstractDataSqlSource { offerConnection(conn); conn = null; } - return newConnection(); + return newConnection(this.queue); } - private SourceConnection newConnection() { - Semaphore semaphore = this.canNewSemaphore; + //用于事务的连接 + public SourceConnection pollTransConnection() { + SourceConnection conn = queue.poll(); + if (conn == null) { + return newConnection(this.queue); + } + usingCounter.increment(); + if (checkValid(conn)) { + cycleCounter.increment(); + return conn; + } else { + offerConnection(conn); + conn = null; + } + return newConnection(this.queue); + } + + private SourceConnection newConnection(ArrayBlockingQueue queue) { + Semaphore semaphore = this.maxSemaphore; SourceConnection conn = null; if (semaphore.tryAcquire()) { try { @@ -2843,10 +2858,14 @@ public class DataJdbcSource extends AbstractDataSqlSource { } public void offerConnection(final C connection) { - offerConnection(connection, this.canNewSemaphore); + offerConnection(connection, this.maxSemaphore, this.queue); } - private void offerConnection(final C connection, Semaphore semaphore) { + public void offerTransConnection(final C connection) { + offerConnection(connection, this.maxSemaphore, this.queue); + } + + private void offerConnection(final C connection, Semaphore semaphore, Queue queue) { SourceConnection conn = (SourceConnection) connection; if (conn != null) { try { @@ -2904,22 +2923,42 @@ public class DataJdbcSource extends AbstractDataSqlSource { return statement; } + public void offerStreamStatement(final Statement stmt) throws SQLException { + stmt.close(); + } + public Statement createQueryStatement() throws SQLException { return conn.createStatement(); } + public void offerQueryStatement(final Statement stmt) throws SQLException { + stmt.close(); + } + public Statement createUpdateStatement() throws SQLException { return conn.createStatement(); } + public void offerUpdateStatement(final Statement stmt) throws SQLException { + stmt.close(); + } + public PreparedStatement prepareQueryStatement(String sql) throws SQLException { return conn.prepareStatement(sql); } + public void offerQueryStatement(final PreparedStatement stmt) throws SQLException { + stmt.close(); + } + public PreparedStatement prepareUpdateStatement(String sql) throws SQLException { return conn.prepareStatement(sql); } + public void offerUpdateStatement(final PreparedStatement stmt) throws SQLException { + stmt.close(); + } + public void setAutoCommit(boolean autoCommit) throws SQLException { conn.setAutoCommit(autoCommit); }