diff --git a/src/com/wentch/redkale/net/Transport.java b/src/com/wentch/redkale/net/Transport.java index a40e99a68..d28cdd8ce 100644 --- a/src/com/wentch/redkale/net/Transport.java +++ b/src/com/wentch/redkale/net/Transport.java @@ -184,8 +184,8 @@ public final class Transport { } } - public void offerConnection(AsyncConnection conn) { - if (false && conn.isTCP()) { //暂时每次都关闭 + public void offerConnection(final boolean forceClose, AsyncConnection conn) { + if (!forceClose && conn.isTCP()) { //暂时每次都关闭 if (conn.isOpen()) { BlockingQueue queue = connPool.get(conn.getRemoteAddress()); if (queue == null) { @@ -212,13 +212,13 @@ public final class Transport { public void completed(Integer result, ByteBuffer attachment) { if (handler != null) handler.completed(result, att); offerBuffer(buffer); - offerConnection(conn); + offerConnection(false, conn); } @Override public void failed(Throwable exc, ByteBuffer attachment) { offerBuffer(buffer); - offerConnection(conn); + offerConnection(true, conn); } }); @@ -227,26 +227,9 @@ public final class Transport { @Override public void failed(Throwable exc, ByteBuffer attachment) { offerBuffer(buffer); - offerConnection(conn); + offerConnection(true, conn); } }); } - public ByteBuffer send(SocketAddress addr, ByteBuffer buffer) { - AsyncConnection conn = pollConnection(addr); - final int readto = conn.getReadTimeoutSecond(); - final int writeto = conn.getWriteTimeoutSecond(); - try { - conn.write(buffer).get(writeto > 0 ? writeto : 5, TimeUnit.SECONDS); - buffer.clear(); - conn.read(buffer).get(readto > 0 ? readto : 5, TimeUnit.SECONDS); - buffer.flip(); - return buffer; - } catch (Exception e) { - throw new RuntimeException(e); - } finally { - offerConnection(conn); - } - } - } diff --git a/src/com/wentch/redkale/net/sncp/SncpClient.java b/src/com/wentch/redkale/net/sncp/SncpClient.java index b7f344139..120ce6e56 100644 --- a/src/com/wentch/redkale/net/sncp/SncpClient.java +++ b/src/com/wentch/redkale/net/sncp/SncpClient.java @@ -278,7 +278,7 @@ public final class SncpClient { throw new RuntimeException(ex); } finally { transport.offerBuffer(buffer); - transport.offerConnection(conn); + transport.offerConnection(true, conn); } } @@ -335,7 +335,7 @@ public final class SncpClient { if (count < 1 && buffer.remaining() == buffer.limit()) { //没有数据可读 future.set(new RuntimeException(action.method + " sncp remote no response data")); transport.offerBuffer(buffer); - transport.offerConnection(conn); + transport.offerConnection(true, conn); return; } if (received < 1 && buffer.limit() < buffer.remaining() + HEADER_SIZE) { //header都没读全 @@ -379,16 +379,15 @@ public final class SncpClient { public void success() { future.set(this.body); transport.offerBuffer(buffer); - transport.offerConnection(conn); + transport.offerConnection(false, conn); } @Override public void failed(Throwable exc, Void attachment2) { future.set(new RuntimeException(action.method + " sncp remote exec failed")); transport.offerBuffer(buffer); - transport.offerConnection(conn); + transport.offerConnection(true, conn); } - }); } @@ -396,7 +395,7 @@ public final class SncpClient { public void failed(Throwable exc, ByteBuffer[] attachment) { exc.printStackTrace(); transport.offerBuffer(buffer); - transport.offerConnection(conn); + transport.offerConnection(true, conn); } }); return future;