From fe5d934dd7b8bab6c3e21fff7976f909123ac229 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=9C=B0=E5=B9=B3=E7=BA=BF?= <22250530@qq.com> Date: Thu, 5 Nov 2015 10:06:35 +0800 Subject: [PATCH] --- .../redkale/convert/bson/BsonWriter.java | 22 +- .../wentch/redkale/net/AsyncConnection.java | 139 +++------- src/com/wentch/redkale/net/Context.java | 9 +- .../wentch/redkale/net/ProtocolServer.java | 65 +---- src/com/wentch/redkale/net/Transport.java | 64 ++--- .../wentch/redkale/net/http/HttpContext.java | 4 +- .../wentch/redkale/net/http/HttpServer.java | 2 +- .../wentch/redkale/net/sncp/SncpClient.java | 249 +++++++++++++++++- .../wentch/redkale/net/sncp/SncpContext.java | 9 +- .../redkale/net/sncp/SncpDynServlet.java | 26 +- .../wentch/redkale/net/sncp/SncpFuture.java | 94 +++++++ .../wentch/redkale/net/sncp/SncpRequest.java | 62 ++--- .../wentch/redkale/net/sncp/SncpResponse.java | 65 +++-- .../wentch/redkale/net/sncp/SncpServer.java | 2 +- 14 files changed, 502 insertions(+), 310 deletions(-) create mode 100644 src/com/wentch/redkale/net/sncp/SncpFuture.java diff --git a/src/com/wentch/redkale/convert/bson/BsonWriter.java b/src/com/wentch/redkale/convert/bson/BsonWriter.java index a244dc59c..eeb22efcd 100644 --- a/src/com/wentch/redkale/convert/bson/BsonWriter.java +++ b/src/com/wentch/redkale/convert/bson/BsonWriter.java @@ -89,26 +89,38 @@ public final class BsonWriter implements Writer { * * @param position * @param chs + * @return */ - public void rewriteTo(int position, byte... chs) { + public int rewriteTo(int position, byte... chs) { System.arraycopy(chs, 0, content, position, chs.length); + return position + chs.length; } - public void rewriteTo(int position, short value) { + public int rewriteTo(int position, short value) { rewriteTo(position, (byte) (value >> 8), (byte) value); + return position + 2; } - public void rewriteTo(int position, char value) { + public int rewriteTo(int position, char value) { rewriteTo(position, (byte) ((value & 0xFF00) >> 8), (byte) (value & 0xFF)); + return position + 2; } - public void rewriteTo(int position, int value) { + public int rewriteTo(int position, int value) { rewriteTo(position, (byte) (value >> 24), (byte) (value >> 16), (byte) (value >> 8), (byte) value); + return position + 4; } - public void rewriteTo(int position, long value) { + public int rewriteTo(int position, long value) { rewriteTo(position, (byte) (value >> 56), (byte) (value >> 48), (byte) (value >> 40), (byte) (value >> 32), (byte) (value >> 24), (byte) (value >> 16), (byte) (value >> 8), (byte) value); + return position + 8; + } + + public BsonWriter fillRange(final int len) { + expand(len); + count += len; + return this; } public void writeTo(final byte ch) { diff --git a/src/com/wentch/redkale/net/AsyncConnection.java b/src/com/wentch/redkale/net/AsyncConnection.java index 2de442967..11a905877 100644 --- a/src/com/wentch/redkale/net/AsyncConnection.java +++ b/src/com/wentch/redkale/net/AsyncConnection.java @@ -24,6 +24,8 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl public abstract SocketAddress getRemoteAddress(); + public abstract SocketAddress getLocalAddress(); + public abstract int getReadTimeoutSecond(); public abstract int getWriteTimeoutSecond(); @@ -104,7 +106,8 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl } return create(channel, address, readTimeoutSecond0, writeTimeoutSecond0); } else if ("UDP".equalsIgnoreCase(protocol)) { - AsyncDatagramChannel channel = AsyncDatagramChannel.open(null); + DatagramChannel channel = DatagramChannel.open(); + channel.configureBlocking(true); channel.connect(address); return create(channel, address, true, readTimeoutSecond0, writeTimeoutSecond0); } else { @@ -112,100 +115,6 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl } } - private static class AIOUDPAsyncConnection extends AsyncConnection { - - private int readTimeoutSecond; - - private int writeTimeoutSecond; - - private final AsyncDatagramChannel channel; - - private final SocketAddress remoteAddress; - - private final boolean client; - - public AIOUDPAsyncConnection(final AsyncDatagramChannel ch, SocketAddress addr, - final boolean client0, final int readTimeoutSecond0, final int writeTimeoutSecond0) { - this.channel = ch; - this.client = client0; - this.readTimeoutSecond = readTimeoutSecond0; - this.writeTimeoutSecond = writeTimeoutSecond0; - this.remoteAddress = addr; - } - - @Override - public void read(ByteBuffer dst, A attachment, CompletionHandler handler) { - if (readTimeoutSecond > 0) { - channel.read(dst, readTimeoutSecond, TimeUnit.SECONDS, attachment, handler); - } else { - channel.read(dst, attachment, handler); - } - } - - @Override - public void write(ByteBuffer src, A attachment, CompletionHandler handler) { - channel.send(src, remoteAddress, attachment, handler); - } - - @Override - public void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { - channel.send(srcs, offset, length, remoteAddress, attachment, handler); - } - - @Override - public void setReadTimeoutSecond(int readTimeoutSecond) { - this.readTimeoutSecond = readTimeoutSecond; - } - - @Override - public void setWriteTimeoutSecond(int writeTimeoutSecond) { - this.writeTimeoutSecond = writeTimeoutSecond; - } - - @Override - public int getReadTimeoutSecond() { - return this.readTimeoutSecond; - } - - @Override - public int getWriteTimeoutSecond() { - return this.writeTimeoutSecond; - } - - @Override - public final SocketAddress getRemoteAddress() { - return remoteAddress; - } - - @Override - public final Future read(ByteBuffer dst) { - return channel.read(dst); - } - - @Override - public final Future write(ByteBuffer src) { - return channel.write(src); - } - - @Override - public final void close() throws IOException { - super.close(); - if (client) { - channel.close(); - } - } - - @Override - public final boolean isOpen() { - return channel.isOpen(); - } - - @Override - public final boolean isTCP() { - return false; - } - } - private static class BIOUDPAsyncConnection extends AsyncConnection { private int readTimeoutSecond; @@ -252,14 +161,22 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl return remoteAddress; } + @Override + public SocketAddress getLocalAddress() { + try { + return channel.getLocalAddress(); + } catch (IOException e) { + return null; + } + } + @Override protected void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { try { int rs = 0; - int end = offset + length - 1; for (int i = offset; i < offset + length; i++) { rs += channel.send(srcs[i], remoteAddress); - if (i != end) Thread.sleep(1); + if(i != offset) Thread.sleep(10); } if (handler != null) handler.completed(rs, attachment); } catch (Exception e) { @@ -326,15 +243,6 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl } } - public static AsyncConnection create(final AsyncDatagramChannel ch, SocketAddress addr, final boolean client0) { - return create(ch, addr, client0, 0, 0); - } - - public static AsyncConnection create(final AsyncDatagramChannel ch, SocketAddress addr, - final boolean client0, final int readTimeoutSecond0, final int writeTimeoutSecond0) { - return new AIOUDPAsyncConnection(ch, addr, client0, readTimeoutSecond0, writeTimeoutSecond0); - } - public static AsyncConnection create(final DatagramChannel ch, SocketAddress addr, final boolean client0, final int readTimeoutSecond0, final int writeTimeoutSecond0) { return new BIOUDPAsyncConnection(ch, addr, client0, readTimeoutSecond0, writeTimeoutSecond0); @@ -425,6 +333,11 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl return remoteAddress; } + @Override + public SocketAddress getLocalAddress() { + return socket.getLocalSocketAddress(); + } + @Override public int getReadTimeoutSecond() { return readTimeoutSecond; @@ -510,6 +423,11 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl } } + /** + * 通常用于 ssl socket + * @param socket + * @return + */ public static AsyncConnection create(final Socket socket) { return create(socket, null, 0, 0); } @@ -604,6 +522,15 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl return remoteAddress; } + @Override + public SocketAddress getLocalAddress() { + try { + return channel.getLocalAddress(); + } catch (IOException e) { + return null; + } + } + @Override public final Future read(ByteBuffer dst) { return channel.read(dst); diff --git a/src/com/wentch/redkale/net/Context.java b/src/com/wentch/redkale/net/Context.java index 248cafd62..47102c289 100644 --- a/src/com/wentch/redkale/net/Context.java +++ b/src/com/wentch/redkale/net/Context.java @@ -25,6 +25,8 @@ public class Context { protected final ExecutorService executor; + protected final int bufferCapacity; + protected final ObjectPool bufferPool; protected final ObjectPool responsePool; @@ -45,12 +47,13 @@ public class Context { protected final WatchFactory watch; - public Context(long serverStartTime, Logger logger, ExecutorService executor, ObjectPool bufferPool, ObjectPool responsePool, + public Context(long serverStartTime, Logger logger, ExecutorService executor, int bufferCapacity, ObjectPool bufferPool, ObjectPool responsePool, final int maxbody, Charset charset, InetSocketAddress address, final PrepareServlet prepare, final WatchFactory watch, final int readTimeoutSecond, final int writeTimeoutSecond) { this.serverStartTime = serverStartTime; this.logger = logger; this.executor = executor; + this.bufferCapacity = bufferCapacity; this.bufferPool = bufferPool; this.responsePool = responsePool; this.maxbody = maxbody; @@ -82,6 +85,10 @@ public class Context { executor.submit(r); } + public int getBufferCapacity() { + return bufferCapacity; + } + public ByteBuffer pollBuffer() { return bufferPool.poll(); } diff --git a/src/com/wentch/redkale/net/ProtocolServer.java b/src/com/wentch/redkale/net/ProtocolServer.java index ddb786eda..3ccd6575e 100644 --- a/src/com/wentch/redkale/net/ProtocolServer.java +++ b/src/com/wentch/redkale/net/ProtocolServer.java @@ -40,76 +40,13 @@ public abstract class ProtocolServer { private static final class ProtocolUDPServer extends ProtocolServer { - private final Context context; - - private AsynchronousChannelGroup group; - - private AsyncDatagramChannel serverChannel; - - public ProtocolUDPServer(Context context) { - this.context = context; - } - - @Override - public void open() throws IOException { - this.group = AsynchronousChannelGroup.withCachedThreadPool(context.executor, 1); - this.serverChannel = AsyncDatagramChannel.open(group); - } - - @Override - public void bind(SocketAddress local, int backlog) throws IOException { - this.serverChannel.bind(local); - } - - @Override - public void setOption(SocketOption name, T value) throws IOException { - this.serverChannel.setOption(name, value); - } - - @Override - public void accept() { - final AsyncDatagramChannel serchannel = this.serverChannel; - final ByteBuffer buffer = this.context.pollBuffer(); - serchannel.receive(buffer, buffer, new CompletionHandler() { - - @Override - public void completed(final SocketAddress address, ByteBuffer attachment) { - final ByteBuffer buffer2 = context.pollBuffer(); - serchannel.receive(buffer2, buffer2, this); - attachment.flip(); - AsyncConnection conn = AsyncConnection.create(serchannel, address, false, context.readTimeoutSecond, context.writeTimeoutSecond); - context.submit(new PrepareRunner(context, conn, attachment)); - } - - @Override - public void failed(Throwable exc, ByteBuffer attachment) { - context.offerBuffer(attachment); - //if (exc != null) context.logger.log(Level.FINEST, AsyncDatagramChannel.class.getSimpleName() + " accept erroneous", exc); - } - }); - } - - @Override - public void close() throws IOException { - this.serverChannel.close(); - } - - @Override - public AsynchronousChannelGroup getChannelGroup() { - return this.group; - } - - } - - private static final class ProtocolUDPWinServer extends ProtocolServer { - private boolean running; private final Context context; private DatagramChannel serverChannel; - public ProtocolUDPWinServer(Context context) { + public ProtocolUDPServer(Context context) { this.context = context; } diff --git a/src/com/wentch/redkale/net/Transport.java b/src/com/wentch/redkale/net/Transport.java index 32490f42d..14cd744c7 100644 --- a/src/com/wentch/redkale/net/Transport.java +++ b/src/com/wentch/redkale/net/Transport.java @@ -23,8 +23,6 @@ public final class Transport { protected static final int MAX_POOL_LIMIT = 16; - protected final boolean aio; - protected final String name; protected final int bufferPoolSize; @@ -44,17 +42,12 @@ public final class Transport { protected final ConcurrentHashMap> connPool = new ConcurrentHashMap<>(); public Transport(Transport transport, InetSocketAddress localAddress, Collection transports) { - this(transport.name, transport.protocol, transport.aio, null, transport.bufferPoolSize, parse(localAddress, transports)); + this(transport.name, transport.protocol, null, transport.bufferPoolSize, parse(localAddress, transports)); } public Transport(String name, String protocol, WatchFactory watch, int bufferPoolSize, Collection addresses) { - this(name, protocol, false, watch, bufferPoolSize, addresses); - } - - public Transport(String name, String protocol, boolean aio, WatchFactory watch, int bufferPoolSize, Collection addresses) { this.name = name; this.protocol = protocol; - this.aio = aio; this.bufferPoolSize = bufferPoolSize; this.bufferCapacity = 8192; AsynchronousChannelGroup g = null; @@ -114,6 +107,10 @@ public final class Transport { return Transport.class.getSimpleName() + "{name=" + name + ",protocol=" + protocol + ",remoteAddres=" + Arrays.toString(remoteAddres) + "}"; } + public int getBufferCapacity() { + return bufferCapacity; + } + public ByteBuffer pollBuffer() { return bufferPool.poll(); } @@ -126,6 +123,10 @@ public final class Transport { for (ByteBuffer buffer : buffers) offerBuffer(buffer); } + public boolean isTCP() { + return "TCP".equalsIgnoreCase(protocol); + } + public AsyncConnection pollConnection(SocketAddress addr) { final boolean rand = addr == null; try { @@ -144,19 +145,13 @@ public final class Transport { if (conn.isOpen()) return conn; } } - if (aio) { - if (channel == null) channel = AsynchronousSocketChannel.open(group); - } else { - if (socket == null) socket = new Socket(); - } + if (channel == null) channel = AsynchronousSocketChannel.open(group); + try { - if (aio) { - channel.connect(addr).get(1, TimeUnit.SECONDS); - } else { - socket.connect(addr, 1000); - } + channel.connect(addr).get(1, TimeUnit.SECONDS); break; } catch (Exception iex) { + iex.printStackTrace(); if (i == remoteAddres.length - 1) { p = 0; socket = null; @@ -166,29 +161,20 @@ public final class Transport { } index.set(p); } else { - if (aio) { - channel = AsynchronousSocketChannel.open(group); - channel.connect(addr).get(1, TimeUnit.SECONDS); - } else { - socket = new Socket(); - socket.connect(addr, 1000); - } + channel = AsynchronousSocketChannel.open(group); + channel.connect(addr).get(1, TimeUnit.SECONDS); } - if (aio && channel == null) return null; - if (!aio && socket == null) return null; - return aio ? AsyncConnection.create(channel, addr, 3000, 3000) : AsyncConnection.create(socket, addr, 3000, 3000); + if (channel == null) return null; + return AsyncConnection.create(channel, addr, 3000, 3000); } else { // UDP if (rand) addr = remoteAddres[0]; - if (aio) { - AsyncDatagramChannel channel = AsyncDatagramChannel.open(group); - channel.connect(addr); - return AsyncConnection.create(channel, addr, true, 3000, 3000); - } else { - DatagramChannel socket = DatagramChannel.open(); - socket.configureBlocking(true); - socket.connect(addr); - return AsyncConnection.create(socket, addr, true, 3000, 3000); - } + DatagramChannel channel = DatagramChannel.open(); + channel.configureBlocking(true); + channel.connect(addr); + return AsyncConnection.create(channel, addr, true, 3000, 3000); +// AsyncDatagramChannel channel = AsyncDatagramChannel.open(group); +// channel.connect(addr); +// return AsyncConnection.create(channel, addr, true, 3000, 3000); } } catch (Exception ex) { throw new RuntimeException("transport address = " + addr, ex); @@ -196,7 +182,7 @@ public final class Transport { } public void offerConnection(AsyncConnection conn) { - if (conn.isTCP() && false) { //暂时每次都关闭 + if (false && conn.isTCP()) { //暂时每次都关闭 if (conn.isOpen()) { BlockingQueue queue = connPool.get(conn.getRemoteAddress()); if (queue == null) { diff --git a/src/com/wentch/redkale/net/http/HttpContext.java b/src/com/wentch/redkale/net/http/HttpContext.java index e395bdeca..32d015ea0 100644 --- a/src/com/wentch/redkale/net/http/HttpContext.java +++ b/src/com/wentch/redkale/net/http/HttpContext.java @@ -28,10 +28,10 @@ public final class HttpContext extends Context { protected final SecureRandom random = new SecureRandom(); - public HttpContext(long serverStartTime, Logger logger, ExecutorService executor, ObjectPool bufferPool, + public HttpContext(long serverStartTime, Logger logger, ExecutorService executor, int bufferCapacity, ObjectPool bufferPool, ObjectPool responsePool, int maxbody, Charset charset, InetSocketAddress address, PrepareServlet prepare, WatchFactory watch, int readTimeoutSecond, int writeTimeoutSecond, String contextPath) { - super(serverStartTime, logger, executor, bufferPool, responsePool, maxbody, charset, + super(serverStartTime, logger, executor, bufferCapacity, bufferPool, responsePool, maxbody, charset, address, prepare, watch, readTimeoutSecond, writeTimeoutSecond); this.contextPath = contextPath; this.jsonFactory = JsonFactory.root(); diff --git a/src/com/wentch/redkale/net/http/HttpServer.java b/src/com/wentch/redkale/net/http/HttpServer.java index 43919ce1b..10dc353de 100644 --- a/src/com/wentch/redkale/net/http/HttpServer.java +++ b/src/com/wentch/redkale/net/http/HttpServer.java @@ -118,7 +118,7 @@ public final class HttpServer extends Server { AtomicLong createResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Response.creatCounter"); AtomicLong cycleResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Response.cycleCounter"); ObjectPool responsePool = HttpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null); - HttpContext httpcontext = new HttpContext(this.serverStartTime, this.logger, executor, bufferPool, responsePool, + HttpContext httpcontext = new HttpContext(this.serverStartTime, this.logger, executor, rcapacity, bufferPool, responsePool, this.maxbody, this.charset, this.address, this.prepare, this.watch, this.readTimeoutSecond, this.writeTimeoutSecond, contextPath); responsePool.setCreator((Object... params) -> new HttpResponse(httpcontext, new HttpRequest(httpcontext, httpcontext.jsonFactory, addrHeader), addHeaders, setHeaders, defCookie)); return httpcontext; diff --git a/src/com/wentch/redkale/net/sncp/SncpClient.java b/src/com/wentch/redkale/net/sncp/SncpClient.java index f8ddb9acb..17cb2c7b6 100644 --- a/src/com/wentch/redkale/net/sncp/SncpClient.java +++ b/src/com/wentch/redkale/net/sncp/SncpClient.java @@ -7,12 +7,13 @@ package com.wentch.redkale.net.sncp; import com.wentch.redkale.convert.bson.*; import com.wentch.redkale.net.*; -import static com.wentch.redkale.net.sncp.SncpRequest.HEADER_SIZE; +import static com.wentch.redkale.net.sncp.SncpRequest.*; import com.wentch.redkale.util.*; import java.lang.annotation.*; import java.lang.reflect.*; import java.net.*; import java.nio.*; +import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; import java.util.function.*; @@ -177,34 +178,237 @@ public final class SncpClient { } public T remote(final BsonConvert convert, Transport transport, final int index, final Object... params) { - return convert.convertFrom(actions[index].resultTypes, send(convert, transport, actions[index], params)); + Future future = transport.isTCP() ? remoteTCP(convert, transport, actions[index], params) : remoteUDP(convert, transport, actions[index], params); + try { + return convert.convertFrom(actions[index].resultTypes, future.get(5, TimeUnit.SECONDS)); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException(actions[index].method + " sncp remote error", e); + } } public void remote(final BsonConvert convert, Transport[] transports, boolean run, final int index, final Object... params) { if (!run) return; - this.remote(false, convert, transports, run, index, params); + for (Transport transport : transports) { + if (transport.isTCP()) { + remoteTCP(convert, transport, actions[index], params); + } else { + remoteUDP(convert, transport, actions[index], params); + } + } } public void asyncRemote(final BsonConvert convert, Transport[] transports, boolean run, final int index, final Object... params) { if (!run) return; - this.remote(true, convert, transports, run, index, params); - } - - private void remote(final boolean async, final BsonConvert convert, final Transport[] transports, final boolean run, final int index, final Object... params) { - if (!run) return; - if (async && executor != null) { + if (executor != null) { executor.accept(() -> { for (Transport transport : transports) { - convert.convertFrom(actions[index].resultTypes, send(convert, transport, actions[index], params)); + if (transport.isTCP()) { + remoteTCP(convert, transport, actions[index], params); + } else { + remoteUDP(convert, transport, actions[index], params); + } } }); } else { for (Transport transport : transports) { - convert.convertFrom(actions[index].resultTypes, send(convert, transport, actions[index], params)); + if (transport.isTCP()) { + remoteTCP(convert, transport, actions[index], params); + } else { + remoteUDP(convert, transport, actions[index], params); + } } } } + private Future remoteUDP(final BsonConvert convert, final Transport transport, final SncpAction action, final Object... params) { + Type[] myparamtypes = action.paramTypes; + final BsonWriter bw = convert.pollBsonWriter().fillRange(HEADER_SIZE); // 将head写入 + for (int i = 0; i < params.length; i++) { + convert.convertTo(bw, myparamtypes[i], params[i]); + } + final SocketAddress addr = action.addressParamIndex >= 0 ? (SocketAddress) params[action.addressParamIndex] : null; + final AsyncConnection conn = transport.pollConnection(addr); + if (conn == null || !conn.isOpen()) throw new RuntimeException("sncp " + (conn == null ? addr : conn.getRemoteAddress()) + " cannot connect"); + + final int reqBodyLength = bw.count() - HEADER_SIZE; //body总长度 + final long seqid = System.nanoTime(); + final DLong actionid = action.actionid; + final int readto = conn.getReadTimeoutSecond(); + final int writeto = conn.getWriteTimeoutSecond(); + final ByteBuffer buffer = transport.pollBuffer(); + try { + //------------------------------ 发送请求 --------------------------------------------------- + if (transport.getBufferCapacity() >= bw.count()) { //只有一帧数据 + fillHeader(bw, seqid, actionid, reqBodyLength, 0, reqBodyLength); + conn.write(bw.toBuffer()).get(writeto > 0 ? writeto : 3, TimeUnit.SECONDS); + } else { + final int bufsize = transport.getBufferCapacity() - HEADER_SIZE; + final int frames = (reqBodyLength / bufsize) + (reqBodyLength % bufsize > 0 ? 1 : 0); + int pos = 0; + for (int i = 0; i < frames; i++) { + int len = Math.min(bufsize, reqBodyLength - pos); + fillHeader(buffer, seqid, actionid, reqBodyLength, pos, len); + bw.toBuffer(pos + HEADER_SIZE, buffer); + pos += len; + buffer.flip(); + if (i != 0) Thread.sleep(10); + conn.write(buffer).get(writeto > 0 ? writeto : 3, TimeUnit.SECONDS); + buffer.clear(); + } + } + //------------------------------ 接收响应 --------------------------------------------------- + int received = 0; + int respBodyLength = 1; + byte[] respBody = null; + while (received < respBodyLength) { + buffer.clear(); + conn.read(buffer).get(readto > 0 ? readto : 3, TimeUnit.SECONDS); + buffer.flip(); + checkResult(seqid, action, buffer); + int respbodylen = buffer.getInt(); + if (respBody == null) { + respBodyLength = respbodylen; + respBody = new byte[respBodyLength]; + } + int bodyOffset = buffer.getInt(); // + int frameLength = buffer.getInt(); // + final int retcode = buffer.getInt(); + if (retcode != 0) throw new RuntimeException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")"); + int len = Math.min(buffer.remaining(), frameLength); + buffer.get(respBody, bodyOffset, len); + received += len; + } + return new SncpFuture<>(respBody); + } catch (RuntimeException e) { + throw e; + } catch (Exception ex) { + throw new RuntimeException(ex); + } finally { + transport.offerBuffer(buffer); + transport.offerConnection(conn); + } + } + + private Future remoteTCP(final BsonConvert convert, final Transport transport, final SncpAction action, final Object... params) { + Type[] myparamtypes = action.paramTypes; + final BsonWriter bw = convert.pollBsonWriter().fillRange(HEADER_SIZE); // 将head写入 + for (int i = 0; i < params.length; i++) { + convert.convertTo(bw, myparamtypes[i], params[i]); + } + final int reqBodyLength = bw.count() - HEADER_SIZE; //body总长度 + final long seqid = System.nanoTime(); + final DLong actionid = action.actionid; + final SocketAddress addr = action.addressParamIndex >= 0 ? (SocketAddress) params[action.addressParamIndex] : null; + final AsyncConnection conn = transport.pollConnection(addr); + if (conn == null || !conn.isOpen()) throw new RuntimeException("sncp " + (conn == null ? addr : conn.getRemoteAddress()) + " cannot connect"); + fillHeader(bw, seqid, actionid, reqBodyLength, 0, reqBodyLength); + + final ByteBuffer buffer = transport.pollBuffer(); + final ByteBuffer sendbuf = bw.toBuffer(); + final SncpFuture future = new SncpFuture(); + conn.write(sendbuf, null, new CompletionHandler() { + + @Override + public void completed(Integer result, Void attachment) { + if (sendbuf.hasRemaining()) { //buffer没有传输完 + conn.write(sendbuf, attachment, this); + return; + } + //----------------------- 读取返回结果 ------------------------------------- + buffer.clear(); + conn.read(buffer, null, new CompletionHandler() { + + private byte[] body; + + private int received; + + @Override + public void completed(Integer count, Void attachment) { + if (count < 1 && buffer.remaining() == buffer.limit()) { //没有数据可读 + future.set(new RuntimeException(action.method + " sncp remote no response data")); + transport.offerBuffer(buffer); + transport.offerConnection(conn); + return; + } + if (received < 1 && buffer.limit() < buffer.remaining() + HEADER_SIZE) { //header都没读全 + conn.read(buffer, attachment, this); + return; + } + buffer.flip(); + if (received > 0) { + int offset = this.received; + this.received += buffer.remaining(); + buffer.get(body, offset, Math.min(buffer.remaining(), this.body.length - offset)); + if (this.received < this.body.length) {// 数据仍然不全,需要继续读取 + buffer.clear(); + conn.read(buffer, attachment, this); + } else { + success(); + } + return; + } + checkResult(seqid, action, buffer); + + final int respBodyLength = buffer.getInt(); + buffer.getInt(); // bodyOffset + buffer.getInt(); // frameLength + final int retcode = buffer.getInt(); + if (retcode != 0) throw new RuntimeException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")"); + + if (respBodyLength > buffer.remaining()) { // 数据不全,需要继续读取 + this.body = new byte[respBodyLength]; + this.received = buffer.remaining(); + buffer.get(body, 0, this.received); + buffer.clear(); + conn.read(buffer, attachment, this); + } else { + this.body = new byte[respBodyLength]; + buffer.get(body, 0, respBodyLength); + success(); + } + } + + public void success() { + future.set(this.body); + transport.offerBuffer(buffer); + transport.offerConnection(conn); + } + + @Override + public void failed(Throwable exc, Void attachment) { + future.set(new RuntimeException(action.method + " sncp remote exec failed")); + transport.offerBuffer(buffer); + transport.offerConnection(conn); + } + + }); + } + + @Override + public void failed(Throwable exc, Void attachment) { + exc.printStackTrace(); + transport.offerBuffer(buffer); + transport.offerConnection(conn); + } + }); + return future; + } + + private void checkResult(long seqid, final SncpAction action, ByteBuffer buffer) { + long rseqid = buffer.getLong(); + if (rseqid != seqid) throw new RuntimeException("sncp(" + action.method + ") response.seqid = " + seqid + ", but request.seqid =" + rseqid); + if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp(" + action.method + ") buffer receive header.length not " + HEADER_SIZE); + long rserviceid = buffer.getLong(); + if (rserviceid != serviceid) throw new RuntimeException("sncp(" + action.method + ") response.serviceid = " + serviceid + ", but request.serviceid =" + rserviceid); + long rnameid = buffer.getLong(); + if (rnameid != nameid) throw new RuntimeException("sncp(" + action.method + ") response.nameid = " + nameid + ", but receive nameid =" + rnameid); + long ractionid1 = buffer.getLong(); + long ractionid2 = buffer.getLong(); + if (!action.actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp(" + action.method + ") response.actionid = " + action.actionid + ", but request.actionid =(" + ractionid1 + "_" + ractionid2 + ")"); + buffer.getInt(); //地址 + buffer.getChar(); //端口 + } + private byte[] send(final BsonConvert convert, Transport transport, final SncpAction action, Object... params) { Type[] myparamtypes = action.paramTypes; final BsonWriter bw = convert.pollBsonWriter(); @@ -222,7 +426,7 @@ public final class SncpClient { final int writeto = conn.getWriteTimeoutSecond(); try { if ((HEADER_SIZE + bodyLength) > buffer.limit()) { - //if (debug) logger.finest(this.serviceid + "," + this.nameid + "," + action + " sncp length : " + (HEADER_SIZE + bodyLength)); + //if (debug) logger.finest(this.serviceid + "," + this.nameid + "," + action + " sncp length : " + (HEADER_SIZE + reqBodyLength)); final int frames = bodyLength / (buffer.capacity() - HEADER_SIZE) + (bodyLength % (buffer.capacity() - HEADER_SIZE) > 0 ? 1 : 0); int pos = 0; for (int i = frames - 1; i >= 0; i--) { //填充每一帧的数据 @@ -230,7 +434,7 @@ public final class SncpClient { fillHeader(buffer, seqid, actionid, bodyLength, pos, len); pos += bw.toBuffer(pos, buffer); buffer.flip(); - conn.write(buffer).get(writeto > 0 ? writeto : 5, TimeUnit.SECONDS); + conn.write(buffer).get(writeto > 0 ? writeto : 3, TimeUnit.SECONDS); buffer.clear(); } convert.offerBsonWriter(bw); @@ -241,7 +445,7 @@ public final class SncpClient { bw.toBuffer(buffer); convert.offerBsonWriter(bw); buffer.flip(); - conn.write(buffer).get(writeto > 0 ? writeto : 5, TimeUnit.SECONDS); + conn.write(buffer).get(writeto > 0 ? writeto : 3, TimeUnit.SECONDS); buffer.clear(); } conn.read(buffer).get(readto > 0 ? readto : 5, TimeUnit.SECONDS); //读取第一帧的结果数据 @@ -328,6 +532,23 @@ public final class SncpClient { } } + private void fillHeader(BsonWriter writer, long seqid, DLong actionid, int bodyLength, int bodyOffset, int frameLength) { + //---------------------head---------------------------------- + int pos = 0; + pos = writer.rewriteTo(pos, seqid); //序列号 + pos = writer.rewriteTo(pos, (char) HEADER_SIZE); //header长度 + pos = writer.rewriteTo(pos, this.serviceid); + pos = writer.rewriteTo(pos, this.nameid); + pos = writer.rewriteTo(pos, actionid.getFirst()); + pos = writer.rewriteTo(pos, actionid.getSecond()); + pos = writer.rewriteTo(pos, addrBytes); + pos = writer.rewriteTo(pos, (char) this.addrPort); + pos = writer.rewriteTo(pos, bodyLength); //body长度 + pos = writer.rewriteTo(pos, bodyOffset); + pos = writer.rewriteTo(pos, frameLength); //一帧数据的长度 + writer.rewriteTo(pos, 0); //结果码, 请求方固定传0 + } + private void fillHeader(ByteBuffer buffer, long seqid, DLong actionid, int bodyLength, int bodyOffset, int frameLength) { //---------------------head---------------------------------- buffer.putLong(seqid); //序列号 diff --git a/src/com/wentch/redkale/net/sncp/SncpContext.java b/src/com/wentch/redkale/net/sncp/SncpContext.java index 784772623..9860a027b 100644 --- a/src/com/wentch/redkale/net/sncp/SncpContext.java +++ b/src/com/wentch/redkale/net/sncp/SncpContext.java @@ -38,8 +38,9 @@ public final class SncpContext extends Context { } public void add(ByteBuffer buffer, int pos) { - this.received += buffer.remaining(); - buffer.get(body, pos, buffer.remaining()); + int len = Math.min(buffer.remaining(), this.body.length - this.received); + this.received += len; + buffer.get(body, pos, len); } public boolean isCompleted() { @@ -52,10 +53,10 @@ public final class SncpContext extends Context { protected final BsonFactory bsonFactory; - public SncpContext(long serverStartTime, Logger logger, ExecutorService executor, ObjectPool bufferPool, + public SncpContext(long serverStartTime, Logger logger, ExecutorService executor, int bufferCapacity, ObjectPool bufferPool, ObjectPool responsePool, int maxbody, Charset charset, InetSocketAddress address, PrepareServlet prepare, WatchFactory watch, int readTimeoutSecond, int writeTimeoutSecond) { - super(serverStartTime, logger, executor, bufferPool, responsePool, maxbody, charset, + super(serverStartTime, logger, executor, bufferCapacity, bufferPool, responsePool, maxbody, charset, address, prepare, watch, readTimeoutSecond, writeTimeoutSecond); this.bsonFactory = BsonFactory.root(); } diff --git a/src/com/wentch/redkale/net/sncp/SncpDynServlet.java b/src/com/wentch/redkale/net/sncp/SncpDynServlet.java index ec5045748..8487cb3f7 100644 --- a/src/com/wentch/redkale/net/sncp/SncpDynServlet.java +++ b/src/com/wentch/redkale/net/sncp/SncpDynServlet.java @@ -6,6 +6,7 @@ package com.wentch.redkale.net.sncp; import com.wentch.redkale.convert.bson.*; +import static com.wentch.redkale.net.sncp.SncpRequest.HEADER_SIZE; import com.wentch.redkale.service.*; import com.wentch.redkale.util.*; import java.io.*; @@ -85,17 +86,18 @@ public final class SncpDynServlet extends SncpServlet { if (action == null) { response.finish(SncpResponse.RETCODE_ILLACTIONID, null); //无效actionid } else { + BsonWriter out = action.convert.pollBsonWriter().fillRange(HEADER_SIZE); BsonReader in = action.convert.pollBsonReader(); try { in.setBytes(request.getBody()); - BsonWriter bw = action.action(in); - response.finish(0, bw); - if (bw != null) action.convert.offerBsonWriter(bw); + action.action(in, out); + response.finish(0, out); } catch (Throwable t) { response.getContext().getLogger().log(Level.INFO, "sncp execute error(" + request + ")", t); response.finish(SncpResponse.RETCODE_THROWEXCEPTION, null); } finally { action.convert.offerBsonReader(in); + action.convert.offerBsonWriter(out); } } } @@ -109,7 +111,7 @@ public final class SncpDynServlet extends SncpServlet { protected java.lang.reflect.Type[] paramTypes; //index=0表示返回参数的type, void的返回参数类型为null - public abstract BsonWriter action(final BsonReader in) throws Throwable; + public abstract void action(final BsonReader in, final BsonWriter out) throws Throwable; /* * @@ -124,12 +126,12 @@ public final class SncpDynServlet extends SncpServlet { * public TestService service; * * @Override - * public BsonWriter action(final BsonReader in) throws Throwable { + * public void action(final BsonReader in, final BsonWriter out) throws Throwable { * TestBean arg1 = convert.convertFrom(in, paramTypes[1]); * String arg2 = convert.convertFrom(in, paramTypes[2]); * int arg3 = convert.convertFrom(in, paramTypes[3]); * Object rs = service.change(arg1, arg2, arg3); - * return convert.convertTo(paramTypes[0], rs); + * convert.convertTo(out, paramTypes[0], rs); * } * } */ @@ -188,11 +190,11 @@ public final class SncpDynServlet extends SncpServlet { throw new RuntimeException(ex); //不可能会发生 } { // action方法 - mv = new DebugMethodVisitor(cw.visitMethod(ACC_PUBLIC, "action", "(" + convertReaderDesc + ")" + convertWriterDesc, null, new String[]{"java/lang/Throwable"})); + mv = new DebugMethodVisitor(cw.visitMethod(ACC_PUBLIC, "action", "(" + convertReaderDesc + convertWriterDesc + ")V", null, new String[]{"java/lang/Throwable"})); //mv.setDebug(true); int iconst = ICONST_1; int intconst = 1; - int store = 2; + int store = 3; //action的参数个数+1 final Class[] paramClasses = method.getParameterTypes(); int[][] codes = new int[paramClasses.length][2]; for (int i = 0; i < paramClasses.length; i++) { //参数 @@ -259,8 +261,7 @@ public final class SncpDynServlet extends SncpServlet { int maxStack = codes.length > 0 ? codes[codes.length - 1][1] : 1; Class returnClass = method.getReturnType(); if (method.getReturnType() == void.class) { //返回 - mv.visitInsn(ACONST_NULL); - mv.visitInsn(ARETURN); + mv.visitInsn(RETURN); maxStack = 8; } else { if (returnClass.isPrimitive()) { @@ -275,13 +276,14 @@ public final class SncpDynServlet extends SncpServlet { mv.visitVarInsn(ASTORE, store); //11 mv.visitVarInsn(ALOAD, 0); mv.visitFieldInsn(GETFIELD, newDynName, "convert", Type.getDescriptor(BsonConvert.class)); + mv.visitVarInsn(ALOAD, 2); mv.visitVarInsn(ALOAD, 0); mv.visitFieldInsn(GETFIELD, newDynName, "paramTypes", "[Ljava/lang/reflect/Type;"); mv.visitInsn(ICONST_0); mv.visitInsn(AALOAD); mv.visitVarInsn(ALOAD, store); - mv.visitMethodInsn(INVOKEVIRTUAL, convertName, "convertToWriter", "(Ljava/lang/reflect/Type;Ljava/lang/Object;)" + convertWriterDesc, false); - mv.visitInsn(ARETURN); + mv.visitMethodInsn(INVOKEVIRTUAL, convertName, "convertTo", "(" + convertWriterDesc + "Ljava/lang/reflect/Type;Ljava/lang/Object;)V", false); + mv.visitInsn(RETURN); store++; if (maxStack < 10) maxStack = 10; } diff --git a/src/com/wentch/redkale/net/sncp/SncpFuture.java b/src/com/wentch/redkale/net/sncp/SncpFuture.java new file mode 100644 index 000000000..68cf07b8a --- /dev/null +++ b/src/com/wentch/redkale/net/sncp/SncpFuture.java @@ -0,0 +1,94 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package com.wentch.redkale.net.sncp; + +import java.util.concurrent.*; + +/** + * 简单的Future实现, set、get方法均只能一个线程调用 + * + * @author zhangjx + * @param + */ +public class SncpFuture implements Future { + + private volatile boolean done; + + private T result; + + private RuntimeException ex; + + public SncpFuture() { + } + + public SncpFuture(T result) { + this.result = result; + this.done = true; + } + + public void set(T result) { + this.result = result; + this.done = true; + synchronized (this) { + notifyAll(); + } + } + + public void set(RuntimeException ex) { + this.ex = ex; + this.done = true; + synchronized (this) { + notifyAll(); + } + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return done; + } + + @Override + public T get() throws InterruptedException, ExecutionException { + if (done) { + if (ex != null) throw ex; + return result; + } + synchronized (this) { + if (!done) wait(10_000); + } + if (done) { + if (ex != null) throw ex; + return result; + } + throw new InterruptedException(); + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + if (done) { + if (ex != null) throw ex; + return result; + } + synchronized (this) { + if (!done) wait(unit.toMillis(timeout)); + } + if (done) { + if (ex != null) throw ex; + return result; + } + throw new TimeoutException(); + } +} diff --git a/src/com/wentch/redkale/net/sncp/SncpRequest.java b/src/com/wentch/redkale/net/sncp/SncpRequest.java index cab061e3a..50ac7f79b 100644 --- a/src/com/wentch/redkale/net/sncp/SncpRequest.java +++ b/src/com/wentch/redkale/net/sncp/SncpRequest.java @@ -36,14 +36,11 @@ public final class SncpRequest extends Request { private int framelength; - //private byte[][] paramBytes; private boolean ping; private byte[] body; - private byte[] bufferbytes = new byte[4]; - - private InetSocketAddress remoteAddress; + private byte[] bufferbytes = new byte[6]; protected SncpRequest(SncpContext context, BsonFactory factory) { super(context); @@ -66,10 +63,6 @@ public final class SncpRequest extends Request { this.nameid = buffer.getLong(); this.actionid = new DLong(buffer.getLong(), buffer.getLong()); buffer.get(bufferbytes); - int port = buffer.getChar(); - if (bufferbytes[0] > 0 && port > 0) { - this.remoteAddress = new InetSocketAddress((0xff & bufferbytes[0]) + "." + (0xff & bufferbytes[1]) + "." + (0xff & bufferbytes[2]) + "." + (0xff & bufferbytes[3]), port); - } this.bodylength = buffer.getInt(); this.bodyoffset = buffer.getInt(); this.framelength = buffer.getInt(); @@ -79,7 +72,18 @@ public final class SncpRequest extends Request { return -1; } //---------------------body---------------------------------- + if (this.channel.isTCP()) { // TCP模式, 不管数据包大小 只传一帧数据 + this.body = new byte[this.bodylength]; + int len = Math.min(this.bodylength, buffer.remaining()); + buffer.get(body, 0, len); + this.bodyoffset = len; + return bodylength - len; + } + //--------------------- UDP 模式 ---------------------------- if (this.bodylength == this.framelength) { //只有一帧的数据 + if (this.framelength > buffer.remaining()) { //缺失一部分数据 + throw new RuntimeException(SncpRequest.class.getSimpleName() + " data need " + this.framelength + " bytes, but only " + buffer.remaining() + " bytes"); + } this.body = new byte[this.framelength]; buffer.get(body); return 0; @@ -102,33 +106,10 @@ public final class SncpRequest extends Request { } @Override - protected int readBody(ByteBuffer buffer) { // TCP 模式会调用此方法 - long rseqid = buffer.getLong(); - if (rseqid != this.seqid) throw new RuntimeException("sncp frame receive seqid = " + seqid + ", but first receive seqid =" + rseqid); - if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp buffer receive header.length not " + HEADER_SIZE); - long rserviceid = buffer.getLong(); - if (rserviceid != this.serviceid) throw new RuntimeException("sncp frame receive serviceid = " + serviceid + ", but first receive serviceid =" + rserviceid); - long rnameid = buffer.getLong(); - if (rnameid != this.nameid) throw new RuntimeException("sncp frame receive nameid = " + nameid + ", but first receive nameid =" + rnameid); - long ractionid1 = buffer.getLong(); - long ractionid2 = buffer.getLong(); - if (!this.actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp frame receive actionid = " + actionid + ", but first receive actionid =(" + ractionid1 + "_" + ractionid2 + ")"); - buffer.getInt(); //地址 - buffer.getChar(); //端口 - final int bodylen = buffer.getInt(); - if (bodylen != this.bodylength) throw new RuntimeException("sncp frame receive bodylength = " + bodylen + ", but first bodylength =" + bodylength); - final int bodyOffset = buffer.getInt(); - final int framelen = buffer.getInt(); - final int retcode = buffer.getInt(); - if (retcode != 0) throw new RuntimeException("sncp frame receive retcode error (retcode=" + retcode + ")"); - final SncpContext scontext = (SncpContext) this.context; - RequestEntry entry = scontext.getRequestEntity(this.seqid); - if (entry == null) entry = scontext.addRequestEntity(this.seqid, new byte[this.bodylength]); - entry.add(buffer, bodyOffset); - if (entry.isCompleted()) { //数据读取完毕 - this.body = entry.body; - scontext.removeRequestEntity(this.seqid); - } + protected int readBody(ByteBuffer buffer) { // 只有 TCP 模式会调用此方法 + final int framelen = buffer.remaining(); + buffer.get(this.body, this.bodyoffset, framelen); + this.bodyoffset += framelen; return framelen; } @@ -137,12 +118,16 @@ public final class SncpRequest extends Request { this.keepAlive = this.channel.isTCP(); } + protected boolean isTCP() { + return this.channel.isTCP(); + } + @Override public String toString() { return SncpRequest.class.getSimpleName() + "{seqid=" + this.seqid + ",serviceid=" + this.serviceid + ",actionid=" + this.actionid + ",bodylength=" + this.bodylength + ",bodyoffset=" + this.bodyoffset - + ",framelength=" + this.framelength + ",remoteAddress=" + remoteAddress + "}"; + + ",framelength=" + this.framelength + ",remoteAddress=" + getRemoteAddress() + "}"; } @Override @@ -155,7 +140,6 @@ public final class SncpRequest extends Request { this.bodyoffset = 0; this.body = null; this.ping = false; - this.remoteAddress = null; this.bufferbytes[0] = 0; super.recycle(); } @@ -185,7 +169,9 @@ public final class SncpRequest extends Request { } public InetSocketAddress getRemoteAddress() { - return remoteAddress; + if (bufferbytes[0] == 0) return null; + return new InetSocketAddress((0xff & bufferbytes[0]) + "." + (0xff & bufferbytes[1]) + "." + (0xff & bufferbytes[2]) + "." + (0xff & bufferbytes[3]), + ((0xff00 & (bufferbytes[4] << 8)) | (0xff & bufferbytes[5]))); } } diff --git a/src/com/wentch/redkale/net/sncp/SncpResponse.java b/src/com/wentch/redkale/net/sncp/SncpResponse.java index 2a7ea4330..5b0658236 100644 --- a/src/com/wentch/redkale/net/sncp/SncpResponse.java +++ b/src/com/wentch/redkale/net/sncp/SncpResponse.java @@ -49,31 +49,50 @@ public final class SncpResponse extends Response { } public void finish(final int retcode, final BsonWriter out) { - ByteBuffer buffer = context.pollBuffer(); - final int bodyLength = (out == null ? 0 : out.count()); - final int bufsize = buffer.capacity() - HEADER_SIZE; - if (bufsize > bodyLength) { //只需一帧 - //---------------------head---------------------------------- - fillHeader(buffer, bodyLength, 0, bodyLength, retcode); - //---------------------body---------------------------------- - out.toBuffer(buffer); - buffer.flip(); + if (out == null) { + final ByteBuffer buffer = context.pollBuffer(); + fillHeader(buffer, 0, 0, 0, retcode); finish(buffer); - } else { - final int frames = (bodyLength / bufsize) + (bodyLength % bufsize > 0 ? 1 : 0); - final ByteBuffer[] buffers = new ByteBuffer[frames]; - int pos = 0; - for (int i = 0; i < frames; i++) { - if (i != 0) buffer = context.pollBuffer(); - int len = Math.min(bufsize, bodyLength - pos); - fillHeader(buffer, bodyLength, pos, len, retcode); - buffers[i] = buffer; - out.toBuffer(pos, buffer); - pos += len; - buffer.flip(); - } - finish(buffers); + return; } + final int respBodyLength = out.count() - HEADER_SIZE; //body总长度 + if (this.channel.isTCP() || out.count() <= context.getBufferCapacity()) { //TCP模式 或者 一帧数据 + fillHeader(out, respBodyLength, 0, respBodyLength, retcode); + finish(out.toBuffer()); + return; + } + final int bufsize = context.getBufferCapacity() - HEADER_SIZE; + final int frames = (respBodyLength / bufsize) + (respBodyLength % bufsize > 0 ? 1 : 0); + final ByteBuffer[] buffers = new ByteBuffer[frames]; + int pos = 0; + for (int i = 0; i < frames; i++) { + final ByteBuffer buffer = context.pollBuffer(); + int len = Math.min(bufsize, respBodyLength - pos); + fillHeader(buffer, respBodyLength, pos, len, retcode); + buffers[i] = buffer; + out.toBuffer(pos + HEADER_SIZE, buffer); + pos += len; + buffer.flip(); + } + finish(buffers); + } + + private void fillHeader(BsonWriter writer, int bodyLength, int bodyOffset, int framelength, int retcode) { + //---------------------head---------------------------------- + int pos = 0; + pos = writer.rewriteTo(pos, request.getSeqid()); + pos = writer.rewriteTo(pos, (char) SncpRequest.HEADER_SIZE); + pos = writer.rewriteTo(pos, request.getServiceid()); + pos = writer.rewriteTo(pos, request.getNameid()); + DLong actionid = request.getActionid(); + pos = writer.rewriteTo(pos, actionid.getFirst()); + pos = writer.rewriteTo(pos, actionid.getSecond()); + pos = writer.rewriteTo(pos, addrBytes); + pos = writer.rewriteTo(pos, (char) this.addrPort); + pos = writer.rewriteTo(pos, bodyLength); + pos = writer.rewriteTo(pos, bodyOffset); + pos = writer.rewriteTo(pos, framelength); + writer.rewriteTo(pos, retcode); } private void fillHeader(ByteBuffer buffer, int bodyLength, int bodyOffset, int framelength, int retcode) { diff --git a/src/com/wentch/redkale/net/sncp/SncpServer.java b/src/com/wentch/redkale/net/sncp/SncpServer.java index 56a90a536..a30e9a260 100644 --- a/src/com/wentch/redkale/net/sncp/SncpServer.java +++ b/src/com/wentch/redkale/net/sncp/SncpServer.java @@ -57,7 +57,7 @@ public final class SncpServer extends Server { AtomicLong createResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Response.creatCounter"); AtomicLong cycleResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Response.cycleCounter"); ObjectPool responsePool = SncpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null); - SncpContext sncpcontext = new SncpContext(this.serverStartTime, this.logger, executor, bufferPool, responsePool, + SncpContext sncpcontext = new SncpContext(this.serverStartTime, this.logger, executor, rcapacity, bufferPool, responsePool, this.maxbody, this.charset, this.address, this.prepare, this.watch, this.readTimeoutSecond, this.writeTimeoutSecond); responsePool.setCreator((Object... params) -> new SncpResponse(sncpcontext, new SncpRequest(sncpcontext, sncpcontext.bsonFactory))); return sncpcontext;