This commit is contained in:
地平线
2015-11-02 18:10:19 +08:00
parent cd64a7aaff
commit 156323d0e5
9 changed files with 145 additions and 62 deletions

View File

@@ -256,11 +256,13 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
protected <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = 0;
int end = offset + length - 1;
for (int i = offset; i < offset + length; i++) {
rs += channel.send(srcs[i], remoteAddress);
if (i != end) Thread.sleep(1);
}
if (handler != null) handler.completed(rs, attachment);
} catch (IOException e) {
} catch (Exception e) {
if (handler != null) handler.failed(e, attachment);
}
}

View File

@@ -60,7 +60,9 @@ public final class PrepareRunner implements Runnable {
}
// { //测试
// buffer.flip();
// System.println(new String(buffer.array(), 0, buffer.remaining()));
// byte[] bs = new byte[buffer.remaining()];
// buffer.get(bs);
// System.println(new String(bs));
// }
buffer.flip();
final Response response = responsePool.poll();
@@ -68,7 +70,7 @@ public final class PrepareRunner implements Runnable {
try {
prepare.prepare(buffer, response.request, response);
} catch (Throwable t) { //此处不可 context.offerBuffer(buffer); 以免prepare.prepare内部异常导致重复 offerBuffer
context.logger.log(Level.WARNING, "prepare servlet abort, forece to close channel ", t);
context.logger.log(Level.WARNING, "prepare servlet abort, forece to close channel ", t);
response.finish(true);
}
}

View File

