This commit is contained in:
@@ -7,7 +7,7 @@ package org.redkale.source;
|
|||||||
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.*;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
import static org.redkale.source.DataSources.*;
|
import static org.redkale.source.DataSources.*;
|
||||||
@@ -31,6 +31,8 @@ public abstract class PoolSource<DBChannel> {
|
|||||||
|
|
||||||
protected final AtomicLong saveCounter = new AtomicLong();
|
protected final AtomicLong saveCounter = new AtomicLong();
|
||||||
|
|
||||||
|
protected final Semaphore semaphore;
|
||||||
|
|
||||||
protected final Logger logger;
|
protected final Logger logger;
|
||||||
|
|
||||||
protected final String rwtype; // "" 或 "read" 或 "write"
|
protected final String rwtype; // "" 或 "read" 或 "write"
|
||||||
@@ -71,6 +73,7 @@ public abstract class PoolSource<DBChannel> {
|
|||||||
this.readTimeoutSeconds = Integer.decode(prop.getProperty(JDBC_READTIMEOUT_SECONDS, "3"));
|
this.readTimeoutSeconds = Integer.decode(prop.getProperty(JDBC_READTIMEOUT_SECONDS, "3"));
|
||||||
this.writeTimeoutSeconds = Integer.decode(prop.getProperty(JDBC_WRITETIMEOUT_SECONDS, "3"));
|
this.writeTimeoutSeconds = Integer.decode(prop.getProperty(JDBC_WRITETIMEOUT_SECONDS, "3"));
|
||||||
this.maxconns = Math.max(8, Integer.decode(prop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16)));
|
this.maxconns = Math.max(8, Integer.decode(prop.getProperty(JDBC_CONNECTIONSMAX, "" + Runtime.getRuntime().availableProcessors() * 16)));
|
||||||
|
this.semaphore = new Semaphore(this.maxconns);
|
||||||
String dbtype0 = "";
|
String dbtype0 = "";
|
||||||
{ //jdbc:mysql:// jdbc:microsoft:sqlserver:// 取://之前的到最后一个:之间的字符串
|
{ //jdbc:mysql:// jdbc:microsoft:sqlserver:// 取://之前的到最后一个:之间的字符串
|
||||||
int pos = this.url.indexOf("://");
|
int pos = this.url.indexOf("://");
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import java.nio.channels.*;
|
|||||||
import java.sql.*;
|
import java.sql.*;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.*;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.logging.*;
|
import java.util.logging.*;
|
||||||
import org.redkale.net.AsyncConnection;
|
import org.redkale.net.AsyncConnection;
|
||||||
import static org.redkale.source.DataSources.*;
|
import static org.redkale.source.DataSources.*;
|
||||||
@@ -38,7 +39,7 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
|
|||||||
this.bufferPool = bufferPool;
|
this.bufferPool = bufferPool;
|
||||||
this.executor = executor;
|
this.executor = executor;
|
||||||
try {
|
try {
|
||||||
this.group = AsynchronousChannelGroup.withThreadPool(executor);
|
this.group = AsynchronousChannelGroup.withCachedThreadPool(executor, executor.getCorePoolSize());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
@@ -94,6 +95,8 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
|
|||||||
return pollAsync(0);
|
return pollAsync(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final AtomicLong nowlong = new AtomicLong();
|
||||||
|
|
||||||
protected CompletableFuture<AsyncConnection> pollAsync(final int count) {
|
protected CompletableFuture<AsyncConnection> pollAsync(final int count) {
|
||||||
if (count >= 3) {
|
if (count >= 3) {
|
||||||
logger.log(Level.WARNING, "create datasource connection error");
|
logger.log(Level.WARNING, "create datasource connection error");
|
||||||
@@ -101,22 +104,27 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
|
|||||||
future.completeExceptionally(new SQLException("create datasource connection error"));
|
future.completeExceptionally(new SQLException("create datasource connection error"));
|
||||||
return future;
|
return future;
|
||||||
}
|
}
|
||||||
|
|
||||||
AsyncConnection conn0 = connQueue.poll();
|
AsyncConnection conn0 = connQueue.poll();
|
||||||
if (conn0 != null && conn0.isOpen()) {
|
if (conn0 != null && conn0.isOpen()) {
|
||||||
cycleCounter.incrementAndGet();
|
cycleCounter.incrementAndGet();
|
||||||
usingCounter.incrementAndGet();
|
usingCounter.incrementAndGet();
|
||||||
return CompletableFuture.completedFuture(conn0);
|
return CompletableFuture.completedFuture(conn0);
|
||||||
}
|
}
|
||||||
if (usingCounter.get() >= maxconns && count < 2) {
|
//logqueue.add("-------semaphore: " + semaphore.availablePermits());
|
||||||
|
if (!semaphore.tryAcquire()) {
|
||||||
return CompletableFuture.supplyAsync(() -> {
|
return CompletableFuture.supplyAsync(() -> {
|
||||||
try {
|
try {
|
||||||
return connQueue.poll(3, TimeUnit.SECONDS);
|
return connQueue.poll(3, TimeUnit.SECONDS);
|
||||||
} catch (Exception t) {
|
} catch (Exception t) {
|
||||||
|
System.out.println("超时了");
|
||||||
|
t.printStackTrace();
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}, executor).thenCompose((conn2) -> {
|
}).thenCompose((conn2) -> {
|
||||||
if (conn2 != null && conn2.isOpen()) {
|
if (conn2 != null && conn2.isOpen()) {
|
||||||
cycleCounter.incrementAndGet();
|
cycleCounter.incrementAndGet();
|
||||||
|
usingCounter.incrementAndGet();
|
||||||
return CompletableFuture.completedFuture(conn2);
|
return CompletableFuture.completedFuture(conn2);
|
||||||
}
|
}
|
||||||
return pollAsync(count + 1);
|
return pollAsync(count + 1);
|
||||||
@@ -124,9 +132,13 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return AsyncConnection.createTCP(group, this.servaddr, this.readTimeoutSeconds, this.writeTimeoutSeconds).thenCompose(conn -> {
|
return AsyncConnection.createTCP(group, this.servaddr, this.readTimeoutSeconds, this.writeTimeoutSeconds).thenCompose(conn -> {
|
||||||
conn.beforeCloseListener((c) -> usingCounter.decrementAndGet());
|
conn.beforeCloseListener((c) -> {
|
||||||
|
semaphore.release();
|
||||||
|
usingCounter.decrementAndGet();
|
||||||
|
});
|
||||||
CompletableFuture<AsyncConnection> future = new CompletableFuture();
|
CompletableFuture<AsyncConnection> future = new CompletableFuture();
|
||||||
final ByteBuffer buffer = reqConnectBuffer(conn);
|
final ByteBuffer buffer = reqConnectBuffer(conn);
|
||||||
|
|
||||||
if (buffer == null) {
|
if (buffer == null) {
|
||||||
final ByteBuffer rbuffer = bufferPool.get();
|
final ByteBuffer rbuffer = bufferPool.get();
|
||||||
conn.read(rbuffer, null, new CompletionHandler<Integer, Void>() {
|
conn.read(rbuffer, null, new CompletionHandler<Integer, Void>() {
|
||||||
@@ -193,6 +205,8 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
|
|||||||
if (t == null) {
|
if (t == null) {
|
||||||
creatCounter.incrementAndGet();
|
creatCounter.incrementAndGet();
|
||||||
usingCounter.incrementAndGet();
|
usingCounter.incrementAndGet();
|
||||||
|
} else {
|
||||||
|
semaphore.release();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user