This commit is contained in:
Redkale
2018-07-14 00:14:10 +08:00
parent 85d4023fa2
commit 858c6a0aaf
2 changed files with 15 additions and 31 deletions

View File

@@ -23,15 +23,15 @@ import static org.redkale.source.DataSources.*;
*/
public class PoolJdbcSource extends PoolSource<Connection> {
private final ConnectionPoolDataSource source;
protected final ConnectionPoolDataSource source;
private final ArrayBlockingQueue<PooledConnection> queue;
protected final ArrayBlockingQueue<PooledConnection> queue;
private final ConnectionEventListener listener;
protected final ConnectionEventListener listener;
private final String unitName;
protected final String unitName;
private final URL persistxml;
protected final URL persistxml;
public PoolJdbcSource(String unitName, URL persistxml, String rwtype, ArrayBlockingQueue aqueue, Properties prop, Logger logger) {
super(rwtype, prop, logger);

View File

@@ -11,7 +11,6 @@ import java.nio.channels.*;
import java.sql.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.logging.*;
import org.redkale.net.AsyncConnection;
import static org.redkale.source.DataSources.*;
@@ -29,6 +28,13 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
//线程池
protected ThreadPoolExecutor executor;
//供supplyAsync->poll使用的线程池
protected ExecutorService pollExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 4, (r) -> {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
});
//TCP Channel组
protected AsynchronousChannelGroup group;
@@ -39,25 +45,7 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
this.bufferPool = bufferPool;
this.executor = executor;
try {
final String cname = this.getClass().getSimpleName() + "-" + rwtype;
final Thread.UncaughtExceptionHandler ueh = (t, e) -> {
logger.log(Level.SEVERE, cname + " error", e);
};
final AtomicInteger counter = new AtomicInteger();
ThreadFactory tf = (Runnable r) -> {
Thread t = new Thread(r);
t.setDaemon(true);
String s = "" + counter.incrementAndGet();
if (s.length() == 1) {
s = "00" + s;
} else if (s.length() == 2) {
s = "0" + s;
}
t.setName(cname + "-Thread-" + s);
t.setUncaughtExceptionHandler(ueh);
return t;
};
this.group = AsynchronousChannelGroup.withFixedThreadPool(executor.getCorePoolSize(), tf);
this.group = AsynchronousChannelGroup.withThreadPool(executor);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -113,8 +101,6 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
return pollAsync(0);
}
private static final AtomicLong nowlong = new AtomicLong();
protected CompletableFuture<AsyncConnection> pollAsync(final int count) {
if (count >= 3) {
logger.log(Level.WARNING, "create datasource connection error");
@@ -129,17 +115,15 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
usingCounter.incrementAndGet();
return CompletableFuture.completedFuture(conn0);
}
//logqueue.add("-------semaphore: " + semaphore.availablePermits());
if (!semaphore.tryAcquire()) {
return CompletableFuture.supplyAsync(() -> {
try {
return connQueue.poll(3, TimeUnit.SECONDS);
} catch (Exception t) {
System.out.println("超时了");
t.printStackTrace();
return null;
}
}, executor).thenCompose((conn2) -> {
}, pollExecutor).thenCompose((conn2) -> {
if (conn2 != null && conn2.isOpen()) {
cycleCounter.incrementAndGet();
usingCounter.incrementAndGet();