This commit is contained in:
地平线
2015-11-03 14:10:15 +08:00
parent 8cb3941e4f
commit 62e6196bb4
4 changed files with 48 additions and 89 deletions

View File

@@ -206,13 +206,13 @@ public final class SncpClient {
} }
private byte[] send(final BsonConvert convert, Transport transport, final SncpAction action, Object... params) { private byte[] send(final BsonConvert convert, Transport transport, final SncpAction action, Object... params) {
int bodyLength = 2;
Type[] myparamtypes = action.paramTypes; Type[] myparamtypes = action.paramTypes;
byte[][] bytesarray = new byte[params.length][]; final BsonWriter bw = convert.pollBsonWriter();
for (int i = 0; i < bytesarray.length; i++) { for (int i = 0; i < params.length; i++) {
bytesarray[i] = convert.convertTo(myparamtypes[i], params[i]); convert.convertTo(bw, myparamtypes[i], params[i]);
bodyLength += 4 + bytesarray[i].length;
} }
final int bodyLength = bw.count();
final long seqid = System.nanoTime(); final long seqid = System.nanoTime();
final DLong actionid = action.actionid; final DLong actionid = action.actionid;
final AsyncConnection conn = transport.pollConnection(action.addressParamIndex >= 0 ? (SocketAddress) params[action.addressParamIndex] : null); final AsyncConnection conn = transport.pollConnection(action.addressParamIndex >= 0 ? (SocketAddress) params[action.addressParamIndex] : null);
@@ -225,47 +225,23 @@ public final class SncpClient {
if (debug) logger.finest(this.serviceid + "," + this.nameid + "," + action + " sncp length : " + (HEADER_SIZE + bodyLength)); if (debug) logger.finest(this.serviceid + "," + this.nameid + "," + action + " sncp length : " + (HEADER_SIZE + bodyLength));
final int frames = bodyLength / (buffer.capacity() - HEADER_SIZE) + (bodyLength % (buffer.capacity() - HEADER_SIZE) > 0 ? 1 : 0); final int frames = bodyLength / (buffer.capacity() - HEADER_SIZE) + (bodyLength % (buffer.capacity() - HEADER_SIZE) > 0 ? 1 : 0);
int pos = 0; int pos = 0;
final byte[] all = new byte[bodyLength];
{ //将二维byte数组转换成一维bye数组
all[pos++] = (byte) ((bytesarray.length & 0xff00) >> 8);
all[pos++] = (byte) (bytesarray.length & 0xff);
for (byte[] bs : bytesarray) {
all[pos++] = (byte) ((bs.length & 0xff000000) >> 24);
all[pos++] = (byte) ((bs.length & 0xff0000) >> 16);
all[pos++] = (byte) ((bs.length & 0xff00) >> 8);
all[pos++] = (byte) (bs.length & 0xff);
System.arraycopy(bs, 0, all, pos, bs.length);
pos += bs.length;
}
}
if (pos != all.length) throw new RuntimeException(this.serviceid + "," + this.nameid + "," + action + " sncp(" + action.method + ") body.length : " + all.length + ", but pos=" + pos);
pos = 0;
for (int i = frames - 1; i >= 0; i--) { //填充每一帧的数据 for (int i = frames - 1; i >= 0; i--) { //填充每一帧的数据
int len = Math.min(buffer.remaining() - HEADER_SIZE, all.length - pos); int len = Math.min(buffer.remaining() - HEADER_SIZE, bodyLength - pos);
fillHeader(buffer, seqid, actionid, bodyLength, frames, i, pos, len); fillHeader(buffer, seqid, actionid, bodyLength, frames, i, pos, len);
buffer.put(all, pos, len); pos += bw.toBuffer(pos, buffer);
pos += len;
buffer.flip(); buffer.flip();
Thread.sleep(1); Thread.sleep(1);
conn.write(buffer).get(writeto > 0 ? writeto : 5, TimeUnit.SECONDS); conn.write(buffer).get(writeto > 0 ? writeto : 5, TimeUnit.SECONDS);
buffer.clear(); buffer.clear();
} }
convert.offerBsonWriter(bw);
} else { //只有一帧的数据 } else { //只有一帧的数据
{
//---------------------head---------------------------------- //---------------------head----------------------------------
int len = 2; fillHeader(buffer, seqid, actionid, bodyLength, 1, 0, 0, bodyLength);
for (byte[] bs : bytesarray) {
len += 4 + bs.length;
}
fillHeader(buffer, seqid, actionid, bodyLength, 1, 0, 0, len);
//---------------------body---------------------------------- //---------------------body----------------------------------
buffer.putChar((char) bytesarray.length); //参数数组大小 bw.toBuffer(buffer);
for (byte[] bs : bytesarray) { convert.offerBsonWriter(bw);
buffer.putInt(bs.length);
buffer.put(bs);
}
buffer.flip(); buffer.flip();
}
conn.write(buffer).get(writeto > 0 ? writeto : 5, TimeUnit.SECONDS); conn.write(buffer).get(writeto > 0 ? writeto : 5, TimeUnit.SECONDS);
buffer.clear(); buffer.clear();
} }

