diff --git a/src/org/redkale/net/AsyncConnection.java b/src/org/redkale/net/AsyncConnection.java index 33113ab81..b13a7ea20 100644 --- a/src/org/redkale/net/AsyncConnection.java +++ b/src/org/redkale/net/AsyncConnection.java @@ -12,6 +12,7 @@ import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import javax.net.ssl.SSLContext; import static org.redkale.net.ProtocolServer.*; @@ -40,6 +41,8 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl //关闭数 protected AtomicLong closedCounter; + protected Consumer beforeCloseListener; + public final long getLastReadTime() { return readtime; } @@ -89,6 +92,11 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl } } + public AsyncConnection beforeCloseListener(Consumer beforeCloseListener) { + this.beforeCloseListener = beforeCloseListener; + return this; + } + @Override public void close() throws IOException { if (closedCounter != null) { @@ -99,6 +107,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl livingCounter.decrementAndGet(); livingCounter = null; } + if (beforeCloseListener != null) beforeCloseListener.accept(this); if (attributes == null) return; try { for (Object obj : attributes.values()) { diff --git a/src/org/redkale/source/DataSqlSource.java b/src/org/redkale/source/DataSqlSource.java index 2a0000104..5ea7a10a0 100644 --- a/src/org/redkale/source/DataSqlSource.java +++ b/src/org/redkale/source/DataSqlSource.java @@ -95,12 +95,6 @@ public abstract class DataSqlSource extends AbstractService implements Dat protected abstract PoolSource createWritePoolSource(DataSource source, String stype, Properties prop); - @Local - protected abstract void closeReadConnection(final Conn conn); - - @Local - protected abstract void closeWriteConnection(final Conn conn); - @Override protected ExecutorService getExecutor() { return executor; diff --git a/src/org/redkale/source/PoolJdbcSource.java b/src/org/redkale/source/PoolJdbcSource.java index ed89aef66..a1ae4ec18 100644 --- a/src/org/redkale/source/PoolJdbcSource.java +++ b/src/org/redkale/source/PoolJdbcSource.java @@ -250,6 +250,16 @@ public class PoolJdbcSource extends PoolSource { } } + @Override + public void closeConnection(final Connection conn) { + if (conn == null) return; + try { + conn.close(); + } catch (Exception e) { + logger.log(Level.WARNING, "closeSQLConnection abort", e); + } + } + @Override public Connection poll() { return poll(0, null); diff --git a/src/org/redkale/source/PoolSource.java b/src/org/redkale/source/PoolSource.java index c50242167..1c5c96df6 100644 --- a/src/org/redkale/source/PoolSource.java +++ b/src/org/redkale/source/PoolSource.java @@ -70,7 +70,7 @@ public abstract class PoolSource { this.connectTimeoutSeconds = Integer.decode(prop.getProperty(JDBC_CONNECTTIMEOUT_SECONDS, "3")); this.readTimeoutSeconds = Integer.decode(prop.getProperty(JDBC_READTIMEOUT_SECONDS, "3")); this.writeTimeoutSeconds = Integer.decode(prop.getProperty(JDBC_WRITETIMEOUT_SECONDS, "3")); - this.maxconns = Integer.decode(prop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16)); + this.maxconns = Math.max(8, Integer.decode(prop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16))); String dbtype0 = ""; { //jdbc:mysql:// jdbc:microsoft:sqlserver:// 取://之前的到最后一个:之间的字符串 int pos = this.url.indexOf("://"); @@ -141,6 +141,8 @@ public abstract class PoolSource { public abstract void close(); + public abstract void closeConnection(final T conn); + public final String getDbtype() { return dbtype; } diff --git a/src/org/redkale/source/PoolTcpSource.java b/src/org/redkale/source/PoolTcpSource.java index 0eef340c5..7e71536af 100644 --- a/src/org/redkale/source/PoolTcpSource.java +++ b/src/org/redkale/source/PoolTcpSource.java @@ -8,10 +8,10 @@ package org.redkale.source; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.*; -import java.sql.SQLException; +import java.sql.*; import java.util.Properties; import java.util.concurrent.*; -import java.util.logging.Logger; +import java.util.logging.*; import org.redkale.net.AsyncConnection; import org.redkale.util.ObjectPool; @@ -30,6 +30,8 @@ public abstract class PoolTcpSource extends PoolSource { //TCP Channel组 protected AsynchronousChannelGroup group; + protected final ArrayBlockingQueue connQueue; + public PoolTcpSource(String rwtype, Properties prop, Logger logger, ObjectPool bufferPool, ThreadPoolExecutor executor) { super(rwtype, prop, logger); this.bufferPool = bufferPool; @@ -39,6 +41,19 @@ public abstract class PoolTcpSource extends PoolSource { } catch (IOException e) { throw new RuntimeException(e); } + this.connQueue = new ArrayBlockingQueue<>(this.maxconns); + } + + @Override + public void closeConnection(final AsyncConnection conn) { + if (conn == null) return; + if (connQueue.offer(conn)) { + saveCounter.incrementAndGet(); + usingCounter.decrementAndGet(); + } else { + //usingCounter 会在close方法中执行 + conn.dispose(); + } } @Override @@ -52,7 +67,40 @@ public abstract class PoolTcpSource extends PoolSource { @Override public CompletableFuture pollAsync() { + return pollAsync(0); + } + + protected CompletableFuture pollAsync(final int count) { + if (count >= 3) { + logger.log(Level.WARNING, "create datasource connection error"); + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new SQLException("create datasource connection error")); + return future; + } + AsyncConnection conn0 = connQueue.poll(); + if (conn0 != null) { + cycleCounter.incrementAndGet(); + usingCounter.incrementAndGet(); + return CompletableFuture.completedFuture(conn0); + } + if (usingCounter.get() >= maxconns && count < 2) { + return CompletableFuture.supplyAsync(() -> { + try { + return connQueue.poll(3, TimeUnit.SECONDS); + } catch (Exception t) { + return null; + } + }, executor).thenCompose((conn2) -> { + if (conn2 != null) { + cycleCounter.incrementAndGet(); + return CompletableFuture.completedFuture(conn2); + } + return pollAsync(count + 1); + }); + } + return AsyncConnection.createTCP(group, this.servaddr, this.readTimeoutSeconds, this.writeTimeoutSeconds).thenCompose(conn -> { + conn.beforeCloseListener((c) -> usingCounter.decrementAndGet()); CompletableFuture future = new CompletableFuture(); final ByteBuffer buffer = reqConnectBuffer(conn); conn.write(buffer, null, new CompletionHandler() { @@ -95,6 +143,21 @@ public abstract class PoolTcpSource extends PoolSource { } }); return future; + }).whenComplete((c, t) -> { + if (t == null) { + creatCounter.incrementAndGet(); + usingCounter.incrementAndGet(); + } + }); + } + + @Override + public void close() { + connQueue.stream().forEach(x -> { + try { + x.close(); + } catch (Exception e) { + } }); } }