diff --git a/src/com/wentch/redkale/net/sncp/SncpClient.java b/src/com/wentch/redkale/net/sncp/SncpClient.java index 3f4dbb603..0e14d7fcf 100644 --- a/src/com/wentch/redkale/net/sncp/SncpClient.java +++ b/src/com/wentch/redkale/net/sncp/SncpClient.java @@ -206,13 +206,13 @@ public final class SncpClient { } private byte[] send(final BsonConvert convert, Transport transport, final SncpAction action, Object... params) { - int bodyLength = 2; Type[] myparamtypes = action.paramTypes; - byte[][] bytesarray = new byte[params.length][]; - for (int i = 0; i < bytesarray.length; i++) { - bytesarray[i] = convert.convertTo(myparamtypes[i], params[i]); - bodyLength += 4 + bytesarray[i].length; + final BsonWriter bw = convert.pollBsonWriter(); + for (int i = 0; i < params.length; i++) { + convert.convertTo(bw, myparamtypes[i], params[i]); } + final int bodyLength = bw.count(); + final long seqid = System.nanoTime(); final DLong actionid = action.actionid; final AsyncConnection conn = transport.pollConnection(action.addressParamIndex >= 0 ? (SocketAddress) params[action.addressParamIndex] : null); @@ -225,47 +225,23 @@ public final class SncpClient { if (debug) logger.finest(this.serviceid + "," + this.nameid + "," + action + " sncp length : " + (HEADER_SIZE + bodyLength)); final int frames = bodyLength / (buffer.capacity() - HEADER_SIZE) + (bodyLength % (buffer.capacity() - HEADER_SIZE) > 0 ? 1 : 0); int pos = 0; - final byte[] all = new byte[bodyLength]; - { //将二维byte数组转换成一维bye数组 - all[pos++] = (byte) ((bytesarray.length & 0xff00) >> 8); - all[pos++] = (byte) (bytesarray.length & 0xff); - for (byte[] bs : bytesarray) { - all[pos++] = (byte) ((bs.length & 0xff000000) >> 24); - all[pos++] = (byte) ((bs.length & 0xff0000) >> 16); - all[pos++] = (byte) ((bs.length & 0xff00) >> 8); - all[pos++] = (byte) (bs.length & 0xff); - System.arraycopy(bs, 0, all, pos, bs.length); - pos += bs.length; - } - } - if (pos != all.length) throw new RuntimeException(this.serviceid + "," + this.nameid + "," + action + " sncp(" + action.method + ") body.length : " + all.length + ", but pos=" + pos); - pos = 0; for (int i = frames - 1; i >= 0; i--) { //填充每一帧的数据 - int len = Math.min(buffer.remaining() - HEADER_SIZE, all.length - pos); + int len = Math.min(buffer.remaining() - HEADER_SIZE, bodyLength - pos); fillHeader(buffer, seqid, actionid, bodyLength, frames, i, pos, len); - buffer.put(all, pos, len); - pos += len; + pos += bw.toBuffer(pos, buffer); buffer.flip(); Thread.sleep(1); conn.write(buffer).get(writeto > 0 ? writeto : 5, TimeUnit.SECONDS); buffer.clear(); } + convert.offerBsonWriter(bw); } else { //只有一帧的数据 - { - //---------------------head---------------------------------- - int len = 2; - for (byte[] bs : bytesarray) { - len += 4 + bs.length; - } - fillHeader(buffer, seqid, actionid, bodyLength, 1, 0, 0, len); - //---------------------body---------------------------------- - buffer.putChar((char) bytesarray.length); //参数数组大小 - for (byte[] bs : bytesarray) { - buffer.putInt(bs.length); - buffer.put(bs); - } - buffer.flip(); - } + //---------------------head---------------------------------- + fillHeader(buffer, seqid, actionid, bodyLength, 1, 0, 0, bodyLength); + //---------------------body---------------------------------- + bw.toBuffer(buffer); + convert.offerBsonWriter(bw); + buffer.flip(); conn.write(buffer).get(writeto > 0 ? writeto : 5, TimeUnit.SECONDS); buffer.clear(); } diff --git a/src/com/wentch/redkale/net/sncp/SncpDynServlet.java b/src/com/wentch/redkale/net/sncp/SncpDynServlet.java index 5f31fe034..a21c24b26 100644 --- a/src/com/wentch/redkale/net/sncp/SncpDynServlet.java +++ b/src/com/wentch/redkale/net/sncp/SncpDynServlet.java @@ -85,11 +85,15 @@ public final class SncpDynServlet extends SncpServlet { if (action == null) { response.finish(SncpResponse.RETCODE_ILLACTIONID, null); //无效actionid } else { + BsonReader in = action.convert.pollBsonReader(); try { - response.finish(0, action.action(request.getParamBytes())); + in.setBytes(request.getBody()); + response.finish(0, action.action(in)); } catch (Throwable t) { response.getContext().getLogger().log(Level.INFO, "sncp execute error(" + request + ")", t); response.finish(SncpResponse.RETCODE_THROWEXCEPTION, null); + } finally { + action.convert.offerBsonReader(in); } } } @@ -103,28 +107,28 @@ public final class SncpDynServlet extends SncpServlet { protected java.lang.reflect.Type[] paramTypes; //index=0表示返回参数的type, void的返回参数类型为null - public abstract byte[] action(byte[][] bytes) throws Throwable; + public abstract byte[] action(final BsonReader in) throws Throwable; /* * * public class TestService implements Service { - * public boolean change(TestBean bean, String name, int id) { + * public boolean change(TestBean bean, String name, int id) { * - * } + * } * } * * public class DynActionTestService_change extends SncpServletAction { * - * public TestService service; + * public TestService service; * - * @Override - * public byte[] action(byte[][] bytes) throws Throwable { - * TestBean arg1 = convert.convertFrom(paramTypes[1], bytes[1]); - * String arg2 = convert.convertFrom(paramTypes[2], bytes[2]); - * int arg3 = convert.convertFrom(paramTypes[3], bytes[3]); - * Object rs = service.change(arg1, arg2, arg3); - * return convert.convertTo(paramTypes[0], rs); - * } + * @Override + * public byte[] action(final BsonReader in) throws Throwable { + * TestBean arg1 = convert.convertFrom(in, paramTypes[1]); + * String arg2 = convert.convertFrom(in, paramTypes[2]); + * int arg3 = convert.convertFrom(in, paramTypes[3]); + * Object rs = service.change(arg1, arg2, arg3); + * return convert.convertTo(paramTypes[0], rs); + * } * } */ /** @@ -140,6 +144,7 @@ public final class SncpDynServlet extends SncpServlet { final String supDynName = SncpServletAction.class.getName().replace('.', '/'); final String serviceName = serviceClass.getName().replace('.', '/'); final String convertName = BsonConvert.class.getName().replace('.', '/'); + final String convertReaderDesc = Type.getDescriptor(BsonReader.class); final String serviceDesc = Type.getDescriptor(serviceClass); String newDynName = serviceName.substring(0, serviceName.lastIndexOf('/') + 1) + "DynAction" + serviceClass.getSimpleName() + "_" + method.getName() + "_" + actionid; @@ -173,14 +178,14 @@ public final class SncpDynServlet extends SncpServlet { mv.visitMaxs(1, 1); mv.visitEnd(); } - String convertFromDesc = "(Ljava/lang/reflect/Type;[B)Ljava/lang/Object;"; + String convertFromDesc = "(" + convertReaderDesc + "Ljava/lang/reflect/Type;)Ljava/lang/Object;"; try { - convertFromDesc = Type.getMethodDescriptor(BsonConvert.class.getMethod("convertFrom", java.lang.reflect.Type.class, byte[].class)); + convertFromDesc = Type.getMethodDescriptor(BsonConvert.class.getMethod("convertFrom", BsonReader.class, java.lang.reflect.Type.class)); } catch (Exception ex) { throw new RuntimeException(ex); //不可能会发生 } { // action方法 - mv = new DebugMethodVisitor(cw.visitMethod(ACC_PUBLIC, "action", "([[B)[B", null, new String[]{"java/lang/Throwable"})); + mv = new DebugMethodVisitor(cw.visitMethod(ACC_PUBLIC, "action", "(" + convertReaderDesc + ")[B", null, new String[]{"java/lang/Throwable"})); //mv.setDebug(true); int iconst = ICONST_1; int intconst = 1; @@ -190,6 +195,7 @@ public final class SncpDynServlet extends SncpServlet { for (int i = 0; i < paramClasses.length; i++) { //参数 mv.visitVarInsn(ALOAD, 0); mv.visitFieldInsn(GETFIELD, newDynName, "convert", Type.getDescriptor(BsonConvert.class)); + mv.visitVarInsn(ALOAD, 1); mv.visitVarInsn(ALOAD, 0); mv.visitFieldInsn(GETFIELD, newDynName, "paramTypes", "[Ljava/lang/reflect/Type;"); if (iconst > ICONST_5) { @@ -198,13 +204,7 @@ public final class SncpDynServlet extends SncpServlet { mv.visitInsn(iconst); // } mv.visitInsn(AALOAD); - mv.visitVarInsn(ALOAD, 1); - if (iconst > ICONST_5) { - mv.visitIntInsn(BIPUSH, intconst); - } else { - mv.visitInsn(iconst); // - } - mv.visitInsn(AALOAD); + mv.visitMethodInsn(INVOKEVIRTUAL, convertName, "convertFrom", convertFromDesc, false); int load = ALOAD; int v = 0; diff --git a/src/com/wentch/redkale/net/sncp/SncpRequest.java b/src/com/wentch/redkale/net/sncp/SncpRequest.java index 6c16ef564..7f84d0347 100644 --- a/src/com/wentch/redkale/net/sncp/SncpRequest.java +++ b/src/com/wentch/redkale/net/sncp/SncpRequest.java @@ -40,8 +40,7 @@ public final class SncpRequest extends Request { private int bodyoffset; - private byte[][] paramBytes; - + //private byte[][] paramBytes; private boolean ping; private byte[] body; @@ -86,14 +85,8 @@ public final class SncpRequest extends Request { this.framelength = buffer.getChar(); //---------------------body---------------------------------- if (this.framecount == 1) { //只有一帧的数据 - int paramlen = buffer.getChar(); - byte[][] bbytes = new byte[paramlen + 1][]; //占位第0个byte[] - for (int i = 1; i <= paramlen; i++) { - byte[] bytes = new byte[buffer.getInt()]; - buffer.get(bytes); - bbytes[i] = bytes; - } - this.paramBytes = bbytes; + this.body = new byte[this.framelength]; + buffer.get(body); return 0; } //多帧数据 @@ -151,19 +144,6 @@ public final class SncpRequest extends Request { @Override protected void prepare() { this.keepAlive = this.channel.isTCP(); - if (this.body == null) return; - byte[] bytes = this.body; - int pos = 0; - int paramlen = ((0xff00 & (bytes[pos++] << 8)) | (0xff & bytes[pos++])); - byte[][] bbytes = new byte[paramlen + 1][]; //占位第0个byte[] - for (int i = 1; i <= paramlen; i++) { - byte[] bs = new byte[(0xff000000 & (bytes[pos++] << 24)) | (0xff0000 & (bytes[pos++] << 16)) - | (0xff00 & (bytes[pos++] << 8)) | (0xff & bytes[pos++])]; - System.arraycopy(bytes, pos, bs, 0, bs.length); - pos += bs.length; - bbytes[i] = bs; - } - this.paramBytes = bbytes; } @Override @@ -171,7 +151,7 @@ public final class SncpRequest extends Request { return SncpRequest.class.getSimpleName() + "{seqid=" + this.seqid + ",serviceid=" + this.serviceid + ",actionid=" + this.actionid + ",framecount=" + this.framecount + ",frameindex=" + this.frameindex + ",framelength=" + this.framelength - + ",bodylength=" + this.bodylength+ ",bodyoffset=" + this.bodyoffset + ",remoteAddress=" + remoteAddress + "}"; + + ",bodylength=" + this.bodylength + ",bodyoffset=" + this.bodyoffset + ",remoteAddress=" + remoteAddress + "}"; } @Override @@ -185,7 +165,7 @@ public final class SncpRequest extends Request { this.bodylength = 0; this.bodyoffset = 0; this.body = null; - this.paramBytes = null; + //this.paramBytes = null; this.ping = false; this.remoteAddress = null; this.bufferbytes[0] = 0; @@ -196,8 +176,11 @@ public final class SncpRequest extends Request { return ping; } - public byte[][] getParamBytes() { - return paramBytes; +// public byte[][] getParamBytes() { +// return paramBytes; +// } + public byte[] getBody() { + return body; } public long getSeqid() { diff --git a/src/com/wentch/redkale/net/sncp/SncpResponse.java b/src/com/wentch/redkale/net/sncp/SncpResponse.java index 7f82fab6f..9a56a5c71 100644 --- a/src/com/wentch/redkale/net/sncp/SncpResponse.java +++ b/src/com/wentch/redkale/net/sncp/SncpResponse.java @@ -53,7 +53,7 @@ public final class SncpResponse extends Response { final int frames = bodyLength / (buffer.capacity() - HEADER_SIZE) + (bodyLength % (buffer.capacity() - HEADER_SIZE) > 0 ? 1 : 0); if (frames <= 1) { //---------------------head---------------------------------- - fillHeader(buffer, bodyLength, 1, 0, retcode, 0, bytes.length); + fillHeader(buffer, bodyLength, 1, 0, retcode, 0, bytes == null ? 0 : bytes.length); //---------------------body---------------------------------- if (bytes != null) buffer.put(bytes); buffer.flip();