View File

@@ -85,11 +85,15 @@ public final class SncpDynServlet extends SncpServlet {
if (action == null) { if (action == null) {
response.finish(SncpResponse.RETCODE_ILLACTIONID, null); //无效actionid response.finish(SncpResponse.RETCODE_ILLACTIONID, null); //无效actionid
} else { } else {
BsonReader in = action.convert.pollBsonReader();
try { try {
response.finish(0, action.action(request.getParamBytes())); in.setBytes(request.getBody());
response.finish(0, action.action(in));
} catch (Throwable t) { } catch (Throwable t) {
response.getContext().getLogger().log(Level.INFO, "sncp execute error(" + request + ")", t); response.getContext().getLogger().log(Level.INFO, "sncp execute error(" + request + ")", t);
response.finish(SncpResponse.RETCODE_THROWEXCEPTION, null); response.finish(SncpResponse.RETCODE_THROWEXCEPTION, null);
} finally {
action.convert.offerBsonReader(in);
} }
} }
} }
@@ -103,7 +107,7 @@ public final class SncpDynServlet extends SncpServlet {
protected java.lang.reflect.Type[] paramTypes; //index=0表示返回参数的type void的返回参数类型为null protected java.lang.reflect.Type[] paramTypes; //index=0表示返回参数的type void的返回参数类型为null
public abstract byte[] action(byte[][] bytes) throws Throwable; public abstract byte[] action(final BsonReader in) throws Throwable;
/* /*
* *
@@ -118,10 +122,10 @@ public final class SncpDynServlet extends SncpServlet {
* public TestService service; * public TestService service;
* *
* @Override * @Override
* public byte[] action(byte[][] bytes) throws Throwable { * public byte[] action(final BsonReader in) throws Throwable {
* TestBean arg1 = convert.convertFrom(paramTypes[1], bytes[1]); * TestBean arg1 = convert.convertFrom(in, paramTypes[1]);
* String arg2 = convert.convertFrom(paramTypes[2], bytes[2]); * String arg2 = convert.convertFrom(in, paramTypes[2]);
* int arg3 = convert.convertFrom(paramTypes[3], bytes[3]); * int arg3 = convert.convertFrom(in, paramTypes[3]);
* Object rs = service.change(arg1, arg2, arg3); * Object rs = service.change(arg1, arg2, arg3);
* return convert.convertTo(paramTypes[0], rs); * return convert.convertTo(paramTypes[0], rs);
* } * }
@@ -140,6 +144,7 @@ public final class SncpDynServlet extends SncpServlet {
final String supDynName = SncpServletAction.class.getName().replace('.', '/'); final String supDynName = SncpServletAction.class.getName().replace('.', '/');
final String serviceName = serviceClass.getName().replace('.', '/'); final String serviceName = serviceClass.getName().replace('.', '/');
final String convertName = BsonConvert.class.getName().replace('.', '/'); final String convertName = BsonConvert.class.getName().replace('.', '/');
final String convertReaderDesc = Type.getDescriptor(BsonReader.class);
final String serviceDesc = Type.getDescriptor(serviceClass); final String serviceDesc = Type.getDescriptor(serviceClass);
String newDynName = serviceName.substring(0, serviceName.lastIndexOf('/') + 1) String newDynName = serviceName.substring(0, serviceName.lastIndexOf('/') + 1)
+ "DynAction" + serviceClass.getSimpleName() + "_" + method.getName() + "_" + actionid; + "DynAction" + serviceClass.getSimpleName() + "_" + method.getName() + "_" + actionid;
@@ -173,14 +178,14 @@ public final class SncpDynServlet extends SncpServlet {
mv.visitMaxs(1, 1); mv.visitMaxs(1, 1);
mv.visitEnd(); mv.visitEnd();
} }
String convertFromDesc = "(Ljava/lang/reflect/Type;[B)Ljava/lang/Object;"; String convertFromDesc = "(" + convertReaderDesc + "Ljava/lang/reflect/Type;)Ljava/lang/Object;";
try { try {
convertFromDesc = Type.getMethodDescriptor(BsonConvert.class.getMethod("convertFrom", java.lang.reflect.Type.class, byte[].class)); convertFromDesc = Type.getMethodDescriptor(BsonConvert.class.getMethod("convertFrom", BsonReader.class, java.lang.reflect.Type.class));
} catch (Exception ex) { } catch (Exception ex) {
throw new RuntimeException(ex); //不可能会发生 throw new RuntimeException(ex); //不可能会发生
} }
{ // action方法 { // action方法
mv = new DebugMethodVisitor(cw.visitMethod(ACC_PUBLIC, "action", "([[B)[B", null, new String[]{"java/lang/Throwable"})); mv = new DebugMethodVisitor(cw.visitMethod(ACC_PUBLIC, "action", "(" + convertReaderDesc + ")[B", null, new String[]{"java/lang/Throwable"}));
//mv.setDebug(true); //mv.setDebug(true);
int iconst = ICONST_1; int iconst = ICONST_1;
int intconst = 1; int intconst = 1;
@@ -190,6 +195,7 @@ public final class SncpDynServlet extends SncpServlet {
for (int i = 0; i < paramClasses.length; i++) { //参数 for (int i = 0; i < paramClasses.length; i++) { //参数
mv.visitVarInsn(ALOAD, 0); mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, "convert", Type.getDescriptor(BsonConvert.class)); mv.visitFieldInsn(GETFIELD, newDynName, "convert", Type.getDescriptor(BsonConvert.class));
mv.visitVarInsn(ALOAD, 1);
mv.visitVarInsn(ALOAD, 0); mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName, "paramTypes", "[Ljava/lang/reflect/Type;"); mv.visitFieldInsn(GETFIELD, newDynName, "paramTypes", "[Ljava/lang/reflect/Type;");
if (iconst > ICONST_5) { if (iconst > ICONST_5) {
@@ -198,13 +204,7 @@ public final class SncpDynServlet extends SncpServlet {
mv.visitInsn(iconst); // mv.visitInsn(iconst); //
} }
mv.visitInsn(AALOAD); mv.visitInsn(AALOAD);
mv.visitVarInsn(ALOAD, 1);
if (iconst > ICONST_5) {
mv.visitIntInsn(BIPUSH, intconst);
} else {
mv.visitInsn(iconst); //
}
mv.visitInsn(AALOAD);
mv.visitMethodInsn(INVOKEVIRTUAL, convertName, "convertFrom", convertFromDesc, false); mv.visitMethodInsn(INVOKEVIRTUAL, convertName, "convertFrom", convertFromDesc, false);
int load = ALOAD; int load = ALOAD;
int v = 0; int v = 0;

View File

@@ -40,8 +40,7 @@ public final class SncpRequest extends Request {
private int bodyoffset; private int bodyoffset;
private byte[][] paramBytes; //private byte[][] paramBytes;
private boolean ping; private boolean ping;
private byte[] body; private byte[] body;
@@ -86,14 +85,8 @@ public final class SncpRequest extends Request {
this.framelength = buffer.getChar(); this.framelength = buffer.getChar();
//---------------------body---------------------------------- //---------------------body----------------------------------
if (this.framecount == 1) { //只有一帧的数据 if (this.framecount == 1) { //只有一帧的数据
int paramlen = buffer.getChar(); this.body = new byte[this.framelength];
byte[][] bbytes = new byte[paramlen + 1][]; //占位第0个byte[] buffer.get(body);
for (int i = 1; i <= paramlen; i++) {
byte[] bytes = new byte[buffer.getInt()];
buffer.get(bytes);
bbytes[i] = bytes;
}
this.paramBytes = bbytes;
return 0; return 0;
} }
//多帧数据 //多帧数据
@@ -151,19 +144,6 @@ public final class SncpRequest extends Request {
@Override @Override
protected void prepare() { protected void prepare() {
this.keepAlive = this.channel.isTCP(); this.keepAlive = this.channel.isTCP();
if (this.body == null) return;
byte[] bytes = this.body;
int pos = 0;
int paramlen = ((0xff00 & (bytes[pos++] << 8)) | (0xff & bytes[pos++]));
byte[][] bbytes = new byte[paramlen + 1][]; //占位第0个byte[]
for (int i = 1; i <= paramlen; i++) {
byte[] bs = new byte[(0xff000000 & (bytes[pos++] << 24)) | (0xff0000 & (bytes[pos++] << 16))
| (0xff00 & (bytes[pos++] << 8)) | (0xff & bytes[pos++])];
System.arraycopy(bytes, pos, bs, 0, bs.length);
pos += bs.length;
bbytes[i] = bs;
}
this.paramBytes = bbytes;
} }
@Override @Override
@@ -185,7 +165,7 @@ public final class SncpRequest extends Request {
this.bodylength = 0; this.bodylength = 0;
this.bodyoffset = 0; this.bodyoffset = 0;
this.body = null; this.body = null;
this.paramBytes = null; //this.paramBytes = null;
this.ping = false; this.ping = false;
this.remoteAddress = null; this.remoteAddress = null;
this.bufferbytes[0] = 0; this.bufferbytes[0] = 0;
@@ -196,8 +176,11 @@ public final class SncpRequest extends Request {
return ping; return ping;
} }
public byte[][] getParamBytes() { // public byte[][] getParamBytes() {
return paramBytes; // return paramBytes;
// }
public byte[] getBody() {
return body;
} }
public long getSeqid() { public long getSeqid() {

View File

@@ -53,7 +53,7 @@ public final class SncpResponse extends Response<SncpRequest> {
final int frames = bodyLength / (buffer.capacity() - HEADER_SIZE) + (bodyLength % (buffer.capacity() - HEADER_SIZE) > 0 ? 1 : 0); final int frames = bodyLength / (buffer.capacity() - HEADER_SIZE) + (bodyLength % (buffer.capacity() - HEADER_SIZE) > 0 ? 1 : 0);
if (frames <= 1) { if (frames <= 1) {
//---------------------head---------------------------------- //---------------------head----------------------------------
fillHeader(buffer, bodyLength, 1, 0, retcode, 0, bytes.length); fillHeader(buffer, bodyLength, 1, 0, retcode, 0, bytes == null ? 0 : bytes.length);
//---------------------body---------------------------------- //---------------------body----------------------------------
if (bytes != null) buffer.put(bytes); if (bytes != null) buffer.put(bytes);
buffer.flip(); buffer.flip();