DataSource增加nonBlocking配置

This commit is contained in:
redkale
2024-09-07 12:33:57 +08:00
parent ea01415b5f
commit 2c25cc9f27
7 changed files with 45 additions and 15 deletions

View File

@@ -77,6 +77,9 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
protected int writeTimeoutSeconds;
// 是否非阻塞式, 非阻塞模式下不会在runWork里执行结果回调, 默认值: false
protected boolean nonBlocking;
// ------------------ 可选项 ------------------
// PING心跳的请求数据为null且pingInterval<1表示不需要定时ping
protected Supplier<R> pingRequestSupplier;

View File

@@ -149,11 +149,17 @@ public abstract class ClientCodec<R extends ClientRequest, P extends ClientResul
if (exc == null) {
final Object rs = request.respTransfer == null ? message : request.respTransfer.apply(message);
// workThread不区分IO线程respFuture.complete中使用CompletableFuture.join会一直阻塞
workThread.runWork(() -> {
if (connection.client.nonBlocking) {
Traces.currentTraceid(request.traceid);
respFuture.complete(rs);
Traces.removeTraceid();
});
} else {
workThread.runWork(() -> {
Traces.currentTraceid(request.traceid);
respFuture.complete(rs);
Traces.removeTraceid();
});
}
} else { // 异常
workThread.runWork(() -> {
Traces.currentTraceid(request.traceid);

View File

@@ -46,12 +46,17 @@ public class HttpContext extends Context {
// 延迟解析header
protected final boolean lazyHeader;
// pipeline模式下是否相同header
// deprecated
final boolean sameHeader;
// 不带通配符的mapping url的缓存对象
final Map<ByteArray, String>[] uriPathCaches = new Map[100];
public HttpContext(HttpContextConfig config) {
super(config);
this.lazyHeader = config.lazyHeader;
this.sameHeader = config.sameHeader;
this.remoteAddrHeader = config.remoteAddrHeader;
this.remoteAddrHeaders = config.remoteAddrHeaders;
this.localHeader = config.localHeader;
@@ -233,9 +238,12 @@ public class HttpContext extends Context {
}
public static class HttpContextConfig extends ContextConfig {
// 是否延迟解析http-header
public boolean lazyHeader;
public boolean sameHeader;
public String remoteAddrHeader;
// 用逗号隔开的多个header

View File

@@ -39,9 +39,6 @@ import static org.redkale.util.Utility.isNotEmpty;
*/
public class HttpRequest extends Request<HttpContext> {
private static final boolean PIPELINE_SAME_HEADERS =
Boolean.getBoolean("redkale.http.request.pipeline.sameheaders");
protected static final Serializable CURRUSERID_NIL = new Serializable() {};
protected static final int READ_STATE_ROUTE = 1;
@@ -202,7 +199,8 @@ public class HttpRequest extends Request<HttpContext> {
final HttpRpcAuthenticator rpcAuthenticator;
HttpServlet.ActionEntry actionEntry; // 仅供HttpServlet传递Entry使用
// 仅供HttpServlet传递Entry使用
HttpServlet.ActionEntry actionEntry;
public HttpRequest(HttpContext context) {
this(context, new ByteArray());
@@ -1217,7 +1215,7 @@ public class HttpRequest extends Request<HttpContext> {
@Override
protected HttpRequest copyHeader() {
if (!PIPELINE_SAME_HEADERS || !context.lazyHeader) {
if (!context.sameHeader || !context.lazyHeader) {
return null;
}
HttpRequest req = new HttpRequest(context, this.body);

View File

@@ -101,7 +101,7 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
@Override
protected String startExtLog() {
return context.lazyHeader ? ", lazyHeader: true" : "";
return (context.lazyHeader ? ", lazyHeader: true" : "") + (context.sameHeader ? ", sameHeader: true" : "");
}
public List<HttpServlet> getHttpServlets() {
@@ -399,6 +399,7 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
final List<String[]> defaultSetHeaders = new ArrayList<>();
boolean autoOptions = false;
boolean lazyHeader = false;
boolean sameHeader = false;
int datePeriod = 0;
String plainContentType = null;
String jsonContentType = null;
@@ -409,10 +410,10 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
AnyValue rpcAuthenticatorConfig = null;
if (config != null) {
lazyHeader = config.getBoolValue("lazy", false); // 兼容旧配置
AnyValue reqConf = config.getAnyValue("request");
if (reqConf != null) {
lazyHeader = reqConf.getBoolValue("lazyHeader", lazyHeader);
sameHeader = reqConf.getBoolValue("sameHeader", sameHeader);
rpcAuthenticatorConfig = reqConf.getAnyValue("rpc");
AnyValue raddr = reqConf.getAnyValue("remoteaddr");
remoteAddrHeader = raddr == null ? null : raddr.getValue("value");
@@ -596,6 +597,7 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
contextConfig.remoteAddrHeaders = null;
}
contextConfig.lazyHeader = lazyHeader;
contextConfig.sameHeader = sameHeader;
contextConfig.localHeader = localHeader;
contextConfig.localParameter = localParameter;
contextConfig.rpcAuthenticatorConfig = rpcAuthenticatorConfig;

View File

@@ -21,6 +21,7 @@ import static org.redkale.boot.Application.*;
import org.redkale.convert.ConvertDisabled;
import org.redkale.inject.ResourceEvent;
import org.redkale.net.AsyncGroup;
import org.redkale.net.WorkThread;
import org.redkale.persistence.Table;
import org.redkale.service.Local;
import static org.redkale.source.DataSources.*;
@@ -93,6 +94,9 @@ public abstract class AbstractDataSqlSource extends AbstractDataSource
// 超过多少毫秒视为很慢, 会打印错误级别的日志, 默认值: 3000
protected long slowmsError;
// 是否非阻塞式, 非阻塞模式下不会在runWork里执行结果回调, 默认值: false
protected boolean clientNonBlocking;
// 用于反向LIKE使用
protected String containSQL;
@@ -139,11 +143,6 @@ public abstract class AbstractDataSqlSource extends AbstractDataSource
}
protected void afterResourceChange() {
this.autoDDL = "true"
.equals(readConfProps
.getProperty(DATA_SOURCE_TABLE_AUTODDL, "false")
.trim());
this.containSQL =
readConfProps.getProperty(DATA_SOURCE_CONTAIN_SQLTEMPLATE, "LOCATE(#{keystr}, #{column}) > 0");
this.notContainSQL =
@@ -154,7 +153,9 @@ public abstract class AbstractDataSqlSource extends AbstractDataSource
this.tablecopySQL = readConfProps.getProperty(
DATA_SOURCE_TABLECOPY_SQLTEMPLATE, "CREATE TABLE IF NOT EXISTS #{newtable} LIKE #{oldtable}");
this.autoDDL = "true".equals(readConfProps.getProperty(DATA_SOURCE_TABLE_AUTODDL, "false"));
this.cacheForbidden = "NONE".equalsIgnoreCase(readConfProps.getProperty(DATA_SOURCE_CACHEMODE));
this.clientNonBlocking = "true".equalsIgnoreCase(readConfProps.getProperty(DATA_SOURCE_NON_BLOCKING, "false"));
this.slowmsWarn = Integer.parseInt(
readConfProps.getProperty(DATA_SOURCE_SLOWMS_WARN, "2000").trim());
this.slowmsError = Integer.parseInt(
@@ -375,6 +376,15 @@ public abstract class AbstractDataSqlSource extends AbstractDataSource
}
}
@Override
protected <T> void complete(WorkThread workThread, CompletableFuture<T> future, T value) {
if (clientNonBlocking) {
future.complete(value);
} else {
super.complete(workThread, future, value);
}
}
protected void updateOneResourceChange(Properties newProps, ResourceEvent[] events) {
throw new UnsupportedOperationException("Not supported yet.");
}

View File

@@ -62,7 +62,10 @@ public final class DataSources {
// @since 2.8.0 //超过多少毫秒视为较慢, 会打印警告级别的日志, 默认值: 3000
public static final String DATA_SOURCE_SLOWMS_ERROR = "error-slowms";
// @since 2.8.0 //是否非阻塞式
public static final String DATA_SOURCE_NON_BLOCKING = "non-blocking";
// @since 2.8.0 //sourceExecutor线程数, 默认值: 内核数
public static final String DATA_SOURCE_THREADS = "threads";