diff --git a/src/main/java/org/redkale/source/AbstractDataSqlSource.java b/src/main/java/org/redkale/source/AbstractDataSqlSource.java index b18e92c13..51f53682e 100644 --- a/src/main/java/org/redkale/source/AbstractDataSqlSource.java +++ b/src/main/java/org/redkale/source/AbstractDataSqlSource.java @@ -300,7 +300,7 @@ public abstract class AbstractDataSqlSource extends AbstractDataSource implement if (pos > 0) { url = url.substring(0, pos) + "..."; } - return getClass().getSimpleName() + "{url=" + url + "}"; + return getClass().getSimpleName() + "{url=" + url + ", maxconns=" + readMaxConns() + "}"; } else { String readUrl = readConfProps.getProperty(DATA_SOURCE_URL); int pos = readUrl.indexOf('?'); @@ -312,10 +312,14 @@ public abstract class AbstractDataSqlSource extends AbstractDataSource implement if (pos > 0) { writeUrl = writeUrl.substring(0, pos) + "..."; } - return getClass().getSimpleName() + "{readurl=" + readUrl + ",writeurl=" + writeUrl + "}"; + return getClass().getSimpleName() + "{read-url=" + readUrl + ", read-maxconns=" + readMaxConns() + ",write-url=" + writeUrl + ", write-maxconns=" + writeMaxConns() + "}"; } } + protected abstract int readMaxConns(); + + protected abstract int writeMaxConns(); + //生成创建表的SQL protected String[] createTableSqls(EntityInfo info) { if (info == null || !autoDDL) { diff --git a/src/main/java/org/redkale/source/DataJdbcSource.java b/src/main/java/org/redkale/source/DataJdbcSource.java index 2fd0b37a3..5883bb470 100644 --- a/src/main/java/org/redkale/source/DataJdbcSource.java +++ b/src/main/java/org/redkale/source/DataJdbcSource.java @@ -67,6 +67,16 @@ public class DataJdbcSource extends AbstractDataSqlSource { this.writePool.onResourceChange(events); } + @Override + protected int readMaxConns() { + return this.readPool.maxConns; + } + + @Override + protected int writeMaxConns() { + return this.writePool.maxConns; + } + @Override public void destroy(AnyValue config) { if (readPool != null) { @@ -2694,8 +2704,6 @@ public class DataJdbcSource extends AbstractDataSqlSource { protected final LongAdder cycleCounter = new LongAdder(); //已复用连接数 - protected final LongAdder waitingCounter = new LongAdder(); //可用中连接数 - protected final java.sql.Driver driver; protected final Properties connectAttrs; @@ -2706,6 +2714,8 @@ public class DataJdbcSource extends AbstractDataSqlSource { protected int maxConns; + protected Semaphore canNewSemaphore; + protected String url; protected int urlVersion; @@ -2713,7 +2723,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { protected Properties clientInfo = new Properties(); public ConnectionPool(Properties prop) { - this.connectTimeoutSeconds = Integer.decode(prop.getProperty(DATA_SOURCE_CONNECTTIMEOUT_SECONDS, "6")); + this.connectTimeoutSeconds = Integer.decode(prop.getProperty(DATA_SOURCE_CONNECTTIMEOUT_SECONDS, "30")); int defMaxConns = Utility.cpus() * 4; if (workExecutor instanceof ThreadPoolExecutor) { defMaxConns = ((ThreadPoolExecutor) workExecutor).getCorePoolSize(); @@ -2723,6 +2733,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { defMaxConns = Math.min(1000, Utility.cpus() * 100); } this.maxConns = Math.max(1, Integer.decode(prop.getProperty(DATA_SOURCE_MAXCONNS, "" + defMaxConns))); + this.canNewSemaphore = new Semaphore(this.maxConns); this.queue = new ArrayBlockingQueue<>(maxConns); this.url = prop.getProperty(DATA_SOURCE_URL); String username = prop.getProperty(DATA_SOURCE_USER, ""); @@ -2740,6 +2751,7 @@ public class DataJdbcSource extends AbstractDataSqlSource { throw new SourceException(e); } clientInfo.put("version", String.valueOf(urlVersion)); + resetMaxConnection(); } @ResourceListener @@ -2776,52 +2788,101 @@ public class DataJdbcSource extends AbstractDataSqlSource { if (newMaxconns != this.maxConns) { ArrayBlockingQueue newQueue = new ArrayBlockingQueue<>(newMaxconns); ArrayBlockingQueue oldQueue = this.queue; + Semaphore oldSemaphore = this.canNewSemaphore; this.queue = newQueue; this.maxConns = newMaxconns; + this.canNewSemaphore = new Semaphore(this.maxConns); Connection conn; while ((conn = oldQueue.poll()) != null) { + offerConnection(conn, oldSemaphore); + } + } + } + + private void resetMaxConnection() { + if ("mysql".equals(dbtype()) || "postgresql".equals(dbtype())) { + int newMaxconns = this.maxConns; + Connection conn = pollConnection(); + try { + Statement stmt = conn.createStatement(); + if ("mysql".equals(dbtype())) { + ResultSet rs = stmt.executeQuery("SHOW VARIABLES LIKE 'max_connections'"); + if (rs.next()) { + newMaxconns = rs.getInt(2); + } + } else if ("postgresql".equals(dbtype())) { + ResultSet rs = stmt.executeQuery("SHOW max_connections"); + if (rs.next()) { + newMaxconns = rs.getInt(1); + } + } + stmt.close(); + } catch (SQLException e) { + } finally { offerConnection(conn); } + if (this.maxConns > newMaxconns) { //配置连接数过大 + ArrayBlockingQueue newQueue = new ArrayBlockingQueue<>(newMaxconns); + ArrayBlockingQueue oldQueue = this.queue; + Semaphore oldSemaphore = this.canNewSemaphore; + this.queue = newQueue; + this.maxConns = newMaxconns; + this.canNewSemaphore = new Semaphore(this.maxConns); + Connection c; + while ((c = oldQueue.poll()) != null) { + offerConnection(c, oldSemaphore); + } + } } } public Connection pollConnection() { Connection conn = queue.poll(); if (conn == null) { - if (usingCounter.intValue() >= maxConns) { - try { - conn = queue.poll(connectTimeoutSeconds, TimeUnit.SECONDS); - } catch (InterruptedException t) { - logger.log(Level.WARNING, "take pooled connection error", t); - } - if (conn == null) { - throw new SourceException("create pooled connection timeout"); - } - } - } - if (conn != null) { - usingCounter.increment(); - waitingCounter.decrement(); - if (checkValid(conn)) { - cycleCounter.increment(); - return conn; - } else { - offerConnection(conn); - conn = null; - } - } - try { - conn = driver.connect(url, connectAttrs); - conn.setClientInfo(clientInfo); - } catch (SQLException ex) { - throw new SourceException(ex); + return newConnection(); } usingCounter.increment(); - creatCounter.increment(); - return conn; + if (checkValid(conn)) { + cycleCounter.increment(); + return conn; + } else { + offerConnection(conn); + conn = null; + } + return newConnection(); + } + + private Connection newConnection() { + Semaphore semaphore = this.canNewSemaphore; + Connection conn = null; + if (semaphore.tryAcquire()) { + try { + conn = driver.connect(url, connectAttrs); + conn.setClientInfo(clientInfo); + } catch (SQLException ex) { + throw new SourceException(ex); + } + usingCounter.increment(); + creatCounter.increment(); + return conn; + } else { + try { + conn = queue.poll(connectTimeoutSeconds, TimeUnit.SECONDS); + } catch (InterruptedException t) { + logger.log(Level.WARNING, "take pooled connection error", t); + } + if (conn == null) { + throw new SourceException("create pooled connection timeout"); + } + return conn; + } } public void offerConnection(final C connection) { + offerConnection(connection, this.canNewSemaphore); + } + + private void offerConnection(final C connection, Semaphore semaphore) { Connection conn = (Connection) connection; if (conn == null) { return; @@ -2829,10 +2890,10 @@ public class DataJdbcSource extends AbstractDataSqlSource { try { if (checkValid(conn) && queue.offer(conn)) { usingCounter.decrement(); - waitingCounter.increment(); } else { usingCounter.decrement(); closeCounter.increment(); + semaphore.release(); conn.close(); } } catch (Exception e) { diff --git a/src/main/java/org/redkale/source/DataMemorySource.java b/src/main/java/org/redkale/source/DataMemorySource.java index d370ea88c..6bc84e74f 100644 --- a/src/main/java/org/redkale/source/DataMemorySource.java +++ b/src/main/java/org/redkale/source/DataMemorySource.java @@ -40,6 +40,16 @@ public class DataMemorySource extends AbstractDataSqlSource implements SearchSou this.cacheForbidden = false; } + @Override + protected int readMaxConns() { + return -1; + } + + @Override + protected int writeMaxConns() { + return -1; + } + @Local @Override public String getType() {