diff --git a/src/com/wentch/redkale/convert/bson/BsonByteBufferWriter.java b/src/com/wentch/redkale/convert/bson/BsonByteBufferWriter.java index 247f07279..8d4a40062 100644 --- a/src/com/wentch/redkale/convert/bson/BsonByteBufferWriter.java +++ b/src/com/wentch/redkale/convert/bson/BsonByteBufferWriter.java @@ -25,6 +25,7 @@ public final class BsonByteBufferWriter extends BsonWriter { this.supplier = supplier; } + @Override public ByteBuffer[] toBuffers() { if (buffers == null) return new ByteBuffer[0]; for (int i = index; i < this.buffers.length; i++) { @@ -48,32 +49,9 @@ public final class BsonByteBufferWriter extends BsonWriter { return bytes; } - @Override - public ByteBuffer toBuffer() { - if (buffers == null) return null; - if (buffers.length == 1) return buffers[0]; - final ByteBuffer rs = ByteBuffer.allocate(count); - for (ByteBuffer buf : toBuffers()) { - rs.put(buf); - buf.flip(); - } - rs.flip(); - return rs; - } - - @Override - public void toBuffer(ByteBuffer buffer) { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public int toBuffer(int offset, ByteBuffer buffer) { - throw new UnsupportedOperationException("Not supported yet."); - } - @Override public String toString() { - return "bytes[" + count() + "]"; + return this.getClass().getSimpleName() + "[count=" + this.count + "]"; } @Override @@ -83,29 +61,7 @@ public final class BsonByteBufferWriter extends BsonWriter { } @Override - public BsonByteBufferWriter fillRange(final int len) { - ByteBuffer buffer = this.buffers[index]; - if (expand(len) == 0) { - buffer.position(buffer.position() + len); - } else { - int remain = len; //还剩多少没有写 - while (remain > 0) { - final int br = buffer.remaining(); - if (remain > br) { //一个buffer写不完 - buffer.position(buffer.position() + br); - buffer = nextByteBuffer(); - remain -= br; - } else { - buffer.position(buffer.position() + remain); - remain = 0; - } - } - } - this.count += len; - return this; - } - - private int expand(final int byteLength) { + protected int expand(final int byteLength) { if (this.buffers == null) { this.index = 0; this.buffers = new ByteBuffer[]{supplier.get()}; @@ -134,11 +90,6 @@ public final class BsonByteBufferWriter extends BsonWriter { return size; } - @Override - public int rewriteTo(int position, byte... chs) { - throw new UnsupportedOperationException("Not supported yet."); - } - @Override public void writeTo(final byte[] chs, final int start, final int len) { if (expand(len) == 0) { diff --git a/src/com/wentch/redkale/convert/bson/BsonConvert.java b/src/com/wentch/redkale/convert/bson/BsonConvert.java index 1f5750a6e..5fd4c1bb0 100644 --- a/src/com/wentch/redkale/convert/bson/BsonConvert.java +++ b/src/com/wentch/redkale/convert/bson/BsonConvert.java @@ -141,26 +141,6 @@ public final class BsonConvert extends Convert { return convertTo(value.getClass(), value); } - public ByteBuffer convertToBuffer(final Type type, Object value) { - if (type == null) return null; - final BsonWriter out = writerPool.get().setTiny(tiny); - factory.loadEncoder(type).convertTo(out, value); - ByteBuffer result = out.toBuffer(); - writerPool.offer(out); - return result; - } - - public ByteBuffer convertToBuffer(Object value) { - if (value == null) { - final BsonWriter out = writerPool.get().setTiny(tiny); - out.writeNull(); - ByteBuffer result = out.toBuffer(); - writerPool.offer(out); - return result; - } - return convertToBuffer(value.getClass(), value); - } - public BsonWriter convertToWriter(final Type type, Object value) { if (type == null) return null; final BsonWriter out = writerPool.get().setTiny(tiny); diff --git a/src/com/wentch/redkale/convert/bson/BsonWriter.java b/src/com/wentch/redkale/convert/bson/BsonWriter.java index 781ba5c1a..52b9081d6 100644 --- a/src/com/wentch/redkale/convert/bson/BsonWriter.java +++ b/src/com/wentch/redkale/convert/bson/BsonWriter.java @@ -34,19 +34,8 @@ public class BsonWriter implements Writer { return newdata; } - public ByteBuffer toBuffer() { - return ByteBuffer.wrap(content, 0, count); - } - - public void toBuffer(ByteBuffer buffer) { - buffer.put(content, 0, count); - } - - public int toBuffer(int offset, ByteBuffer buffer) { - int len = Math.min(count - offset, buffer.remaining()); - if (len < 1) return 0; - buffer.put(content, offset, len); - return len; + public ByteBuffer[] toBuffers() { + return new ByteBuffer[]{ByteBuffer.wrap(content, 0, count)}; } protected BsonWriter(byte[] bs) { @@ -74,57 +63,18 @@ public class BsonWriter implements Writer { //----------------------------------------------------------------------- //----------------------------------------------------------------------- /** - * 返回指定至少指定长度的缓冲区 + * 扩充指定长度的缓冲区 * * @param len - * @return + * @return */ - private byte[] expand(int len) { + protected int expand(int len) { int newcount = count + len; - if (newcount <= content.length) return content; + if (newcount <= content.length) return 0; byte[] newdata = new byte[Math.max(content.length * 3 / 2, newcount)]; System.arraycopy(content, 0, newdata, 0, count); this.content = newdata; - return newdata; - } - - /** - * 往指定的位置写入字节 - * - * @param position - * @param chs - * @return - */ - public int rewriteTo(int position, byte... chs) { - System.arraycopy(chs, 0, content, position, chs.length); - return position + chs.length; - } - - public final int rewriteTo(int position, short value) { - rewriteTo(position, (byte) (value >> 8), (byte) value); - return position + 2; - } - - public final int rewriteTo(int position, char value) { - rewriteTo(position, (byte) ((value & 0xFF00) >> 8), (byte) (value & 0xFF)); - return position + 2; - } - - public final int rewriteTo(int position, int value) { - rewriteTo(position, (byte) (value >> 24), (byte) (value >> 16), (byte) (value >> 8), (byte) value); - return position + 4; - } - - public final int rewriteTo(int position, long value) { - rewriteTo(position, (byte) (value >> 56), (byte) (value >> 48), (byte) (value >> 40), (byte) (value >> 32), - (byte) (value >> 24), (byte) (value >> 16), (byte) (value >> 8), (byte) value); - return position + 8; - } - - public BsonWriter fillRange(final int len) { - expand(len); - count += len; - return this; + return 0; } public void writeTo(final byte ch) { @@ -152,7 +102,7 @@ public class BsonWriter implements Writer { @Override public String toString() { - return new String(content, 0, count); + return this.getClass().getSimpleName() + "[count=" + this.count + "]"; } //------------------------------------------------------------------------ diff --git a/src/com/wentch/redkale/net/sncp/SncpClient.java b/src/com/wentch/redkale/net/sncp/SncpClient.java index 17cb2c7b6..b7f344139 100644 --- a/src/com/wentch/redkale/net/sncp/SncpClient.java +++ b/src/com/wentch/redkale/net/sncp/SncpClient.java @@ -222,7 +222,8 @@ public final class SncpClient { private Future remoteUDP(final BsonConvert convert, final Transport transport, final SncpAction action, final Object... params) { Type[] myparamtypes = action.paramTypes; - final BsonWriter bw = convert.pollBsonWriter().fillRange(HEADER_SIZE); // 将head写入 + final Supplier supplier = transport.getBufferSupplier(); + final BsonWriter bw = convert.pollBsonWriter(() -> supplier.get().put(DEFAULT_HEADER)); // 将head写入 for (int i = 0; i < params.length; i++) { convert.convertTo(bw, myparamtypes[i], params[i]); } @@ -230,7 +231,7 @@ public final class SncpClient { final AsyncConnection conn = transport.pollConnection(addr); if (conn == null || !conn.isOpen()) throw new RuntimeException("sncp " + (conn == null ? addr : conn.getRemoteAddress()) + " cannot connect"); - final int reqBodyLength = bw.count() - HEADER_SIZE; //body总长度 + final int reqBodyLength = bw.count(); //body总长度 final long seqid = System.nanoTime(); final DLong actionid = action.actionid; final int readto = conn.getReadTimeoutSecond(); @@ -238,23 +239,14 @@ public final class SncpClient { final ByteBuffer buffer = transport.pollBuffer(); try { //------------------------------ 发送请求 --------------------------------------------------- - if (transport.getBufferCapacity() >= bw.count()) { //只有一帧数据 - fillHeader(bw, seqid, actionid, reqBodyLength, 0, reqBodyLength); - conn.write(bw.toBuffer()).get(writeto > 0 ? writeto : 3, TimeUnit.SECONDS); - } else { - final int bufsize = transport.getBufferCapacity() - HEADER_SIZE; - final int frames = (reqBodyLength / bufsize) + (reqBodyLength % bufsize > 0 ? 1 : 0); - int pos = 0; - for (int i = 0; i < frames; i++) { - int len = Math.min(bufsize, reqBodyLength - pos); - fillHeader(buffer, seqid, actionid, reqBodyLength, pos, len); - bw.toBuffer(pos + HEADER_SIZE, buffer); - pos += len; - buffer.flip(); - if (i != 0) Thread.sleep(10); - conn.write(buffer).get(writeto > 0 ? writeto : 3, TimeUnit.SECONDS); - buffer.clear(); - } + int pos = 0; + for (ByteBuffer buf : bw.toBuffers()) { + int len = buf.remaining() - HEADER_SIZE; + fillHeader(buf, seqid, actionid, reqBodyLength, pos, len); + pos += len; + Thread.sleep(20); + conn.write(buf).get(writeto > 0 ? writeto : 3, TimeUnit.SECONDS); + transport.offerBuffer(buf); } //------------------------------ 接收响应 --------------------------------------------------- int received = 0; @@ -264,6 +256,7 @@ public final class SncpClient { buffer.clear(); conn.read(buffer).get(readto > 0 ? readto : 3, TimeUnit.SECONDS); buffer.flip(); + checkResult(seqid, action, buffer); int respbodylen = buffer.getInt(); if (respBody == null) { @@ -291,27 +284,42 @@ public final class SncpClient { private Future remoteTCP(final BsonConvert convert, final Transport transport, final SncpAction action, final Object... params) { Type[] myparamtypes = action.paramTypes; - final BsonWriter bw = convert.pollBsonWriter().fillRange(HEADER_SIZE); // 将head写入 + final BsonWriter writer = convert.pollBsonWriter(transport.getBufferSupplier()); // 将head写入 + writer.writeTo(DEFAULT_HEADER); for (int i = 0; i < params.length; i++) { - convert.convertTo(bw, myparamtypes[i], params[i]); + convert.convertTo(writer, myparamtypes[i], params[i]); } - final int reqBodyLength = bw.count() - HEADER_SIZE; //body总长度 + final int reqBodyLength = writer.count() - HEADER_SIZE; //body总长度 final long seqid = System.nanoTime(); final DLong actionid = action.actionid; final SocketAddress addr = action.addressParamIndex >= 0 ? (SocketAddress) params[action.addressParamIndex] : null; final AsyncConnection conn = transport.pollConnection(addr); if (conn == null || !conn.isOpen()) throw new RuntimeException("sncp " + (conn == null ? addr : conn.getRemoteAddress()) + " cannot connect"); - fillHeader(bw, seqid, actionid, reqBodyLength, 0, reqBodyLength); + final ByteBuffer[] sendBuffers = writer.toBuffers(); + fillHeader(sendBuffers[0], seqid, actionid, reqBodyLength, 0, reqBodyLength); final ByteBuffer buffer = transport.pollBuffer(); - final ByteBuffer sendbuf = bw.toBuffer(); final SncpFuture future = new SncpFuture(); - conn.write(sendbuf, null, new CompletionHandler() { + conn.write(sendBuffers, sendBuffers, new CompletionHandler() { @Override - public void completed(Integer result, Void attachment) { - if (sendbuf.hasRemaining()) { //buffer没有传输完 - conn.write(sendbuf, attachment, this); + public void completed(Integer result, ByteBuffer[] attachments) { + int index = -1; + for (int i = 0; i < attachments.length; i++) { + if (attachments[i].hasRemaining()) { + index = i; + break; + } else { + transport.offerBuffer(attachments[i]); + } + } + if (index == 0) { + conn.write(attachments, attachments, this); + return; + } else if (index > 0) { + ByteBuffer[] newattachs = new ByteBuffer[attachments.length - index]; + System.arraycopy(attachments, index, newattachs, 0, newattachs.length); + conn.write(newattachs, newattachs, this); return; } //----------------------- 读取返回结果 ------------------------------------- @@ -323,7 +331,7 @@ public final class SncpClient { private int received; @Override - public void completed(Integer count, Void attachment) { + public void completed(Integer count, Void attachment2) { if (count < 1 && buffer.remaining() == buffer.limit()) { //没有数据可读 future.set(new RuntimeException(action.method + " sncp remote no response data")); transport.offerBuffer(buffer); @@ -331,7 +339,7 @@ public final class SncpClient { return; } if (received < 1 && buffer.limit() < buffer.remaining() + HEADER_SIZE) { //header都没读全 - conn.read(buffer, attachment, this); + conn.read(buffer, attachment2, this); return; } buffer.flip(); @@ -341,7 +349,7 @@ public final class SncpClient { buffer.get(body, offset, Math.min(buffer.remaining(), this.body.length - offset)); if (this.received < this.body.length) {// 数据仍然不全,需要继续读取 buffer.clear(); - conn.read(buffer, attachment, this); + conn.read(buffer, attachment2, this); } else { success(); } @@ -360,7 +368,7 @@ public final class SncpClient { this.received = buffer.remaining(); buffer.get(body, 0, this.received); buffer.clear(); - conn.read(buffer, attachment, this); + conn.read(buffer, attachment2, this); } else { this.body = new byte[respBodyLength]; buffer.get(body, 0, respBodyLength); @@ -375,7 +383,7 @@ public final class SncpClient { } @Override - public void failed(Throwable exc, Void attachment) { + public void failed(Throwable exc, Void attachment2) { future.set(new RuntimeException(action.method + " sncp remote exec failed")); transport.offerBuffer(buffer); transport.offerConnection(conn); @@ -385,7 +393,7 @@ public final class SncpClient { } @Override - public void failed(Throwable exc, Void attachment) { + public void failed(Throwable exc, ByteBuffer[] attachment) { exc.printStackTrace(); transport.offerBuffer(buffer); transport.offerConnection(conn); @@ -409,159 +417,25 @@ public final class SncpClient { buffer.getChar(); //端口 } - private byte[] send(final BsonConvert convert, Transport transport, final SncpAction action, Object... params) { - Type[] myparamtypes = action.paramTypes; - final BsonWriter bw = convert.pollBsonWriter(); - for (int i = 0; i < params.length; i++) { - convert.convertTo(bw, myparamtypes[i], params[i]); - } - final int bodyLength = bw.count(); - - final long seqid = System.nanoTime(); - final DLong actionid = action.actionid; - final AsyncConnection conn = transport.pollConnection(action.addressParamIndex >= 0 ? (SocketAddress) params[action.addressParamIndex] : null); - if (conn == null || !conn.isOpen()) return null; - final ByteBuffer buffer = transport.pollBuffer(); - final int readto = conn.getReadTimeoutSecond(); - final int writeto = conn.getWriteTimeoutSecond(); - try { - if ((HEADER_SIZE + bodyLength) > buffer.limit()) { - //if (debug) logger.finest(this.serviceid + "," + this.nameid + "," + action + " sncp length : " + (HEADER_SIZE + reqBodyLength)); - final int frames = bodyLength / (buffer.capacity() - HEADER_SIZE) + (bodyLength % (buffer.capacity() - HEADER_SIZE) > 0 ? 1 : 0); - int pos = 0; - for (int i = frames - 1; i >= 0; i--) { //填充每一帧的数据 - int len = Math.min(buffer.remaining() - HEADER_SIZE, bodyLength - pos); - fillHeader(buffer, seqid, actionid, bodyLength, pos, len); - pos += bw.toBuffer(pos, buffer); - buffer.flip(); - conn.write(buffer).get(writeto > 0 ? writeto : 3, TimeUnit.SECONDS); - buffer.clear(); - } - convert.offerBsonWriter(bw); - } else { //只有一帧的数据 - //---------------------head---------------------------------- - fillHeader(buffer, seqid, actionid, bodyLength, 0, bodyLength); - //---------------------body---------------------------------- - bw.toBuffer(buffer); - convert.offerBsonWriter(bw); - buffer.flip(); - conn.write(buffer).get(writeto > 0 ? writeto : 3, TimeUnit.SECONDS); - buffer.clear(); - } - conn.read(buffer).get(readto > 0 ? readto : 5, TimeUnit.SECONDS); //读取第一帧的结果数据 - buffer.flip(); - long rseqid = buffer.getLong(); - if (rseqid != seqid) throw new RuntimeException("sncp(" + action.method + ") send seqid = " + seqid + ", but receive seqid =" + rseqid); - if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp(" + action.method + ") buffer receive header.length not " + HEADER_SIZE); - long rserviceid = buffer.getLong(); - if (rserviceid != serviceid) throw new RuntimeException("sncp(" + action.method + ") send serviceid = " + serviceid + ", but receive serviceid =" + rserviceid); - long rnameid = buffer.getLong(); - if (rnameid != nameid) throw new RuntimeException("sncp(" + action.method + ") send nameid = " + nameid + ", but receive nameid =" + rnameid); - long ractionid1 = buffer.getLong(); - long ractionid2 = buffer.getLong(); - if (!actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp(" + action.method + ") send actionid = " + actionid + ", but receive actionid =(" + ractionid1 + "_" + ractionid2 + ")"); - buffer.getInt(); //地址 - buffer.getChar(); //端口 - final int bodylen = buffer.getInt(); - int bodyOffset = buffer.getInt(); - int frameLength = buffer.getInt(); - final int retcode = buffer.getInt(); - if (retcode != 0) throw new RuntimeException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")"); - - final byte[] body = new byte[bodylen]; - if (bodylen == frameLength) { //只有一帧的数据 - buffer.get(body, bodyOffset, frameLength); - return body; - } else { //读取多帧结果数据 - int received = 0; - int lack = 0; - int lackoffset = 0; - while (received < bodylen) { - if (buffer.remaining() < frameLength) { //一帧缺失部分数据 - lack = frameLength - buffer.remaining(); - lackoffset = bodyOffset + buffer.remaining(); - received += buffer.remaining(); - buffer.get(body, bodyOffset, buffer.remaining()); - } else { - lack = 0; - received += frameLength; - buffer.get(body, bodyOffset, frameLength); - } - if (received >= bodylen) break; - if (buffer.hasRemaining()) { - byte[] bytes = new byte[buffer.remaining()]; - buffer.get(bytes); - buffer.clear(); - buffer.put(bytes); - } else { - buffer.clear(); - } - conn.read(buffer).get(readto > 0 ? readto : 5, TimeUnit.SECONDS); - buffer.flip(); - - if (lack > 0) buffer.get(body, lackoffset, lack); - rseqid = buffer.getLong(); - if (rseqid != seqid) throw new RuntimeException("sncp(" + action.method + ") send seqid = " + seqid + ", but receive next.seqid =" + rseqid); - if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp(" + action.method + ") buffer receive header.length not " + HEADER_SIZE); - rserviceid = buffer.getLong(); - if (rserviceid != serviceid) throw new RuntimeException("sncp(" + action.method + ") send serviceid = " + serviceid + ", but receive next.serviceid =" + rserviceid); - rnameid = buffer.getLong(); - if (rnameid != nameid) throw new RuntimeException("sncp(" + action.method + ") send nameid = " + nameid + ", but receive next.nameid =" + rnameid); - ractionid1 = buffer.getLong(); - ractionid2 = buffer.getLong(); - if (!actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp(" + action.method + ") send actionid = " + actionid + ", but receive next.actionid =(" + ractionid1 + "_" + ractionid2 + ")"); - buffer.getInt(); //地址 - buffer.getChar(); //端口 - int rbodylen = buffer.getInt(); - if (rbodylen != bodylen) throw new RuntimeException("sncp(" + action.method + ") receive bodylength = " + bodylen + ", but receive next.bodylength =" + rbodylen); - bodyOffset = buffer.getInt(); - frameLength = buffer.getInt(); - int rretcode = buffer.getInt(); - if (rretcode != 0) throw new RuntimeException("remote service(" + action.method + ") deal error (receive retcode =" + rretcode + ")"); - } - if (received != bodylen) throw new RuntimeException("sncp(" + action.method + ") receive bodylength = " + bodylen + ", but receive next.receivedlength =" + received); - return body; - } - } catch (RuntimeException ex) { - throw ex; - } catch (Exception e) { - throw new RuntimeException("sncp(" + action.method + ") " + conn.getRemoteAddress() + " connect failed.", e); - } finally { - transport.offerBuffer(buffer); - transport.offerConnection(conn); - } - } - - private void fillHeader(BsonWriter writer, long seqid, DLong actionid, int bodyLength, int bodyOffset, int frameLength) { - //---------------------head---------------------------------- - int pos = 0; - pos = writer.rewriteTo(pos, seqid); //序列号 - pos = writer.rewriteTo(pos, (char) HEADER_SIZE); //header长度 - pos = writer.rewriteTo(pos, this.serviceid); - pos = writer.rewriteTo(pos, this.nameid); - pos = writer.rewriteTo(pos, actionid.getFirst()); - pos = writer.rewriteTo(pos, actionid.getSecond()); - pos = writer.rewriteTo(pos, addrBytes); - pos = writer.rewriteTo(pos, (char) this.addrPort); - pos = writer.rewriteTo(pos, bodyLength); //body长度 - pos = writer.rewriteTo(pos, bodyOffset); - pos = writer.rewriteTo(pos, frameLength); //一帧数据的长度 - writer.rewriteTo(pos, 0); //结果码, 请求方固定传0 - } - private void fillHeader(ByteBuffer buffer, long seqid, DLong actionid, int bodyLength, int bodyOffset, int frameLength) { //---------------------head---------------------------------- + final int currentpos = buffer.position(); + buffer.position(0); buffer.putLong(seqid); //序列号 buffer.putChar((char) HEADER_SIZE); //header长度 buffer.putLong(this.serviceid); buffer.putLong(this.nameid); buffer.putLong(actionid.getFirst()); buffer.putLong(actionid.getSecond()); - buffer.put(addrBytes); + buffer.put(addrBytes[0]); + buffer.put(addrBytes[1]); + buffer.put(addrBytes[2]); + buffer.put(addrBytes[3]); buffer.putChar((char) this.addrPort); buffer.putInt(bodyLength); //body长度 buffer.putInt(bodyOffset); buffer.putInt(frameLength); //一帧数据的长度 buffer.putInt(0); //结果码, 请求方固定传0 + buffer.position(currentpos); } } diff --git a/src/com/wentch/redkale/net/sncp/SncpDynServlet.java b/src/com/wentch/redkale/net/sncp/SncpDynServlet.java index 8487cb3f7..68d69baa6 100644 --- a/src/com/wentch/redkale/net/sncp/SncpDynServlet.java +++ b/src/com/wentch/redkale/net/sncp/SncpDynServlet.java @@ -6,12 +6,14 @@ package com.wentch.redkale.net.sncp; import com.wentch.redkale.convert.bson.*; -import static com.wentch.redkale.net.sncp.SncpRequest.HEADER_SIZE; +import static com.wentch.redkale.net.sncp.SncpRequest.DEFAULT_HEADER; import com.wentch.redkale.service.*; import com.wentch.redkale.util.*; import java.io.*; import java.lang.reflect.*; +import java.nio.*; import java.util.*; +import java.util.function.*; import java.util.logging.*; import javax.annotation.*; import jdk.internal.org.objectweb.asm.*; @@ -38,6 +40,8 @@ public final class SncpDynServlet extends SncpServlet { private final HashMap actions = new HashMap<>(); + private Supplier bufferSupplier; + public SncpDynServlet(final BsonConvert convert, final String serviceName, final Service service, final AnyValue conf) { this.conf = conf; this.serviceName = serviceName; @@ -81,12 +85,22 @@ public final class SncpDynServlet extends SncpServlet { @Override public void execute(SncpRequest request, SncpResponse response) throws IOException { + final boolean tcp = request.isTCP(); + if (bufferSupplier == null) { + if (tcp) { + bufferSupplier = request.getContext().getBufferSupplier(); + } else { //UDP 需要分包 + final Supplier supplier = request.getContext().getBufferSupplier(); + bufferSupplier = () -> supplier.get().put(DEFAULT_HEADER); + } + } SncpServletAction action = actions.get(request.getActionid()); //if (finest) logger.log(Level.FINEST, "sncpdyn.execute: " + request + ", " + (action == null ? "null" : action.method)); if (action == null) { response.finish(SncpResponse.RETCODE_ILLACTIONID, null); //无效actionid } else { - BsonWriter out = action.convert.pollBsonWriter().fillRange(HEADER_SIZE); + BsonWriter out = action.convert.pollBsonWriter(bufferSupplier); + if (tcp) out.writeTo(DEFAULT_HEADER); BsonReader in = action.convert.pollBsonReader(); try { in.setBytes(request.getBody()); diff --git a/src/com/wentch/redkale/net/sncp/SncpRequest.java b/src/com/wentch/redkale/net/sncp/SncpRequest.java index 50ac7f79b..f5e4eb8c6 100644 --- a/src/com/wentch/redkale/net/sncp/SncpRequest.java +++ b/src/com/wentch/redkale/net/sncp/SncpRequest.java @@ -20,6 +20,8 @@ public final class SncpRequest extends Request { public static final int HEADER_SIZE = 64; + public static final byte[] DEFAULT_HEADER = new byte[HEADER_SIZE]; + protected final BsonConvert convert; private long seqid; diff --git a/src/com/wentch/redkale/net/sncp/SncpResponse.java b/src/com/wentch/redkale/net/sncp/SncpResponse.java index 5b0658236..d65d76d0b 100644 --- a/src/com/wentch/redkale/net/sncp/SncpResponse.java +++ b/src/com/wentch/redkale/net/sncp/SncpResponse.java @@ -55,48 +55,25 @@ public final class SncpResponse extends Response { finish(buffer); return; } - final int respBodyLength = out.count() - HEADER_SIZE; //body总长度 - if (this.channel.isTCP() || out.count() <= context.getBufferCapacity()) { //TCP模式 或者 一帧数据 - fillHeader(out, respBodyLength, 0, respBodyLength, retcode); - finish(out.toBuffer()); - return; - } - final int bufsize = context.getBufferCapacity() - HEADER_SIZE; - final int frames = (respBodyLength / bufsize) + (respBodyLength % bufsize > 0 ? 1 : 0); - final ByteBuffer[] buffers = new ByteBuffer[frames]; - int pos = 0; - for (int i = 0; i < frames; i++) { - final ByteBuffer buffer = context.pollBuffer(); - int len = Math.min(bufsize, respBodyLength - pos); - fillHeader(buffer, respBodyLength, pos, len, retcode); - buffers[i] = buffer; - out.toBuffer(pos + HEADER_SIZE, buffer); - pos += len; - buffer.flip(); + final int respBodyLength = out.count(); //body总长度 + final ByteBuffer[] buffers = out.toBuffers(); + if (this.channel.isTCP()) { //TCP模式 或者 一帧数据 TCP的总长度需要减去第一个buffer的header长度 + fillHeader(buffers[0], respBodyLength - HEADER_SIZE, 0, respBodyLength - HEADER_SIZE, retcode); + } else { + int pos = 0; + for (ByteBuffer buffer : buffers) { + int len = buffer.remaining() - HEADER_SIZE; + fillHeader(buffer, respBodyLength, pos, len, retcode); + pos += len; + } } finish(buffers); } - private void fillHeader(BsonWriter writer, int bodyLength, int bodyOffset, int framelength, int retcode) { - //---------------------head---------------------------------- - int pos = 0; - pos = writer.rewriteTo(pos, request.getSeqid()); - pos = writer.rewriteTo(pos, (char) SncpRequest.HEADER_SIZE); - pos = writer.rewriteTo(pos, request.getServiceid()); - pos = writer.rewriteTo(pos, request.getNameid()); - DLong actionid = request.getActionid(); - pos = writer.rewriteTo(pos, actionid.getFirst()); - pos = writer.rewriteTo(pos, actionid.getSecond()); - pos = writer.rewriteTo(pos, addrBytes); - pos = writer.rewriteTo(pos, (char) this.addrPort); - pos = writer.rewriteTo(pos, bodyLength); - pos = writer.rewriteTo(pos, bodyOffset); - pos = writer.rewriteTo(pos, framelength); - writer.rewriteTo(pos, retcode); - } - private void fillHeader(ByteBuffer buffer, int bodyLength, int bodyOffset, int framelength, int retcode) { //---------------------head---------------------------------- + final int currentpos = buffer.position(); + buffer.position(0); buffer.putLong(request.getSeqid()); buffer.putChar((char) SncpRequest.HEADER_SIZE); buffer.putLong(request.getServiceid()); @@ -110,5 +87,6 @@ public final class SncpResponse extends Response { buffer.putInt(bodyOffset); buffer.putInt(framelength); buffer.putInt(retcode); + buffer.position(currentpos); } }