diff --git a/src/org/redkale/net/sncp/SncpAsyncHandler.java b/src/org/redkale/net/sncp/SncpAsyncHandler.java index a549ecf27..3abf70cac 100644 --- a/src/org/redkale/net/sncp/SncpAsyncHandler.java +++ b/src/org/redkale/net/sncp/SncpAsyncHandler.java @@ -5,6 +5,7 @@ */ package org.redkale.net.sncp; +import java.util.concurrent.CompletableFuture; import java.util.logging.Level; import jdk.internal.org.objectweb.asm.*; import static jdk.internal.org.objectweb.asm.Opcodes.*; @@ -31,6 +32,10 @@ public interface SncpAsyncHandler extends AsyncHandler { public void sncp_setParams(Object... params); + public void sncp_setFuture(CompletableFuture future); + + public CompletableFuture sncp_getFuture(); + static class Factory { /** @@ -45,6 +50,8 @@ public interface SncpAsyncHandler extends AsyncHandler { * * private SncpAsyncHandler sncphandler; * + * private CompletableFuture sncpfuture; + * * @java.beans.ConstructorProperties({"sncphandler"}) * public XXXAsyncHandler_DyncSncpAsyncHandler_4323(SncpAsyncHandler sncphandler) { * super(); @@ -70,6 +77,16 @@ public interface SncpAsyncHandler extends AsyncHandler { * public void sncp_setParams(Object... params) { * sncphandler.sncp_setParams(params); * } + * + * @Override + * public void sncp_setFuture(CompletableFuture future) { + * this.sncpfuture = future; + * } + * + * @Override + * public CompletableFuture sncp_getFuture() { + * return this.sncpfuture; + * } * } * * @@ -82,22 +99,27 @@ public interface SncpAsyncHandler extends AsyncHandler { //------------------------------------------------------------- final boolean handlerinterface = handlerClass.isInterface(); final String handlerClassName = handlerClass.getName().replace('.', '/'); - final String sncyHandlerName = SncpAsyncHandler.class.getName().replace('.', '/'); - final String sncyHandlerDesc = Type.getDescriptor(SncpAsyncHandler.class); + final String sncpHandlerName = SncpAsyncHandler.class.getName().replace('.', '/'); + final String sncpHandlerDesc = Type.getDescriptor(SncpAsyncHandler.class); + final String sncpFutureDesc = Type.getDescriptor(CompletableFuture.class); final String newDynName = handlerClass.getName().replace('.', '/') + "_Dync" + SncpAsyncHandler.class.getSimpleName() + "_" + (System.currentTimeMillis() % 10000); ClassWriter cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES); FieldVisitor fv; AsmMethodVisitor mv; AnnotationVisitor av0; - cw.visit(V1_8, ACC_PUBLIC + ACC_SUPER, newDynName, null, handlerinterface ? "java/lang/Object" : handlerClassName, handlerinterface ? new String[]{handlerClassName, sncyHandlerName} : new String[]{sncyHandlerName}); + cw.visit(V1_8, ACC_PUBLIC + ACC_SUPER, newDynName, null, handlerinterface ? "java/lang/Object" : handlerClassName, handlerinterface ? new String[]{handlerClassName, sncpHandlerName} : new String[]{sncpHandlerName}); { //handler 属性 - fv = cw.visitField(ACC_PRIVATE, "sncphandler", sncyHandlerDesc, null, null); + fv = cw.visitField(ACC_PRIVATE, "sncphandler", sncpHandlerDesc, null, null); + fv.visitEnd(); + } + { //future 属性 + fv = cw.visitField(ACC_PRIVATE, "sncpfuture", sncpFutureDesc, null, null); fv.visitEnd(); } {//构造方法 - mv = new AsmMethodVisitor(cw.visitMethod(ACC_PUBLIC, "", "(" + sncyHandlerDesc + ")V", null, null)); + mv = new AsmMethodVisitor(cw.visitMethod(ACC_PUBLIC, "", "(" + sncpHandlerDesc + ")V", null, null)); //mv.setDebug(true); { av0 = mv.visitAnnotation("Ljava/beans/ConstructorProperties;", true); @@ -112,7 +134,7 @@ public interface SncpAsyncHandler extends AsyncHandler { mv.visitMethodInsn(INVOKESPECIAL, handlerinterface ? "java/lang/Object" : handlerClassName, "", "()V", false); mv.visitVarInsn(ALOAD, 0); mv.visitVarInsn(ALOAD, 1); - mv.visitFieldInsn(PUTFIELD, newDynName, "sncphandler", sncyHandlerDesc); + mv.visitFieldInsn(PUTFIELD, newDynName, "sncphandler", sncpHandlerDesc); mv.visitInsn(RETURN); mv.visitMaxs(2, 2); mv.visitEnd(); @@ -122,20 +144,20 @@ public interface SncpAsyncHandler extends AsyncHandler { if ("completed".equals(method.getName()) && method.getParameterCount() == 2) { mv = new AsmMethodVisitor(cw.visitMethod(ACC_PUBLIC, "completed", Type.getMethodDescriptor(method), null, null)); mv.visitVarInsn(ALOAD, 0); - mv.visitFieldInsn(GETFIELD, newDynName, "sncphandler", sncyHandlerDesc); + mv.visitFieldInsn(GETFIELD, newDynName, "sncphandler", sncpHandlerDesc); mv.visitVarInsn(ALOAD, 1); mv.visitVarInsn(ALOAD, 2); - mv.visitMethodInsn(INVOKEINTERFACE, sncyHandlerName, "completed", "(Ljava/lang/Object;Ljava/lang/Object;)V", true); + mv.visitMethodInsn(INVOKEINTERFACE, sncpHandlerName, "completed", "(Ljava/lang/Object;Ljava/lang/Object;)V", true); mv.visitInsn(RETURN); mv.visitMaxs(3, 3); mv.visitEnd(); } else if ("failed".equals(method.getName()) && method.getParameterCount() == 2) { mv = new AsmMethodVisitor(cw.visitMethod(ACC_PUBLIC, "failed", Type.getMethodDescriptor(method), null, null)); mv.visitVarInsn(ALOAD, 0); - mv.visitFieldInsn(GETFIELD, newDynName, "sncphandler", sncyHandlerDesc); + mv.visitFieldInsn(GETFIELD, newDynName, "sncphandler", sncpHandlerDesc); mv.visitVarInsn(ALOAD, 1); mv.visitVarInsn(ALOAD, 2); - mv.visitMethodInsn(INVOKEINTERFACE, sncyHandlerName, "failed", "(Ljava/lang/Throwable;Ljava/lang/Object;)V", true); + mv.visitMethodInsn(INVOKEINTERFACE, sncpHandlerName, "failed", "(Ljava/lang/Throwable;Ljava/lang/Object;)V", true); mv.visitInsn(RETURN); mv.visitMaxs(3, 3); mv.visitEnd(); @@ -171,8 +193,8 @@ public interface SncpAsyncHandler extends AsyncHandler { { // sncp_getParams mv = new AsmMethodVisitor(cw.visitMethod(ACC_PUBLIC, "sncp_getParams", "()[Ljava/lang/Object;", null, null)); mv.visitVarInsn(ALOAD, 0); - mv.visitFieldInsn(GETFIELD, newDynName, "sncphandler", sncyHandlerDesc); - mv.visitMethodInsn(INVOKEINTERFACE, sncyHandlerName, "sncp_getParams", "()[Ljava/lang/Object;", true); + mv.visitFieldInsn(GETFIELD, newDynName, "sncphandler", sncpHandlerDesc); + mv.visitMethodInsn(INVOKEINTERFACE, sncpHandlerName, "sncp_getParams", "()[Ljava/lang/Object;", true); mv.visitInsn(ARETURN); mv.visitMaxs(1, 1); mv.visitEnd(); @@ -180,13 +202,30 @@ public interface SncpAsyncHandler extends AsyncHandler { { // sncp_setParams mv = new AsmMethodVisitor(cw.visitMethod(ACC_PUBLIC + ACC_VARARGS, "sncp_setParams", "([Ljava/lang/Object;)V", null, null)); mv.visitVarInsn(ALOAD, 0); - mv.visitFieldInsn(GETFIELD, newDynName, "sncphandler", sncyHandlerDesc); + mv.visitFieldInsn(GETFIELD, newDynName, "sncphandler", sncpHandlerDesc); mv.visitVarInsn(ALOAD, 1); - mv.visitMethodInsn(INVOKEINTERFACE, sncyHandlerName, "sncp_setParams", "([Ljava/lang/Object;)V", true); + mv.visitMethodInsn(INVOKEINTERFACE, sncpHandlerName, "sncp_setParams", "([Ljava/lang/Object;)V", true); mv.visitInsn(RETURN); mv.visitMaxs(2, 2); mv.visitEnd(); } + { // sncp_setFuture + mv = new AsmMethodVisitor(cw.visitMethod(ACC_PUBLIC, "sncp_setFuture", "(" + sncpFutureDesc + ")V", null, null)); + mv.visitVarInsn(ALOAD, 0); + mv.visitVarInsn(ALOAD, 1); + mv.visitFieldInsn(PUTFIELD, newDynName, "sncpfuture", sncpFutureDesc); + mv.visitInsn(RETURN); + mv.visitMaxs(2, 2); + mv.visitEnd(); + } + { // sncp_getFuture + mv = new AsmMethodVisitor(cw.visitMethod(ACC_PUBLIC, "sncp_getFuture", "()" + sncpFutureDesc, null, null)); + mv.visitVarInsn(ALOAD, 0); + mv.visitFieldInsn(GETFIELD, newDynName, "sncpfuture", sncpFutureDesc); + mv.visitInsn(ARETURN); + mv.visitMaxs(1, 1); + mv.visitEnd(); + } cw.visitEnd(); byte[] bytes = cw.toByteArray(); Class newHandlerClazz = (Class) new ClassLoader(handlerClass.getClassLoader()) { @@ -214,6 +253,8 @@ public interface SncpAsyncHandler extends AsyncHandler { protected SncpResponse response; + protected CompletableFuture future; + public DefaultSncpAsyncHandler(SncpServletAction action, BsonReader in, BsonWriter out, SncpRequest request, SncpResponse response) { this.action = action; this.in = in; @@ -252,5 +293,15 @@ public interface SncpAsyncHandler extends AsyncHandler { this.params = params; } + @Override + public void sncp_setFuture(CompletableFuture future) { + this.future = future; + } + + @Override + public CompletableFuture sncp_getFuture() { + return this.future; + } + } } diff --git a/src/org/redkale/net/sncp/SncpClient.java b/src/org/redkale/net/sncp/SncpClient.java index a08d13fb8..439cbde08 100644 --- a/src/org/redkale/net/sncp/SncpClient.java +++ b/src/org/redkale/net/sncp/SncpClient.java @@ -53,6 +53,10 @@ public final class SncpClient { protected final int addressSourceParamIndex; + protected final boolean boolReturnTypeFuture; // 返回结果类型是否为 CompletableFuture + + protected final Creator futureCreator; + public SncpAction(final Class clazz, Method method, DLong actionid) { this.actionid = actionid == null ? Sncp.hash(method) : actionid; Type rt = method.getGenericReturnType(); @@ -61,6 +65,8 @@ public final class SncpClient { if (tv.getBounds().length == 1) rt = tv.getBounds()[0]; } this.resultTypes = rt == void.class ? null : rt; + this.boolReturnTypeFuture = CompletableFuture.class.isAssignableFrom(method.getReturnType()); + this.futureCreator = boolReturnTypeFuture ? Creator.create((Class) method.getReturnType()) : null; this.paramTypes = method.getGenericParameterTypes(); this.paramClass = method.getParameterTypes(); this.method = method; @@ -75,6 +81,9 @@ public final class SncpClient { Class[] params = method.getParameterTypes(); for (int i = 0; i < params.length; i++) { if (AsyncHandler.class.isAssignableFrom(params[i])) { + if (boolReturnTypeFuture) { + throw new RuntimeException(method + " have both AsyncHandler and CompletableFuture"); + } if (handlerFuncIndex >= 0) { throw new RuntimeException(method + " have more than one AsyncHandler type parameter"); } @@ -292,9 +301,32 @@ public final class SncpClient { final SncpAction action = actions[index]; final AsyncHandler handlerFunc = action.handlerFuncParamIndex >= 0 ? (AsyncHandler) params[action.handlerFuncParamIndex] : null; if (action.handlerFuncParamIndex >= 0) params[action.handlerFuncParamIndex] = null; - SncpFuture future = remote0(handlerFunc, bsonConvert, jsonConvert, transport, null, action, params); - if (handlerFunc != null) return null; final BsonReader reader = bsonConvert.pollBsonReader(); + CompletableFuture future = remote0(handlerFunc, bsonConvert, jsonConvert, transport, null, action, params); + if (action.boolReturnTypeFuture) { + CompletableFuture result = action.futureCreator.create(); + future.whenCompleteAsync((v, e) -> { + try { + if (e != null) { + result.completeExceptionally(e); + } else { + reader.setBytes(v); + byte i; + while ((i = reader.readByte()) != 0) { + final Attribute attr = action.paramAttrs[i]; + attr.set(params[i - 1], bsonConvert.convertFrom(attr.type(), reader)); + } + Object rs = bsonConvert.convertFrom(Object.class, reader); + + result.complete(rs); + } + } finally { + bsonConvert.offerBsonReader(reader); + } + }); //需要获取 Executor + return (T) result; + } + if (handlerFunc != null) return null; try { reader.setBytes(future.get(5, TimeUnit.SECONDS)); byte i; @@ -319,7 +351,7 @@ public final class SncpClient { } } - private SncpFuture remote0(final AsyncHandler handler, final BsonConvert bsonConvert, final JsonConvert jsonConvert, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) { + private CompletableFuture remote0(final AsyncHandler handler, final BsonConvert bsonConvert, final JsonConvert jsonConvert, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) { if ("rest".equalsIgnoreCase(transport.getSubprotocol())) { return remoteRest0(handler, jsonConvert, transport, addr0, action, params); } @@ -327,11 +359,11 @@ public final class SncpClient { } //尚未实现 - private SncpFuture remoteRest0(final AsyncHandler handler, final JsonConvert jsonConvert, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) { + private CompletableFuture remoteRest0(final AsyncHandler handler, final JsonConvert jsonConvert, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) { return null; } - private SncpFuture remoteSncp0(final AsyncHandler handler, final BsonConvert bsonConvert, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) { + private CompletableFuture remoteSncp0(final AsyncHandler handler, final BsonConvert bsonConvert, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) { Type[] myparamtypes = action.paramTypes; Class[] myparamclass = action.paramClass; if (action.addressSourceParamIndex >= 0) params[action.addressSourceParamIndex] = this.clientAddress; @@ -353,7 +385,7 @@ public final class SncpClient { fillHeader(sendBuffers[0], seqid, actionid, reqBodyLength); final ByteBuffer buffer = transport.pollBuffer(); - final SncpFuture future = new SncpFuture(false); + final CompletableFuture future = new CompletableFuture(); conn.write(sendBuffers, sendBuffers, new CompletionHandler() { @Override @@ -387,7 +419,7 @@ public final class SncpClient { @Override public void completed(Integer count, Void attachment2) { if (count < 1 && buffer.remaining() == buffer.limit()) { //没有数据可读 - future.set(new RuntimeException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote no response data")); + future.completeExceptionally(new RuntimeException(action.method + " sncp[" + conn.getRemoteAddress() + "] remote no response data")); transport.offerBuffer(buffer); transport.offerConnection(true, conn); return; @@ -432,7 +464,7 @@ public final class SncpClient { } public void success() { - future.set(this.body); + future.complete(this.body); transport.offerBuffer(buffer); transport.offerConnection(false, conn); if (handler != null) { @@ -458,7 +490,7 @@ public final class SncpClient { @Override public void failed(Throwable exc, Void attachment2) { logger.log(Level.SEVERE, action.method + " sncp (params: " + convert.convertTo(params) + ") remote read exec failed", exc); - future.set(new RuntimeException(action.method + " sncp remote exec failed")); + future.completeExceptionally(new RuntimeException(action.method + " sncp remote exec failed")); transport.offerBuffer(buffer); transport.offerConnection(true, conn); if (handler != null) { @@ -510,91 +542,4 @@ public final class SncpClient { buffer.position(currentpos); } - protected static final class SncpFuture implements Future { - - private volatile boolean done; - - private T result; - - private RuntimeException ex; - - private final boolean rest; - - public SncpFuture(boolean rest) { - this.rest = rest; - } - - public SncpFuture(boolean rest, T result) { - this.rest = rest; - this.result = result; - this.done = true; - } - - public boolean isRest() { - return this.rest; - } - - public void set(T result) { - this.result = result; - this.done = true; - synchronized (this) { - notifyAll(); - } - } - - public void set(RuntimeException ex) { - this.ex = ex; - this.done = true; - synchronized (this) { - notifyAll(); - } - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return false; - } - - @Override - public boolean isCancelled() { - return false; - } - - @Override - public boolean isDone() { - return done; - } - - @Override - public T get() throws InterruptedException, ExecutionException { - if (done) { - if (ex != null) throw ex; - return result; - } - synchronized (this) { - if (!done) wait(10_000); - } - if (done) { - if (ex != null) throw ex; - return result; - } - throw new InterruptedException(); - } - - @Override - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - if (done) { - if (ex != null) throw ex; - return result; - } - synchronized (this) { - if (!done) wait(unit.toMillis(timeout)); - } - if (done) { - if (ex != null) throw ex; - return result; - } - throw new TimeoutException(); - } - } } diff --git a/src/org/redkale/net/sncp/SncpDynServlet.java b/src/org/redkale/net/sncp/SncpDynServlet.java index b55ed98ff..c6862d3f0 100644 --- a/src/org/redkale/net/sncp/SncpDynServlet.java +++ b/src/org/redkale/net/sncp/SncpDynServlet.java @@ -11,6 +11,7 @@ import java.lang.annotation.*; import java.lang.reflect.*; import java.nio.*; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.function.*; import java.util.logging.*; import javax.annotation.*; @@ -113,7 +114,7 @@ public final class SncpDynServlet extends SncpServlet { if (bufferSupplier == null) { bufferSupplier = request.getContext().getBufferSupplier(); } - SncpServletAction action = actions.get(request.getActionid()); + final SncpServletAction action = actions.get(request.getActionid()); //if (finest) logger.log(Level.FINEST, "sncpdyn.execute: " + request + ", " + (action == null ? "null" : action.method)); if (action == null) { response.finish(SncpResponse.RETCODE_ILLACTIONID, null); //无效actionid @@ -134,6 +135,8 @@ public final class SncpDynServlet extends SncpServlet { } handler = creator.create(new DefaultSncpAsyncHandler(action, in, out, request, response)); } + } else if (action.boolReturnTypeFuture) { + handler = new DefaultSncpAsyncHandler(action, in, out, request, response); } in.setBytes(request.getBody()); action.action(in, out, handler); @@ -141,6 +144,26 @@ public final class SncpDynServlet extends SncpServlet { response.finish(0, out); action.convert.offerBsonReader(in); action.convert.offerBsonWriter(out); + } else if (action.boolReturnTypeFuture) { + CompletableFuture future = handler.sncp_getFuture(); + if (future == null) { + action._callParameter(out, handler.sncp_getParams()); + action.convert.convertTo(out, Object.class, null); + } else { + Object[] sncpParams = handler.sncp_getParams(); + future.whenCompleteAsync((v, e) -> { + if (e != null) { + response.getContext().getLogger().log(Level.INFO, "sncp CompleteAsync error(" + request + ")", e); + response.finish(SncpResponse.RETCODE_THROWEXCEPTION, null); + return; + } + action._callParameter(out, sncpParams); + action.convert.convertTo(out, Object.class, v); + response.finish(0, out); + action.convert.offerBsonReader(in); + action.convert.offerBsonWriter(out); + }, getExecutor()); + } } } catch (Throwable t) { response.getContext().getLogger().log(Level.INFO, "sncp execute error(" + request + ")", t); @@ -164,10 +187,13 @@ public final class SncpDynServlet extends SncpServlet { protected int handlerFuncParamIndex = -1; //handlerFuncParamIndex>=0表示存在AsyncHandler参数 + protected boolean boolReturnTypeFuture = false; // 返回结果类型是否为 CompletableFuture + protected Class handlerFuncParamClass; //AsyncHandler参数的类型 public abstract void action(final BsonReader in, final BsonWriter out, final SncpAsyncHandler handler) throws Throwable; + //只有同步方法才调用 (没有AsyncHandler、CompletableFuture) public final void _callParameter(final BsonWriter out, final Object... params) { if (paramAttrs != null) { for (int i = 1; i < paramAttrs.length; i++) { @@ -193,6 +219,10 @@ public final class SncpDynServlet extends SncpServlet { * * public void update(long show, short v2, AsyncHandler<Boolean, TestBean> handler, TestBean bean, String name, int id) { * } + * + * public CompletableFuture<String> changeName(TestBean bean, String name, int id) { + * return null; + * } * } * * @@ -245,6 +275,22 @@ public final class SncpDynServlet extends SncpServlet { * } * } * + * + * class DynActionTestService_changeName extends SncpServletAction { + * + * public TestService service; + * + * @Override + * public void action(final BsonReader in, final BsonWriter out, final SncpAsyncHandler handler) throws Throwable { + * TestBean arg1 = convert.convertFrom(paramTypes[1], in); + * String arg2 = convert.convertFrom(paramTypes[2], in); + * int arg3 = convert.convertFrom(paramTypes[3], in); + * handler.sncp_setParams(arg1, arg2, arg3); + * CompletableFuture future = service.changeName(arg1, arg2, arg3); + * handler.sncp_setFuture(future); + * } + * } + * * * * @param service Service @@ -264,6 +310,7 @@ public final class SncpDynServlet extends SncpServlet { final String convertReaderDesc = Type.getDescriptor(BsonReader.class); final String convertWriterDesc = Type.getDescriptor(BsonWriter.class); final String serviceDesc = Type.getDescriptor(serviceClass); + final boolean boolReturnTypeFuture = CompletableFuture.class.isAssignableFrom(method.getReturnType()); String newDynName = serviceName.substring(0, serviceName.lastIndexOf('/') + 1) + "DynAction" + serviceClass.getSimpleName() + "_" + method.getName() + "_" + actionid; while (true) { @@ -312,8 +359,11 @@ public final class SncpDynServlet extends SncpServlet { int store = 4; //action的参数个数+1 final Class[] paramClasses = method.getParameterTypes(); int[][] codes = new int[paramClasses.length][2]; - for (int i = 0; i < paramClasses.length; i++) { //参数 + for (int i = 0; i < paramClasses.length; i++) { //反序列化方法的每个参数 if (AsyncHandler.class.isAssignableFrom(paramClasses[i])) { + if (boolReturnTypeFuture) { + throw new RuntimeException(method + " have both AsyncHandler and CompletableFuture"); + } if (handlerFuncIndex >= 0) { throw new RuntimeException(method + " have more than one AsyncHandler type parameter"); } @@ -386,7 +436,7 @@ public final class SncpDynServlet extends SncpServlet { intconst++; store++; } - if (handlerFuncIndex >= 0) { //调用SncpAsyncHandler.setParams(Object... params) + if (boolReturnTypeFuture || handlerFuncIndex >= 0) { //调用SncpAsyncHandler.setParams(Object... params) mv.visitVarInsn(ALOAD, 3); if (paramClasses.length > 5) { mv.visitIntInsn(BIPUSH, paramClasses.length); @@ -444,8 +494,13 @@ public final class SncpDynServlet extends SncpServlet { } } mv.visitVarInsn(ASTORE, store); //11 + if (boolReturnTypeFuture) { + mv.visitVarInsn(ALOAD, 3); + mv.visitVarInsn(ALOAD, store); + mv.visitMethodInsn(INVOKEINTERFACE, handlerName, "sncp_setFuture", "(Ljava/util/concurrent/CompletableFuture;)V", true); + } } - if (handlerFuncIndex < 0) { + if (!boolReturnTypeFuture && handlerFuncIndex < 0) { //同步方法 //------------------------- _callParameter 方法 -------------------------------- mv.visitVarInsn(ALOAD, 0); mv.visitVarInsn(ALOAD, 2); @@ -486,10 +541,10 @@ public final class SncpDynServlet extends SncpServlet { } //-------------------------直接返回 或者 调用convertTo方法 -------------------------------- int maxStack = codes.length > 0 ? codes[codes.length - 1][1] : 1; - if (returnClass == void.class) { //返回 + if (boolReturnTypeFuture || returnClass == void.class) { //返回 mv.visitInsn(RETURN); maxStack = 8; - } else { + } else { //同步方法调用 mv.visitVarInsn(ALOAD, 0); mv.visitFieldInsn(GETFIELD, newDynName, "convert", Type.getDescriptor(BsonConvert.class)); mv.visitVarInsn(ALOAD, 2); @@ -528,6 +583,7 @@ public final class SncpDynServlet extends SncpServlet { instance.paramTypes = types; instance.handlerFuncParamIndex = handlerFuncIndex; instance.handlerFuncParamClass = handlerFuncClass; + instance.boolReturnTypeFuture = boolReturnTypeFuture; org.redkale.util.Attribute[] atts = new org.redkale.util.Attribute[ptypes.length + 1]; Annotation[][] anns = method.getParameterAnnotations(); diff --git a/src/org/redkale/net/sncp/SncpServlet.java b/src/org/redkale/net/sncp/SncpServlet.java index e7bc7b9f3..89ce86cd6 100644 --- a/src/org/redkale/net/sncp/SncpServlet.java +++ b/src/org/redkale/net/sncp/SncpServlet.java @@ -6,6 +6,7 @@ package org.redkale.net.sncp; import java.util.Objects; +import java.util.concurrent.*; import org.redkale.net.*; import org.redkale.util.*; @@ -20,6 +21,14 @@ public abstract class SncpServlet extends Servlet extends return !entry.isExpired(); } + //@Override + public CompletableFuture existsAsync(final K key) { + CompletableFuture future = new CompletableFuture(); + future.complete(exists(key)); + return future; + } + @Override public void exists(final AsyncHandler handler, @RpcAttachment final K key) { boolean rs = exists(key); @@ -211,6 +218,13 @@ public class CacheMemorySource extends return (V) entry.getValue(); } + //@Override + public CompletableFuture getAsync(final K key) { + CompletableFuture future = new CompletableFuture(); + future.complete(get(key)); + return future; + } + @Override public void get(final AsyncHandler handler, @RpcAttachment final K key) { V rs = get(key); @@ -230,6 +244,13 @@ public class CacheMemorySource extends return (V) entry.getValue(); } + //@Override + public CompletableFuture getAndRefreshAsync(final K key, final int expireSeconds) { + CompletableFuture future = new CompletableFuture(); + future.complete(getAndRefresh(key, expireSeconds)); + return future; + } + @Override public void getAndRefresh(final AsyncHandler handler, @RpcAttachment final K key, final int expireSeconds) { V rs = getAndRefresh(key, expireSeconds); diff --git a/src/org/redkale/source/CacheSource.java b/src/org/redkale/source/CacheSource.java index 9637b47ae..80b8702f7 100644 --- a/src/org/redkale/source/CacheSource.java +++ b/src/org/redkale/source/CacheSource.java @@ -52,7 +52,43 @@ public interface CacheSource { public void removeSetItem(final K key, final V value); - //----------------------异步版--------------------------------- + //---------------------- CompletableFuture 异步版 --------------------------------- + /** + public CompletableFuture existsAsync(final K key); + + public CompletableFuture getAsync(final K key); + + public CompletableFuture getAndRefreshAsync(final K key, final int expireSeconds); + + public CompletableFuture refreshAsync(final K key, final int expireSeconds); + + public CompletableFuture setAsync(final K key, final V value); + + public CompletableFuture setAsync(final int expireSeconds, final K key, final V value); + + public CompletableFuture setExpireSecondsAsync(final K key, final int expireSeconds); + + public CompletableFuture removeAsync(final K key); + + public CompletableFuture> getCollectionAsync(final K key); + + public CompletableFuture> getCollectionAndRefreshAsync(final K key, final int expireSeconds); + + public CompletableFuture appendListItemAsync(final K key, final V value); + + public CompletableFuture removeListItemAsync(final K key, final V value); + + public CompletableFuture appendSetItemAsync(final K key, final V value); + + public CompletableFuture removeSetItemAsync(final K key, final V value); + + default CompletableFuture isOpenAsync() { + CompletableFuture future = new CompletableFuture(); + future.complete(true); + return future; + } + */ + //---------------------- AsyncHandler 异步版 --------------------------------- public void exists(final AsyncHandler handler, final K key); public void get(final AsyncHandler handler, final K key); diff --git a/test/org/redkale/test/sncp/SncpTest.java b/test/org/redkale/test/sncp/SncpTest.java index 5670d6c10..931186e58 100644 --- a/test/org/redkale/test/sncp/SncpTest.java +++ b/test/org/redkale/test/sncp/SncpTest.java @@ -132,6 +132,13 @@ public class SncpTest { }.start(); } cld.await(); + final CountDownLatch cld2 = new CountDownLatch(1); + final CompletableFuture future = service.queryResultAsync(callbean); + future.whenComplete((v, e) -> { + cld2.countDown(); + System.out.println("异步执行完毕: " + v + ", 异常为: " + e); + }); + cld2.await(); System.out.println("---全部运行完毕---"); System.exit(0); } diff --git a/test/org/redkale/test/sncp/SncpTestIService.java b/test/org/redkale/test/sncp/SncpTestIService.java index 1deec776d..91d69588c 100644 --- a/test/org/redkale/test/sncp/SncpTestIService.java +++ b/test/org/redkale/test/sncp/SncpTestIService.java @@ -5,6 +5,7 @@ */ package org.redkale.test.sncp; +import java.util.concurrent.CompletableFuture; import org.redkale.service.*; import org.redkale.source.DataCallArrayAttribute; @@ -16,6 +17,8 @@ public interface SncpTestIService extends Service { public String queryResult(SncpTestBean bean); + public CompletableFuture queryResultAsync(SncpTestBean bean); + public void insert(@RpcCall(DataCallArrayAttribute.class) SncpTestBean... beans); public String updateBean(@RpcCall(SncpTestServiceImpl.CallAttribute.class) SncpTestBean bean); diff --git a/test/org/redkale/test/sncp/SncpTestServiceImpl.java b/test/org/redkale/test/sncp/SncpTestServiceImpl.java index 825e7b170..eddfa6d14 100644 --- a/test/org/redkale/test/sncp/SncpTestServiceImpl.java +++ b/test/org/redkale/test/sncp/SncpTestServiceImpl.java @@ -7,6 +7,7 @@ package org.redkale.test.sncp; import java.lang.reflect.Method; import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; import org.redkale.net.sncp.*; import org.redkale.service.*; import org.redkale.source.DataCallArrayAttribute; @@ -19,6 +20,25 @@ import org.redkale.util.*; @ResourceType({SncpTestIService.class}) public class SncpTestServiceImpl implements SncpTestIService { + @Override + public CompletableFuture queryResultAsync(SncpTestBean bean) { + final CompletableFuture future = new CompletableFuture<>(); + new Thread() { + @Override + public void run() { + try { + Thread.sleep(1000); + System.out.println(Thread.currentThread().getName() + " 运行了异步方法-----------queryResultAsync方法"); + future.complete("异步result: " + bean); + } catch (Exception e) { + e.printStackTrace(); + } + } + }.start(); + return future; + + } + public static class CallAttribute implements Attribute { @Override @@ -50,12 +70,14 @@ public class SncpTestServiceImpl implements SncpTestIService { } + @Override public void insert(@RpcCall(DataCallArrayAttribute.class) SncpTestBean... beans) { for (SncpTestBean bean : beans) { bean.setId(System.currentTimeMillis()); } } + @Override public String queryResult(SncpTestBean bean) { System.out.println(Thread.currentThread().getName() + " 运行了queryResult方法"); return "result: " + bean; @@ -67,6 +89,7 @@ public class SncpTestServiceImpl implements SncpTestIService { } @RpcMultiRun + @Override public String updateBean(@RpcCall(CallAttribute.class) SncpTestBean bean) { bean.setId(System.currentTimeMillis()); System.out.println(Thread.currentThread().getName() + " 运行了updateBean方法");