This commit is contained in:
kamhung
2015-11-11 11:22:13 +08:00
parent 7bceab104b
commit a727dff93e
7 changed files with 92 additions and 343 deletions

View File

@@ -25,6 +25,7 @@ public final class BsonByteBufferWriter extends BsonWriter {
this.supplier = supplier;
}
@Override
public ByteBuffer[] toBuffers() {
if (buffers == null) return new ByteBuffer[0];
for (int i = index; i < this.buffers.length; i++) {
@@ -48,32 +49,9 @@ public final class BsonByteBufferWriter extends BsonWriter {
return bytes;
}
@Override
public ByteBuffer toBuffer() {
if (buffers == null) return null;
if (buffers.length == 1) return buffers[0];
final ByteBuffer rs = ByteBuffer.allocate(count);
for (ByteBuffer buf : toBuffers()) {
rs.put(buf);
buf.flip();
}
rs.flip();
return rs;
}
@Override
public void toBuffer(ByteBuffer buffer) {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int toBuffer(int offset, ByteBuffer buffer) {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public String toString() {
return "bytes[" + count() + "]";
return this.getClass().getSimpleName() + "[count=" + this.count + "]";
}
@Override
@@ -83,29 +61,7 @@ public final class BsonByteBufferWriter extends BsonWriter {
}
@Override
public BsonByteBufferWriter fillRange(final int len) {
ByteBuffer buffer = this.buffers[index];
if (expand(len) == 0) {
buffer.position(buffer.position() + len);
} else {
int remain = len; //还剩多少没有写
while (remain > 0) {
final int br = buffer.remaining();
if (remain > br) { //一个buffer写不完
buffer.position(buffer.position() + br);
buffer = nextByteBuffer();
remain -= br;
} else {
buffer.position(buffer.position() + remain);
remain = 0;
}
}
}
this.count += len;
return this;
}
private int expand(final int byteLength) {
protected int expand(final int byteLength) {
if (this.buffers == null) {
this.index = 0;
this.buffers = new ByteBuffer[]{supplier.get()};
@@ -134,11 +90,6 @@ public final class BsonByteBufferWriter extends BsonWriter {
return size;
}
@Override
public int rewriteTo(int position, byte... chs) {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public void writeTo(final byte[] chs, final int start, final int len) {
if (expand(len) == 0) {

View File

@@ -141,26 +141,6 @@ public final class BsonConvert extends Convert<BsonReader, BsonWriter> {
return convertTo(value.getClass(), value);
}
public ByteBuffer convertToBuffer(final Type type, Object value) {
if (type == null) return null;
final BsonWriter out = writerPool.get().setTiny(tiny);
factory.loadEncoder(type).convertTo(out, value);
ByteBuffer result = out.toBuffer();
writerPool.offer(out);
return result;
}
public ByteBuffer convertToBuffer(Object value) {
if (value == null) {
final BsonWriter out = writerPool.get().setTiny(tiny);
out.writeNull();
ByteBuffer result = out.toBuffer();
writerPool.offer(out);
return result;
}
return convertToBuffer(value.getClass(), value);
}
public BsonWriter convertToWriter(final Type type, Object value) {
if (type == null) return null;
final BsonWriter out = writerPool.get().setTiny(tiny);

View File

@@ -34,19 +34,8 @@ public class BsonWriter implements Writer {
return newdata;
}
public ByteBuffer toBuffer() {
return ByteBuffer.wrap(content, 0, count);
}
public void toBuffer(ByteBuffer buffer) {
buffer.put(content, 0, count);
}
public int toBuffer(int offset, ByteBuffer buffer) {
int len = Math.min(count - offset, buffer.remaining());
if (len < 1) return 0;
buffer.put(content, offset, len);
return len;
public ByteBuffer[] toBuffers() {
return new ByteBuffer[]{ByteBuffer.wrap(content, 0, count)};
}
protected BsonWriter(byte[] bs) {
@@ -74,57 +63,18 @@ public class BsonWriter implements Writer {
//-----------------------------------------------------------------------
//-----------------------------------------------------------------------
/**
* 返回指定至少指定长度的缓冲区
* 扩充指定长度的缓冲区
*
* @param len
* @return
* @return
*/
private byte[] expand(int len) {
protected int expand(int len) {
int newcount = count + len;
if (newcount <= content.length) return content;
if (newcount <= content.length) return 0;
byte[] newdata = new byte[Math.max(content.length * 3 / 2, newcount)];
System.arraycopy(content, 0, newdata, 0, count);
this.content = newdata;
return newdata;
}
/**
* 往指定的位置写入字节
*
* @param position
* @param chs
* @return
*/
public int rewriteTo(int position, byte... chs) {
System.arraycopy(chs, 0, content, position, chs.length);
return position + chs.length;
}
public final int rewriteTo(int position, short value) {
rewriteTo(position, (byte) (value >> 8), (byte) value);
return position + 2;
}
public final int rewriteTo(int position, char value) {
rewriteTo(position, (byte) ((value & 0xFF00) >> 8), (byte) (value & 0xFF));
return position + 2;
}
public final int rewriteTo(int position, int value) {
rewriteTo(position, (byte) (value >> 24), (byte) (value >> 16), (byte) (value >> 8), (byte) value);
return position + 4;
}
public final int rewriteTo(int position, long value) {
rewriteTo(position, (byte) (value >> 56), (byte) (value >> 48), (byte) (value >> 40), (byte) (value >> 32),
(byte) (value >> 24), (byte) (value >> 16), (byte) (value >> 8), (byte) value);
return position + 8;
}
public BsonWriter fillRange(final int len) {
expand(len);
count += len;
return this;
return 0;
}
public void writeTo(final byte ch) {
@@ -152,7 +102,7 @@ public class BsonWriter implements Writer {
@Override
public String toString() {
return new String(content, 0, count);
return this.getClass().getSimpleName() + "[count=" + this.count + "]";
}
//------------------------------------------------------------------------

View File

@@ -222,7 +222,8 @@ public final class SncpClient {
private Future<byte[]> remoteUDP(final BsonConvert convert, final Transport transport, final SncpAction action, final Object... params) {
Type[] myparamtypes = action.paramTypes;
final BsonWriter bw = convert.pollBsonWriter().fillRange(HEADER_SIZE); // 将head写入
final Supplier<ByteBuffer> supplier = transport.getBufferSupplier();
final BsonWriter bw = convert.pollBsonWriter(() -> supplier.get().put(DEFAULT_HEADER)); // 将head写入
for (int i = 0; i < params.length; i++) {
convert.convertTo(bw, myparamtypes[i], params[i]);
}
@@ -230,7 +231,7 @@ public final class SncpClient {
final AsyncConnection conn = transport.pollConnection(addr);
if (conn == null || !conn.isOpen()) throw new RuntimeException("sncp " + (conn == null ? addr : conn.getRemoteAddress()) + " cannot connect");
final int reqBodyLength = bw.count() - HEADER_SIZE; //body总长度
final int reqBodyLength = bw.count(); //body总长度
final long seqid = System.nanoTime();
final DLong actionid = action.actionid;
final int readto = conn.getReadTimeoutSecond();
@@ -238,23 +239,14 @@ public final class SncpClient {
final ByteBuffer buffer = transport.pollBuffer();
try {
//------------------------------ 发送请求 ---------------------------------------------------
if (transport.getBufferCapacity() >= bw.count()) { //只有一帧数据
fillHeader(bw, seqid, actionid, reqBodyLength, 0, reqBodyLength);
conn.write(bw.toBuffer()).get(writeto > 0 ? writeto : 3, TimeUnit.SECONDS);
} else {
final int bufsize = transport.getBufferCapacity() - HEADER_SIZE;
final int frames = (reqBodyLength / bufsize) + (reqBodyLength % bufsize > 0 ? 1 : 0);
int pos = 0;
for (int i = 0; i < frames; i++) {
int len = Math.min(bufsize, reqBodyLength - pos);
fillHeader(buffer, seqid, actionid, reqBodyLength, pos, len);
bw.toBuffer(pos + HEADER_SIZE, buffer);
pos += len;
buffer.flip();
if (i != 0) Thread.sleep(10);
conn.write(buffer).get(writeto > 0 ? writeto : 3, TimeUnit.SECONDS);
buffer.clear();
}
int pos = 0;
for (ByteBuffer buf : bw.toBuffers()) {
int len = buf.remaining() - HEADER_SIZE;
fillHeader(buf, seqid, actionid, reqBodyLength, pos, len);
pos += len;
Thread.sleep(20);
conn.write(buf).get(writeto > 0 ? writeto : 3, TimeUnit.SECONDS);
transport.offerBuffer(buf);
}
//------------------------------ 接收响应 ---------------------------------------------------
int received = 0;
@@ -264,6 +256,7 @@ public final class SncpClient {
buffer.clear();
conn.read(buffer).get(readto > 0 ? readto : 3, TimeUnit.SECONDS);
buffer.flip();
checkResult(seqid, action, buffer);
int respbodylen = buffer.getInt();
if (respBody == null) {
@@ -291,27 +284,42 @@ public final class SncpClient {
private Future<byte[]> remoteTCP(final BsonConvert convert, final Transport transport, final SncpAction action, final Object... params) {
Type[] myparamtypes = action.paramTypes;
final BsonWriter bw = convert.pollBsonWriter().fillRange(HEADER_SIZE); // 将head写入
final BsonWriter writer = convert.pollBsonWriter(transport.getBufferSupplier()); // 将head写入
writer.writeTo(DEFAULT_HEADER);
for (int i = 0; i < params.length; i++) {
convert.convertTo(bw, myparamtypes[i], params[i]);
convert.convertTo(writer, myparamtypes[i], params[i]);
}
final int reqBodyLength = bw.count() - HEADER_SIZE; //body总长度
final int reqBodyLength = writer.count() - HEADER_SIZE; //body总长度
final long seqid = System.nanoTime();
final DLong actionid = action.actionid;
final SocketAddress addr = action.addressParamIndex >= 0 ? (SocketAddress) params[action.addressParamIndex] : null;
final AsyncConnection conn = transport.pollConnection(addr);
if (conn == null || !conn.isOpen()) throw new RuntimeException("sncp " + (conn == null ? addr : conn.getRemoteAddress()) + " cannot connect");
fillHeader(bw, seqid, actionid, reqBodyLength, 0, reqBodyLength);
final ByteBuffer[] sendBuffers = writer.toBuffers();
fillHeader(sendBuffers[0], seqid, actionid, reqBodyLength, 0, reqBodyLength);
final ByteBuffer buffer = transport.pollBuffer();
final ByteBuffer sendbuf = bw.toBuffer();
final SncpFuture<byte[]> future = new SncpFuture();
conn.write(sendbuf, null, new CompletionHandler<Integer, Void>() {
conn.write(sendBuffers, sendBuffers, new CompletionHandler<Integer, ByteBuffer[]>() {
@Override
public void completed(Integer result, Void attachment) {
if (sendbuf.hasRemaining()) { //buffer没有传输完
conn.write(sendbuf, attachment, this);
public void completed(Integer result, ByteBuffer[] attachments) {
int index = -1;
for (int i = 0; i < attachments.length; i++) {
if (attachments[i].hasRemaining()) {
index = i;
break;
} else {
transport.offerBuffer(attachments[i]);
}
}
if (index == 0) {
conn.write(attachments, attachments, this);
return;
} else if (index > 0) {
ByteBuffer[] newattachs = new ByteBuffer[attachments.length - index];
System.arraycopy(attachments, index, newattachs, 0, newattachs.length);
conn.write(newattachs, newattachs, this);
return;
}
//----------------------- 读取返回结果 -------------------------------------
@@ -323,7 +331,7 @@ public final class SncpClient {
private int received;
@Override
public void completed(Integer count, Void attachment) {
public void completed(Integer count, Void attachment2) {
if (count < 1 && buffer.remaining() == buffer.limit()) { //没有数据可读
future.set(new RuntimeException(action.method + " sncp remote no response data"));
transport.offerBuffer(buffer);
@@ -331,7 +339,7 @@ public final class SncpClient {
return;
}
if (received < 1 && buffer.limit() < buffer.remaining() + HEADER_SIZE) { //header都没读全
conn.read(buffer, attachment, this);
conn.read(buffer, attachment2, this);
return;
}
buffer.flip();
@@ -341,7 +349,7 @@ public final class SncpClient {
buffer.get(body, offset, Math.min(buffer.remaining(), this.body.length - offset));
if (this.received < this.body.length) {// 数据仍然不全,需要继续读取
buffer.clear();
conn.read(buffer, attachment, this);
conn.read(buffer, attachment2, this);
} else {
success();
}
@@ -360,7 +368,7 @@ public final class SncpClient {
this.received = buffer.remaining();
buffer.get(body, 0, this.received);
buffer.clear();
conn.read(buffer, attachment, this);
conn.read(buffer, attachment2, this);
} else {
this.body = new byte[respBodyLength];
buffer.get(body, 0, respBodyLength);
@@ -375,7 +383,7 @@ public final class SncpClient {
}
@Override
public void failed(Throwable exc, Void attachment) {
public void failed(Throwable exc, Void attachment2) {
future.set(new RuntimeException(action.method + " sncp remote exec failed"));
transport.offerBuffer(buffer);
transport.offerConnection(conn);
@@ -385,7 +393,7 @@ public final class SncpClient {
}
@Override
public void failed(Throwable exc, Void attachment) {
public void failed(Throwable exc, ByteBuffer[] attachment) {
exc.printStackTrace();
transport.offerBuffer(buffer);
transport.offerConnection(conn);
@@ -409,159 +417,25 @@ public final class SncpClient {
buffer.getChar(); //端口
}
private byte[] send(final BsonConvert convert, Transport transport, final SncpAction action, Object... params) {
Type[] myparamtypes = action.paramTypes;
final BsonWriter bw = convert.pollBsonWriter();
for (int i = 0; i < params.length; i++) {
convert.convertTo(bw, myparamtypes[i], params[i]);
}
final int bodyLength = bw.count();
final long seqid = System.nanoTime();
final DLong actionid = action.actionid;
final AsyncConnection conn = transport.pollConnection(action.addressParamIndex >= 0 ? (SocketAddress) params[action.addressParamIndex] : null);
if (conn == null || !conn.isOpen()) return null;
final ByteBuffer buffer = transport.pollBuffer();
final int readto = conn.getReadTimeoutSecond();
final int writeto = conn.getWriteTimeoutSecond();
try {
if ((HEADER_SIZE + bodyLength) > buffer.limit()) {
//if (debug) logger.finest(this.serviceid + "," + this.nameid + "," + action + " sncp length : " + (HEADER_SIZE + reqBodyLength));
final int frames = bodyLength / (buffer.capacity() - HEADER_SIZE) + (bodyLength % (buffer.capacity() - HEADER_SIZE) > 0 ? 1 : 0);
int pos = 0;
for (int i = frames - 1; i >= 0; i--) { //填充每一帧的数据
int len = Math.min(buffer.remaining() - HEADER_SIZE, bodyLength - pos);
fillHeader(buffer, seqid, actionid, bodyLength, pos, len);
pos += bw.toBuffer(pos, buffer);
buffer.flip();
conn.write(buffer).get(writeto > 0 ? writeto : 3, TimeUnit.SECONDS);
buffer.clear();
}
convert.offerBsonWriter(bw);
} else { //只有一帧的数据
//---------------------head----------------------------------
fillHeader(buffer, seqid, actionid, bodyLength, 0, bodyLength);
//---------------------body----------------------------------
bw.toBuffer(buffer);
convert.offerBsonWriter(bw);
buffer.flip();
conn.write(buffer).get(writeto > 0 ? writeto : 3, TimeUnit.SECONDS);
buffer.clear();
}
conn.read(buffer).get(readto > 0 ? readto : 5, TimeUnit.SECONDS); //读取第一帧的结果数据
buffer.flip();
long rseqid = buffer.getLong();
if (rseqid != seqid) throw new RuntimeException("sncp(" + action.method + ") send seqid = " + seqid + ", but receive seqid =" + rseqid);
if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp(" + action.method + ") buffer receive header.length not " + HEADER_SIZE);
long rserviceid = buffer.getLong();
if (rserviceid != serviceid) throw new RuntimeException("sncp(" + action.method + ") send serviceid = " + serviceid + ", but receive serviceid =" + rserviceid);
long rnameid = buffer.getLong();
if (rnameid != nameid) throw new RuntimeException("sncp(" + action.method + ") send nameid = " + nameid + ", but receive nameid =" + rnameid);
long ractionid1 = buffer.getLong();
long ractionid2 = buffer.getLong();
if (!actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp(" + action.method + ") send actionid = " + actionid + ", but receive actionid =(" + ractionid1 + "_" + ractionid2 + ")");
buffer.getInt(); //地址
buffer.getChar(); //端口
final int bodylen = buffer.getInt();
int bodyOffset = buffer.getInt();
int frameLength = buffer.getInt();
final int retcode = buffer.getInt();
if (retcode != 0) throw new RuntimeException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")");
final byte[] body = new byte[bodylen];
if (bodylen == frameLength) { //只有一帧的数据
buffer.get(body, bodyOffset, frameLength);
return body;
} else { //读取多帧结果数据
int received = 0;
int lack = 0;
int lackoffset = 0;
while (received < bodylen) {
if (buffer.remaining() < frameLength) { //一帧缺失部分数据
lack = frameLength - buffer.remaining();
lackoffset = bodyOffset + buffer.remaining();
received += buffer.remaining();
buffer.get(body, bodyOffset, buffer.remaining());
} else {
lack = 0;
received += frameLength;
buffer.get(body, bodyOffset, frameLength);
}
if (received >= bodylen) break;
if (buffer.hasRemaining()) {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
buffer.clear();
buffer.put(bytes);
} else {
buffer.clear();
}
conn.read(buffer).get(readto > 0 ? readto : 5, TimeUnit.SECONDS);
buffer.flip();
if (lack > 0) buffer.get(body, lackoffset, lack);
rseqid = buffer.getLong();
if (rseqid != seqid) throw new RuntimeException("sncp(" + action.method + ") send seqid = " + seqid + ", but receive next.seqid =" + rseqid);
if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp(" + action.method + ") buffer receive header.length not " + HEADER_SIZE);
rserviceid = buffer.getLong();
if (rserviceid != serviceid) throw new RuntimeException("sncp(" + action.method + ") send serviceid = " + serviceid + ", but receive next.serviceid =" + rserviceid);
rnameid = buffer.getLong();
if (rnameid != nameid) throw new RuntimeException("sncp(" + action.method + ") send nameid = " + nameid + ", but receive next.nameid =" + rnameid);
ractionid1 = buffer.getLong();
ractionid2 = buffer.getLong();
if (!actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp(" + action.method + ") send actionid = " + actionid + ", but receive next.actionid =(" + ractionid1 + "_" + ractionid2 + ")");
buffer.getInt(); //地址
buffer.getChar(); //端口
int rbodylen = buffer.getInt();
if (rbodylen != bodylen) throw new RuntimeException("sncp(" + action.method + ") receive bodylength = " + bodylen + ", but receive next.bodylength =" + rbodylen);
bodyOffset = buffer.getInt();
frameLength = buffer.getInt();
int rretcode = buffer.getInt();
if (rretcode != 0) throw new RuntimeException("remote service(" + action.method + ") deal error (receive retcode =" + rretcode + ")");
}
if (received != bodylen) throw new RuntimeException("sncp(" + action.method + ") receive bodylength = " + bodylen + ", but receive next.receivedlength =" + received);
return body;
}
} catch (RuntimeException ex) {
throw ex;
} catch (Exception e) {
throw new RuntimeException("sncp(" + action.method + ") " + conn.getRemoteAddress() + " connect failed.", e);
} finally {
transport.offerBuffer(buffer);
transport.offerConnection(conn);
}
}
private void fillHeader(BsonWriter writer, long seqid, DLong actionid, int bodyLength, int bodyOffset, int frameLength) {
//---------------------head----------------------------------
int pos = 0;
pos = writer.rewriteTo(pos, seqid); //序列号
pos = writer.rewriteTo(pos, (char) HEADER_SIZE); //header长度
pos = writer.rewriteTo(pos, this.serviceid);
pos = writer.rewriteTo(pos, this.nameid);
pos = writer.rewriteTo(pos, actionid.getFirst());
pos = writer.rewriteTo(pos, actionid.getSecond());
pos = writer.rewriteTo(pos, addrBytes);
pos = writer.rewriteTo(pos, (char) this.addrPort);
pos = writer.rewriteTo(pos, bodyLength); //body长度
pos = writer.rewriteTo(pos, bodyOffset);
pos = writer.rewriteTo(pos, frameLength); //一帧数据的长度
writer.rewriteTo(pos, 0); //结果码, 请求方固定传0
}
private void fillHeader(ByteBuffer buffer, long seqid, DLong actionid, int bodyLength, int bodyOffset, int frameLength) {
//---------------------head----------------------------------
final int currentpos = buffer.position();
buffer.position(0);
buffer.putLong(seqid); //序列号
buffer.putChar((char) HEADER_SIZE); //header长度
buffer.putLong(this.serviceid);
buffer.putLong(this.nameid);
buffer.putLong(actionid.getFirst());
buffer.putLong(actionid.getSecond());
buffer.put(addrBytes);
buffer.put(addrBytes[0]);
buffer.put(addrBytes[1]);
buffer.put(addrBytes[2]);
buffer.put(addrBytes[3]);
buffer.putChar((char) this.addrPort);
buffer.putInt(bodyLength); //body长度
buffer.putInt(bodyOffset);
buffer.putInt(frameLength); //一帧数据的长度
buffer.putInt(0); //结果码, 请求方固定传0
buffer.position(currentpos);
}
}

View File

@@ -6,12 +6,14 @@
package com.wentch.redkale.net.sncp;
import com.wentch.redkale.convert.bson.*;
import static com.wentch.redkale.net.sncp.SncpRequest.HEADER_SIZE;
import static com.wentch.redkale.net.sncp.SncpRequest.DEFAULT_HEADER;
import com.wentch.redkale.service.*;
import com.wentch.redkale.util.*;
import java.io.*;
import java.lang.reflect.*;
import java.nio.*;
import java.util.*;
import java.util.function.*;
import java.util.logging.*;
import javax.annotation.*;
import jdk.internal.org.objectweb.asm.*;
@@ -38,6 +40,8 @@ public final class SncpDynServlet extends SncpServlet {
private final HashMap<DLong, SncpServletAction> actions = new HashMap<>();
private Supplier<ByteBuffer> bufferSupplier;
public SncpDynServlet(final BsonConvert convert, final String serviceName, final Service service, final AnyValue conf) {
this.conf = conf;
this.serviceName = serviceName;
@@ -81,12 +85,22 @@ public final class SncpDynServlet extends SncpServlet {
@Override
public void execute(SncpRequest request, SncpResponse response) throws IOException {
final boolean tcp = request.isTCP();
if (bufferSupplier == null) {
if (tcp) {
bufferSupplier = request.getContext().getBufferSupplier();
} else { //UDP 需要分包
final Supplier<ByteBuffer> supplier = request.getContext().getBufferSupplier();
bufferSupplier = () -> supplier.get().put(DEFAULT_HEADER);
}
}
SncpServletAction action = actions.get(request.getActionid());
//if (finest) logger.log(Level.FINEST, "sncpdyn.execute: " + request + ", " + (action == null ? "null" : action.method));
if (action == null) {
response.finish(SncpResponse.RETCODE_ILLACTIONID, null); //无效actionid
} else {
BsonWriter out = action.convert.pollBsonWriter().fillRange(HEADER_SIZE);
BsonWriter out = action.convert.pollBsonWriter(bufferSupplier);
if (tcp) out.writeTo(DEFAULT_HEADER);
BsonReader in = action.convert.pollBsonReader();
try {
in.setBytes(request.getBody());

View File

@@ -20,6 +20,8 @@ public final class SncpRequest extends Request {
public static final int HEADER_SIZE = 64;
public static final byte[] DEFAULT_HEADER = new byte[HEADER_SIZE];
protected final BsonConvert convert;
private long seqid;

View File

@@ -55,48 +55,25 @@ public final class SncpResponse extends Response<SncpRequest> {
finish(buffer);
return;
}
final int respBodyLength = out.count() - HEADER_SIZE; //body总长度
if (this.channel.isTCP() || out.count() <= context.getBufferCapacity()) { //TCP模式 或者 一帧数据
fillHeader(out, respBodyLength, 0, respBodyLength, retcode);
finish(out.toBuffer());
return;
}
final int bufsize = context.getBufferCapacity() - HEADER_SIZE;
final int frames = (respBodyLength / bufsize) + (respBodyLength % bufsize > 0 ? 1 : 0);
final ByteBuffer[] buffers = new ByteBuffer[frames];
int pos = 0;
for (int i = 0; i < frames; i++) {
final ByteBuffer buffer = context.pollBuffer();
int len = Math.min(bufsize, respBodyLength - pos);
fillHeader(buffer, respBodyLength, pos, len, retcode);
buffers[i] = buffer;
out.toBuffer(pos + HEADER_SIZE, buffer);
pos += len;
buffer.flip();
final int respBodyLength = out.count(); //body总长度
final ByteBuffer[] buffers = out.toBuffers();
if (this.channel.isTCP()) { //TCP模式 或者 一帧数据 TCP的总长度需要减去第一个buffer的header长度
fillHeader(buffers[0], respBodyLength - HEADER_SIZE, 0, respBodyLength - HEADER_SIZE, retcode);
} else {
int pos = 0;
for (ByteBuffer buffer : buffers) {
int len = buffer.remaining() - HEADER_SIZE;
fillHeader(buffer, respBodyLength, pos, len, retcode);
pos += len;
}
}
finish(buffers);
}
private void fillHeader(BsonWriter writer, int bodyLength, int bodyOffset, int framelength, int retcode) {
//---------------------head----------------------------------
int pos = 0;
pos = writer.rewriteTo(pos, request.getSeqid());
pos = writer.rewriteTo(pos, (char) SncpRequest.HEADER_SIZE);
pos = writer.rewriteTo(pos, request.getServiceid());
pos = writer.rewriteTo(pos, request.getNameid());
DLong actionid = request.getActionid();
pos = writer.rewriteTo(pos, actionid.getFirst());
pos = writer.rewriteTo(pos, actionid.getSecond());
pos = writer.rewriteTo(pos, addrBytes);
pos = writer.rewriteTo(pos, (char) this.addrPort);
pos = writer.rewriteTo(pos, bodyLength);
pos = writer.rewriteTo(pos, bodyOffset);
pos = writer.rewriteTo(pos, framelength);
writer.rewriteTo(pos, retcode);
}
private void fillHeader(ByteBuffer buffer, int bodyLength, int bodyOffset, int framelength, int retcode) {
//---------------------head----------------------------------
final int currentpos = buffer.position();
buffer.position(0);
buffer.putLong(request.getSeqid());
buffer.putChar((char) SncpRequest.HEADER_SIZE);
buffer.putLong(request.getServiceid());
@@ -110,5 +87,6 @@ public final class SncpResponse extends Response<SncpRequest> {
buffer.putInt(bodyOffset);
buffer.putInt(framelength);
buffer.putInt(retcode);
buffer.position(currentpos);
}
}