@@ -42,8 +42,7 @@ public abstract class PrepareServlet<R extends Request, P extends Response<R>> i
@Override
public void completed(Integer result, ByteBuffer attachment) {
buffer.flip();
ai.addAndGet(-buffer.remaining());
request.readBody(buffer);
ai.addAndGet(-request.readBody(buffer));
if (ai.get() > 0) {
buffer.clear();
request.channel.read(buffer, buffer, this);

View File

@@ -34,7 +34,7 @@ public abstract class ProtocolServer {
//---------------------------------------------------------------------
public static ProtocolServer create(String protocol, Context context) {
if ("TCP".equalsIgnoreCase(protocol)) return new ProtocolTCPServer(context);
if ("UDP".equalsIgnoreCase(protocol)) return winos ? new ProtocolUDPWinServer(context) : new ProtocolUDPServer(context);
if ("UDP".equalsIgnoreCase(protocol)) return new ProtocolUDPServer(context);
throw new RuntimeException("ProtocolServer not support protocol " + protocol);
}

View File

@@ -42,7 +42,12 @@ public abstract class Request {
*/
protected abstract int readHeader(ByteBuffer buffer);
protected abstract void readBody(ByteBuffer buffer);
/**
* 读取buffer并返回读取的有效数据长度
@param buffer
@return
*/
protected abstract int readBody(ByteBuffer buffer);
protected abstract void prepare();

View File

@@ -159,8 +159,10 @@ public class HttpRequest extends Request {
}
@Override
protected void readBody(ByteBuffer buffer) {
array.add(buffer, buffer.remaining());
protected int readBody(ByteBuffer buffer) {
int len = buffer.remaining();
array.add(buffer, len);
return len;
}
@Override

View File

@@ -223,34 +223,41 @@ public final class SncpClient {
try {
if ((HEADER_SIZE + bodyLength) > buffer.limit()) {
if (debug) logger.finest(this.serviceid + "," + this.nameid + "," + action + " sncp length : " + (HEADER_SIZE + bodyLength));
final int patch = bodyLength / (buffer.capacity() - HEADER_SIZE) + (bodyLength % (buffer.capacity() - HEADER_SIZE) > 0 ? 1 : 0);
final int frames = bodyLength / (buffer.capacity() - HEADER_SIZE) + (bodyLength % (buffer.capacity() - HEADER_SIZE) > 0 ? 1 : 0);
int pos = 0;
final byte[] all = new byte[bodyLength];
all[pos++] = (byte) ((bytesarray.length & 0xff00) >> 8);
all[pos++] = (byte) (bytesarray.length & 0xff);
for (byte[] bs : bytesarray) {
all[pos++] = (byte) ((bs.length & 0xff000000) >> 24);
all[pos++] = (byte) ((bs.length & 0xff0000) >> 16);
all[pos++] = (byte) ((bs.length & 0xff00) >> 8);
all[pos++] = (byte) (bs.length & 0xff);
System.arraycopy(bs, 0, all, pos, bs.length);
pos += bs.length;
{ //将二维byte数组转换成一维bye数组
all[pos++] = (byte) ((bytesarray.length & 0xff00) >> 8);
all[pos++] = (byte) (bytesarray.length & 0xff);
for (byte[] bs : bytesarray) {
all[pos++] = (byte) ((bs.length & 0xff000000) >> 24);
all[pos++] = (byte) ((bs.length & 0xff0000) >> 16);
all[pos++] = (byte) ((bs.length & 0xff00) >> 8);
all[pos++] = (byte) (bs.length & 0xff);
System.arraycopy(bs, 0, all, pos, bs.length);
pos += bs.length;
}
}
if (pos != all.length) logger.warning(this.serviceid + "," + this.nameid + "," + action + " sncp(" + action.method + ") body.length : " + all.length + ", but pos=" + pos);
if (pos != all.length) throw new RuntimeException(this.serviceid + "," + this.nameid + "," + action + " sncp(" + action.method + ") body.length : " + all.length + ", but pos=" + pos);
pos = 0;
for (int i = patch - 1; i >= 0; i--) {
fillHeader(buffer, seqid, actionid, patch, i, bodyLength);
int len = Math.min(buffer.remaining(), all.length - pos);
for (int i = frames - 1; i >= 0; i--) { //填充每一帧的数据
int len = Math.min(buffer.remaining() - HEADER_SIZE, all.length - pos);
fillHeader(buffer, seqid, actionid, bodyLength, frames, i, pos, len);
buffer.put(all, pos, len);
pos += len;
buffer.flip();
Thread.sleep(1);
conn.write(buffer).get(writeto > 0 ? writeto : 5, TimeUnit.SECONDS);
buffer.clear();
}
} else { //只有一帧的数据
{
//---------------------head----------------------------------
fillHeader(buffer, seqid, actionid, 1, 0, bodyLength);
int len = 2;
for (byte[] bs : bytesarray) {
len += 4 + bs.length;
}
fillHeader(buffer, seqid, actionid, bodyLength, 1, 0, 0, len);
//---------------------body----------------------------------
buffer.putChar((char) bytesarray.length); //参数数组大小
for (byte[] bs : bytesarray) {
@@ -262,7 +269,7 @@ public final class SncpClient {
conn.write(buffer).get(writeto > 0 ? writeto : 5, TimeUnit.SECONDS);
buffer.clear();
}
conn.read(buffer).get(readto > 0 ? readto : 5, TimeUnit.SECONDS);
conn.read(buffer).get(readto > 0 ? readto : 5, TimeUnit.SECONDS); //读取第一帧的结果数据
buffer.flip();
long rseqid = buffer.getLong();
if (rseqid != seqid) throw new RuntimeException("sncp(" + action.method + ") send seqid = " + seqid + ", but receive seqid =" + rseqid);
@@ -274,28 +281,48 @@ public final class SncpClient {
long ractionid1 = buffer.getLong();
long ractionid2 = buffer.getLong();
if (!actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp(" + action.method + ") send actionid = " + actionid + ", but receive actionid =(" + ractionid1 + "_" + ractionid2 + ")");
buffer.getInt();
buffer.getInt();
buffer.getInt(); //地址
buffer.getChar(); //端口
final int bodylen = buffer.getInt();
final int frameCount = buffer.get();
if (frameCount < 1) throw new RuntimeException("sncp(" + action.method + ") send nameid = " + nameid + ", but frame.count =" + frameCount);
int frameIndex = buffer.get();
if (frameIndex < 0 || frameIndex >= frameCount) throw new RuntimeException("sncp(" + action.method + ") send nameid = " + nameid + ", but frame.count =" + frameCount + " & frame.index =" + frameIndex);
final int retcode = buffer.getInt();
if (retcode != 0) throw new RuntimeException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")");
final int bodylen = buffer.getInt();
int bodyOffset = buffer.getInt();
int frameLength = buffer.getChar();
final byte[] body = new byte[bodylen];
if (frameCount == 1) {
buffer.get(body);
if (frameCount == 1) { //只有一帧的数据
buffer.get(body, bodyOffset, frameLength);
return body;
} else {
} else { //读取多帧结果数据
int received = 0;
int lack = 0;
int lackoffset = 0;
for (int i = 0; i < frameCount; i++) {
received += buffer.remaining();
buffer.get(body, (frameCount - frameIndex - 1) * (buffer.capacity() - HEADER_SIZE), buffer.remaining());
received += frameLength;
if (buffer.remaining() < frameLength) { //一帧缺失部分数据
lack = frameLength - buffer.remaining();
lackoffset = bodyOffset + buffer.remaining();
buffer.get(body, bodyOffset, buffer.remaining());
} else {
lack = 0;
buffer.get(body, bodyOffset, frameLength);
}
if (i == frameCount - 1) break;
buffer.clear();
if (buffer.hasRemaining()) {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
buffer.clear();
buffer.put(bytes);
} else {
buffer.clear();
}
conn.read(buffer).get(readto > 0 ? readto : 5, TimeUnit.SECONDS);
buffer.flip();
if (lack > 0) buffer.get(body, lackoffset, lack);
rseqid = buffer.getLong();
if (rseqid != seqid) throw new RuntimeException("sncp(" + action.method + ") send seqid = " + seqid + ", but receive next.seqid =" + rseqid);
if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp(" + action.method + ") buffer receive header.length not " + HEADER_SIZE);
@@ -306,16 +333,17 @@ public final class SncpClient {
ractionid1 = buffer.getLong();
ractionid2 = buffer.getLong();
if (!actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp(" + action.method + ") send actionid = " + actionid + ", but receive next.actionid =(" + ractionid1 + "_" + ractionid2 + ")");
buffer.getInt();
buffer.getInt();
if (buffer.get() < 1) throw new RuntimeException("sncp(" + action.method + ") send nameid = " + nameid + ", but next.frame.count != " + frameCount);
frameIndex = buffer.get();
if (frameIndex < 0 || frameIndex >= frameCount)
throw new RuntimeException("sncp(" + action.method + ") receive nameid = " + nameid + ", but frame.count =" + frameCount + " & next.frame.index =" + frameIndex);
int rretcode = buffer.getInt();
if (rretcode != 0) throw new RuntimeException("remote service(" + action.method + ") deal error (receive retcode =" + rretcode + ")");
buffer.getInt(); //地址
buffer.getChar(); //端口
int rbodylen = buffer.getInt();
if (rbodylen != bodylen) throw new RuntimeException("sncp(" + action.method + ") receive bodylength = " + bodylen + ", but receive next.bodylength =" + rbodylen);
if (buffer.get() <= 1) throw new RuntimeException("sncp(" + action.method + ") receive count error, but next.frame.count != " + frameCount);
frameIndex = buffer.get();
if (frameIndex < 0 || frameIndex >= frameCount) throw new RuntimeException("sncp(" + action.method + ") receive nameid = " + nameid + ", but frame.count =" + frameCount + " & next.frame.index =" + frameIndex);
int rretcode = buffer.getInt();
if (rretcode != 0) throw new RuntimeException("remote service(" + action.method + ") deal error (receive retcode =" + rretcode + ")");
bodyOffset = buffer.getInt();
frameLength = buffer.getChar();
}
if (received != bodylen) throw new RuntimeException("sncp(" + action.method + ") receive bodylength = " + bodylen + ", but receive next.receivedlength =" + received);
return body;
@@ -330,7 +358,7 @@ public final class SncpClient {
}
}
private void fillHeader(ByteBuffer buffer, long seqid, DLong actionid, int frameCount, int frameIndex, int bodyLength) {
private void fillHeader(ByteBuffer buffer, long seqid, DLong actionid, int bodyLength, int frameCount, int frameIndex, int bodyOffset, int frameLength) {
//---------------------head----------------------------------
buffer.putLong(seqid); //序列号
buffer.putChar((char) HEADER_SIZE); //header长度
@@ -339,10 +367,12 @@ public final class SncpClient {
buffer.putLong(actionid.getFirst());
buffer.putLong(actionid.getSecond());
buffer.put(addrBytes);
buffer.putInt(this.addrPort);
buffer.putChar((char) this.addrPort);
buffer.putInt(bodyLength); //body长度
buffer.put((byte) frameCount); //数据的帧数, 最小值为1
buffer.put((byte) frameIndex); //数据的帧数序号, 从frame.count-1开始, 0表示最后一帧
buffer.putInt(0); //结果码, 请求方固定传0
buffer.putInt(bodyLength); //body长度
buffer.putInt(bodyOffset);
buffer.putChar((char) frameLength); //一帧数据的长度
}
}

View File

@@ -18,7 +18,7 @@ import java.nio.*;
*/
public final class SncpRequest extends Request {
public static final int HEADER_SIZE = 60;
public static final int HEADER_SIZE = 64;
protected final BsonConvert convert;
@@ -28,6 +28,8 @@ public final class SncpRequest extends Request {
private int frameindex;
private int framelength;
private long nameid;
private long serviceid;
@@ -36,6 +38,8 @@ public final class SncpRequest extends Request {
private int bodylength;
private int bodyoffset;
private byte[][] paramBytes;
private boolean ping;
@@ -67,17 +71,19 @@ public final class SncpRequest extends Request {
this.nameid = buffer.getLong();
this.actionid = new DLong(buffer.getLong(), buffer.getLong());
buffer.get(bufferbytes);
int port = buffer.getInt();
int port = buffer.getChar();
if (bufferbytes[0] > 0 && port > 0) {
this.remoteAddress = new InetSocketAddress((0xff & bufferbytes[0]) + "." + (0xff & bufferbytes[1]) + "." + (0xff & bufferbytes[2]) + "." + (0xff & bufferbytes[3]), port);
}
this.bodylength = buffer.getInt();
this.framecount = buffer.get();
this.frameindex = buffer.get();
if (buffer.getInt() != 0) {
context.getLogger().finest("sncp buffer header.retcode not 0");
return -1;
}
this.bodylength = buffer.getInt();
this.bodyoffset = buffer.getInt();
this.framelength = buffer.getChar();
//---------------------body----------------------------------
if (this.framecount == 1) { //只有一帧的数据
int paramlen = buffer.getChar();
@@ -94,7 +100,7 @@ public final class SncpRequest extends Request {
final SncpContext scontext = (SncpContext) this.context;
RequestEntry entry = scontext.getRequestEntity(this.seqid);
if (entry == null) entry = scontext.addRequestEntity(this.seqid, new byte[this.bodylength]);
entry.add(buffer, (this.framecount - this.frameindex - 1) * (buffer.capacity() - HEADER_SIZE));
entry.add(buffer, this.bodyoffset);
if (entry.isCompleted()) { //数据读取完毕
this.body = entry.body;
@@ -103,11 +109,43 @@ public final class SncpRequest extends Request {
} else {
scontext.expireRequestEntry(10 * 1000); //10秒过期
}
if (this.channel.isTCP()) return this.bodylength - this.framelength;
return Integer.MIN_VALUE; //多帧数据返回 Integer.MIN_VALUE
}
@Override
protected void readBody(ByteBuffer buffer) {
protected int readBody(ByteBuffer buffer) { // TCP 模式会调用此方法
long rseqid = buffer.getLong();
if (rseqid != this.seqid) throw new RuntimeException("sncp frame receive seqid = " + seqid + ", but first receive seqid =" + rseqid);
if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp buffer receive header.length not " + HEADER_SIZE);
long rserviceid = buffer.getLong();
if (rserviceid != this.serviceid) throw new RuntimeException("sncp frame receive serviceid = " + serviceid + ", but first receive serviceid =" + rserviceid);
long rnameid = buffer.getLong();
if (rnameid != this.nameid) throw new RuntimeException("sncp frame receive nameid = " + nameid + ", but first receive nameid =" + rnameid);
long ractionid1 = buffer.getLong();
long ractionid2 = buffer.getLong();
if (!this.actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp frame receive actionid = " + actionid + ", but first receive actionid =(" + ractionid1 + "_" + ractionid2 + ")");
buffer.getInt(); //地址
buffer.getChar(); //端口
final int bodylen = buffer.getInt();
if (bodylen != this.bodylength) throw new RuntimeException("sncp frame receive bodylength = " + bodylen + ", but first bodylength =" + bodylength);
final int frameCount = buffer.get();
if (frameCount != this.framecount) throw new RuntimeException("sncp frame receive nameid = " + nameid + ", but first frame.count =" + frameCount);
final int frameIndex = buffer.get();
if (frameIndex < 0 || frameIndex >= frameCount) throw new RuntimeException("sncp frame receive nameid = " + nameid + ", but first frame.count =" + frameCount + " & frame.index =" + frameIndex);
final int retcode = buffer.getInt();
if (retcode != 0) throw new RuntimeException("sncp frame receive retcode error (retcode=" + retcode + ")");
final int bodyOffset = buffer.getInt();
final int framelen = buffer.getChar();
final SncpContext scontext = (SncpContext) this.context;
RequestEntry entry = scontext.getRequestEntity(this.seqid);
if (entry == null) entry = scontext.addRequestEntity(this.seqid, new byte[this.bodylength]);
entry.add(buffer, bodyOffset);
if (entry.isCompleted()) { //数据读取完毕
this.body = entry.body;
scontext.removeRequestEntity(this.seqid);
}
return framelen;
}
@Override
@@ -132,7 +170,8 @@ public final class SncpRequest extends Request {
public String toString() {
return SncpRequest.class.getSimpleName() + "{seqid=" + this.seqid
+ ",serviceid=" + this.serviceid + ",actionid=" + this.actionid
+ ",framecount=" + this.framecount + ",frameindex=" + this.frameindex + ",bodylength=" + this.bodylength + ",remoteAddress=" + remoteAddress + "}";
+ ",framecount=" + this.framecount + ",frameindex=" + this.frameindex + ",framelength=" + this.framelength
+ ",bodylength=" + this.bodylength+ ",bodyoffset=" + this.bodyoffset + ",remoteAddress=" + remoteAddress + "}";
}
@Override
@@ -140,9 +179,11 @@ public final class SncpRequest extends Request {
this.seqid = 0;
this.framecount = 0;
this.frameindex = 0;
this.framelength = 0;
this.serviceid = 0;
this.actionid = null;
this.bodylength = 0;
this.bodyoffset = 0;
this.body = null;
this.paramBytes = null;
this.ping = false;

View File

@@ -50,22 +50,22 @@ public final class SncpResponse extends Response<SncpRequest> {
public void finish(final int retcode, final byte[] bytes) {
ByteBuffer buffer = context.pollBuffer();
final int bodyLength = (bytes == null ? 0 : bytes.length);
final int patch = bodyLength / (buffer.capacity() - HEADER_SIZE) + (bodyLength % (buffer.capacity() - HEADER_SIZE) > 0 ? 1 : 0);
if (patch <= 1) {
final int frames = bodyLength / (buffer.capacity() - HEADER_SIZE) + (bodyLength % (buffer.capacity() - HEADER_SIZE) > 0 ? 1 : 0);
if (frames <= 1) {
//---------------------head----------------------------------
fillHeader(buffer, retcode, 1, 0, bodyLength);
fillHeader(buffer, bodyLength, 1, 0, retcode, 0, bytes.length);
//---------------------body----------------------------------
if (bytes != null) buffer.put(bytes);
buffer.flip();
finish(buffer);
} else {
final ByteBuffer[] buffers = new ByteBuffer[patch];
final ByteBuffer[] buffers = new ByteBuffer[frames];
int pos = 0;
for (int i = patch - 1; i >= 0; i--) {
if (i != patch - 1) buffer = context.pollBuffer();
fillHeader(buffer, retcode, patch, i, bodyLength);
for (int i = frames - 1; i >= 0; i--) {
if (i != frames - 1) buffer = context.pollBuffer();
int len = Math.min(buffer.remaining() - HEADER_SIZE, bytes.length - pos);
fillHeader(buffer, bodyLength, frames, i, retcode, pos, len);
buffers[i] = buffer;
int len = Math.min(buffer.remaining(), bytes.length - pos);
buffer.put(bytes, pos, len);
pos += len;
buffer.flip();
@@ -74,7 +74,7 @@ public final class SncpResponse extends Response<SncpRequest> {
}
}
private void fillHeader(ByteBuffer buffer, int retcode, int frameCount, int frameIndex, int bodyLength) {
private void fillHeader(ByteBuffer buffer, int bodyLength, int frameCount, int frameIndex, int retcode, int bodyOffset, int framelength) {
//---------------------head----------------------------------
buffer.putLong(request.getSeqid());
buffer.putChar((char) SncpRequest.HEADER_SIZE);
@@ -84,10 +84,12 @@ public final class SncpResponse extends Response<SncpRequest> {
buffer.putLong(actionid.getFirst());
buffer.putLong(actionid.getSecond());
buffer.put(addrBytes);
buffer.putInt(this.addrPort);
buffer.putChar((char) this.addrPort);
buffer.putInt(bodyLength);
buffer.put((byte) frameCount); // frame count
buffer.put((byte) frameIndex); //frame index
buffer.putInt(retcode);
buffer.putInt(bodyLength);
buffer.putInt(bodyOffset);
buffer.putChar((char) framelength);
}
}