实现@NonBlocking

This commit is contained in:
redkale
2023-02-04 14:59:30 +08:00
parent 894ea65a35
commit 63f4a6ee0d
19 changed files with 374 additions and 109 deletions

View File

@@ -6,13 +6,12 @@ package org.redkale.annotation;
import java.lang.annotation.*;
/**
* 非阻塞模式标记, 标记在Service类和方法、HttpServlet类上 <br>
* 非阻塞模式标记, 标记在Service类和方法、Filter类、HttpServlet类上 <br>
* 一般情况下,没有显注此注解的方法视为阻塞时, 以下两种情况除外: <br>
* 1、返回类型是CompletableFuture <br>
* 1、返回类型是CompletionStage <br>
* 2、返回类型是void且参数存在CompletionHandler类型 <br>
* 阻塞模式的方法会在work线程池中运行 非阻塞在IO线程中运行。
*
*
* <p>
* 详情见: https://redkale.org
*
@@ -22,7 +21,7 @@ import java.lang.annotation.*;
*/
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface NonBlocking {
public @interface NonBlocking { //不可使用@Inherited防止被继承, 见HttpServlet.preExecute/authenticate/execute
boolean value() default true;
}

View File

@@ -14,6 +14,7 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.stream.Stream;
import org.redkale.annotation.*;
import static org.redkale.boot.Application.RESNAME_SNCP_ADDRESS;
import org.redkale.boot.ClassFilter.FilterEntry;
import org.redkale.cluster.ClusterAgent;
import org.redkale.mq.MessageAgent;
@@ -24,7 +25,6 @@ import org.redkale.service.Service;
import org.redkale.util.AnyValue.DefaultAnyValue;
import org.redkale.util.*;
import org.redkale.watch.*;
import static org.redkale.boot.Application.RESNAME_SNCP_ADDRESS;
/**
* HTTP Server节点的配置Server

View File

@@ -111,13 +111,7 @@ public class Context {
}
protected void executeDispatch(Request request, Response response) {
if (workHashExecutor != null) {
workHashExecutor.execute(request.getHashid(), () -> dispatcher.dispatch(request, response));
} else if (workExecutor != null) {
workExecutor.execute(() -> dispatcher.dispatch(request, response));
} else {
dispatcher.dispatch(request, response);
}
dispatcher.dispatch(request, response);
}
public void execute(Servlet servlet, Request request, Response response) {

View File

@@ -52,6 +52,10 @@ public abstract class DispatcherServlet<K extends Serializable, C extends Contex
protected Filter<C, R, P> headFilter;
protected DispatcherServlet() {
this._nonBlocking = true;
}
protected void incrExecuteCounter() {
executeCounter.increment();
}
@@ -290,8 +294,8 @@ public abstract class DispatcherServlet<K extends Serializable, C extends Contex
response.inNonBlocking = true;
response.nextEvent();
} catch (Throwable t) {
response.context.logger.log(Level.WARNING, "prepare servlet abort, force to close channel ", t);
response.error(t);
response.context.logger.log(Level.WARNING, "Dispatch servlet occur exception", t);
response.finishError(t);
}
}

View File

@@ -6,13 +6,14 @@
package org.redkale.net;
import java.io.IOException;
import org.redkale.annotation.Priority;
import org.redkale.annotation.*;
import org.redkale.util.AnyValue;
/**
* 协议拦截器类, 类似JavaEE中的javax.servlet.Filter <br>
* javax.servlet.Filter方法doFilter是同步操作此Filter.doFilter则是异步操作方法return前需要调用Response.nextEvent()方可执行下一个Filter <br>
* 通过给Filter标记注解&#064;Priority来确定执行的顺序, Priority.value值越大越先执行
* 通过给Filter标记注解&#064;Priority来确定执行的顺序, Priority.value值越大越先执行 <br>
* 如果doFilter方法是非阻塞的需要在Filter类上标记&#064;NonBlocking
*
* <p>
* 详情见: https://redkale.org
@@ -26,15 +27,20 @@ public abstract class Filter<C extends Context, R extends Request<C>, P extends
AnyValue _conf; //当前Filter的配置
final boolean _nonBlocking; //当前Filter.doFilter方法是否为阻塞模式
Filter<C, R, P> _next; //下一个Filter
protected Filter() {
NonBlocking a = getClass().getAnnotation(NonBlocking.class);
this._nonBlocking = a != null && a.value();
}
public void init(C context, AnyValue config) {
}
public abstract void doFilter(R request, P response) throws IOException;
public abstract boolean isNonBlocking();
public void destroy(C context, AnyValue config) {
}
@@ -47,4 +53,8 @@ public abstract class Filter<C extends Context, R extends Request<C>, P extends
Priority p2 = o.getClass().getAnnotation(Priority.class);
return (p2 == null ? 0 : p2.value()) - (p1 == null ? 0 : p1.value());
}
protected boolean isNonBlocking() {
return _nonBlocking;
}
}

View File

@@ -8,6 +8,7 @@ package org.redkale.net;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.*;
import java.util.function.*;
import java.util.logging.Level;
import org.redkale.util.*;
@@ -31,6 +32,8 @@ public abstract class Response<C extends Context, R extends Request<C>> {
protected Consumer<Response> responseConsumer; //虚拟构建的Response可能不存在responseConsumer
protected final ExecutorService workExecutor;
protected final R request;
protected final WorkThread thread;
@@ -118,6 +121,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
this.request = request;
this.thread = WorkThread.currWorkThread();
this.writeBuffer = context != null ? ByteBuffer.allocateDirect(context.getBufferCapacity()) : null;
this.workExecutor = context == null || context.workExecutor == null ? ForkJoinPool.commonPool() : context.workExecutor;
}
protected AsyncConnection removeChannel() {
@@ -154,6 +158,14 @@ public abstract class Response<C extends Context, R extends Request<C>> {
return true;
}
protected ExecutorService getWorkExecutor() {
return workExecutor;
}
protected void updateNonBlocking(boolean nonBlocking) {
this.inNonBlocking = nonBlocking;
}
protected boolean inNonBlocking() {
return inNonBlocking;
}
@@ -181,13 +193,45 @@ public abstract class Response<C extends Context, R extends Request<C>> {
if (this.filter != null) {
Filter runner = this.filter;
this.filter = this.filter._next;
runner.doFilter(request, this);
if (inNonBlocking) {
if (runner.isNonBlocking()) {
runner.doFilter(request, this);
} else {
inNonBlocking = false;
workExecutor.execute(() -> {
try {
runner.doFilter(request, Response.this);
} catch (Throwable t) {
context.getLogger().log(Level.WARNING, "Filter occur exception. request = " + request, t);
finishError(t);
}
});
}
} else {
runner.doFilter(request, this);
}
return;
}
if (this.servlet != null) {
Servlet s = this.servlet;
this.servlet = null;
s.execute(request, this);
if (inNonBlocking) {
if (s.isNonBlocking()) {
s.execute(request, this);
} else {
inNonBlocking = false;
workExecutor.execute(() -> {
try {
s.execute(request, Response.this);
} catch (Throwable t) {
context.getLogger().log(Level.WARNING, "Servlet occur exception. request = " + request, t);
finishError(t);
}
});
}
} else {
s.execute(request, this);
}
}
}
@@ -212,6 +256,12 @@ public abstract class Response<C extends Context, R extends Request<C>> {
this.completeInIOThread(false);
}
//被重载后kill不一定为true
protected void finishError(Throwable t) {
error(t);
}
//kill=true
protected void error(Throwable t) {
completeInIOThread(true);
}

View File

@@ -5,8 +5,8 @@
*/
package org.redkale.net;
import org.redkale.util.AnyValue;
import java.io.IOException;
import org.redkale.util.AnyValue;
/**
* 协议请求处理类
@@ -23,6 +23,8 @@ public abstract class Servlet<C extends Context, R extends Request<C>, P extends
AnyValue _conf; //当前Servlet的配置
protected boolean _nonBlocking; //当前Servlet.execute方法是否为非阻塞模式
//Server执行start时运行此方法
public void init(C context, AnyValue config) {
}
@@ -33,4 +35,7 @@ public abstract class Servlet<C extends Context, R extends Request<C>, P extends
public void destroy(C context, AnyValue config) {
}
protected boolean isNonBlocking() {
return _nonBlocking;
}
}

View File

@@ -55,6 +55,10 @@ public class HttpDispatcherServlet extends DispatcherServlet<String, HttpContext
private HttpServlet lastRunServlet;
protected HttpDispatcherServlet() {
super();
}
private List<HttpServlet> removeHttpServlet(final Predicate<MappingEntry> predicateEntry, final Predicate<Map.Entry<String, WebSocketServlet>> predicateFilter) {
List<HttpServlet> servlets = new ArrayList<>();
allMapLock.lock();
@@ -354,8 +358,8 @@ public class HttpDispatcherServlet extends DispatcherServlet<String, HttpContext
}
servlet.execute(request, response);
} catch (Exception e) {
request.getContext().getLogger().log(Level.WARNING, "Servlet occur, force to close channel. request = " + request, e);
response.finish(500, null);
request.getContext().getLogger().log(Level.WARNING, "Dispatch servlet occur exception. request = " + request, e);
response.finishError(e);
}
}

View File

@@ -236,6 +236,21 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
return super.recycle();
}
@Override
protected ExecutorService getWorkExecutor() {
return super.getWorkExecutor();
}
@Override
protected void updateNonBlocking(boolean nonBlocking) {
super.updateNonBlocking(nonBlocking);
}
@Override
protected boolean inNonBlocking() {
return super.inNonBlocking();
}
// protected Supplier<ByteBuffer> getBodyBufferSupplier() {
// return bodyBufferSupplier;
// }
@@ -308,10 +323,15 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
finish(v);
}, (t, a) -> {
context.getLogger().log(Level.WARNING, "Servlet occur, force to close channel. request = " + request + ", result is CompletionHandler", (Throwable) t);
finish(500, null);
finishError(t);
});
}
@Override
protected void finishError(Throwable t) {
finish(500, null);
}
/**
* 创建CompletionHandler子类的实例 <br>
*
@@ -620,7 +640,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
public void finishFuture(final Convert convert, Type valueType, CompletionStage future) {
future.whenComplete((v, e) -> {
if (e != null) {
context.getLogger().log(Level.WARNING, "Servlet occur, force to close channel. request = " + request + ", result is CompletionStage", (Throwable) e);
context.getLogger().log(Level.WARNING, "Servlet occur exception. request = " + request + ", result is CompletionStage", (Throwable) e);
if (e instanceof TimeoutException) {
finish504();
} else {
@@ -652,7 +672,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
public void finishJsonFuture(final Convert convert, Type valueType, CompletionStage future) {
future.whenComplete((v, e) -> {
if (e != null) {
context.getLogger().log(Level.WARNING, "Servlet occur, force to close channel. request = " + request + ", result is CompletionStage", (Throwable) e);
context.getLogger().log(Level.WARNING, "Servlet occur exception. request = " + request + ", result is CompletionStage", (Throwable) e);
if (e instanceof TimeoutException) {
finish504();
} else {

View File

@@ -7,10 +7,12 @@ package org.redkale.net.http;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.lang.reflect.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.logging.Level;
import org.redkale.annotation.NonBlocking;
import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES;
import org.redkale.asm.*;
import static org.redkale.asm.Opcodes.*;
@@ -48,6 +50,10 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
//这里不能直接使用HttpServlet会造成死循环初始化HttpServlet
private final Servlet<HttpContext, HttpRequest, HttpResponse> authSuccessServlet = new Servlet<HttpContext, HttpRequest, HttpResponse>() {
{
this._nonBlocking = true;
}
@Override
public void execute(HttpRequest request, HttpResponse response) throws IOException {
ActionEntry entry = request.actionEntry;
@@ -72,12 +78,32 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
}
response.setCacheHandler(entry.cacheHandler);
}
entry.servlet.execute(request, response);
if (response.inNonBlocking()) {
if (entry.nonBlocking) {
entry.servlet.execute(request, response);
} else {
response.updateNonBlocking(false);
response.getWorkExecutor().execute(() -> {
try {
entry.servlet.execute(request, response);
} catch (Throwable t) {
response.getContext().getLogger().log(Level.WARNING, "Servlet occur exception. request = " + request, t);
response.finishError(t);
}
});
}
} else {
entry.servlet.execute(request, response);
}
}
};
//preExecute运行完后执行的Servlet
private final Servlet<HttpContext, HttpRequest, HttpResponse> preSuccessServlet = new Servlet<HttpContext, HttpRequest, HttpResponse>() {
{
this._nonBlocking = true;
}
@Override
public void execute(HttpRequest request, HttpResponse response) throws IOException {
if (request.actionEntry != null) {
@@ -134,7 +160,10 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
if (ws != null && !ws.repair()) {
path = "";
}
HashMap<String, ActionEntry> map = this._actionmap != null ? this._actionmap : loadActionEntry();
//设置整个HttpServlet是否非阻塞式
this._nonBlocking = isNonBlocking(getClass());
//RestServlet会填充_actionmap
HashMap<String, ActionEntry> map = this._actionmap != null ? this._actionmap : loadActionEntry(this._nonBlocking);
this.mappings = new Map.Entry[map.size()];
int i = -1;
for (Map.Entry<String, ActionEntry> en : map.entrySet()) {
@@ -201,6 +230,7 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
*
* @throws IOException IOException
*/
@NonBlocking
protected void preExecute(HttpRequest request, HttpResponse response) throws IOException {
response.nextEvent();
}
@@ -227,17 +257,74 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
*
* @throws IOException IOException
*/
@NonBlocking
protected void authenticate(HttpRequest request, HttpResponse response) throws IOException {
response.nextEvent();
}
@Override
@NonBlocking
public void execute(HttpRequest request, HttpResponse response) throws IOException {
response.thenEvent(preSuccessServlet);
preExecute(request, response);
}
private HashMap<String, ActionEntry> loadActionEntry() {
static Boolean isNonBlocking(Class<?> servletClass) {
Class clz = servletClass;
Boolean preNonBlocking = null;
Boolean authNonBlocking = null;
Boolean exeNonBlocking = null;
do {
if (java.lang.reflect.Modifier.isAbstract(clz.getModifiers())) {
break;
}
RedkaleClassLoader.putReflectionDeclaredMethods(clz.getName());
for (final Method method : clz.getDeclaredMethods()) {
String methodName = method.getName();
//-----------------------------------------------
Class[] paramTypes = method.getParameterTypes();
if (paramTypes.length != 2 || paramTypes[0] != HttpRequest.class || paramTypes[1] != HttpResponse.class) {
continue;
}
//-----------------------------------------------
Class[] exps = method.getExceptionTypes();
if (exps.length > 0 && (exps.length != 1 || exps[0] != IOException.class)) {
continue;
}
//-----------------------------------------------
if ("preExecute".equals(methodName)) {
if (preNonBlocking == null) {
NonBlocking non = method.getAnnotation(NonBlocking.class);
preNonBlocking = non != null && non.value();
}
continue;
}
if ("authenticate".equals(methodName)) {
if (authNonBlocking == null) {
NonBlocking non = method.getAnnotation(NonBlocking.class);
authNonBlocking = non != null && non.value();
}
continue;
}
if ("execute".equals(methodName)) {
if (exeNonBlocking == null) {
NonBlocking non = method.getAnnotation(NonBlocking.class);
exeNonBlocking = non != null && non.value();
}
continue;
}
}
} while ((clz = clz.getSuperclass()) != HttpServlet.class);
//设置整个HttpServlet是否非阻塞式
NonBlocking non = servletClass.getAnnotation(NonBlocking.class);
if (non == null) {
return (preNonBlocking != null && preNonBlocking) && (authNonBlocking != null && authNonBlocking) && (exeNonBlocking != null && exeNonBlocking);
} else {
return non.value();
}
}
private HashMap<String, ActionEntry> loadActionEntry(boolean typeNonBlocking) {
WebServlet module = this.getClass().getAnnotation(WebServlet.class);
final int serviceid = module == null ? 0 : module.moduleid();
final HashMap<String, ActionEntry> map = new HashMap<>();
@@ -248,13 +335,9 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
if (java.lang.reflect.Modifier.isAbstract(clz.getModifiers())) {
break;
}
RedkaleClassLoader.putReflectionPublicMethods(clz.getName());
for (final Method method : clz.getMethods()) {
//-----------------------------------------------
String methodname = method.getName();
if ("service".equals(methodname) || "preExecute".equals(methodname) || "execute".equals(methodname) || "authenticate".equals(methodname)) {
continue;
}
RedkaleClassLoader.putReflectionDeclaredMethods(clz.getName());
for (final Method method : clz.getDeclaredMethods()) {
String methodName = method.getName();
//-----------------------------------------------
Class[] paramTypes = method.getParameterTypes();
if (paramTypes.length != 2 || paramTypes[0] != HttpRequest.class || paramTypes[1] != HttpResponse.class) {
@@ -266,6 +349,14 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
continue;
}
//-----------------------------------------------
if ("preExecute".equals(methodName) || "authenticate".equals(methodName)
|| "execute".equals(methodName) || "service".equals(methodName)) {
continue;
}
if (!Modifier.isPublic(method.getModifiers())) {
continue;
}
//-----------------------------------------------
final HttpMapping mapping = method.getAnnotation(HttpMapping.class);
if (mapping == null) {
@@ -285,7 +376,7 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
throw new HttpException(this.getClass().getSimpleName() + " have two same " + HttpMapping.class.getSimpleName() + "(" + name + ")");
}
nameset.put(name, clz);
map.put(name, new ActionEntry(serviceid, actionid, name, methods, method, createActionServlet(method)));
map.put(name, new ActionEntry(serviceid, actionid, name, methods, method, createActionServlet(typeNonBlocking, method)));
}
} while ((clz = clz.getSuperclass()) != HttpServlet.class);
return map;
@@ -331,6 +422,7 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
oneCache = new CacheEntry(response.getStatus(), response.getContentType(), content);
} : null;
}
this.nonBlocking = servlet._nonBlocking;
}
protected static boolean auth(Method method) {
@@ -353,10 +445,6 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
return method.getAnnotations();
}
boolean isNeedCheck() {
return this.moduleid != 0 || this.actionid != 0;
}
boolean checkMethod(final String reqMethod) {
if (methods.length == 0) {
return true;
@@ -379,6 +467,8 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
final boolean rpconly;
final boolean nonBlocking;
final boolean auth;
final int moduleid;
@@ -398,7 +488,7 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
Annotation[] annotations;
}
private HttpServlet createActionServlet(final Method method) {
private HttpServlet createActionServlet(final boolean typeNonBlocking, final Method method) {
//------------------------------------------------------------------------------
final String supDynName = HttpServlet.class.getName().replace('.', '/');
final String interName = this.getClass().getName().replace('.', '/');
@@ -478,6 +568,8 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
java.lang.reflect.Field field = instance.getClass().getField(factfield);
field.set(instance, this);
RedkaleClassLoader.putReflectionField(newDynName.replace('/', '.'), field);
NonBlocking non = method.getAnnotation(NonBlocking.class);
instance._nonBlocking = typeNonBlocking ? (non == null ? typeNonBlocking : non.value()) : false;
return instance;
} catch (Exception ex) {
throw new HttpException(ex);
@@ -517,6 +609,7 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
if (actionSimpleMappingUrl != null && !Utility.contains(actionSimpleMappingUrl, '*', '{', '[', '(', '|', '^', '$', '+', '?', '\\')) {
this._actionSimpleMappingUrl = actionSimpleMappingUrl;
}
this._nonBlocking = actionEntry.nonBlocking;
}
@Override

View File

@@ -984,17 +984,61 @@ public final class Rest {
if (!HttpServlet.class.isAssignableFrom(baseServletType)) {
throw new RestException(baseServletType + " is not HttpServlet Class on createRestServlet");
}
int mod = baseServletType.getModifiers();
if (!java.lang.reflect.Modifier.isPublic(mod)) {
int parentMod = baseServletType.getModifiers();
if (!java.lang.reflect.Modifier.isPublic(parentMod)) {
throw new RestException(baseServletType + " is not Public Class on createRestServlet");
}
if (java.lang.reflect.Modifier.isAbstract(mod)) {
for (Method m : baseServletType.getDeclaredMethods()) {
if (java.lang.reflect.Modifier.isAbstract(m.getModifiers())) { //@since 2.4.0
throw new RestException(baseServletType + " cannot contains a abstract Method on " + baseServletType);
Boolean parentNon0 = null;
{
NonBlocking snon = serviceType.getAnnotation(NonBlocking.class);
parentNon0 = snon == null ? null : snon.value();
if (HttpServlet.class != baseServletType) {
Boolean preNonBlocking = null;
Boolean authNonBlocking = null;
RedkaleClassLoader.putReflectionDeclaredMethods(baseServletType.getName());
for (Method m : baseServletType.getDeclaredMethods()) {
if (java.lang.reflect.Modifier.isAbstract(parentMod) && java.lang.reflect.Modifier.isAbstract(m.getModifiers())) { //@since 2.4.0
throw new RestException(baseServletType + " cannot contains a abstract Method on " + baseServletType);
}
Class[] paramTypes = m.getParameterTypes();
if (paramTypes.length != 2 || paramTypes[0] != HttpRequest.class || paramTypes[1] != HttpResponse.class) {
continue;
}
//-----------------------------------------------
Class[] exps = m.getExceptionTypes();
if (exps.length > 0 && (exps.length != 1 || exps[0] != IOException.class)) {
continue;
}
//-----------------------------------------------
String methodName = m.getName();
if ("preExecute".equals(methodName)) {
if (preNonBlocking == null) {
NonBlocking non = m.getAnnotation(NonBlocking.class);
preNonBlocking = non != null && non.value();
}
continue;
}
if ("authenticate".equals(methodName)) {
if (authNonBlocking == null) {
NonBlocking non = m.getAnnotation(NonBlocking.class);
authNonBlocking = non != null && non.value();
}
continue;
}
}
if (preNonBlocking != null && !preNonBlocking) {
parentNon0 = false;
} else if (authNonBlocking != null && !authNonBlocking) {
parentNon0 = false;
} else {
NonBlocking bnon = baseServletType.getAnnotation(NonBlocking.class);
if (bnon != null && !bnon.value()) {
parentNon0 = false;
}
}
}
}
final String restInternalName = Type.getInternalName(Rest.class);
final String serviceDesc = Type.getDescriptor(serviceType);
final String webServletDesc = Type.getDescriptor(WebServlet.class);
@@ -1002,6 +1046,7 @@ public final class Rest {
final String reqDesc = Type.getDescriptor(HttpRequest.class);
final String respDesc = Type.getDescriptor(HttpResponse.class);
final String convertDesc = Type.getDescriptor(Convert.class);
final String nonblockDesc = Type.getDescriptor(NonBlocking.class);
final String typeDesc = Type.getDescriptor(java.lang.reflect.Type.class);
final String retDesc = Type.getDescriptor(RetResult.class);
final String httpResultDesc = Type.getDescriptor(HttpResult.class);
@@ -1041,6 +1086,8 @@ public final class Rest {
throw new RestException(serviceType + " is ignore Rest Service Class"); //标记为ignore=true不创建Servlet
}
final boolean serRpcOnly = controller != null && controller.rpconly();
final Boolean parentNonBlocking = parentNon0;
ClassLoader loader = classLoader == null ? Thread.currentThread().getContextClassLoader() : classLoader;
String stname = serviceType.getSimpleName();
if (stname.startsWith("Service")) { //类似ServiceWatchService这样的类保留第一个Service字样
@@ -1125,11 +1172,11 @@ public final class Rest {
paramTypes.add(TypeToken.getGenericType(method.getGenericParameterTypes(), serviceType));
retvalTypes.add(formatRestReturnType(method, serviceType));
if (mappings.length == 0) { //没有Mapping设置一个默认值
MappingEntry entry = new MappingEntry(serRpcOnly, methodIdex, null, bigModuleName, method);
MappingEntry entry = new MappingEntry(serRpcOnly, methodIdex, parentNonBlocking, null, bigModuleName, method);
entrys.add(entry);
} else {
for (RestMapping mapping : mappings) {
MappingEntry entry = new MappingEntry(serRpcOnly, methodIdex, mapping, defModuleName, method);
MappingEntry entry = new MappingEntry(serRpcOnly, methodIdex, parentNonBlocking, mapping, defModuleName, method);
entrys.add(entry);
}
}
@@ -1465,8 +1512,8 @@ public final class Rest {
Method restactMethod = newClazz.getDeclaredMethod("_createRestActionEntry");
restactMethod.setAccessible(true);
Field tmpentrysfield = HttpServlet.class.getDeclaredField("_actionmap");
tmpentrysfield.setAccessible(true);
Field tmpEntrysField = HttpServlet.class.getDeclaredField("_actionmap");
tmpEntrysField.setAccessible(true);
HashMap<String, HttpServlet.ActionEntry> innerEntryMap = (HashMap) restactMethod.invoke(obj);
for (Map.Entry<String, HttpServlet.ActionEntry> en : innerEntryMap.entrySet()) {
Method m = mappingUrlToMethod.get(en.getKey());
@@ -1474,7 +1521,10 @@ public final class Rest {
en.getValue().annotations = HttpServlet.ActionEntry.annotations(m);
}
}
tmpentrysfield.set(obj, innerEntryMap);
tmpEntrysField.set(obj, innerEntryMap);
Field nonblockField = Servlet.class.getDeclaredField("_nonBlocking");
nonblockField.setAccessible(true);
nonblockField.set(obj, parentNonBlocking == null ? true : parentNonBlocking);
return obj;
} catch (ClassNotFoundException e) {
} catch (Throwable e) {
@@ -1587,14 +1637,14 @@ public final class Rest {
paramTypes.add(TypeToken.getGenericType(method.getGenericParameterTypes(), serviceType));
retvalTypes.add(formatRestReturnType(method, serviceType));
if (mappings.length == 0) { //没有Mapping设置一个默认值
MappingEntry entry = new MappingEntry(serRpcOnly, methodidex, null, bigmodulename, method);
MappingEntry entry = new MappingEntry(serRpcOnly, methodidex, parentNonBlocking, null, bigmodulename, method);
if (entrys.contains(entry)) {
throw new RestException(serviceType.getName() + " on " + method.getName() + " 's mapping(" + entry.name + ") is repeat");
}
entrys.add(entry);
} else {
for (RestMapping mapping : mappings) {
MappingEntry entry = new MappingEntry(serRpcOnly, methodidex, mapping, defmodulename, method);
MappingEntry entry = new MappingEntry(serRpcOnly, methodidex, parentNonBlocking, mapping, defmodulename, method);
if (entrys.contains(entry)) {
throw new RestException(serviceType.getName() + " on " + method.getName() + " 's mapping(" + entry.name + ") is repeat");
}
@@ -1667,6 +1717,11 @@ public final class Rest {
classMap.put("repair", repair);
//classMap.put("comment", comment); //不显示太多信息
}
{ //NonBlocking
av0 = cw.visitAnnotation(nonblockDesc, true);
av0.visit("value", true);
av0.visitEnd();
}
{ //内部类
cw.visitInnerClass(actionEntryName, httpServletName, HttpServlet.ActionEntry.class.getSimpleName(), ACC_PROTECTED + ACC_FINAL + ACC_STATIC);
@@ -2102,8 +2157,7 @@ public final class Rest {
Map<String, Object> mappingMap = new LinkedHashMap<>();
java.lang.reflect.Type returnGenericNoFutureType = TypeToken.getGenericType(method.getGenericReturnType(), serviceType);
{ // 设置 Annotation
//设置 HttpMapping
{ //设置 Annotation HttpMapping
boolean reqpath = false;
for (Object[] ps : paramlist) {
if ("#".equals((String) ps[1])) {
@@ -2168,7 +2222,12 @@ public final class Rest {
mappingMap.put("result", returnGenericNoFutureType == returnType ? returnType.getName() : String.valueOf(returnGenericNoFutureType));
entry.mappingurl = url;
}
if (rcs != null && rcs.length > 0) { // 设置 Annotation
{ //设置 Annotation NonBlocking
av0 = mv.visitAnnotation(nonblockDesc, true);
av0.visit("value", entry.nonBlocking);
av0.visitEnd();
}
if (rcs != null && rcs.length > 0) { // 设置 Annotation RestConvert
av0 = mv.visitAnnotation(restConvertsDesc, true);
AnnotationVisitor av1 = av0.visitArray("value");
//设置 RestConvert
@@ -2197,7 +2256,7 @@ public final class Rest {
av1.visitEnd();
av0.visitEnd();
}
if (rcc != null && rcc.length > 0) { // 设置 Annotation
if (rcc != null && rcc.length > 0) { // 设置 Annotation RestConvertCoder
av0 = mv.visitAnnotation(restConvertCodersDesc, true);
AnnotationVisitor av1 = av0.visitArray("value");
//设置 RestConvertCoder
@@ -3184,8 +3243,13 @@ public final class Rest {
cw2.visit(V11, ACC_SUPER, newDynName + "$" + entry.newActionClassName, null, httpServletName, null);
cw2.visitInnerClass(newDynName + "$" + entry.newActionClassName, newDynName, entry.newActionClassName, ACC_PRIVATE + ACC_STATIC);
{ //设置 Annotation NonBlocking
av0 = cw2.visitAnnotation(nonblockDesc, true);
av0.visit("value", entry.nonBlocking);
av0.visitEnd();
}
{
fv = cw2.visitField(0, "servlet", "L" + newDynName + ";", null, null);
fv = cw2.visitField(0, "_parentServlet", "L" + newDynName + ";", null, null);
fv.visitEnd();
}
{
@@ -3194,7 +3258,10 @@ public final class Rest {
mv.visitMethodInsn(INVOKESPECIAL, httpServletName, "<init>", "()V", false);
mv.visitVarInsn(ALOAD, 0);
mv.visitVarInsn(ALOAD, 1);
mv.visitFieldInsn(PUTFIELD, newDynName + "$" + entry.newActionClassName, "servlet", "L" + newDynName + ";");
mv.visitFieldInsn(PUTFIELD, newDynName + "$" + entry.newActionClassName, "_parentServlet", "L" + newDynName + ";");
mv.visitVarInsn(ALOAD, 0);
mv.visitInsn(entry.nonBlocking ? ICONST_1 : ICONST_0);
mv.visitFieldInsn(PUTFIELD, newDynName + "$" + entry.newActionClassName, "_nonBlocking", "Z");
mv.visitInsn(RETURN);
mv.visitMaxs(2, 2);
mv.visitEnd();
@@ -3211,7 +3278,7 @@ public final class Rest {
{
mv = new MethodDebugVisitor(cw2.visitMethod(ACC_PUBLIC, "execute", "(" + reqDesc + respDesc + ")V", null, new String[]{"java/io/IOException"}));
mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, newDynName + "$" + entry.newActionClassName, "servlet", "L" + newDynName + ";");
mv.visitFieldInsn(GETFIELD, newDynName + "$" + entry.newActionClassName, "_parentServlet", "L" + newDynName + ";");
mv.visitVarInsn(ALOAD, 1);
mv.visitVarInsn(ALOAD, 2);
mv.visitMethodInsn(INVOKEVIRTUAL, newDynName, entry.newMethodName, "(" + reqDesc + respDesc + ")V", false);
@@ -3416,8 +3483,8 @@ public final class Rest {
Method restactMethod = newClazz.getDeclaredMethod("_createRestActionEntry");
restactMethod.setAccessible(true);
RedkaleClassLoader.putReflectionMethod(newDynName.replace('/', '.'), restactMethod);
Field tmpentrysfield = HttpServlet.class.getDeclaredField("_actionmap");
tmpentrysfield.setAccessible(true);
Field tmpEntrysField = HttpServlet.class.getDeclaredField("_actionmap");
tmpEntrysField.setAccessible(true);
HashMap<String, HttpServlet.ActionEntry> innerEntryMap = (HashMap) restactMethod.invoke(obj);
for (Map.Entry<String, HttpServlet.ActionEntry> en : innerEntryMap.entrySet()) {
Method m = mappingurlToMethod.get(en.getKey());
@@ -3425,8 +3492,13 @@ public final class Rest {
en.getValue().annotations = HttpServlet.ActionEntry.annotations(m);
}
}
tmpentrysfield.set(obj, innerEntryMap);
RedkaleClassLoader.putReflectionField(HttpServlet.class.getName(), tmpentrysfield);
tmpEntrysField.set(obj, innerEntryMap);
RedkaleClassLoader.putReflectionField(HttpServlet.class.getName(), tmpEntrysField);
Field nonblockField = Servlet.class.getDeclaredField("_nonBlocking");
nonblockField.setAccessible(true);
nonblockField.set(obj, parentNonBlocking == null ? true : parentNonBlocking);
RedkaleClassLoader.putReflectionField(Servlet.class.getName(), nonblockField);
return obj;
} catch (Throwable e) {
throw new RestException(e);
@@ -3557,7 +3629,7 @@ public final class Rest {
return normal ? name : Utility.md5Hex(name);
}
public MappingEntry(final boolean serRpcOnly, int methodIndex, RestMapping mapping, final String defModuleName, Method method) {
public MappingEntry(final boolean serRpcOnly, int methodIndex, Boolean typeNonBlocking, RestMapping mapping, final String defModuleName, Method method) {
if (mapping == null) {
mapping = DEFAULT__MAPPING;
}
@@ -3588,6 +3660,22 @@ public final class Rest {
this.existsPound = pound;
this.newMethodName = formatMappingName(this.name.replace('/', '$').replace('.', '_').replace('-', '_'));
this.newActionClassName = "_Dyn_" + this.newMethodName + "_ActionHttpServlet";
NonBlocking non = method.getAnnotation(NonBlocking.class);
Boolean nonFlag = non == null ? typeNonBlocking : (Boolean) non.value(); //显注在方法优先级大于类
if (nonFlag == null) {
if (CompletionStage.class.isAssignableFrom(method.getReturnType())) {
nonFlag = true;
} else {
for (Parameter mp : method.getParameters()) {
if (CompletionHandler.class.isAssignableFrom(mp.getType())) {
nonFlag = true;
break;
}
}
}
}
this.nonBlocking = nonFlag == null ? false : nonFlag;
}
public final int methodIdx; // _paramtypes 的下标从0开始
@@ -3608,6 +3696,8 @@ public final class Rest {
public final String[] methods;
public final boolean nonBlocking;
public final boolean rpconly;
public final boolean auth;

View File

@@ -148,6 +148,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
@Override
final void preInit(Application application, HttpContext context, AnyValue conf) {
this._nonBlocking = true;
if (this.textConvert == null) {
this.textConvert = jsonConvert;
}
@@ -218,6 +219,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
}
@Override //在IOThread中执行
@NonBlocking
public final void execute(final HttpRequest request, final HttpResponse response) throws IOException {
final boolean debug = logger.isLoggable(Level.FINEST);
if (!request.isWebSocket()) {

View File

@@ -9,7 +9,6 @@ import java.nio.channels.CompletionHandler;
import java.util.concurrent.CompletableFuture;
import java.util.logging.*;
import org.redkale.annotation.ConstructorParameters;
import static org.redkale.asm.Opcodes.*;
import org.redkale.asm.*;
import static org.redkale.asm.Opcodes.*;
import org.redkale.convert.bson.*;

View File

@@ -57,8 +57,7 @@ public class SncpClientRequest extends ClientRequest {
@Override
public String toString() {
return getClass().getSimpleName() + "_" + Objects.hashCode(this) + "{"
+ "header=" + header
+ ", seqid =" + seqid
+ "header=" + header + ", seqid =" + seqid
+ ", body=[" + (bodyContent == null ? -1 : bodyContent.length) + "]"
+ "}";
}

View File

@@ -24,6 +24,10 @@ public class SncpDispatcherServlet extends DispatcherServlet<Uint128, SncpContex
private final byte[] pongBytes = Sncp.getPongBytes();
protected SncpDispatcherServlet() {
super();
}
@Override
public void addServlet(SncpServlet servlet, Object attachment, AnyValue conf, Uint128... mappings) {
sncplock.lock();

View File

@@ -12,7 +12,7 @@ import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.*;
import org.redkale.annotation.Resource;
import org.redkale.annotation.*;
import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES;
import org.redkale.asm.*;
import static org.redkale.asm.Opcodes.*;
@@ -164,6 +164,8 @@ public final class SncpDynServlet extends SncpServlet {
public Creator<SncpAsyncHandler> handlerCreator;
protected boolean nonBlocking;
@Resource
protected BsonConvert convert;
@@ -619,9 +621,14 @@ public final class SncpDynServlet extends SncpServlet {
}
}
}
NonBlocking non = method.getAnnotation(NonBlocking.class);
if (non == null) {
non = service.getClass().getAnnotation(NonBlocking.class);
}
try {
SncpServletAction instance = (SncpServletAction) newClazz.getDeclaredConstructor().newInstance();
instance.method = method;
instance.nonBlocking = non == null ? false : non.value();
java.lang.reflect.Type[] types = new java.lang.reflect.Type[originalParamTypes.length + 1];
types[0] = originalReturnType;
System.arraycopy(originalParamTypes, 0, types, 1, originalParamTypes.length);

View File

@@ -42,11 +42,6 @@ public class SncpHeader {
public SncpHeader() {
}
public SncpHeader(InetSocketAddress clientSncpAddress) {
this.addrBytes = clientSncpAddress == null ? new byte[4] : clientSncpAddress.getAddress().getAddress();
this.addrPort = clientSncpAddress == null ? 0 : clientSncpAddress.getPort();
}
public SncpHeader(InetSocketAddress clientSncpAddress, Uint128 serviceid, Uint128 actionid) {
this.addrBytes = clientSncpAddress == null ? new byte[4] : clientSncpAddress.getAddress().getAddress();
this.addrPort = clientSncpAddress == null ? 0 : clientSncpAddress.getPort();
@@ -145,64 +140,32 @@ public class SncpHeader {
return seqid;
}
public void setSeqid(Long seqid) {
this.seqid = seqid;
}
public Uint128 getServiceid() {
return serviceid;
}
public void setServiceid(Uint128 serviceid) {
this.serviceid = serviceid;
}
public int getServiceVersion() {
return serviceVersion;
}
public void setServiceVersion(int serviceVersion) {
this.serviceVersion = serviceVersion;
}
public Uint128 getActionid() {
return actionid;
}
public void setActionid(Uint128 actionid) {
this.actionid = actionid;
}
public byte[] getAddrBytes() {
return addrBytes;
}
public void setAddrBytes(byte[] addrBytes) {
this.addrBytes = addrBytes;
}
public int getAddrPort() {
return addrPort;
}
public void setAddrPort(int addrPort) {
this.addrPort = addrPort;
}
public int getBodyLength() {
return bodyLength;
}
public void setBodyLength(int bodyLength) {
this.bodyLength = bodyLength;
}
public int getRetcode() {
return retcode;
}
public void setRetcode(int retcode) {
this.retcode = retcode;
}
}

View File

@@ -5,6 +5,7 @@
*/
package org.redkale.net.sncp;
import java.util.concurrent.ExecutorService;
import org.redkale.convert.bson.BsonWriter;
import org.redkale.net.Response;
import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE;
@@ -66,6 +67,26 @@ public class SncpResponse extends Response<SncpContext, SncpRequest> {
return super.recycle();
}
@Override
protected ExecutorService getWorkExecutor() {
return super.getWorkExecutor();
}
@Override
protected void updateNonBlocking(boolean nonBlocking) {
super.updateNonBlocking(nonBlocking);
}
@Override
protected boolean inNonBlocking() {
return super.inNonBlocking();
}
@Override
protected void finishError(Throwable t) {
finish(RETCODE_THROWEXCEPTION, null);
}
public void finish(final int retcode, final BsonWriter out) {
if (out == null) {
final ByteArray buffer = new ByteArray(HEADER_SIZE);

View File

@@ -30,6 +30,7 @@ public abstract class SncpServlet extends Servlet<SncpContext, SncpRequest, Sncp
this.serviceName = serviceResourceName;
this.serviceType = serviceResourceType;
this.service = service;
this._nonBlocking = true;
}
public Service getService() {