This commit is contained in:
kamhung
2015-11-12 11:04:47 +08:00
parent e7cad6b5a0
commit 9004c9e0ba
2 changed files with 10 additions and 28 deletions

View File

@@ -184,8 +184,8 @@ public final class Transport {
} }
} }
public void offerConnection(AsyncConnection conn) { public void offerConnection(final boolean forceClose, AsyncConnection conn) {
if (false && conn.isTCP()) { //暂时每次都关闭 if (!forceClose && conn.isTCP()) { //暂时每次都关闭
if (conn.isOpen()) { if (conn.isOpen()) {
BlockingQueue<AsyncConnection> queue = connPool.get(conn.getRemoteAddress()); BlockingQueue<AsyncConnection> queue = connPool.get(conn.getRemoteAddress());
if (queue == null) { if (queue == null) {
@@ -212,13 +212,13 @@ public final class Transport {
public void completed(Integer result, ByteBuffer attachment) { public void completed(Integer result, ByteBuffer attachment) {
if (handler != null) handler.completed(result, att); if (handler != null) handler.completed(result, att);
offerBuffer(buffer); offerBuffer(buffer);
offerConnection(conn); offerConnection(false, conn);
} }
@Override @Override
public void failed(Throwable exc, ByteBuffer attachment) { public void failed(Throwable exc, ByteBuffer attachment) {
offerBuffer(buffer); offerBuffer(buffer);
offerConnection(conn); offerConnection(true, conn);
} }
}); });
@@ -227,26 +227,9 @@ public final class Transport {
@Override @Override
public void failed(Throwable exc, ByteBuffer attachment) { public void failed(Throwable exc, ByteBuffer attachment) {
offerBuffer(buffer); offerBuffer(buffer);
offerConnection(conn); offerConnection(true, conn);
} }
}); });
} }
public ByteBuffer send(SocketAddress addr, ByteBuffer buffer) {
AsyncConnection conn = pollConnection(addr);
final int readto = conn.getReadTimeoutSecond();
final int writeto = conn.getWriteTimeoutSecond();
try {
conn.write(buffer).get(writeto > 0 ? writeto : 5, TimeUnit.SECONDS);
buffer.clear();
conn.read(buffer).get(readto > 0 ? readto : 5, TimeUnit.SECONDS);
buffer.flip();
return buffer;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
offerConnection(conn);
}
}
} }

View File

@@ -278,7 +278,7 @@ public final class SncpClient {
throw new RuntimeException(ex); throw new RuntimeException(ex);
} finally { } finally {
transport.offerBuffer(buffer); transport.offerBuffer(buffer);
transport.offerConnection(conn); transport.offerConnection(true, conn);
} }
} }
@@ -335,7 +335,7 @@ public final class SncpClient {
if (count < 1 && buffer.remaining() == buffer.limit()) { //没有数据可读 if (count < 1 && buffer.remaining() == buffer.limit()) { //没有数据可读
future.set(new RuntimeException(action.method + " sncp remote no response data")); future.set(new RuntimeException(action.method + " sncp remote no response data"));
transport.offerBuffer(buffer); transport.offerBuffer(buffer);
transport.offerConnection(conn); transport.offerConnection(true, conn);
return; return;
} }
if (received < 1 && buffer.limit() < buffer.remaining() + HEADER_SIZE) { //header都没读全 if (received < 1 && buffer.limit() < buffer.remaining() + HEADER_SIZE) { //header都没读全
@@ -379,16 +379,15 @@ public final class SncpClient {
public void success() { public void success() {
future.set(this.body); future.set(this.body);
transport.offerBuffer(buffer); transport.offerBuffer(buffer);
transport.offerConnection(conn); transport.offerConnection(false, conn);
} }
@Override @Override
public void failed(Throwable exc, Void attachment2) { public void failed(Throwable exc, Void attachment2) {
future.set(new RuntimeException(action.method + " sncp remote exec failed")); future.set(new RuntimeException(action.method + " sncp remote exec failed"));
transport.offerBuffer(buffer); transport.offerBuffer(buffer);
transport.offerConnection(conn); transport.offerConnection(true, conn);
} }
}); });
} }
@@ -396,7 +395,7 @@ public final class SncpClient {
public void failed(Throwable exc, ByteBuffer[] attachment) { public void failed(Throwable exc, ByteBuffer[] attachment) {
exc.printStackTrace(); exc.printStackTrace();
transport.offerBuffer(buffer); transport.offerBuffer(buffer);
transport.offerConnection(conn); transport.offerConnection(true, conn);
} }
}); });
return future; return future;