This commit is contained in:
Redkale
2018-05-31 17:12:18 +08:00
parent bca42ee7b4
commit bcca20dbbb

View File

@@ -296,7 +296,7 @@ public final class SncpClient {
if (action.handlerFuncParamIndex >= 0) params[action.handlerFuncParamIndex] = null; if (action.handlerFuncParamIndex >= 0) params[action.handlerFuncParamIndex] = null;
final BsonReader reader = bsonConvert.pollBsonReader(); final BsonReader reader = bsonConvert.pollBsonReader();
CompletableFuture<byte[]> future = remote0(handlerFunc, remoteGroupTransport, null, action, params); CompletableFuture<byte[]> future = remote0(handlerFunc, remoteGroupTransport, null, action, params);
if (action.boolReturnTypeFuture) { if (action.boolReturnTypeFuture) { //与handlerFuncIndex互斥
CompletableFuture result = action.futureCreator.create(); CompletableFuture result = action.futureCreator.create();
future.whenComplete((v, e) -> { future.whenComplete((v, e) -> {
try { try {
@@ -355,7 +355,12 @@ public final class SncpClient {
CompletableFuture<AsyncConnection> connFuture = transport.pollConnection(addr); CompletableFuture<AsyncConnection> connFuture = transport.pollConnection(addr);
return connFuture.thenCompose(conn0 -> { return connFuture.thenCompose(conn0 -> {
final CompletableFuture<byte[]> future = new CompletableFuture(); final CompletableFuture<byte[]> future = new CompletableFuture();
if (conn0 == null || !conn0.isOpen()) { if (conn0 == null) {
future.completeExceptionally(new RuntimeException("sncp " + (conn0 == null ? addr : conn0.getRemoteAddress()) + " cannot connect"));
return future;
}
if (!conn0.isOpen()) {
conn0.dispose();
future.completeExceptionally(new RuntimeException("sncp " + (conn0 == null ? addr : conn0.getRemoteAddress()) + " cannot connect")); future.completeExceptionally(new RuntimeException("sncp " + (conn0 == null ? addr : conn0.getRemoteAddress()) + " cannot connect"));
return future; return future;
} }
@@ -396,48 +401,58 @@ public final class SncpClient {
@Override @Override
public void completed(Integer count, Void attachment2) { public void completed(Integer count, Void attachment2) {
if (count < 1 && buffer.remaining() == buffer.limit()) { //没有数据可读 try {
future.completeExceptionally(new RuntimeException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote no response data")); if (count < 1 && buffer.remaining() == buffer.limit()) { //没有数据可读
transport.offerBuffer(buffer); future.completeExceptionally(new RuntimeException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote no response data"));
transport.offerConnection(true, conn); transport.offerBuffer(buffer);
return; transport.offerConnection(true, conn);
} return;
if (received < 1 && buffer.limit() < buffer.remaining() + HEADER_SIZE) { //header都没读全 }
conn.read(buffer, attachment2, this); if (received < 1 && buffer.limit() < buffer.remaining() + HEADER_SIZE) { //header都没读全
return; conn.read(buffer, attachment2, this);
} return;
buffer.flip(); }
if (received > 0) { buffer.flip();
int offset = this.received; if (received > 0) {
this.received += buffer.remaining(); int offset = this.received;
buffer.get(body, offset, Math.min(buffer.remaining(), this.body.length - offset)); this.received += buffer.remaining();
if (this.received < this.body.length) {// 数据仍然不全,需要继续读取 buffer.get(body, offset, Math.min(buffer.remaining(), this.body.length - offset));
if (this.received < this.body.length) {// 数据仍然不全,需要继续读取
buffer.clear();
conn.read(buffer, attachment2, this);
} else {
success();
}
return;
}
checkResult(seqid, action, buffer);
final int respBodyLength = buffer.getInt();
final int retcode = buffer.getInt();
if (retcode != 0) {
logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")");
throw new RuntimeException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")");
}
if (respBodyLength > buffer.remaining()) { // 数据不全,需要继续读取
this.body = new byte[respBodyLength];
this.received = buffer.remaining();
buffer.get(body, 0, this.received);
buffer.clear(); buffer.clear();
conn.read(buffer, attachment2, this); conn.read(buffer, attachment2, this);
} else { } else {
this.body = new byte[respBodyLength];
buffer.get(body, 0, respBodyLength);
success(); success();
} }
return; } catch (Throwable e) {
} future.completeExceptionally(new RuntimeException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote response error"));
checkResult(seqid, action, buffer); transport.offerConnection(true, conn);
if (handler != null) {
final int respBodyLength = buffer.getInt(); final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null;
final int retcode = buffer.getInt(); handler.failed(e, handlerAttach);
if (retcode != 0) { }
logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")"); logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") deal error", e);
throw new RuntimeException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")");
}
if (respBodyLength > buffer.remaining()) { // 数据不全,需要继续读取
this.body = new byte[respBodyLength];
this.received = buffer.remaining();
buffer.get(body, 0, this.received);
buffer.clear();
conn.read(buffer, attachment2, this);
} else {
this.body = new byte[respBodyLength];
buffer.get(body, 0, respBodyLength);
success();
} }
} }
@@ -468,7 +483,6 @@ public final class SncpClient {
@Override @Override
public void failed(Throwable exc, Void attachment2) { public void failed(Throwable exc, Void attachment2) {
logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote read exec failed", exc);
future.completeExceptionally(new RuntimeException(action.method + " sncp remote exec failed")); future.completeExceptionally(new RuntimeException(action.method + " sncp remote exec failed"));
transport.offerBuffer(buffer); transport.offerBuffer(buffer);
transport.offerConnection(true, conn); transport.offerConnection(true, conn);
@@ -476,15 +490,21 @@ public final class SncpClient {
final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null;
handler.failed(exc, handlerAttach); handler.failed(exc, handlerAttach);
} }
logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote read exec failed", exc);
} }
}); });
} }
@Override @Override
public void failed(Throwable exc, ByteBuffer[] attachment) { public void failed(Throwable exc, ByteBuffer[] attachment) {
logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote write exec failed", exc); future.completeExceptionally(new RuntimeException(action.method + " sncp remote exec failed"));
transport.offerBuffer(buffer); transport.offerBuffer(buffer);
transport.offerConnection(true, conn); transport.offerConnection(true, conn);
if (handler != null) {
final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null;
handler.failed(exc, handlerAttach);
}
logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote write exec failed", exc);
} }
}); });
return future; return future;