diff --git a/src/org/redkale/net/AsyncConnection.java b/src/org/redkale/net/AsyncConnection.java index bccf790c6..669ab8270 100644 --- a/src/org/redkale/net/AsyncConnection.java +++ b/src/org/redkale/net/AsyncConnection.java @@ -135,11 +135,34 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl */ public static CompletableFuture createTCP(final AsynchronousChannelGroup group, final SocketAddress address, final int readTimeoutSecond0, final int writeTimeoutSecond0) throws IOException { + return createTCP(group, address, false, readTimeoutSecond0, writeTimeoutSecond0); + } + + /** + * 创建TCP协议客户端连接 + * + * @param address 连接点子 + * @param group 连接AsynchronousChannelGroup + * @param noDelay TcpNoDelay + * @param readTimeoutSecond0 读取超时秒数 + * @param writeTimeoutSecond0 写入超时秒数 + * + * @return 连接CompletableFuture + * @throws java.io.IOException 异常 + */ + public static CompletableFuture createTCP(final AsynchronousChannelGroup group, final SocketAddress address, + final boolean noDelay, final int readTimeoutSecond0, final int writeTimeoutSecond0) throws IOException { final CompletableFuture future = new CompletableFuture(); final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group); channel.connect(address, null, new CompletionHandler() { @Override public void completed(Void result, Void attachment) { + if (noDelay) { + try { + channel.setOption(StandardSocketOptions.TCP_NODELAY, true); + } catch (IOException e) { + } + } future.complete(create(channel, address, readTimeoutSecond0, writeTimeoutSecond0)); } diff --git a/src/org/redkale/net/Transport.java b/src/org/redkale/net/Transport.java index 632a2ab94..b805d331f 100644 --- a/src/org/redkale/net/Transport.java +++ b/src/org/redkale/net/Transport.java @@ -12,6 +12,7 @@ import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; import java.util.function.Supplier; +import java.util.logging.Level; import org.redkale.convert.*; import org.redkale.convert.json.JsonConvert; import org.redkale.util.*; @@ -188,7 +189,7 @@ public final class Transport { return tcp; } - public AsyncConnection pollConnection(SocketAddress addr) { + public CompletableFuture pollConnection(SocketAddress addr) { if (this.strategy != null) return strategy.pollConnection(addr, this); if (addr == null && this.transportAddres.length == 1) addr = this.transportAddres[0].address; final boolean rand = addr == null; @@ -207,7 +208,7 @@ public final class Transport { if (!queue.isEmpty()) { AsyncConnection conn; while ((conn = queue.poll()) != null) { - if (conn.isOpen()) return conn; + if (conn.isOpen()) return CompletableFuture.completedFuture(conn); } } tryed = true; @@ -247,14 +248,14 @@ public final class Transport { if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true); channel.connect(addr).get(2, TimeUnit.SECONDS); } - if (channel == null) return null; - return AsyncConnection.create(channel, addr, 3000, 3000); + if (channel == null) return CompletableFuture.completedFuture(null); + return CompletableFuture.completedFuture(AsyncConnection.create(channel, addr, 3000, 3000)); } else { // UDP if (rand) addr = this.transportAddres[0].address; DatagramChannel channel = DatagramChannel.open(); channel.configureBlocking(true); channel.connect(addr); - return AsyncConnection.create(channel, addr, true, 3000, 3000); + return CompletableFuture.completedFuture(AsyncConnection.create(channel, addr, true, 3000, 3000)); // AsyncDatagramChannel channel = AsyncDatagramChannel.open(group); // channel.connect(addr); // return AsyncConnection.create(channel, addr, true, 3000, 3000); @@ -280,35 +281,40 @@ public final class Transport { } public void async(SocketAddress addr, final ByteBuffer buffer, A att, final CompletionHandler handler) { - final AsyncConnection conn = pollConnection(addr); - conn.write(buffer, buffer, new CompletionHandler() { - - @Override - public void completed(Integer result, ByteBuffer attachment) { - buffer.clear(); - conn.read(buffer, buffer, new CompletionHandler() { - - @Override - public void completed(Integer result, ByteBuffer attachment) { - if (handler != null) handler.completed(result, att); - offerBuffer(buffer); - offerConnection(false, conn); - } - - @Override - public void failed(Throwable exc, ByteBuffer attachment) { - offerBuffer(buffer); - offerConnection(true, conn); - } - }); - + pollConnection(addr).whenComplete((conn, ex) -> { + if (ex != null) { + factory.getLogger().log(Level.WARNING, Transport.class.getSimpleName() + " async error", ex); + return; } + conn.write(buffer, buffer, new CompletionHandler() { - @Override - public void failed(Throwable exc, ByteBuffer attachment) { - offerBuffer(buffer); - offerConnection(true, conn); - } + @Override + public void completed(Integer result, ByteBuffer attachment) { + buffer.clear(); + conn.read(buffer, buffer, new CompletionHandler() { + + @Override + public void completed(Integer result, ByteBuffer attachment) { + if (handler != null) handler.completed(result, att); + offerBuffer(buffer); + offerConnection(false, conn); + } + + @Override + public void failed(Throwable exc, ByteBuffer attachment) { + offerBuffer(buffer); + offerConnection(true, conn); + } + }); + + } + + @Override + public void failed(Throwable exc, ByteBuffer attachment) { + offerBuffer(buffer); + offerConnection(true, conn); + } + }); }); } diff --git a/src/org/redkale/net/TransportFactory.java b/src/org/redkale/net/TransportFactory.java index 8561392ef..488420763 100644 --- a/src/org/redkale/net/TransportFactory.java +++ b/src/org/redkale/net/TransportFactory.java @@ -248,6 +248,10 @@ public class TransportFactory { return new ArrayList<>(this.groupInfos.values()); } + public Logger getLogger() { + return logger; + } + public void addSncpService(Service service) { if (service == null) return; services.add(new WeakReference<>(service)); diff --git a/src/org/redkale/net/TransportStrategy.java b/src/org/redkale/net/TransportStrategy.java index f086cbd32..78efe774b 100644 --- a/src/org/redkale/net/TransportStrategy.java +++ b/src/org/redkale/net/TransportStrategy.java @@ -6,6 +6,7 @@ package org.redkale.net; import java.net.SocketAddress; +import java.util.concurrent.CompletableFuture; /** * 远程请求的负载均衡策略 @@ -17,5 +18,5 @@ import java.net.SocketAddress; */ public interface TransportStrategy { - public AsyncConnection pollConnection(SocketAddress addr, Transport transport); + public CompletableFuture pollConnection(SocketAddress addr, Transport transport); } diff --git a/src/org/redkale/net/sncp/SncpClient.java b/src/org/redkale/net/sncp/SncpClient.java index 7114cb5f0..8a582a337 100644 --- a/src/org/redkale/net/sncp/SncpClient.java +++ b/src/org/redkale/net/sncp/SncpClient.java @@ -342,7 +342,7 @@ public final class SncpClient { final Type[] myparamtypes = action.paramTypes; final Class[] myparamclass = action.paramClass; if (action.addressSourceParamIndex >= 0) params[action.addressSourceParamIndex] = this.clientAddress; - final BsonWriter writer = bsonConvert.pollBsonWriter(transport == null ? bufferSupplier : transport.getBufferSupplier()); // 将head写入 + final BsonWriter writer = bsonConvert.pollBsonWriter(transport.getBufferSupplier()); // 将head写入 writer.writeTo(DEFAULT_HEADER); for (int i = 0; i < params.length; i++) { //params 可能包含: 3 个 boolean bsonConvert.convertTo(writer, AsyncHandler.class.isAssignableFrom(myparamclass[i]) ? AsyncHandler.class : myparamtypes[i], params[i]); @@ -351,146 +351,142 @@ public final class SncpClient { final long seqid = System.nanoTime(); final DLong actionid = action.actionid; final SocketAddress addr = addr0 == null ? (action.addressTargetParamIndex >= 0 ? (SocketAddress) params[action.addressTargetParamIndex] : null) : addr0; - final CompletableFuture future = new CompletableFuture(); - AsyncConnection conn0; - try { - conn0 = transport.pollConnection(addr); - } catch (Exception e) { - future.completeExceptionally(e); - return future; - } - if (conn0 == null || !conn0.isOpen()) { - future.completeExceptionally(new RuntimeException("sncp " + (conn0 == null ? addr : conn0.getRemoteAddress()) + " cannot connect")); - return future; - } - final AsyncConnection conn = conn0; - final ByteBuffer[] sendBuffers = writer.toBuffers(); - fillHeader(sendBuffers[0], seqid, actionid, reqBodyLength); + CompletableFuture connFuture = transport.pollConnection(addr); + return connFuture.thenCompose(conn0 -> { + final CompletableFuture future = new CompletableFuture(); + if (conn0 == null || !conn0.isOpen()) { + future.completeExceptionally(new RuntimeException("sncp " + (conn0 == null ? addr : conn0.getRemoteAddress()) + " cannot connect")); + return future; + } + final AsyncConnection conn = conn0; + final ByteBuffer[] sendBuffers = writer.toBuffers(); + fillHeader(sendBuffers[0], seqid, actionid, reqBodyLength); - final ByteBuffer buffer = transport.pollBuffer(); - conn.write(sendBuffers, sendBuffers, new CompletionHandler() { + final ByteBuffer buffer = transport.pollBuffer(); + conn.write(sendBuffers, sendBuffers, new CompletionHandler() { - @Override - public void completed(Integer result, ByteBuffer[] attachments) { - int index = -1; - for (int i = 0; i < attachments.length; i++) { - if (attachments[i].hasRemaining()) { - index = i; - break; - } else { - transport.offerBuffer(attachments[i]); + @Override + public void completed(Integer result, ByteBuffer[] attachments) { + int index = -1; + for (int i = 0; i < attachments.length; i++) { + if (attachments[i].hasRemaining()) { + index = i; + break; + } else { + transport.offerBuffer(attachments[i]); + } } - } - if (index == 0) { - conn.write(attachments, attachments, this); - return; - } else if (index > 0) { - ByteBuffer[] newattachs = new ByteBuffer[attachments.length - index]; - System.arraycopy(attachments, index, newattachs, 0, newattachs.length); - conn.write(newattachs, newattachs, this); - return; - } - //----------------------- 读取返回结果 ------------------------------------- - buffer.clear(); - conn.read(buffer, null, new CompletionHandler() { + if (index == 0) { + conn.write(attachments, attachments, this); + return; + } else if (index > 0) { + ByteBuffer[] newattachs = new ByteBuffer[attachments.length - index]; + System.arraycopy(attachments, index, newattachs, 0, newattachs.length); + conn.write(newattachs, newattachs, this); + return; + } + //----------------------- 读取返回结果 ------------------------------------- + buffer.clear(); + conn.read(buffer, null, new CompletionHandler() { - private byte[] body; + private byte[] body; - private int received; + private int received; - @Override - public void completed(Integer count, Void attachment2) { - if (count < 1 && buffer.remaining() == buffer.limit()) { //没有数据可读 - future.completeExceptionally(new RuntimeException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote no response data")); - transport.offerBuffer(buffer); - transport.offerConnection(true, conn); - return; - } - if (received < 1 && buffer.limit() < buffer.remaining() + HEADER_SIZE) { //header都没读全 - conn.read(buffer, attachment2, this); - return; - } - buffer.flip(); - if (received > 0) { - int offset = this.received; - this.received += buffer.remaining(); - buffer.get(body, offset, Math.min(buffer.remaining(), this.body.length - offset)); - if (this.received < this.body.length) {// 数据仍然不全,需要继续读取 + @Override + public void completed(Integer count, Void attachment2) { + if (count < 1 && buffer.remaining() == buffer.limit()) { //没有数据可读 + future.completeExceptionally(new RuntimeException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote no response data")); + transport.offerBuffer(buffer); + transport.offerConnection(true, conn); + return; + } + if (received < 1 && buffer.limit() < buffer.remaining() + HEADER_SIZE) { //header都没读全 + conn.read(buffer, attachment2, this); + return; + } + buffer.flip(); + if (received > 0) { + int offset = this.received; + this.received += buffer.remaining(); + buffer.get(body, offset, Math.min(buffer.remaining(), this.body.length - offset)); + if (this.received < this.body.length) {// 数据仍然不全,需要继续读取 + buffer.clear(); + conn.read(buffer, attachment2, this); + } else { + success(); + } + return; + } + checkResult(seqid, action, buffer); + + final int respBodyLength = buffer.getInt(); + final int retcode = buffer.getInt(); + if (retcode != 0) { + logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")"); + throw new RuntimeException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")"); + } + + if (respBodyLength > buffer.remaining()) { // 数据不全,需要继续读取 + this.body = new byte[respBodyLength]; + this.received = buffer.remaining(); + buffer.get(body, 0, this.received); buffer.clear(); conn.read(buffer, attachment2, this); } else { + this.body = new byte[respBodyLength]; + buffer.get(body, 0, respBodyLength); success(); } - return; - } - checkResult(seqid, action, buffer); - - final int respBodyLength = buffer.getInt(); - final int retcode = buffer.getInt(); - if (retcode != 0) { - logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")"); - throw new RuntimeException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")"); } - if (respBodyLength > buffer.remaining()) { // 数据不全,需要继续读取 - this.body = new byte[respBodyLength]; - this.received = buffer.remaining(); - buffer.get(body, 0, this.received); - buffer.clear(); - conn.read(buffer, attachment2, this); - } else { - this.body = new byte[respBodyLength]; - buffer.get(body, 0, respBodyLength); - success(); - } - } - - public void success() { - future.complete(this.body); - transport.offerBuffer(buffer); - transport.offerConnection(false, conn); - if (handler != null) { - final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; - final BsonReader reader = bsonConvert.pollBsonReader(); - try { - reader.setBytes(this.body); - int i; - while ((i = (reader.readByte() & 0xff)) != 0) { - final Attribute attr = action.paramAttrs[i]; - attr.set(params[i - 1], bsonConvert.convertFrom(attr.type(), reader)); + public void success() { + future.complete(this.body); + transport.offerBuffer(buffer); + transport.offerConnection(false, conn); + if (handler != null) { + final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; + final BsonReader reader = bsonConvert.pollBsonReader(); + try { + reader.setBytes(this.body); + int i; + while ((i = (reader.readByte() & 0xff)) != 0) { + final Attribute attr = action.paramAttrs[i]; + attr.set(params[i - 1], bsonConvert.convertFrom(attr.type(), reader)); + } + Object rs = bsonConvert.convertFrom(action.handlerFuncParamIndex >= 0 ? Object.class : action.resultTypes, reader); + handler.completed(rs, handlerAttach); + } catch (Exception e) { + handler.failed(e, handlerAttach); + } finally { + bsonConvert.offerBsonReader(reader); } - Object rs = bsonConvert.convertFrom(action.handlerFuncParamIndex >= 0 ? Object.class : action.resultTypes, reader); - handler.completed(rs, handlerAttach); - } catch (Exception e) { - handler.failed(e, handlerAttach); - } finally { - bsonConvert.offerBsonReader(reader); } } - } - @Override - public void failed(Throwable exc, Void attachment2) { - logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote read exec failed", exc); - future.completeExceptionally(new RuntimeException(action.method + " sncp remote exec failed")); - transport.offerBuffer(buffer); - transport.offerConnection(true, conn); - if (handler != null) { - final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; - handler.failed(exc, handlerAttach); + @Override + public void failed(Throwable exc, Void attachment2) { + logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote read exec failed", exc); + future.completeExceptionally(new RuntimeException(action.method + " sncp remote exec failed")); + transport.offerBuffer(buffer); + transport.offerConnection(true, conn); + if (handler != null) { + final Object handlerAttach = action.handlerAttachParamIndex >= 0 ? params[action.handlerAttachParamIndex] : null; + handler.failed(exc, handlerAttach); + } } - } - }); - } + }); + } - @Override - public void failed(Throwable exc, ByteBuffer[] attachment) { - logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote write exec failed", exc); - transport.offerBuffer(buffer); - transport.offerConnection(true, conn); - } + @Override + public void failed(Throwable exc, ByteBuffer[] attachment) { + logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote write exec failed", exc); + transport.offerBuffer(buffer); + transport.offerConnection(true, conn); + } + }); + return future; }); - return future; } private void checkResult(long seqid, final SncpAction action, ByteBuffer buffer) {