VertxSqlDataSource优化
This commit is contained in:
@@ -243,12 +243,8 @@ public class AsyncIOGroup extends AsyncGroup {
|
||||
public void completed(Void result, Void attachment) {
|
||||
conn.setReadTimeoutSeconds(readTimeoutSeconds);
|
||||
conn.setWriteTimeoutSeconds(writeTimeoutSeconds);
|
||||
if (connCreateCounter != null) {
|
||||
connCreateCounter.increment();
|
||||
}
|
||||
if (connLivingCounter != null) {
|
||||
connLivingCounter.increment();
|
||||
}
|
||||
connCreateCounter.increment();
|
||||
connLivingCounter.increment();
|
||||
if (conn.sslEngine == null) {
|
||||
future.complete(conn);
|
||||
} else {
|
||||
|
||||
@@ -32,8 +32,6 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
|
||||
|
||||
private AsyncIOGroup ioGroup;
|
||||
|
||||
private Thread acceptThread;
|
||||
|
||||
private boolean closed;
|
||||
|
||||
private Supplier<Response> responseSupplier;
|
||||
@@ -117,13 +115,14 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
|
||||
ObjectPool<Response> pool = localResponsePool.get();
|
||||
(pool == null ? safeResponsePool : pool).accept(v);
|
||||
};
|
||||
final String threadNameFormat = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread-%s" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s");
|
||||
final String threadNameFormat = Utility.isEmpty(server.name) ? "Redkale-IOServletThread-%s"
|
||||
: ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s");
|
||||
if (this.ioGroup == null) {
|
||||
this.ioGroup = new AsyncIOGroup(threadNameFormat, null, safeBufferPool);
|
||||
this.ioGroup.start();
|
||||
}
|
||||
|
||||
this.acceptThread = new Thread() {
|
||||
Thread acceptThread = new Thread() {
|
||||
{
|
||||
setName(String.format(threadNameFormat, "Accept"));
|
||||
}
|
||||
@@ -164,7 +163,7 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
|
||||
}
|
||||
}
|
||||
};
|
||||
this.acceptThread.start();
|
||||
acceptThread.start();
|
||||
}
|
||||
|
||||
private void accept(AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread) throws IOException {
|
||||
@@ -177,7 +176,8 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
|
||||
channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
|
||||
ioGroup.connCreateCounter.increment();
|
||||
ioGroup.connLivingCounter.increment();
|
||||
AsyncNioTcpConnection conn = new AsyncNioTcpConnection(false, ioGroup, ioReadThread, ioWriteThread, channel, context.getSSLBuilder(), context.getSSLContext(), null);
|
||||
AsyncNioTcpConnection conn = new AsyncNioTcpConnection(false, ioGroup,
|
||||
ioReadThread, ioWriteThread, channel, context.getSSLBuilder(), context.getSSLContext(), null);
|
||||
ProtocolCodec codec = new ProtocolCodec(context, responseSupplier, responseConsumer, conn);
|
||||
conn.protocolCodec = codec;
|
||||
if (conn.sslEngine == null) {
|
||||
|
||||
@@ -34,8 +34,6 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
|
||||
|
||||
private AsyncIOGroup ioGroup;
|
||||
|
||||
private Thread acceptThread;
|
||||
|
||||
private boolean closed;
|
||||
|
||||
private Supplier<Response> responseSupplier;
|
||||
@@ -106,17 +104,18 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
|
||||
ObjectPool<Response> pool = localResponsePool.get();
|
||||
return pool == null ? safeResponsePool.get() : pool.get();
|
||||
};
|
||||
this.responseConsumer = (v) -> {
|
||||
this.responseConsumer = v -> {
|
||||
ObjectPool<Response> pool = localResponsePool.get();
|
||||
(pool == null ? safeResponsePool : pool).accept(v);
|
||||
};
|
||||
final String threadNameFormat = server.name == null || server.name.isEmpty() ? "Redkale-IOServletThread-%s" : ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s");
|
||||
final String threadNameFormat = Utility.isEmpty(server.name) ? "Redkale-IOServletThread-%s"
|
||||
: ("Redkale-" + server.name.replace("Server-", "") + "-IOServletThread-%s");
|
||||
if (this.ioGroup == null) {
|
||||
this.ioGroup = new AsyncIOGroup(threadNameFormat, null, safeBufferPool);
|
||||
this.ioGroup.start();
|
||||
}
|
||||
udpServerChannel.serverChannel.register(this.selector, SelectionKey.OP_READ);
|
||||
this.acceptThread = new Thread() {
|
||||
Thread acceptThread = new Thread() {
|
||||
{
|
||||
setName(String.format(threadNameFormat, "Accept"));
|
||||
}
|
||||
@@ -175,13 +174,14 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
|
||||
}
|
||||
}
|
||||
};
|
||||
this.acceptThread.start();
|
||||
acceptThread.start();
|
||||
}
|
||||
|
||||
private void accept(SocketAddress address, ByteBuffer buffer, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread) throws IOException {
|
||||
ioGroup.connCreateCounter.increment();
|
||||
ioGroup.connLivingCounter.increment();
|
||||
AsyncNioUdpConnection conn = new AsyncNioUdpConnection(false, ioGroup, ioReadThread, ioWriteThread, udpServerChannel.serverChannel, context.getSSLBuilder(), context.getSSLContext(), address);
|
||||
AsyncNioUdpConnection conn = new AsyncNioUdpConnection(false, ioGroup,
|
||||
ioReadThread, ioWriteThread, udpServerChannel.serverChannel, context.getSSLBuilder(), context.getSSLContext(), address);
|
||||
conn.udpServerChannel = udpServerChannel;
|
||||
udpServerChannel.connections.put(address, conn);
|
||||
ProtocolCodec codec = new ProtocolCodec(context, responseSupplier, responseConsumer, conn);
|
||||
|
||||
@@ -45,7 +45,7 @@ public abstract class AbstractDataSource extends AbstractService implements Data
|
||||
|
||||
private final ReentrantLock executorLock = new ReentrantLock();
|
||||
|
||||
private int sourceThreads = Utility.cpus();
|
||||
protected int sourceThreads = Utility.cpus();
|
||||
|
||||
@Resource(name = RESNAME_APP_EXECUTOR, required = false)
|
||||
private ExecutorService sourceExecutor;
|
||||
@@ -70,6 +70,10 @@ public abstract class AbstractDataSource extends AbstractService implements Data
|
||||
@ResourceListener
|
||||
public abstract void onResourceChange(ResourceEvent[] events);
|
||||
|
||||
protected void setSourceExecutor(ExecutorService executor) {
|
||||
this.sourceExecutor = executor;
|
||||
}
|
||||
|
||||
protected SourceUrlInfo parseSourceUrl(final String url) {
|
||||
final SourceUrlInfo info = new SourceUrlInfo();
|
||||
info.url = url;
|
||||
@@ -146,11 +150,11 @@ public abstract class AbstractDataSource extends AbstractService implements Data
|
||||
return executor;
|
||||
}
|
||||
|
||||
protected <T> void complete(CompletableFuture<T> future, T value) {
|
||||
protected <T> void complete(WorkThread workThread, CompletableFuture<T> future, T value) {
|
||||
getExecutor().execute(() -> future.complete(value));
|
||||
}
|
||||
|
||||
protected <T> void completeExceptionally(CompletableFuture<T> future, Throwable exp) {
|
||||
protected <T> void completeExceptionally(WorkThread workThread, CompletableFuture<T> future, Throwable exp) {
|
||||
getExecutor().execute(() -> future.completeExceptionally(exp));
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user