From bcca20dbbbd920c999a62bbde049f9635614ff8e Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Thu, 31 May 2018 17:12:18 +0800 Subject: [PATCH] --- src/org/redkale/net/sncp/SncpClient.java | 102 ++++++++++++++--------- 1 file changed, 61 insertions(+), 41 deletions(-) diff --git a/src/org/redkale/net/sncp/SncpClient.java b/src/org/redkale/net/sncp/SncpClient.java index cba650f92..bdadd4f89 100644 --- a/src/org/redkale/net/sncp/SncpClient.java +++ b/src/org/redkale/net/sncp/SncpClient.java @@ -296,7 +296,7 @@ public final class SncpClient { if (action.handlerFuncParamIndex >= 0) params[action.handlerFuncParamIndex] = null; final BsonReader reader = bsonConvert.pollBsonReader(); CompletableFuture future = remote0(handlerFunc, remoteGroupTransport, null, action, params); - if (action.boolReturnTypeFuture) { + if (action.boolReturnTypeFuture) { //与handlerFuncIndex互斥 CompletableFuture result = action.futureCreator.create(); future.whenComplete((v, e) -> { try { @@ -355,7 +355,12 @@ public final class SncpClient { CompletableFuture connFuture = transport.pollConnection(addr); return connFuture.thenCompose(conn0 -> { final CompletableFuture future = new CompletableFuture(); - if (conn0 == null || !conn0.isOpen()) { + if (conn0 == null) { + future.completeExceptionally(new RuntimeException("sncp " + (conn0 == null ? addr : conn0.getRemoteAddress()) + " cannot connect")); + return future; + } + if (!conn0.isOpen()) { + conn0.dispose(); future.completeExceptionally(new RuntimeException("sncp " + (conn0 == null ? addr : conn0.getRemoteAddress()) + " cannot connect")); return future; } @@ -396,48 +401,58 @@ public final class SncpClient { @Override public void completed(Integer count, Void attachment2) { - if (count < 1 && buffer.remaining() == buffer.limit()) { //没有数据可读 - future.completeExceptionally(new RuntimeException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote no response data")); - transport.offerBuffer(buffer); - transport.offerConnection(true, conn); - return; - } - if (received < 1 && buffer.limit() < buffer.remaining() + HEADER_SIZE) { //header都没读全 - conn.read(buffer, attachment2, this); - return; - } - buffer.flip(); - if (received > 0) { - int offset = this.received; - this.received += buffer.remaining(); - buffer.get(body, offset, Math.min(buffer.remaining(), this.body.length - offset)); - if (this.received < this.body.length) {// 数据仍然不全,需要继续读取 + try { + if (count < 1 && buffer.remaining() == buffer.limit()) { //没有数据可读 + future.completeExceptionally(new RuntimeException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote no response data")); + transport.offerBuffer(buffer); + transport.offerConnection(true, conn); + return; + } + if (received < 1 && buffer.limit() < buffer.remaining() + HEADER_SIZE) { //header都没读全 + conn.read(buffer, attachment2, this); + return; + } + buffer.flip(); + if (received > 0) { + int offset = this.received; + this.received += buffer.remaining(); + buffer.get(body, offset, Math.min(buffer.remaining(), this.body.length - offset)); + if (this.received < this.body.length) {// 数据仍然不全,需要继续读取 + buffer.clear(); + conn.read(buffer, attachment2, this); + } else { + success(); + } + return; + } + checkResult(seqid, action, buffer); + + final int respBodyLength = buffer.getInt(); + final int retcode = buffer.getInt(); + if (retcode != 0) { + logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")"); + throw new RuntimeException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")"); + } + + if (respBodyLength > buffer.remaining()) { // 数据不全,需要继续读取 + this.body = new byte[respBodyLength]; + this.received = buffer.remaining(); + buffer.get(body, 0, this.received); buffer.clear(); conn.read(buffer, attachment2, this); } else { + this.body = new byte[respBodyLength]; + buffer.get(body, 0, respBodyLength); success(); } - return; - } - checkResult(seqid, action, buffer); - - final int respBodyLength = buffer.getInt(); - final int retcode = buffer.getInt(); - if (retcode != 0) { - logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")"); - throw new RuntimeException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")"); - } - - if (respBodyLength > buffer.remaining()) { // 数据不全,需要继续读取 - this.body = new byte[respBodyLength]; - this.received = buffer.remaining(); - buffer.get(body, 0, this.received); - buffer.clear(); - conn.read(buffer, attachment2, this); - } else { - this.body = new byte[respBodyLength]; - buffer.get(body, 0, respBodyLength); - success(); + } catch (Throwable e) { + future.completeExceptionally(new RuntimeException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote response error")); + transport.offerConnection(true, conn); + if (handler != null) { + final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; + handler.failed(e, handlerAttach); + } + logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") deal error", e); } } @@ -468,7 +483,6 @@ public final class SncpClient { @Override public void failed(Throwable exc, Void attachment2) { - logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote read exec failed", exc); future.completeExceptionally(new RuntimeException(action.method + " sncp remote exec failed")); transport.offerBuffer(buffer); transport.offerConnection(true, conn); @@ -476,15 +490,21 @@ public final class SncpClient { final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; handler.failed(exc, handlerAttach); } + logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote read exec failed", exc); } }); } @Override public void failed(Throwable exc, ByteBuffer[] attachment) { - logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote write exec failed", exc); + future.completeExceptionally(new RuntimeException(action.method + " sncp remote exec failed")); transport.offerBuffer(buffer); transport.offerConnection(true, conn); + if (handler != null) { + final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; + handler.failed(exc, handlerAttach); + } + logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote write exec failed", exc); } }); return future;