From 7b9c71dcda2d2f59cadcbb127920f46919ed0c52 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Sat, 14 Apr 2018 13:13:13 +0800 Subject: [PATCH] --- src/org/redkale/source/PoolSource.java | 39 ++++++++++++++-- src/org/redkale/source/PoolTcpSource.java | 56 +++++++++++++++++++++-- 2 files changed, 88 insertions(+), 7 deletions(-) diff --git a/src/org/redkale/source/PoolSource.java b/src/org/redkale/source/PoolSource.java index cc94a080e..1e6f34843 100644 --- a/src/org/redkale/source/PoolSource.java +++ b/src/org/redkale/source/PoolSource.java @@ -47,7 +47,7 @@ public abstract class PoolSource { protected String url; - protected InetSocketAddress addr; + protected InetSocketAddress servaddr; protected String user; @@ -118,9 +118,9 @@ public abstract class PoolSource { } pos = url0.indexOf(':'); if (pos > 0) { - this.addr = new InetSocketAddress(url0.substring(0, pos), Integer.parseInt(url0.substring(pos + 1))); + this.servaddr = new InetSocketAddress(url0.substring(0, pos), Integer.parseInt(url0.substring(pos + 1))); } else { - this.addr = new InetSocketAddress(url0, getDefaultPort()); + this.servaddr = new InetSocketAddress(url0, getDefaultPort()); } } @@ -160,4 +160,37 @@ public abstract class PoolSource { public final long getSaveCount() { return saveCounter.longValue(); } + + public final int getMaxconns() { + return maxconns; + } + + public final int getConnectTimeoutSeconds() { + return connectTimeoutSeconds; + } + + public final int getReadTimeoutSeconds() { + return readTimeoutSeconds; + } + + public final int getWriteTimeoutSeconds() { + return writeTimeoutSeconds; + } + + public final String getUrl() { + return url; + } + + public final InetSocketAddress getServaddr() { + return servaddr; + } + + public final String getUser() { + return user; + } + + public final String getPassword() { + return password; + } + } diff --git a/src/org/redkale/source/PoolTcpSource.java b/src/org/redkale/source/PoolTcpSource.java index 490e51fab..647d4b4e4 100644 --- a/src/org/redkale/source/PoolTcpSource.java +++ b/src/org/redkale/source/PoolTcpSource.java @@ -7,7 +7,8 @@ package org.redkale.source; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousChannelGroup; +import java.nio.channels.*; +import java.sql.SQLException; import java.util.Properties; import java.util.concurrent.*; import java.util.logging.Logger; @@ -29,8 +30,8 @@ public abstract class PoolTcpSource extends PoolSource { //TCP Channel组 protected AsynchronousChannelGroup group; - public PoolTcpSource(String stype, Properties prop, Logger logger, ObjectPool bufferPool,ThreadPoolExecutor executor) { - super(stype, prop, logger); + public PoolTcpSource(String rwtype, Properties prop, Logger logger, ObjectPool bufferPool, ThreadPoolExecutor executor) { + super(rwtype, prop, logger); this.bufferPool = bufferPool; this.executor = executor; try { @@ -50,8 +51,55 @@ public abstract class PoolTcpSource extends PoolSource { return pollAsync().join(); } + protected abstract ByteBuffer reqConnectBuffer(); + + protected abstract void respConnectBuffer(final ByteBuffer buffer, CompletableFuture future, AsyncConnection conn); + @Override public CompletableFuture pollAsync() { - return AsyncConnection.createTCP(group, this.addr, this.readTimeoutSeconds, this.writeTimeoutSeconds); + return AsyncConnection.createTCP(group, this.servaddr, this.readTimeoutSeconds, this.writeTimeoutSeconds).thenCompose(conn -> { + CompletableFuture future = new CompletableFuture(); + final ByteBuffer buffer = reqConnectBuffer(); + conn.write(buffer, null, new CompletionHandler() { + @Override + public void completed(Integer result, Void attachment1) { + if (result < 0) { + failed(new SQLException("Write Buffer Error"), attachment1); + return; + } + if (buffer.hasRemaining()) { + conn.write(buffer, attachment1, this); + return; + } + buffer.clear(); + conn.read(buffer, null, new CompletionHandler() { + @Override + public void completed(Integer result, Void attachment2) { + if (result < 0) { + failed(new SQLException("Read Buffer Error"), attachment2); + return; + } + buffer.flip(); + respConnectBuffer(buffer, future, conn); + } + + @Override + public void failed(Throwable exc, Void attachment2) { + bufferPool.accept(buffer); + future.completeExceptionally(exc); + conn.dispose(); + } + }); + } + + @Override + public void failed(Throwable exc, Void attachment1) { + bufferPool.accept(buffer); + future.completeExceptionally(exc); + conn.dispose(); + } + }); + return future; + }); } }