diff --git a/src/org/redkale/source/PoolJdbcSource.java b/src/org/redkale/source/PoolJdbcSource.java index 205dd810e..750990373 100644 --- a/src/org/redkale/source/PoolJdbcSource.java +++ b/src/org/redkale/source/PoolJdbcSource.java @@ -23,15 +23,15 @@ import static org.redkale.source.DataSources.*; */ public class PoolJdbcSource extends PoolSource { - private final ConnectionPoolDataSource source; + protected final ConnectionPoolDataSource source; - private final ArrayBlockingQueue queue; + protected final ArrayBlockingQueue queue; - private final ConnectionEventListener listener; + protected final ConnectionEventListener listener; - private final String unitName; + protected final String unitName; - private final URL persistxml; + protected final URL persistxml; public PoolJdbcSource(String unitName, URL persistxml, String rwtype, ArrayBlockingQueue aqueue, Properties prop, Logger logger) { super(rwtype, prop, logger); diff --git a/src/org/redkale/source/PoolTcpSource.java b/src/org/redkale/source/PoolTcpSource.java index 51f49c77a..e75ce2033 100644 --- a/src/org/redkale/source/PoolTcpSource.java +++ b/src/org/redkale/source/PoolTcpSource.java @@ -11,7 +11,6 @@ import java.nio.channels.*; import java.sql.*; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.*; import java.util.logging.*; import org.redkale.net.AsyncConnection; import static org.redkale.source.DataSources.*; @@ -29,6 +28,13 @@ public abstract class PoolTcpSource extends PoolSource { //线程池 protected ThreadPoolExecutor executor; + //供supplyAsync->poll使用的线程池 + protected ExecutorService pollExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 4, (r) -> { + Thread t = new Thread(r); + t.setDaemon(true); + return t; + }); + //TCP Channel组 protected AsynchronousChannelGroup group; @@ -39,25 +45,7 @@ public abstract class PoolTcpSource extends PoolSource { this.bufferPool = bufferPool; this.executor = executor; try { - final String cname = this.getClass().getSimpleName() + "-" + rwtype; - final Thread.UncaughtExceptionHandler ueh = (t, e) -> { - logger.log(Level.SEVERE, cname + " error", e); - }; - final AtomicInteger counter = new AtomicInteger(); - ThreadFactory tf = (Runnable r) -> { - Thread t = new Thread(r); - t.setDaemon(true); - String s = "" + counter.incrementAndGet(); - if (s.length() == 1) { - s = "00" + s; - } else if (s.length() == 2) { - s = "0" + s; - } - t.setName(cname + "-Thread-" + s); - t.setUncaughtExceptionHandler(ueh); - return t; - }; - this.group = AsynchronousChannelGroup.withFixedThreadPool(executor.getCorePoolSize(), tf); + this.group = AsynchronousChannelGroup.withThreadPool(executor); } catch (IOException e) { throw new RuntimeException(e); } @@ -113,8 +101,6 @@ public abstract class PoolTcpSource extends PoolSource { return pollAsync(0); } - private static final AtomicLong nowlong = new AtomicLong(); - protected CompletableFuture pollAsync(final int count) { if (count >= 3) { logger.log(Level.WARNING, "create datasource connection error"); @@ -129,17 +115,15 @@ public abstract class PoolTcpSource extends PoolSource { usingCounter.incrementAndGet(); return CompletableFuture.completedFuture(conn0); } - //logqueue.add("-------semaphore: " + semaphore.availablePermits()); + if (!semaphore.tryAcquire()) { return CompletableFuture.supplyAsync(() -> { try { return connQueue.poll(3, TimeUnit.SECONDS); } catch (Exception t) { - System.out.println("超时了"); - t.printStackTrace(); return null; } - }, executor).thenCompose((conn2) -> { + }, pollExecutor).thenCompose((conn2) -> { if (conn2 != null && conn2.isOpen()) { cycleCounter.incrementAndGet(); usingCounter.incrementAndGet();