优化timeout

This commit is contained in:
redkale
2023-12-06 21:03:43 +08:00
parent 54f9b68b3d
commit b3ea041b67
10 changed files with 57 additions and 19 deletions

View File

@@ -11,6 +11,7 @@ import java.nio.channels.*;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
import java.util.function.Supplier;
import org.redkale.annotation.ResourceType; import org.redkale.annotation.ResourceType;
import org.redkale.util.*; import org.redkale.util.*;
@@ -237,7 +238,8 @@ public class AsyncIOGroup extends AsyncGroup {
return CompletableFuture.failedFuture(e); return CompletableFuture.failedFuture(e);
} }
int seconds = connectTimeoutSeconds > 0 ? connectTimeoutSeconds : 6; int seconds = connectTimeoutSeconds > 0 ? connectTimeoutSeconds : 6;
final CompletableFuture future = Utility.orTimeout(new CompletableFuture(), seconds, TimeUnit.SECONDS); final Supplier<String> timeoutMsg = () -> address + " connect timeout";
final CompletableFuture future = Utility.orTimeout(new CompletableFuture(), timeoutMsg, seconds, TimeUnit.SECONDS);
conn.connect(address, null, new CompletionHandler<Void, Void>() { conn.connect(address, null, new CompletionHandler<Void, Void>() {
@Override @Override
public void completed(Void result, Void attachment) { public void completed(Void result, Void attachment) {
@@ -263,7 +265,7 @@ public class AsyncIOGroup extends AsyncGroup {
future.completeExceptionally(exc); future.completeExceptionally(exc);
} }
}); });
return Utility.orTimeout(future, 30, TimeUnit.SECONDS); return Utility.orTimeout(future, timeoutMsg, 30, TimeUnit.SECONDS);
} }
//创建一个AsyncConnection对象只给测试代码使用 //创建一个AsyncConnection对象只给测试代码使用
@@ -314,7 +316,7 @@ public class AsyncIOGroup extends AsyncGroup {
return CompletableFuture.failedFuture(e); return CompletableFuture.failedFuture(e);
} }
int seconds = connectTimeoutSeconds > 0 ? connectTimeoutSeconds : 6; int seconds = connectTimeoutSeconds > 0 ? connectTimeoutSeconds : 6;
final CompletableFuture future = Utility.orTimeout(new CompletableFuture(), seconds, TimeUnit.SECONDS); final CompletableFuture future = Utility.orTimeout(new CompletableFuture(), () -> address + " connect timeout", seconds, TimeUnit.SECONDS);
conn.connect(address, null, new CompletionHandler<Void, Void>() { conn.connect(address, null, new CompletionHandler<Void, Void>() {
@Override @Override
public void completed(Void result, Void attachment) { public void completed(Void result, Void attachment) {

View File

@@ -242,7 +242,7 @@ public final class Transport {
} }
} }
if (semaphore != null && !semaphore.tryAcquire()) { if (semaphore != null && !semaphore.tryAcquire()) {
final CompletableFuture<AsyncConnection> future = Utility.orTimeout(new CompletableFuture<>(), 10, TimeUnit.SECONDS); final CompletableFuture<AsyncConnection> future = Utility.orTimeout(new CompletableFuture<>(), null, 10, TimeUnit.SECONDS);
future.whenComplete((r, t) -> node.pollQueue.remove(future)); future.whenComplete((r, t) -> node.pollQueue.remove(future));
if (node.pollQueue.offer(future)) { if (node.pollQueue.offer(future)) {
return future; return future;

View File

@@ -306,7 +306,7 @@ public class TransportFactory {
if (node.disabletime < 1) { if (node.disabletime < 1) {
continue; //可用 continue; //可用
} }
CompletableFuture<AsyncConnection> future = Utility.orTimeout(asyncGroup.createTCPClient(node.address), 2, TimeUnit.SECONDS); CompletableFuture<AsyncConnection> future = Utility.orTimeout(asyncGroup.createTCPClient(node.address), null, 2, TimeUnit.SECONDS);
future.whenComplete((r, t) -> { future.whenComplete((r, t) -> {
node.disabletime = t == null ? 0 : System.currentTimeMillis(); node.disabletime = t == null ? 0 : System.currentTimeMillis();
if (r != null) { if (r != null) {

View File

@@ -130,6 +130,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
t.setDaemon(true); t.setDaemon(true);
return t; return t;
}); });
this.timeoutScheduler.setRemoveOnCancelPolicy(true);
int pingSeconds = pingIntervalSeconds(); int pingSeconds = pingIntervalSeconds();
if (pingRequestSupplier != null && pingSeconds > 0 && this.timeoutFuture == null) { if (pingRequestSupplier != null && pingSeconds > 0 && this.timeoutFuture == null) {
this.timeoutFuture = this.timeoutScheduler.scheduleAtFixedRate(() -> { this.timeoutFuture = this.timeoutScheduler.scheduleAtFixedRate(() -> {
@@ -384,7 +385,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
}); });
} else { } else {
int seconds = connectTimeoutSeconds > 0 ? connectTimeoutSeconds : 6; int seconds = connectTimeoutSeconds > 0 ? connectTimeoutSeconds : 6;
CompletableFuture rs = Utility.orTimeout(new CompletableFuture(), seconds, TimeUnit.SECONDS); CompletableFuture rs = Utility.orTimeout(new CompletableFuture(), () -> addr + " connect timeout", seconds, TimeUnit.SECONDS);
waitQueue.offer(rs); waitQueue.offer(rs);
return rs; return rs;
} }

View File

@@ -43,7 +43,7 @@ public class ClientFuture<R extends ClientRequest, T> extends CompletableFuture<
} }
void cancelTimeout() { void cancelTimeout() {
if (timeout != null) { if (timeout != null && !timeout.isDone()) {
timeout.cancel(true); timeout.cancel(true);
} }
} }
@@ -88,7 +88,9 @@ public class ClientFuture<R extends ClientRequest, T> extends CompletableFuture<
} }
workThread.runWork(() -> { workThread.runWork(() -> {
Traces.currentTraceid(traceid); Traces.currentTraceid(traceid);
completeExceptionally(ex); if (!isDone()) {
completeExceptionally(ex);
}
Traces.removeTraceid(); Traces.removeTraceid();
}); });
} }

View File

@@ -484,6 +484,7 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
t.setDaemon(true); t.setDaemon(true);
return t; return t;
}); });
this.dateScheduler.setRemoveOnCancelPolicy(true);
final ObjectRef<byte[]> dateRef = new ObjectRef<>(); final ObjectRef<byte[]> dateRef = new ObjectRef<>();
final DateFormat gmtDateFormat = new SimpleDateFormat("EEE, d MMM y HH:mm:ss z", Locale.ENGLISH); final DateFormat gmtDateFormat = new SimpleDateFormat("EEE, d MMM y HH:mm:ss z", Locale.ENGLISH);
gmtDateFormat.setTimeZone(TimeZone.getTimeZone("GMT")); gmtDateFormat.setTimeZone(TimeZone.getTimeZone("GMT"));

