Context.getWorkHashExecutor

This commit is contained in:
redkale
2023-02-04 16:10:08 +08:00
parent b9c1fa723c
commit b60e5fef96
6 changed files with 39 additions and 16 deletions

View File

@@ -116,6 +116,7 @@ public class Context {
public void execute(Servlet servlet, Request request, Response response) {
if (workHashExecutor != null) {
response.updateNonBlocking(false);
workHashExecutor.execute(request.getHashid(), () -> {
try {
long cha = System.currentTimeMillis() - request.getCreateTime();
@@ -127,18 +128,19 @@ public class Context {
response.context.logger.log(Level.FINE, "hash execute servlet delay=" + cha + "ms, request=" + request);
}
} catch (Throwable t) {
response.context.logger.log(Level.WARNING, "execute servlet abort, force to close channel ", t);
response.error(t);
response.context.logger.log(Level.WARNING, "Execute servlet occur exception. request = " + request, t);
response.finishError(t);
}
});
} else if (workExecutor != null) {
} else if (workExecutor != null && response.inNonBlocking() && !servlet.isNonBlocking()) {
response.updateNonBlocking(false);
workExecutor.execute(() -> {
try {
Traces.computeCurrTraceid(request.getTraceid());
servlet.execute(request, response);
} catch (Throwable t) {
response.context.logger.log(Level.WARNING, "execute servlet abort, force to close channel ", t);
response.error(t);
response.context.logger.log(Level.WARNING, "Execute servlet occur exception. request = " + request, t);
response.finishError(t);
}
});
} else {
@@ -146,8 +148,8 @@ public class Context {
Traces.computeCurrTraceid(request.getTraceid());
servlet.execute(request, response);
} catch (Throwable t) {
response.context.logger.log(Level.WARNING, "execute servlet abort, force to close channel ", t);
response.error(t);
response.context.logger.log(Level.WARNING, "Execute servlet occur exception. request = " + request, t);
response.finishError(t);
}
}

View File

@@ -34,6 +34,8 @@ public abstract class Response<C extends Context, R extends Request<C>> {
protected final ExecutorService workExecutor;
protected final ThreadHashExecutor workHashExecutor;
protected final R request;
protected final WorkThread thread;
@@ -122,6 +124,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
this.thread = WorkThread.currWorkThread();
this.writeBuffer = context != null ? ByteBuffer.allocateDirect(context.getBufferCapacity()) : null;
this.workExecutor = context == null || context.workExecutor == null ? ForkJoinPool.commonPool() : context.workExecutor;
this.workHashExecutor = context == null ? null : context.workHashExecutor;
}
protected AsyncConnection removeChannel() {
@@ -162,6 +165,10 @@ public abstract class Response<C extends Context, R extends Request<C>> {
return workExecutor;
}
protected ThreadHashExecutor getWorkHashExecutor() {
return workHashExecutor;
}
protected void updateNonBlocking(boolean nonBlocking) {
this.inNonBlocking = nonBlocking;
}

View File

@@ -241,6 +241,11 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
return super.getWorkExecutor();
}
@Override
protected ThreadHashExecutor getWorkHashExecutor() {
return super.getWorkHashExecutor();
}
@Override
protected void updateNonBlocking(boolean nonBlocking) {
super.updateNonBlocking(nonBlocking);

View File

@@ -30,12 +30,12 @@ import org.redkale.util.*;
*/
public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse> {
@Deprecated(since = "2.8.0")
public static final int RET_SERVER_ERROR = 1200_0001;
@Deprecated(since = "2.8.0")
public static final int RET_METHOD_ERROR = 1200_0002;
//@Deprecated(since = "2.8.0")
//public static final int RET_SERVER_ERROR = 1200_0001;
//
//@Deprecated(since = "2.8.0")
//public static final int RET_METHOD_ERROR = 1200_0002;
//
String _actionSimpleMappingUrl; //只给HttpActionServlet使用_actionSimpleMappingUrl不能包含正则表达式比如 /json /createRecord, 不能是 /user/**
String _prefix = ""; //当前HttpServlet的path前缀
@@ -315,12 +315,12 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
}
}
} while ((clz = clz.getSuperclass()) != HttpServlet.class);
//设置整个HttpServlet是否非阻塞式
NonBlocking non = servletClass.getAnnotation(NonBlocking.class);
if (non == null) {
return (preNonBlocking != null && preNonBlocking) && (authNonBlocking != null && authNonBlocking) && (exeNonBlocking != null && exeNonBlocking);
} else {
return non.value();
return non.value() && (preNonBlocking == null || preNonBlocking) && (authNonBlocking == null || authNonBlocking) && (exeNonBlocking == null || exeNonBlocking);
}
}

View File

@@ -9,7 +9,7 @@ import java.util.concurrent.ExecutorService;
import org.redkale.convert.bson.BsonWriter;
import org.redkale.net.Response;
import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE;
import org.redkale.util.ByteArray;
import org.redkale.util.*;
/**
*
@@ -72,6 +72,11 @@ public class SncpResponse extends Response<SncpContext, SncpRequest> {
return super.getWorkExecutor();
}
@Override
protected ThreadHashExecutor getWorkHashExecutor() {
return super.getWorkHashExecutor();
}
@Override
protected void updateNonBlocking(boolean nonBlocking) {
super.updateNonBlocking(nonBlocking);

View File

@@ -7,6 +7,7 @@ package org.redkale.util;
import java.io.*;
import java.lang.reflect.*;
import java.net.*;
import java.nio.ByteBuffer;
import java.util.AbstractMap.SimpleEntry;
import java.util.*;
import java.util.concurrent.*;
@@ -622,6 +623,9 @@ public interface Creator<T> {
arrayCacheMap.put(char.class, t -> new char[t]);
arrayCacheMap.put(float.class, t -> new float[t]);
arrayCacheMap.put(double.class, t -> new double[t]);
arrayCacheMap.put(ByteBuffer.class, t -> new ByteBuffer[t]);
arrayCacheMap.put(SocketAddress.class, t -> new SocketAddress[t]);
arrayCacheMap.put(InetSocketAddress.class, t -> new InetSocketAddress[t]);
}
static class SimpleClassVisitor extends ClassVisitor {