From 63f4a6ee0d1a391088b2dc489baf009e35aa8a88 Mon Sep 17 00:00:00 2001 From: redkale Date: Sat, 4 Feb 2023 14:59:30 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0@NonBlocking?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/annotation/NonBlocking.java | 7 +- .../java/org/redkale/boot/NodeHttpServer.java | 2 +- src/main/java/org/redkale/net/Context.java | 8 +- .../org/redkale/net/DispatcherServlet.java | 8 +- src/main/java/org/redkale/net/Filter.java | 18 ++- src/main/java/org/redkale/net/Response.java | 54 ++++++- src/main/java/org/redkale/net/Servlet.java | 7 +- .../net/http/HttpDispatcherServlet.java | 8 +- .../org/redkale/net/http/HttpResponse.java | 26 +++- .../org/redkale/net/http/HttpServlet.java | 127 +++++++++++++--- src/main/java/org/redkale/net/http/Rest.java | 140 ++++++++++++++---- .../redkale/net/http/WebSocketServlet.java | 2 + .../redkale/net/sncp/SncpAsyncHandler.java | 1 - .../redkale/net/sncp/SncpClientRequest.java | 3 +- .../net/sncp/SncpDispatcherServlet.java | 4 + .../org/redkale/net/sncp/SncpDynServlet.java | 9 +- .../java/org/redkale/net/sncp/SncpHeader.java | 37 ----- .../org/redkale/net/sncp/SncpResponse.java | 21 +++ .../org/redkale/net/sncp/SncpServlet.java | 1 + 19 files changed, 374 insertions(+), 109 deletions(-) diff --git a/src/main/java/org/redkale/annotation/NonBlocking.java b/src/main/java/org/redkale/annotation/NonBlocking.java index 84ccd92da..73189e915 100644 --- a/src/main/java/org/redkale/annotation/NonBlocking.java +++ b/src/main/java/org/redkale/annotation/NonBlocking.java @@ -6,13 +6,12 @@ package org.redkale.annotation; import java.lang.annotation.*; /** - * 非阻塞模式标记, 标记在Service类和方法、HttpServlet类上
+ * 非阻塞模式标记, 标记在Service类和方法、Filter类、HttpServlet类上
* 一般情况下,没有显注此注解的方法视为阻塞时, 以下两种情况除外:
- * 1、返回类型是CompletableFuture
+ * 1、返回类型是CompletionStage
* 2、返回类型是void且参数存在CompletionHandler类型
* 阻塞模式的方法会在work线程池中运行, 非阻塞在IO线程中运行。 * - * *

* 详情见: https://redkale.org * @@ -22,7 +21,7 @@ import java.lang.annotation.*; */ @Target({ElementType.TYPE, ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) -public @interface NonBlocking { +public @interface NonBlocking { //不可使用@Inherited,防止被继承, 见HttpServlet.preExecute/authenticate/execute boolean value() default true; } diff --git a/src/main/java/org/redkale/boot/NodeHttpServer.java b/src/main/java/org/redkale/boot/NodeHttpServer.java index 1af72964d..2e24b7e54 100644 --- a/src/main/java/org/redkale/boot/NodeHttpServer.java +++ b/src/main/java/org/redkale/boot/NodeHttpServer.java @@ -14,6 +14,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.stream.Stream; import org.redkale.annotation.*; +import static org.redkale.boot.Application.RESNAME_SNCP_ADDRESS; import org.redkale.boot.ClassFilter.FilterEntry; import org.redkale.cluster.ClusterAgent; import org.redkale.mq.MessageAgent; @@ -24,7 +25,6 @@ import org.redkale.service.Service; import org.redkale.util.AnyValue.DefaultAnyValue; import org.redkale.util.*; import org.redkale.watch.*; -import static org.redkale.boot.Application.RESNAME_SNCP_ADDRESS; /** * HTTP Server节点的配置Server diff --git a/src/main/java/org/redkale/net/Context.java b/src/main/java/org/redkale/net/Context.java index 16a23b2db..7b69858ec 100644 --- a/src/main/java/org/redkale/net/Context.java +++ b/src/main/java/org/redkale/net/Context.java @@ -111,13 +111,7 @@ public class Context { } protected void executeDispatch(Request request, Response response) { - if (workHashExecutor != null) { - workHashExecutor.execute(request.getHashid(), () -> dispatcher.dispatch(request, response)); - } else if (workExecutor != null) { - workExecutor.execute(() -> dispatcher.dispatch(request, response)); - } else { - dispatcher.dispatch(request, response); - } + dispatcher.dispatch(request, response); } public void execute(Servlet servlet, Request request, Response response) { diff --git a/src/main/java/org/redkale/net/DispatcherServlet.java b/src/main/java/org/redkale/net/DispatcherServlet.java index e21623de8..edac8e798 100644 --- a/src/main/java/org/redkale/net/DispatcherServlet.java +++ b/src/main/java/org/redkale/net/DispatcherServlet.java @@ -52,6 +52,10 @@ public abstract class DispatcherServlet headFilter; + protected DispatcherServlet() { + this._nonBlocking = true; + } + protected void incrExecuteCounter() { executeCounter.increment(); } @@ -290,8 +294,8 @@ public abstract class DispatcherServlet * javax.servlet.Filter方法doFilter是同步操作,此Filter.doFilter则是异步操作,方法return前需要调用Response.nextEvent()方可执行下一个Filter
- * 通过给Filter标记注解@Priority来确定执行的顺序, Priority.value值越大越先执行 + * 通过给Filter标记注解@Priority来确定执行的顺序, Priority.value值越大越先执行
+ * 如果doFilter方法是非阻塞的,需要在Filter类上标记@NonBlocking * *

* 详情见: https://redkale.org @@ -26,15 +27,20 @@ public abstract class Filter, P extends AnyValue _conf; //当前Filter的配置 + final boolean _nonBlocking; //当前Filter.doFilter方法是否为阻塞模式 + Filter _next; //下一个Filter + protected Filter() { + NonBlocking a = getClass().getAnnotation(NonBlocking.class); + this._nonBlocking = a != null && a.value(); + } + public void init(C context, AnyValue config) { } public abstract void doFilter(R request, P response) throws IOException; - public abstract boolean isNonBlocking(); - public void destroy(C context, AnyValue config) { } @@ -47,4 +53,8 @@ public abstract class Filter, P extends Priority p2 = o.getClass().getAnnotation(Priority.class); return (p2 == null ? 0 : p2.value()) - (p1 == null ? 0 : p1.value()); } + + protected boolean isNonBlocking() { + return _nonBlocking; + } } diff --git a/src/main/java/org/redkale/net/Response.java b/src/main/java/org/redkale/net/Response.java index 84ef082df..ff2afa166 100644 --- a/src/main/java/org/redkale/net/Response.java +++ b/src/main/java/org/redkale/net/Response.java @@ -8,6 +8,7 @@ package org.redkale.net; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; +import java.util.concurrent.*; import java.util.function.*; import java.util.logging.Level; import org.redkale.util.*; @@ -31,6 +32,8 @@ public abstract class Response> { protected Consumer responseConsumer; //虚拟构建的Response可能不存在responseConsumer + protected final ExecutorService workExecutor; + protected final R request; protected final WorkThread thread; @@ -118,6 +121,7 @@ public abstract class Response> { this.request = request; this.thread = WorkThread.currWorkThread(); this.writeBuffer = context != null ? ByteBuffer.allocateDirect(context.getBufferCapacity()) : null; + this.workExecutor = context == null || context.workExecutor == null ? ForkJoinPool.commonPool() : context.workExecutor; } protected AsyncConnection removeChannel() { @@ -154,6 +158,14 @@ public abstract class Response> { return true; } + protected ExecutorService getWorkExecutor() { + return workExecutor; + } + + protected void updateNonBlocking(boolean nonBlocking) { + this.inNonBlocking = nonBlocking; + } + protected boolean inNonBlocking() { return inNonBlocking; } @@ -181,13 +193,45 @@ public abstract class Response> { if (this.filter != null) { Filter runner = this.filter; this.filter = this.filter._next; - runner.doFilter(request, this); + if (inNonBlocking) { + if (runner.isNonBlocking()) { + runner.doFilter(request, this); + } else { + inNonBlocking = false; + workExecutor.execute(() -> { + try { + runner.doFilter(request, Response.this); + } catch (Throwable t) { + context.getLogger().log(Level.WARNING, "Filter occur exception. request = " + request, t); + finishError(t); + } + }); + } + } else { + runner.doFilter(request, this); + } return; } if (this.servlet != null) { Servlet s = this.servlet; this.servlet = null; - s.execute(request, this); + if (inNonBlocking) { + if (s.isNonBlocking()) { + s.execute(request, this); + } else { + inNonBlocking = false; + workExecutor.execute(() -> { + try { + s.execute(request, Response.this); + } catch (Throwable t) { + context.getLogger().log(Level.WARNING, "Servlet occur exception. request = " + request, t); + finishError(t); + } + }); + } + } else { + s.execute(request, this); + } } } @@ -212,6 +256,12 @@ public abstract class Response> { this.completeInIOThread(false); } + //被重载后kill不一定为true + protected void finishError(Throwable t) { + error(t); + } + + //kill=true protected void error(Throwable t) { completeInIOThread(true); } diff --git a/src/main/java/org/redkale/net/Servlet.java b/src/main/java/org/redkale/net/Servlet.java index 0efcf3537..b835de31b 100644 --- a/src/main/java/org/redkale/net/Servlet.java +++ b/src/main/java/org/redkale/net/Servlet.java @@ -5,8 +5,8 @@ */ package org.redkale.net; -import org.redkale.util.AnyValue; import java.io.IOException; +import org.redkale.util.AnyValue; /** * 协议请求处理类 @@ -23,6 +23,8 @@ public abstract class Servlet, P extends AnyValue _conf; //当前Servlet的配置 + protected boolean _nonBlocking; //当前Servlet.execute方法是否为非阻塞模式 + //Server执行start时运行此方法 public void init(C context, AnyValue config) { } @@ -33,4 +35,7 @@ public abstract class Servlet, P extends public void destroy(C context, AnyValue config) { } + protected boolean isNonBlocking() { + return _nonBlocking; + } } diff --git a/src/main/java/org/redkale/net/http/HttpDispatcherServlet.java b/src/main/java/org/redkale/net/http/HttpDispatcherServlet.java index 978854fa8..24405128a 100644 --- a/src/main/java/org/redkale/net/http/HttpDispatcherServlet.java +++ b/src/main/java/org/redkale/net/http/HttpDispatcherServlet.java @@ -55,6 +55,10 @@ public class HttpDispatcherServlet extends DispatcherServlet removeHttpServlet(final Predicate predicateEntry, final Predicate> predicateFilter) { List servlets = new ArrayList<>(); allMapLock.lock(); @@ -354,8 +358,8 @@ public class HttpDispatcherServlet extends DispatcherServlet * @@ -620,7 +640,7 @@ public class HttpResponse extends Response { public void finishFuture(final Convert convert, Type valueType, CompletionStage future) { future.whenComplete((v, e) -> { if (e != null) { - context.getLogger().log(Level.WARNING, "Servlet occur, force to close channel. request = " + request + ", result is CompletionStage", (Throwable) e); + context.getLogger().log(Level.WARNING, "Servlet occur exception. request = " + request + ", result is CompletionStage", (Throwable) e); if (e instanceof TimeoutException) { finish504(); } else { @@ -652,7 +672,7 @@ public class HttpResponse extends Response { public void finishJsonFuture(final Convert convert, Type valueType, CompletionStage future) { future.whenComplete((v, e) -> { if (e != null) { - context.getLogger().log(Level.WARNING, "Servlet occur, force to close channel. request = " + request + ", result is CompletionStage", (Throwable) e); + context.getLogger().log(Level.WARNING, "Servlet occur exception. request = " + request + ", result is CompletionStage", (Throwable) e); if (e instanceof TimeoutException) { finish504(); } else { diff --git a/src/main/java/org/redkale/net/http/HttpServlet.java b/src/main/java/org/redkale/net/http/HttpServlet.java index a44a36be3..502b36f2a 100644 --- a/src/main/java/org/redkale/net/http/HttpServlet.java +++ b/src/main/java/org/redkale/net/http/HttpServlet.java @@ -7,10 +7,12 @@ package org.redkale.net.http; import java.io.IOException; import java.lang.annotation.Annotation; -import java.lang.reflect.Method; +import java.lang.reflect.*; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; +import java.util.logging.Level; +import org.redkale.annotation.NonBlocking; import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES; import org.redkale.asm.*; import static org.redkale.asm.Opcodes.*; @@ -48,6 +50,10 @@ public class HttpServlet extends Servlet //这里不能直接使用HttpServlet,会造成死循环初始化HttpServlet private final Servlet authSuccessServlet = new Servlet() { + { + this._nonBlocking = true; + } + @Override public void execute(HttpRequest request, HttpResponse response) throws IOException { ActionEntry entry = request.actionEntry; @@ -72,12 +78,32 @@ public class HttpServlet extends Servlet } response.setCacheHandler(entry.cacheHandler); } - entry.servlet.execute(request, response); + if (response.inNonBlocking()) { + if (entry.nonBlocking) { + entry.servlet.execute(request, response); + } else { + response.updateNonBlocking(false); + response.getWorkExecutor().execute(() -> { + try { + entry.servlet.execute(request, response); + } catch (Throwable t) { + response.getContext().getLogger().log(Level.WARNING, "Servlet occur exception. request = " + request, t); + response.finishError(t); + } + }); + } + } else { + entry.servlet.execute(request, response); + } } }; //preExecute运行完后执行的Servlet private final Servlet preSuccessServlet = new Servlet() { + { + this._nonBlocking = true; + } + @Override public void execute(HttpRequest request, HttpResponse response) throws IOException { if (request.actionEntry != null) { @@ -134,7 +160,10 @@ public class HttpServlet extends Servlet if (ws != null && !ws.repair()) { path = ""; } - HashMap map = this._actionmap != null ? this._actionmap : loadActionEntry(); + //设置整个HttpServlet是否非阻塞式 + this._nonBlocking = isNonBlocking(getClass()); + //RestServlet会填充_actionmap + HashMap map = this._actionmap != null ? this._actionmap : loadActionEntry(this._nonBlocking); this.mappings = new Map.Entry[map.size()]; int i = -1; for (Map.Entry en : map.entrySet()) { @@ -201,6 +230,7 @@ public class HttpServlet extends Servlet * * @throws IOException IOException */ + @NonBlocking protected void preExecute(HttpRequest request, HttpResponse response) throws IOException { response.nextEvent(); } @@ -227,17 +257,74 @@ public class HttpServlet extends Servlet * * @throws IOException IOException */ + @NonBlocking protected void authenticate(HttpRequest request, HttpResponse response) throws IOException { response.nextEvent(); } @Override + @NonBlocking public void execute(HttpRequest request, HttpResponse response) throws IOException { response.thenEvent(preSuccessServlet); preExecute(request, response); } - private HashMap loadActionEntry() { + static Boolean isNonBlocking(Class servletClass) { + Class clz = servletClass; + Boolean preNonBlocking = null; + Boolean authNonBlocking = null; + Boolean exeNonBlocking = null; + do { + if (java.lang.reflect.Modifier.isAbstract(clz.getModifiers())) { + break; + } + RedkaleClassLoader.putReflectionDeclaredMethods(clz.getName()); + for (final Method method : clz.getDeclaredMethods()) { + String methodName = method.getName(); + //----------------------------------------------- + Class[] paramTypes = method.getParameterTypes(); + if (paramTypes.length != 2 || paramTypes[0] != HttpRequest.class || paramTypes[1] != HttpResponse.class) { + continue; + } + //----------------------------------------------- + Class[] exps = method.getExceptionTypes(); + if (exps.length > 0 && (exps.length != 1 || exps[0] != IOException.class)) { + continue; + } + //----------------------------------------------- + if ("preExecute".equals(methodName)) { + if (preNonBlocking == null) { + NonBlocking non = method.getAnnotation(NonBlocking.class); + preNonBlocking = non != null && non.value(); + } + continue; + } + if ("authenticate".equals(methodName)) { + if (authNonBlocking == null) { + NonBlocking non = method.getAnnotation(NonBlocking.class); + authNonBlocking = non != null && non.value(); + } + continue; + } + if ("execute".equals(methodName)) { + if (exeNonBlocking == null) { + NonBlocking non = method.getAnnotation(NonBlocking.class); + exeNonBlocking = non != null && non.value(); + } + continue; + } + } + } while ((clz = clz.getSuperclass()) != HttpServlet.class); + //设置整个HttpServlet是否非阻塞式 + NonBlocking non = servletClass.getAnnotation(NonBlocking.class); + if (non == null) { + return (preNonBlocking != null && preNonBlocking) && (authNonBlocking != null && authNonBlocking) && (exeNonBlocking != null && exeNonBlocking); + } else { + return non.value(); + } + } + + private HashMap loadActionEntry(boolean typeNonBlocking) { WebServlet module = this.getClass().getAnnotation(WebServlet.class); final int serviceid = module == null ? 0 : module.moduleid(); final HashMap map = new HashMap<>(); @@ -248,13 +335,9 @@ public class HttpServlet extends Servlet if (java.lang.reflect.Modifier.isAbstract(clz.getModifiers())) { break; } - RedkaleClassLoader.putReflectionPublicMethods(clz.getName()); - for (final Method method : clz.getMethods()) { - //----------------------------------------------- - String methodname = method.getName(); - if ("service".equals(methodname) || "preExecute".equals(methodname) || "execute".equals(methodname) || "authenticate".equals(methodname)) { - continue; - } + RedkaleClassLoader.putReflectionDeclaredMethods(clz.getName()); + for (final Method method : clz.getDeclaredMethods()) { + String methodName = method.getName(); //----------------------------------------------- Class[] paramTypes = method.getParameterTypes(); if (paramTypes.length != 2 || paramTypes[0] != HttpRequest.class || paramTypes[1] != HttpResponse.class) { @@ -266,6 +349,14 @@ public class HttpServlet extends Servlet continue; } //----------------------------------------------- + if ("preExecute".equals(methodName) || "authenticate".equals(methodName) + || "execute".equals(methodName) || "service".equals(methodName)) { + continue; + } + if (!Modifier.isPublic(method.getModifiers())) { + continue; + } + //----------------------------------------------- final HttpMapping mapping = method.getAnnotation(HttpMapping.class); if (mapping == null) { @@ -285,7 +376,7 @@ public class HttpServlet extends Servlet throw new HttpException(this.getClass().getSimpleName() + " have two same " + HttpMapping.class.getSimpleName() + "(" + name + ")"); } nameset.put(name, clz); - map.put(name, new ActionEntry(serviceid, actionid, name, methods, method, createActionServlet(method))); + map.put(name, new ActionEntry(serviceid, actionid, name, methods, method, createActionServlet(typeNonBlocking, method))); } } while ((clz = clz.getSuperclass()) != HttpServlet.class); return map; @@ -331,6 +422,7 @@ public class HttpServlet extends Servlet oneCache = new CacheEntry(response.getStatus(), response.getContentType(), content); } : null; } + this.nonBlocking = servlet._nonBlocking; } protected static boolean auth(Method method) { @@ -353,10 +445,6 @@ public class HttpServlet extends Servlet return method.getAnnotations(); } - boolean isNeedCheck() { - return this.moduleid != 0 || this.actionid != 0; - } - boolean checkMethod(final String reqMethod) { if (methods.length == 0) { return true; @@ -379,6 +467,8 @@ public class HttpServlet extends Servlet final boolean rpconly; + final boolean nonBlocking; + final boolean auth; final int moduleid; @@ -398,7 +488,7 @@ public class HttpServlet extends Servlet Annotation[] annotations; } - private HttpServlet createActionServlet(final Method method) { + private HttpServlet createActionServlet(final boolean typeNonBlocking, final Method method) { //------------------------------------------------------------------------------ final String supDynName = HttpServlet.class.getName().replace('.', '/'); final String interName = this.getClass().getName().replace('.', '/'); @@ -478,6 +568,8 @@ public class HttpServlet extends Servlet java.lang.reflect.Field field = instance.getClass().getField(factfield); field.set(instance, this); RedkaleClassLoader.putReflectionField(newDynName.replace('/', '.'), field); + NonBlocking non = method.getAnnotation(NonBlocking.class); + instance._nonBlocking = typeNonBlocking ? (non == null ? typeNonBlocking : non.value()) : false; return instance; } catch (Exception ex) { throw new HttpException(ex); @@ -517,6 +609,7 @@ public class HttpServlet extends Servlet if (actionSimpleMappingUrl != null && !Utility.contains(actionSimpleMappingUrl, '*', '{', '[', '(', '|', '^', '$', '+', '?', '\\')) { this._actionSimpleMappingUrl = actionSimpleMappingUrl; } + this._nonBlocking = actionEntry.nonBlocking; } @Override diff --git a/src/main/java/org/redkale/net/http/Rest.java b/src/main/java/org/redkale/net/http/Rest.java index 9e9bfb291..bd2008b9f 100644 --- a/src/main/java/org/redkale/net/http/Rest.java +++ b/src/main/java/org/redkale/net/http/Rest.java @@ -984,17 +984,61 @@ public final class Rest { if (!HttpServlet.class.isAssignableFrom(baseServletType)) { throw new RestException(baseServletType + " is not HttpServlet Class on createRestServlet"); } - int mod = baseServletType.getModifiers(); - if (!java.lang.reflect.Modifier.isPublic(mod)) { + int parentMod = baseServletType.getModifiers(); + if (!java.lang.reflect.Modifier.isPublic(parentMod)) { throw new RestException(baseServletType + " is not Public Class on createRestServlet"); } - if (java.lang.reflect.Modifier.isAbstract(mod)) { - for (Method m : baseServletType.getDeclaredMethods()) { - if (java.lang.reflect.Modifier.isAbstract(m.getModifiers())) { //@since 2.4.0 - throw new RestException(baseServletType + " cannot contains a abstract Method on " + baseServletType); + Boolean parentNon0 = null; + { + NonBlocking snon = serviceType.getAnnotation(NonBlocking.class); + parentNon0 = snon == null ? null : snon.value(); + if (HttpServlet.class != baseServletType) { + Boolean preNonBlocking = null; + Boolean authNonBlocking = null; + RedkaleClassLoader.putReflectionDeclaredMethods(baseServletType.getName()); + for (Method m : baseServletType.getDeclaredMethods()) { + if (java.lang.reflect.Modifier.isAbstract(parentMod) && java.lang.reflect.Modifier.isAbstract(m.getModifiers())) { //@since 2.4.0 + throw new RestException(baseServletType + " cannot contains a abstract Method on " + baseServletType); + } + Class[] paramTypes = m.getParameterTypes(); + if (paramTypes.length != 2 || paramTypes[0] != HttpRequest.class || paramTypes[1] != HttpResponse.class) { + continue; + } + //----------------------------------------------- + Class[] exps = m.getExceptionTypes(); + if (exps.length > 0 && (exps.length != 1 || exps[0] != IOException.class)) { + continue; + } + //----------------------------------------------- + String methodName = m.getName(); + if ("preExecute".equals(methodName)) { + if (preNonBlocking == null) { + NonBlocking non = m.getAnnotation(NonBlocking.class); + preNonBlocking = non != null && non.value(); + } + continue; + } + if ("authenticate".equals(methodName)) { + if (authNonBlocking == null) { + NonBlocking non = m.getAnnotation(NonBlocking.class); + authNonBlocking = non != null && non.value(); + } + continue; + } + } + if (preNonBlocking != null && !preNonBlocking) { + parentNon0 = false; + } else if (authNonBlocking != null && !authNonBlocking) { + parentNon0 = false; + } else { + NonBlocking bnon = baseServletType.getAnnotation(NonBlocking.class); + if (bnon != null && !bnon.value()) { + parentNon0 = false; + } } } } + final String restInternalName = Type.getInternalName(Rest.class); final String serviceDesc = Type.getDescriptor(serviceType); final String webServletDesc = Type.getDescriptor(WebServlet.class); @@ -1002,6 +1046,7 @@ public final class Rest { final String reqDesc = Type.getDescriptor(HttpRequest.class); final String respDesc = Type.getDescriptor(HttpResponse.class); final String convertDesc = Type.getDescriptor(Convert.class); + final String nonblockDesc = Type.getDescriptor(NonBlocking.class); final String typeDesc = Type.getDescriptor(java.lang.reflect.Type.class); final String retDesc = Type.getDescriptor(RetResult.class); final String httpResultDesc = Type.getDescriptor(HttpResult.class); @@ -1041,6 +1086,8 @@ public final class Rest { throw new RestException(serviceType + " is ignore Rest Service Class"); //标记为ignore=true不创建Servlet } final boolean serRpcOnly = controller != null && controller.rpconly(); + final Boolean parentNonBlocking = parentNon0; + ClassLoader loader = classLoader == null ? Thread.currentThread().getContextClassLoader() : classLoader; String stname = serviceType.getSimpleName(); if (stname.startsWith("Service")) { //类似ServiceWatchService这样的类保留第一个Service字样 @@ -1125,11 +1172,11 @@ public final class Rest { paramTypes.add(TypeToken.getGenericType(method.getGenericParameterTypes(), serviceType)); retvalTypes.add(formatRestReturnType(method, serviceType)); if (mappings.length == 0) { //没有Mapping,设置一个默认值 - MappingEntry entry = new MappingEntry(serRpcOnly, methodIdex, null, bigModuleName, method); + MappingEntry entry = new MappingEntry(serRpcOnly, methodIdex, parentNonBlocking, null, bigModuleName, method); entrys.add(entry); } else { for (RestMapping mapping : mappings) { - MappingEntry entry = new MappingEntry(serRpcOnly, methodIdex, mapping, defModuleName, method); + MappingEntry entry = new MappingEntry(serRpcOnly, methodIdex, parentNonBlocking, mapping, defModuleName, method); entrys.add(entry); } } @@ -1465,8 +1512,8 @@ public final class Rest { Method restactMethod = newClazz.getDeclaredMethod("_createRestActionEntry"); restactMethod.setAccessible(true); - Field tmpentrysfield = HttpServlet.class.getDeclaredField("_actionmap"); - tmpentrysfield.setAccessible(true); + Field tmpEntrysField = HttpServlet.class.getDeclaredField("_actionmap"); + tmpEntrysField.setAccessible(true); HashMap innerEntryMap = (HashMap) restactMethod.invoke(obj); for (Map.Entry en : innerEntryMap.entrySet()) { Method m = mappingUrlToMethod.get(en.getKey()); @@ -1474,7 +1521,10 @@ public final class Rest { en.getValue().annotations = HttpServlet.ActionEntry.annotations(m); } } - tmpentrysfield.set(obj, innerEntryMap); + tmpEntrysField.set(obj, innerEntryMap); + Field nonblockField = Servlet.class.getDeclaredField("_nonBlocking"); + nonblockField.setAccessible(true); + nonblockField.set(obj, parentNonBlocking == null ? true : parentNonBlocking); return obj; } catch (ClassNotFoundException e) { } catch (Throwable e) { @@ -1587,14 +1637,14 @@ public final class Rest { paramTypes.add(TypeToken.getGenericType(method.getGenericParameterTypes(), serviceType)); retvalTypes.add(formatRestReturnType(method, serviceType)); if (mappings.length == 0) { //没有Mapping,设置一个默认值 - MappingEntry entry = new MappingEntry(serRpcOnly, methodidex, null, bigmodulename, method); + MappingEntry entry = new MappingEntry(serRpcOnly, methodidex, parentNonBlocking, null, bigmodulename, method); if (entrys.contains(entry)) { throw new RestException(serviceType.getName() + " on " + method.getName() + " 's mapping(" + entry.name + ") is repeat"); } entrys.add(entry); } else { for (RestMapping mapping : mappings) { - MappingEntry entry = new MappingEntry(serRpcOnly, methodidex, mapping, defmodulename, method); + MappingEntry entry = new MappingEntry(serRpcOnly, methodidex, parentNonBlocking, mapping, defmodulename, method); if (entrys.contains(entry)) { throw new RestException(serviceType.getName() + " on " + method.getName() + " 's mapping(" + entry.name + ") is repeat"); } @@ -1667,6 +1717,11 @@ public final class Rest { classMap.put("repair", repair); //classMap.put("comment", comment); //不显示太多信息 } + { //NonBlocking + av0 = cw.visitAnnotation(nonblockDesc, true); + av0.visit("value", true); + av0.visitEnd(); + } { //内部类 cw.visitInnerClass(actionEntryName, httpServletName, HttpServlet.ActionEntry.class.getSimpleName(), ACC_PROTECTED + ACC_FINAL + ACC_STATIC); @@ -2102,8 +2157,7 @@ public final class Rest { Map mappingMap = new LinkedHashMap<>(); java.lang.reflect.Type returnGenericNoFutureType = TypeToken.getGenericType(method.getGenericReturnType(), serviceType); - { // 设置 Annotation - //设置 HttpMapping + { //设置 Annotation HttpMapping boolean reqpath = false; for (Object[] ps : paramlist) { if ("#".equals((String) ps[1])) { @@ -2168,7 +2222,12 @@ public final class Rest { mappingMap.put("result", returnGenericNoFutureType == returnType ? returnType.getName() : String.valueOf(returnGenericNoFutureType)); entry.mappingurl = url; } - if (rcs != null && rcs.length > 0) { // 设置 Annotation + { //设置 Annotation NonBlocking + av0 = mv.visitAnnotation(nonblockDesc, true); + av0.visit("value", entry.nonBlocking); + av0.visitEnd(); + } + if (rcs != null && rcs.length > 0) { // 设置 Annotation RestConvert av0 = mv.visitAnnotation(restConvertsDesc, true); AnnotationVisitor av1 = av0.visitArray("value"); //设置 RestConvert @@ -2197,7 +2256,7 @@ public final class Rest { av1.visitEnd(); av0.visitEnd(); } - if (rcc != null && rcc.length > 0) { // 设置 Annotation + if (rcc != null && rcc.length > 0) { // 设置 Annotation RestConvertCoder av0 = mv.visitAnnotation(restConvertCodersDesc, true); AnnotationVisitor av1 = av0.visitArray("value"); //设置 RestConvertCoder @@ -3184,8 +3243,13 @@ public final class Rest { cw2.visit(V11, ACC_SUPER, newDynName + "$" + entry.newActionClassName, null, httpServletName, null); cw2.visitInnerClass(newDynName + "$" + entry.newActionClassName, newDynName, entry.newActionClassName, ACC_PRIVATE + ACC_STATIC); + { //设置 Annotation NonBlocking + av0 = cw2.visitAnnotation(nonblockDesc, true); + av0.visit("value", entry.nonBlocking); + av0.visitEnd(); + } { - fv = cw2.visitField(0, "servlet", "L" + newDynName + ";", null, null); + fv = cw2.visitField(0, "_parentServlet", "L" + newDynName + ";", null, null); fv.visitEnd(); } { @@ -3194,7 +3258,10 @@ public final class Rest { mv.visitMethodInsn(INVOKESPECIAL, httpServletName, "", "()V", false); mv.visitVarInsn(ALOAD, 0); mv.visitVarInsn(ALOAD, 1); - mv.visitFieldInsn(PUTFIELD, newDynName + "$" + entry.newActionClassName, "servlet", "L" + newDynName + ";"); + mv.visitFieldInsn(PUTFIELD, newDynName + "$" + entry.newActionClassName, "_parentServlet", "L" + newDynName + ";"); + mv.visitVarInsn(ALOAD, 0); + mv.visitInsn(entry.nonBlocking ? ICONST_1 : ICONST_0); + mv.visitFieldInsn(PUTFIELD, newDynName + "$" + entry.newActionClassName, "_nonBlocking", "Z"); mv.visitInsn(RETURN); mv.visitMaxs(2, 2); mv.visitEnd(); @@ -3211,7 +3278,7 @@ public final class Rest { { mv = new MethodDebugVisitor(cw2.visitMethod(ACC_PUBLIC, "execute", "(" + reqDesc + respDesc + ")V", null, new String[]{"java/io/IOException"})); mv.visitVarInsn(ALOAD, 0); - mv.visitFieldInsn(GETFIELD, newDynName + "$" + entry.newActionClassName, "servlet", "L" + newDynName + ";"); + mv.visitFieldInsn(GETFIELD, newDynName + "$" + entry.newActionClassName, "_parentServlet", "L" + newDynName + ";"); mv.visitVarInsn(ALOAD, 1); mv.visitVarInsn(ALOAD, 2); mv.visitMethodInsn(INVOKEVIRTUAL, newDynName, entry.newMethodName, "(" + reqDesc + respDesc + ")V", false); @@ -3416,8 +3483,8 @@ public final class Rest { Method restactMethod = newClazz.getDeclaredMethod("_createRestActionEntry"); restactMethod.setAccessible(true); RedkaleClassLoader.putReflectionMethod(newDynName.replace('/', '.'), restactMethod); - Field tmpentrysfield = HttpServlet.class.getDeclaredField("_actionmap"); - tmpentrysfield.setAccessible(true); + Field tmpEntrysField = HttpServlet.class.getDeclaredField("_actionmap"); + tmpEntrysField.setAccessible(true); HashMap innerEntryMap = (HashMap) restactMethod.invoke(obj); for (Map.Entry en : innerEntryMap.entrySet()) { Method m = mappingurlToMethod.get(en.getKey()); @@ -3425,8 +3492,13 @@ public final class Rest { en.getValue().annotations = HttpServlet.ActionEntry.annotations(m); } } - tmpentrysfield.set(obj, innerEntryMap); - RedkaleClassLoader.putReflectionField(HttpServlet.class.getName(), tmpentrysfield); + tmpEntrysField.set(obj, innerEntryMap); + RedkaleClassLoader.putReflectionField(HttpServlet.class.getName(), tmpEntrysField); + + Field nonblockField = Servlet.class.getDeclaredField("_nonBlocking"); + nonblockField.setAccessible(true); + nonblockField.set(obj, parentNonBlocking == null ? true : parentNonBlocking); + RedkaleClassLoader.putReflectionField(Servlet.class.getName(), nonblockField); return obj; } catch (Throwable e) { throw new RestException(e); @@ -3557,7 +3629,7 @@ public final class Rest { return normal ? name : Utility.md5Hex(name); } - public MappingEntry(final boolean serRpcOnly, int methodIndex, RestMapping mapping, final String defModuleName, Method method) { + public MappingEntry(final boolean serRpcOnly, int methodIndex, Boolean typeNonBlocking, RestMapping mapping, final String defModuleName, Method method) { if (mapping == null) { mapping = DEFAULT__MAPPING; } @@ -3588,6 +3660,22 @@ public final class Rest { this.existsPound = pound; this.newMethodName = formatMappingName(this.name.replace('/', '$').replace('.', '_').replace('-', '_')); this.newActionClassName = "_Dyn_" + this.newMethodName + "_ActionHttpServlet"; + + NonBlocking non = method.getAnnotation(NonBlocking.class); + Boolean nonFlag = non == null ? typeNonBlocking : (Boolean) non.value(); //显注在方法优先级大于类 + if (nonFlag == null) { + if (CompletionStage.class.isAssignableFrom(method.getReturnType())) { + nonFlag = true; + } else { + for (Parameter mp : method.getParameters()) { + if (CompletionHandler.class.isAssignableFrom(mp.getType())) { + nonFlag = true; + break; + } + } + } + } + this.nonBlocking = nonFlag == null ? false : nonFlag; } public final int methodIdx; // _paramtypes 的下标,从0开始 @@ -3608,6 +3696,8 @@ public final class Rest { public final String[] methods; + public final boolean nonBlocking; + public final boolean rpconly; public final boolean auth; diff --git a/src/main/java/org/redkale/net/http/WebSocketServlet.java b/src/main/java/org/redkale/net/http/WebSocketServlet.java index 2418b279d..1089565b0 100644 --- a/src/main/java/org/redkale/net/http/WebSocketServlet.java +++ b/src/main/java/org/redkale/net/http/WebSocketServlet.java @@ -148,6 +148,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl @Override final void preInit(Application application, HttpContext context, AnyValue conf) { + this._nonBlocking = true; if (this.textConvert == null) { this.textConvert = jsonConvert; } @@ -218,6 +219,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl } @Override //在IOThread中执行 + @NonBlocking public final void execute(final HttpRequest request, final HttpResponse response) throws IOException { final boolean debug = logger.isLoggable(Level.FINEST); if (!request.isWebSocket()) { diff --git a/src/main/java/org/redkale/net/sncp/SncpAsyncHandler.java b/src/main/java/org/redkale/net/sncp/SncpAsyncHandler.java index 1d847b1d0..d09e7a050 100644 --- a/src/main/java/org/redkale/net/sncp/SncpAsyncHandler.java +++ b/src/main/java/org/redkale/net/sncp/SncpAsyncHandler.java @@ -9,7 +9,6 @@ import java.nio.channels.CompletionHandler; import java.util.concurrent.CompletableFuture; import java.util.logging.*; import org.redkale.annotation.ConstructorParameters; -import static org.redkale.asm.Opcodes.*; import org.redkale.asm.*; import static org.redkale.asm.Opcodes.*; import org.redkale.convert.bson.*; diff --git a/src/main/java/org/redkale/net/sncp/SncpClientRequest.java b/src/main/java/org/redkale/net/sncp/SncpClientRequest.java index 2da07737e..82aded86b 100644 --- a/src/main/java/org/redkale/net/sncp/SncpClientRequest.java +++ b/src/main/java/org/redkale/net/sncp/SncpClientRequest.java @@ -57,8 +57,7 @@ public class SncpClientRequest extends ClientRequest { @Override public String toString() { return getClass().getSimpleName() + "_" + Objects.hashCode(this) + "{" - + "header=" + header - + ", seqid =" + seqid + + "header=" + header + ", seqid =" + seqid + ", body=[" + (bodyContent == null ? -1 : bodyContent.length) + "]" + "}"; } diff --git a/src/main/java/org/redkale/net/sncp/SncpDispatcherServlet.java b/src/main/java/org/redkale/net/sncp/SncpDispatcherServlet.java index 590b4bb24..60b1e3e4a 100644 --- a/src/main/java/org/redkale/net/sncp/SncpDispatcherServlet.java +++ b/src/main/java/org/redkale/net/sncp/SncpDispatcherServlet.java @@ -24,6 +24,10 @@ public class SncpDispatcherServlet extends DispatcherServlet handlerCreator; + protected boolean nonBlocking; + @Resource protected BsonConvert convert; @@ -619,9 +621,14 @@ public final class SncpDynServlet extends SncpServlet { } } } + NonBlocking non = method.getAnnotation(NonBlocking.class); + if (non == null) { + non = service.getClass().getAnnotation(NonBlocking.class); + } try { SncpServletAction instance = (SncpServletAction) newClazz.getDeclaredConstructor().newInstance(); instance.method = method; + instance.nonBlocking = non == null ? false : non.value(); java.lang.reflect.Type[] types = new java.lang.reflect.Type[originalParamTypes.length + 1]; types[0] = originalReturnType; System.arraycopy(originalParamTypes, 0, types, 1, originalParamTypes.length); diff --git a/src/main/java/org/redkale/net/sncp/SncpHeader.java b/src/main/java/org/redkale/net/sncp/SncpHeader.java index c17cbba06..17e498d9a 100644 --- a/src/main/java/org/redkale/net/sncp/SncpHeader.java +++ b/src/main/java/org/redkale/net/sncp/SncpHeader.java @@ -42,11 +42,6 @@ public class SncpHeader { public SncpHeader() { } - public SncpHeader(InetSocketAddress clientSncpAddress) { - this.addrBytes = clientSncpAddress == null ? new byte[4] : clientSncpAddress.getAddress().getAddress(); - this.addrPort = clientSncpAddress == null ? 0 : clientSncpAddress.getPort(); - } - public SncpHeader(InetSocketAddress clientSncpAddress, Uint128 serviceid, Uint128 actionid) { this.addrBytes = clientSncpAddress == null ? new byte[4] : clientSncpAddress.getAddress().getAddress(); this.addrPort = clientSncpAddress == null ? 0 : clientSncpAddress.getPort(); @@ -145,64 +140,32 @@ public class SncpHeader { return seqid; } - public void setSeqid(Long seqid) { - this.seqid = seqid; - } - public Uint128 getServiceid() { return serviceid; } - public void setServiceid(Uint128 serviceid) { - this.serviceid = serviceid; - } - public int getServiceVersion() { return serviceVersion; } - public void setServiceVersion(int serviceVersion) { - this.serviceVersion = serviceVersion; - } - public Uint128 getActionid() { return actionid; } - public void setActionid(Uint128 actionid) { - this.actionid = actionid; - } - public byte[] getAddrBytes() { return addrBytes; } - public void setAddrBytes(byte[] addrBytes) { - this.addrBytes = addrBytes; - } - public int getAddrPort() { return addrPort; } - public void setAddrPort(int addrPort) { - this.addrPort = addrPort; - } - public int getBodyLength() { return bodyLength; } - public void setBodyLength(int bodyLength) { - this.bodyLength = bodyLength; - } - public int getRetcode() { return retcode; } - public void setRetcode(int retcode) { - this.retcode = retcode; - } - } diff --git a/src/main/java/org/redkale/net/sncp/SncpResponse.java b/src/main/java/org/redkale/net/sncp/SncpResponse.java index d2a814cba..268b3c314 100644 --- a/src/main/java/org/redkale/net/sncp/SncpResponse.java +++ b/src/main/java/org/redkale/net/sncp/SncpResponse.java @@ -5,6 +5,7 @@ */ package org.redkale.net.sncp; +import java.util.concurrent.ExecutorService; import org.redkale.convert.bson.BsonWriter; import org.redkale.net.Response; import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE; @@ -66,6 +67,26 @@ public class SncpResponse extends Response { return super.recycle(); } + @Override + protected ExecutorService getWorkExecutor() { + return super.getWorkExecutor(); + } + + @Override + protected void updateNonBlocking(boolean nonBlocking) { + super.updateNonBlocking(nonBlocking); + } + + @Override + protected boolean inNonBlocking() { + return super.inNonBlocking(); + } + + @Override + protected void finishError(Throwable t) { + finish(RETCODE_THROWEXCEPTION, null); + } + public void finish(final int retcode, final BsonWriter out) { if (out == null) { final ByteArray buffer = new ByteArray(HEADER_SIZE); diff --git a/src/main/java/org/redkale/net/sncp/SncpServlet.java b/src/main/java/org/redkale/net/sncp/SncpServlet.java index 636813115..3284a770d 100644 --- a/src/main/java/org/redkale/net/sncp/SncpServlet.java +++ b/src/main/java/org/redkale/net/sncp/SncpServlet.java @@ -30,6 +30,7 @@ public abstract class SncpServlet extends Servlet