diff --git a/src/org/redkale/net/sncp/SncpAsyncHandler.java b/src/org/redkale/net/sncp/SncpAsyncHandler.java new file mode 100644 index 000000000..7541e9ff6 --- /dev/null +++ b/src/org/redkale/net/sncp/SncpAsyncHandler.java @@ -0,0 +1,33 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.net.sncp; + +import org.redkale.util.AsyncHandler; + +/** + * 异步回调函数 + * + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * @param 结果对象的泛型 + * @param 附件对象的泛型 + */ +public abstract class SncpAsyncHandler implements AsyncHandler { + + protected Object[] params; + + public Object[] getParams() { + return params; + } + + public void setParams(Object... params) { + this.params = params; + } + +} diff --git a/src/org/redkale/net/sncp/SncpClient.java b/src/org/redkale/net/sncp/SncpClient.java index c23e728dc..8345db22c 100644 --- a/src/org/redkale/net/sncp/SncpClient.java +++ b/src/org/redkale/net/sncp/SncpClient.java @@ -296,7 +296,7 @@ public final class SncpClient { final Attribute attr = action.paramAttrs[i]; attr.set(params[i - 1], bsonConvert.convertFrom(attr.type(), reader)); } - return bsonConvert.convertFrom(action.resultTypes, reader); + return bsonConvert.convertFrom(action.handlerFuncParamIndex >= 0 ? Object.class : action.resultTypes, reader); } catch (InterruptedException | ExecutionException | TimeoutException e) { logger.log(Level.SEVERE, actions[index].method + " sncp (params: " + jsonConvert.convertTo(params) + ") remote error", e); throw new RuntimeException(actions[index].method + " sncp remote error", e); @@ -438,7 +438,7 @@ public final class SncpClient { final Attribute attr = action.paramAttrs[i]; attr.set(params[i - 1], bsonConvert.convertFrom(attr.type(), reader)); } - Object rs = bsonConvert.convertFrom(action.resultTypes, reader); + Object rs = bsonConvert.convertFrom(action.handlerFuncParamIndex >= 0 ? Object.class : action.resultTypes, reader); handler.completed(rs, handlerAttach); } catch (Exception e) { handler.failed(e, handlerAttach); diff --git a/src/org/redkale/net/sncp/SncpDynServlet.java b/src/org/redkale/net/sncp/SncpDynServlet.java index 3173c591f..521b51092 100644 --- a/src/org/redkale/net/sncp/SncpDynServlet.java +++ b/src/org/redkale/net/sncp/SncpDynServlet.java @@ -120,32 +120,39 @@ public final class SncpDynServlet extends SncpServlet { BsonWriter out = action.convert.pollBsonWriter(bufferSupplier); out.writeTo(DEFAULT_HEADER); BsonReader in = action.convert.pollBsonReader(); + SncpAsyncHandler handler; try { - AsyncHandler handler = action.handlerFuncParamIndex >= 0 ? AsyncHandler.create((v, a) -> { - try { - action.convert.convertTo(out, Object.class, v); - response.finish(0, out); - } catch (Exception e) { - response.getContext().getLogger().log(Level.INFO, "sncp execute error(" + request + ")", e); - response.finish(SncpResponse.RETCODE_THROWEXCEPTION, null); - } finally { - action.convert.offerBsonReader(in); - action.convert.offerBsonWriter(out); + handler = action.handlerFuncParamIndex >= 0 ? new SncpAsyncHandler() { + @Override + public void completed(Object result, Object attachment) { + try { + action._callParameter(out, params); + action.convert.convertTo(out, Object.class, result); + response.finish(0, out); + } catch (Exception e) { + failed(e, attachment); + } finally { + action.convert.offerBsonReader(in); + action.convert.offerBsonWriter(out); + } } - }, (t, a) -> { - response.getContext().getLogger().log(Level.INFO, "sncp execute error(" + request + ")", t); - response.finish(SncpResponse.RETCODE_THROWEXCEPTION, null); - }) : null; + @Override + public void failed(Throwable exc, Object attachment) { + response.getContext().getLogger().log(Level.INFO, "sncp execute error(" + request + ")", exc); + response.finish(SncpResponse.RETCODE_THROWEXCEPTION, null); + } + } : null; in.setBytes(request.getBody()); - action.action(in, out); - response.finish(0, out); + action.action(in, out, handler); + if (handler == null) { + response.finish(0, out); + action.convert.offerBsonReader(in); + action.convert.offerBsonWriter(out); + } } catch (Throwable t) { response.getContext().getLogger().log(Level.INFO, "sncp execute error(" + request + ")", t); response.finish(SncpResponse.RETCODE_THROWEXCEPTION, null); - } finally { - action.convert.offerBsonReader(in); - action.convert.offerBsonWriter(out); } } } @@ -163,7 +170,7 @@ public final class SncpDynServlet extends SncpServlet { protected int handlerFuncParamIndex = -1; //handlerFuncParamIndex>=0表示存在AsyncHandler参数 - public abstract void action(final BsonReader in, final BsonWriter out) throws Throwable; + public abstract void action(final BsonReader in, final BsonWriter out, final SncpAsyncHandler handler) throws Throwable; public final void _callParameter(final BsonWriter out, final Object... params) { if (paramAttrs != null) { @@ -213,6 +220,8 @@ public final class SncpDynServlet extends SncpServlet { final String supDynName = SncpServletAction.class.getName().replace('.', '/'); final String serviceName = serviceClass.getName().replace('.', '/'); final String convertName = BsonConvert.class.getName().replace('.', '/'); + final String handlerName = SncpAsyncHandler.class.getName().replace('.', '/'); + final String asyncHandlerDesc = Type.getDescriptor(SncpAsyncHandler.class); final String convertReaderDesc = Type.getDescriptor(BsonReader.class); final String convertWriterDesc = Type.getDescriptor(BsonWriter.class); final String serviceDesc = Type.getDescriptor(serviceClass); @@ -256,11 +265,11 @@ public final class SncpDynServlet extends SncpServlet { } int handlerFuncIndex = -1; { // action方法 - mv = new AsmMethodVisitor(cw.visitMethod(ACC_PUBLIC, "action", "(" + convertReaderDesc + convertWriterDesc + ")V", null, new String[]{"java/lang/Throwable"})); + mv = new AsmMethodVisitor(cw.visitMethod(ACC_PUBLIC, "action", "(" + convertReaderDesc + convertWriterDesc + asyncHandlerDesc + ")V", null, new String[]{"java/lang/Throwable"})); //mv.setDebug(true); int iconst = ICONST_1; int intconst = 1; - int store = 3; //action的参数个数+1 + int store = 4; //action的参数个数+1 final Class[] paramClasses = method.getParameterTypes(); int[][] codes = new int[paramClasses.length][2]; for (int i = 0; i < paramClasses.length; i++) { //参数 @@ -269,6 +278,19 @@ public final class SncpDynServlet extends SncpServlet { throw new RuntimeException(method + " have more than one AsyncHandler type parameter"); } handlerFuncIndex = i; + mv.visitVarInsn(ALOAD, 3); + mv.visitVarInsn(ASTORE, store); + codes[i] = new int[]{ALOAD, store}; + store++; + iconst++; + intconst++; + mv.visitVarInsn(ALOAD, 0); + mv.visitFieldInsn(GETFIELD, newDynName, "convert", Type.getDescriptor(BsonConvert.class)); + mv.visitLdcInsn(Type.getType(Type.getDescriptor(AsyncHandler.class))); + mv.visitVarInsn(ALOAD, 1); + mv.visitMethodInsn(INVOKEVIRTUAL, convertName, "convertFrom", convertFromDesc, false); + mv.visitInsn(POP); + continue; } mv.visitVarInsn(ALOAD, 0); mv.visitFieldInsn(GETFIELD, newDynName, "convert", Type.getDescriptor(BsonConvert.class)); @@ -321,6 +343,43 @@ public final class SncpDynServlet extends SncpServlet { intconst++; store++; } + if (handlerFuncIndex >= 0) { //调用SncpAsyncHandler.setParams(Object... params) + mv.visitVarInsn(ALOAD, 3); + if (paramClasses.length > 5) { + mv.visitIntInsn(BIPUSH, paramClasses.length); + } else { + mv.visitInsn(paramClasses.length + ICONST_0); + } + mv.visitTypeInsn(ANEWARRAY, "java/lang/Object"); + int insn = 3; //action的参数个数 + for (int j = 0; j < paramClasses.length; j++) { + final Class pt = paramClasses[j]; + mv.visitInsn(DUP); + insn++; + if (j <= 5) { + mv.visitInsn(ICONST_0 + j); + } else { + mv.visitIntInsn(BIPUSH, j); + } + if (pt.isPrimitive()) { + if (pt == long.class) { + mv.visitVarInsn(LLOAD, insn++); + } else if (pt == float.class) { + mv.visitVarInsn(FLOAD, insn++); + } else if (pt == double.class) { + mv.visitVarInsn(DLOAD, insn++); + } else { + mv.visitVarInsn(ILOAD, insn); + } + Class bigclaz = java.lang.reflect.Array.get(java.lang.reflect.Array.newInstance(pt, 1), 0).getClass(); + mv.visitMethodInsn(INVOKESTATIC, bigclaz.getName().replace('.', '/'), "valueOf", "(" + Type.getDescriptor(pt) + ")" + Type.getDescriptor(bigclaz), false); + } else { + mv.visitVarInsn(ALOAD, insn); + } + mv.visitInsn(AASTORE); + } + mv.visitMethodInsn(INVOKEVIRTUAL, handlerName, "setParams", "([Ljava/lang/Object;)V", false); + } { //调用service mv.visitVarInsn(ALOAD, 0); mv.visitFieldInsn(GETFIELD, newDynName, "service", serviceDesc); @@ -343,7 +402,7 @@ public final class SncpDynServlet extends SncpServlet { } mv.visitVarInsn(ASTORE, store); //11 } - if (true) { + if (handlerFuncIndex < 0) { //------------------------- _callParameter 方法 -------------------------------- mv.visitVarInsn(ALOAD, 0); mv.visitVarInsn(ALOAD, 2); @@ -353,7 +412,7 @@ public final class SncpDynServlet extends SncpServlet { mv.visitIntInsn(BIPUSH, paramClasses.length); } mv.visitTypeInsn(ANEWARRAY, "java/lang/Object"); - int insn = 2; + int insn = 3;//action的参数个数 for (int j = 0; j < paramClasses.length; j++) { final Class pt = paramClasses[j]; mv.visitInsn(DUP); diff --git a/test/org/redkale/test/service/ABMainService.java b/test/org/redkale/test/service/ABMainService.java index d1a176fbc..a00b3e860 100644 --- a/test/org/redkale/test/service/ABMainService.java +++ b/test/org/redkale/test/service/ABMainService.java @@ -50,7 +50,7 @@ public class ABMainService implements Service { //------------------------ 初始化 BCService ------------------------------------ final Transport bctransport = new Transport("", WatchFactory.root(), "", newBufferPool(), newChannelGroup(), null, Utility.ofSet(new InetSocketAddress("127.0.0.1", 5577))); BCService bcservice = Sncp.createLocalService("", null, ResourceFactory.root(), BCService.class, new InetSocketAddress("127.0.0.1", 5588), bctransport, null); - CService remoteCService = Sncp.createRemoteService("", null, CService.class, new InetSocketAddress("127.0.0.1", 5588), bctransport); + CService remoteCService = Sncp.createRemoteService("", null, CService.class, new InetSocketAddress("127.0.0.1", 5588), bctransport); factory.inject(remoteCService); factory.register("", remoteCService); SncpServer bcserver = new SncpServer(); @@ -61,7 +61,7 @@ public class ABMainService implements Service { //------------------------ 初始化 ABMainService ------------------------------------ final Transport abtransport = new Transport("", WatchFactory.root(), "", newBufferPool(), newChannelGroup(), null, Utility.ofSet(new InetSocketAddress("127.0.0.1", 5588))); ABMainService service = Sncp.createLocalService("", null, ResourceFactory.root(), ABMainService.class, new InetSocketAddress("127.0.0.1", 5599), bctransport, null); - BCService remoteBCService = Sncp.createRemoteService("", null, BCService.class, new InetSocketAddress("127.0.0.1", 5599), abtransport); + BCService remoteBCService = Sncp.createRemoteService("", null, BCService.class, new InetSocketAddress("127.0.0.1", 5599), abtransport); factory.inject(remoteBCService); factory.register("", remoteBCService); @@ -157,8 +157,9 @@ public class ABMainService implements Service { bcService.bcCurrentTime(AsyncHandler.create((v, a) -> { System.out.println("执行了 ABMainService.abCurrentTime----异步方法"); String rs = "异步abCurrentTime: " + v; - if (handler != null) handler.completed(rs, null); + if (handler != null) handler.completed(rs, a); }, (t, a) -> { + if (handler != null) handler.failed(t, a); }), name); } } diff --git a/test/org/redkale/test/service/BCService.java b/test/org/redkale/test/service/BCService.java index a77acd295..046109369 100644 --- a/test/org/redkale/test/service/BCService.java +++ b/test/org/redkale/test/service/BCService.java @@ -27,9 +27,10 @@ public class BCService implements Service { public void bcCurrentTime(final AsyncHandler handler, final String name) { cService.ccCurrentTime(AsyncHandler.create((v, a) -> { System.out.println("执行了 BCService.bcCurrentTime----异步方法"); - String rs = "异步bcCurrentTime: " + v.getResult(); + String rs = "异步bcCurrentTime: " + (v == null ? null : v.getResult()); if (handler != null) handler.completed(rs, null); }, (t, a) -> { + if (handler != null) handler.failed(t, a); }), name); } }