diff --git a/src/org/redkale/source/PoolSource.java b/src/org/redkale/source/PoolSource.java index 7cba80f59..492e48faa 100644 --- a/src/org/redkale/source/PoolSource.java +++ b/src/org/redkale/source/PoolSource.java @@ -7,7 +7,7 @@ package org.redkale.source; import java.net.InetSocketAddress; import java.util.*; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; import static org.redkale.source.DataSources.*; @@ -31,6 +31,8 @@ public abstract class PoolSource { protected final AtomicLong saveCounter = new AtomicLong(); + protected final Semaphore semaphore; + protected final Logger logger; protected final String rwtype; // "" 或 "read" 或 "write" @@ -71,6 +73,7 @@ public abstract class PoolSource { this.readTimeoutSeconds = Integer.decode(prop.getProperty(JDBC_READTIMEOUT_SECONDS, "3")); this.writeTimeoutSeconds = Integer.decode(prop.getProperty(JDBC_WRITETIMEOUT_SECONDS, "3")); this.maxconns = Math.max(8, Integer.decode(prop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16))); + this.semaphore = new Semaphore(this.maxconns); String dbtype0 = ""; { //jdbc:mysql:// jdbc:microsoft:sqlserver:// 取://之前的到最后一个:之间的字符串 int pos = this.url.indexOf("://"); diff --git a/src/org/redkale/source/PoolTcpSource.java b/src/org/redkale/source/PoolTcpSource.java index e80d82e85..826f0fdce 100644 --- a/src/org/redkale/source/PoolTcpSource.java +++ b/src/org/redkale/source/PoolTcpSource.java @@ -11,6 +11,7 @@ import java.nio.channels.*; import java.sql.*; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.*; import org.redkale.net.AsyncConnection; import static org.redkale.source.DataSources.*; @@ -38,7 +39,7 @@ public abstract class PoolTcpSource extends PoolSource { this.bufferPool = bufferPool; this.executor = executor; try { - this.group = AsynchronousChannelGroup.withThreadPool(executor); + this.group = AsynchronousChannelGroup.withCachedThreadPool(executor, executor.getCorePoolSize()); } catch (IOException e) { throw new RuntimeException(e); } @@ -94,6 +95,8 @@ public abstract class PoolTcpSource extends PoolSource { return pollAsync(0); } + private static final AtomicLong nowlong = new AtomicLong(); + protected CompletableFuture pollAsync(final int count) { if (count >= 3) { logger.log(Level.WARNING, "create datasource connection error"); @@ -101,22 +104,27 @@ public abstract class PoolTcpSource extends PoolSource { future.completeExceptionally(new SQLException("create datasource connection error")); return future; } + AsyncConnection conn0 = connQueue.poll(); if (conn0 != null && conn0.isOpen()) { cycleCounter.incrementAndGet(); usingCounter.incrementAndGet(); return CompletableFuture.completedFuture(conn0); } - if (usingCounter.get() >= maxconns && count < 2) { + //logqueue.add("-------semaphore: " + semaphore.availablePermits()); + if (!semaphore.tryAcquire()) { return CompletableFuture.supplyAsync(() -> { try { return connQueue.poll(3, TimeUnit.SECONDS); } catch (Exception t) { + System.out.println("超时了"); + t.printStackTrace(); return null; } - }, executor).thenCompose((conn2) -> { + }).thenCompose((conn2) -> { if (conn2 != null && conn2.isOpen()) { cycleCounter.incrementAndGet(); + usingCounter.incrementAndGet(); return CompletableFuture.completedFuture(conn2); } return pollAsync(count + 1); @@ -124,9 +132,13 @@ public abstract class PoolTcpSource extends PoolSource { } return AsyncConnection.createTCP(group, this.servaddr, this.readTimeoutSeconds, this.writeTimeoutSeconds).thenCompose(conn -> { - conn.beforeCloseListener((c) -> usingCounter.decrementAndGet()); + conn.beforeCloseListener((c) -> { + semaphore.release(); + usingCounter.decrementAndGet(); + }); CompletableFuture future = new CompletableFuture(); final ByteBuffer buffer = reqConnectBuffer(conn); + if (buffer == null) { final ByteBuffer rbuffer = bufferPool.get(); conn.read(rbuffer, null, new CompletionHandler() { @@ -193,6 +205,8 @@ public abstract class PoolTcpSource extends PoolSource { if (t == null) { creatCounter.incrementAndGet(); usingCounter.incrementAndGet(); + } else { + semaphore.release(); } }); }