View File

@@ -121,6 +121,7 @@ public class WebSocketEngine {
t.setDaemon(true); t.setDaemon(true);
return t; return t;
}); });
this.scheduler.setRemoveOnCancelPolicy(true);
long delay = (liveInterval - System.currentTimeMillis() / 1000 % liveInterval) + index * 5; long delay = (liveInterval - System.currentTimeMillis() / 1000 % liveInterval) + index * 5;
final int intervalms = liveInterval * 1000; final int intervalms = liveInterval * 1000;
scheduler.scheduleWithFixedDelay(() -> { scheduler.scheduleWithFixedDelay(() -> {

View File

@@ -57,6 +57,7 @@ public class ScheduledFactory {
protected ScheduledFactory(UnaryOperator<String> propertyFunc) { protected ScheduledFactory(UnaryOperator<String> propertyFunc) {
this.propertyFunc = propertyFunc; this.propertyFunc = propertyFunc;
this.scheduler = new ScheduledThreadPoolExecutor(Utility.cpus(), Utility.newThreadFactory("Scheduled-Task-Thread-%s")); this.scheduler = new ScheduledThreadPoolExecutor(Utility.cpus(), Utility.newThreadFactory("Scheduled-Task-Thread-%s"));
this.scheduler.setRemoveOnCancelPolicy(true);
} }
public static ScheduledFactory create(UnaryOperator<String> propertyFunc) { public static ScheduledFactory create(UnaryOperator<String> propertyFunc) {

View File

@@ -163,6 +163,7 @@ public final class EntityCache<T> {
t.setDaemon(true); t.setDaemon(true);
return t; return t;
}); });
this.scheduler.setRemoveOnCancelPolicy(true);
this.scheduler.scheduleAtFixedRate(() -> { this.scheduler.scheduleAtFixedRate(() -> {
try { try {
ConcurrentHashMap newmap2 = new ConcurrentHashMap(); ConcurrentHashMap newmap2 = new ConcurrentHashMap();

View File

@@ -123,10 +123,20 @@ public final class Utility {
private static HttpClient httpClient; private static HttpClient httpClient;
private static final ScheduledThreadPoolExecutor delayer;
//private static final javax.net.ssl.SSLContext DEFAULTSSL_CONTEXT; //private static final javax.net.ssl.SSLContext DEFAULTSSL_CONTEXT;
//private static final javax.net.ssl.HostnameVerifier defaultVerifier = (s, ss) -> true; //private static final javax.net.ssl.HostnameVerifier defaultVerifier = (s, ss) -> true;
static { static {
System.setProperty("jdk.httpclient.allowRestrictedHeaders", "host"); System.setProperty("jdk.httpclient.allowRestrictedHeaders", "host");
(delayer = new ScheduledThreadPoolExecutor(1, r -> {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("RedkaleFutureDelayScheduler");
return t;
})).setRemoveOnCancelPolicy(true);
Unsafe unsafe0 = null; Unsafe unsafe0 = null;
Function<Object, Object> strCharFunction0 = null; Function<Object, Object> strCharFunction0 = null;
Function<Object, Object> sbCharFunction0 = null; Function<Object, Object> sbCharFunction0 = null;
@@ -401,15 +411,21 @@ public final class Utility {
} }
public static ScheduledThreadPoolExecutor newScheduledExecutor(int corePoolSize) { public static ScheduledThreadPoolExecutor newScheduledExecutor(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize, newThreadFactory(null)); ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(corePoolSize, newThreadFactory(null));
scheduler.setRemoveOnCancelPolicy(true);
return scheduler;
} }
public static ScheduledThreadPoolExecutor newScheduledExecutor(int corePoolSize, String name) { public static ScheduledThreadPoolExecutor newScheduledExecutor(int corePoolSize, String name) {
return new ScheduledThreadPoolExecutor(corePoolSize, newThreadFactory(name)); ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(corePoolSize, newThreadFactory(name));
scheduler.setRemoveOnCancelPolicy(true);
return scheduler;
} }
public static ScheduledThreadPoolExecutor newScheduledExecutor(int corePoolSize, String name, RejectedExecutionHandler handler) { public static ScheduledThreadPoolExecutor newScheduledExecutor(int corePoolSize, String name, RejectedExecutionHandler handler) {
return new ScheduledThreadPoolExecutor(corePoolSize, newThreadFactory(name), handler); ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(corePoolSize, newThreadFactory(name), handler);
scheduler.setRemoveOnCancelPolicy(true);
return scheduler;
} }
public static Consumer<Consumer<String>> signalShutdownConsumer() { public static Consumer<Consumer<String>> signalShutdownConsumer() {
@@ -645,16 +661,29 @@ public final class Utility {
} }
} }
public static <T> CompletableFuture<T> orTimeout(CompletableFuture future, Duration timeout) { public static <T> CompletableFuture<T> orTimeout(CompletableFuture future, Supplier<String> errMsgFunc, Duration timeout) {
return future.orTimeout(timeout.toMillis(), TimeUnit.MILLISECONDS); return orTimeout(future, errMsgFunc, timeout.toMillis(), TimeUnit.MILLISECONDS);
}
public static <T> CompletableFuture<T> orTimeout(CompletableFuture future, Supplier<String> errMsgFunc, long timeout, TimeUnit unit) {
if (future == null) {
return future;
}
final ScheduledFuture<?> sf = delayer.schedule(() -> {
if (!future.isDone()) {
String msg = errMsgFunc == null ? null : errMsgFunc.get();
future.completeExceptionally(msg == null ? new TimeoutException(msg) : new TimeoutException());
}
}, timeout, unit);
return future.whenComplete((v, t) -> {
if (t == null && !sf.isDone()) {
sf.cancel(false);
}
});
} }
public static <T> CompletableFuture<T> completeOnTimeout(CompletableFuture future, T value, Duration timeout) { public static <T> CompletableFuture<T> completeOnTimeout(CompletableFuture future, T value, Duration timeout) {
return future.completeOnTimeout(value, timeout.toMillis(), TimeUnit.MILLISECONDS); return completeOnTimeout(future, value, timeout.toMillis(), TimeUnit.MILLISECONDS);
}
public static <T> CompletableFuture<T> orTimeout(CompletableFuture future, long timeout, TimeUnit unit) {
return future.orTimeout(timeout, unit);
} }
public static <T> CompletableFuture<T> completeOnTimeout(CompletableFuture future, T value, long timeout, TimeUnit unit) { public static <T> CompletableFuture<T> completeOnTimeout(CompletableFuture future, T value, long timeout, TimeUnit unit) {