From b3ea041b6735526a24038b2d7957177628b85431 Mon Sep 17 00:00:00 2001 From: redkale Date: Wed, 6 Dec 2023 21:03:43 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96timeout?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/net/AsyncIOGroup.java | 8 +-- src/main/java/org/redkale/net/Transport.java | 2 +- .../org/redkale/net/TransportFactory.java | 2 +- .../java/org/redkale/net/client/Client.java | 3 +- .../org/redkale/net/client/ClientFuture.java | 6 ++- .../java/org/redkale/net/http/HttpServer.java | 1 + .../org/redkale/net/http/WebSocketEngine.java | 1 + .../redkale/scheduling/ScheduledFactory.java | 3 +- .../java/org/redkale/source/EntityCache.java | 1 + src/main/java/org/redkale/util/Utility.java | 49 +++++++++++++++---- 10 files changed, 57 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/redkale/net/AsyncIOGroup.java b/src/main/java/org/redkale/net/AsyncIOGroup.java index 5d566f89b..68a18958d 100644 --- a/src/main/java/org/redkale/net/AsyncIOGroup.java +++ b/src/main/java/org/redkale/net/AsyncIOGroup.java @@ -11,6 +11,7 @@ import java.nio.channels.*; import java.util.Objects; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import java.util.function.Supplier; import org.redkale.annotation.ResourceType; import org.redkale.util.*; @@ -237,7 +238,8 @@ public class AsyncIOGroup extends AsyncGroup { return CompletableFuture.failedFuture(e); } int seconds = connectTimeoutSeconds > 0 ? connectTimeoutSeconds : 6; - final CompletableFuture future = Utility.orTimeout(new CompletableFuture(), seconds, TimeUnit.SECONDS); + final Supplier timeoutMsg = () -> address + " connect timeout"; + final CompletableFuture future = Utility.orTimeout(new CompletableFuture(), timeoutMsg, seconds, TimeUnit.SECONDS); conn.connect(address, null, new CompletionHandler() { @Override public void completed(Void result, Void attachment) { @@ -263,7 +265,7 @@ public class AsyncIOGroup extends AsyncGroup { future.completeExceptionally(exc); } }); - return Utility.orTimeout(future, 30, TimeUnit.SECONDS); + return Utility.orTimeout(future, timeoutMsg, 30, TimeUnit.SECONDS); } //创建一个AsyncConnection对象,只给测试代码使用 @@ -314,7 +316,7 @@ public class AsyncIOGroup extends AsyncGroup { return CompletableFuture.failedFuture(e); } int seconds = connectTimeoutSeconds > 0 ? connectTimeoutSeconds : 6; - final CompletableFuture future = Utility.orTimeout(new CompletableFuture(), seconds, TimeUnit.SECONDS); + final CompletableFuture future = Utility.orTimeout(new CompletableFuture(), () -> address + " connect timeout", seconds, TimeUnit.SECONDS); conn.connect(address, null, new CompletionHandler() { @Override public void completed(Void result, Void attachment) { diff --git a/src/main/java/org/redkale/net/Transport.java b/src/main/java/org/redkale/net/Transport.java index 5682ad9f0..2e9b2d6d8 100644 --- a/src/main/java/org/redkale/net/Transport.java +++ b/src/main/java/org/redkale/net/Transport.java @@ -242,7 +242,7 @@ public final class Transport { } } if (semaphore != null && !semaphore.tryAcquire()) { - final CompletableFuture future = Utility.orTimeout(new CompletableFuture<>(), 10, TimeUnit.SECONDS); + final CompletableFuture future = Utility.orTimeout(new CompletableFuture<>(), null, 10, TimeUnit.SECONDS); future.whenComplete((r, t) -> node.pollQueue.remove(future)); if (node.pollQueue.offer(future)) { return future; diff --git a/src/main/java/org/redkale/net/TransportFactory.java b/src/main/java/org/redkale/net/TransportFactory.java index 19b130b0a..940920c03 100644 --- a/src/main/java/org/redkale/net/TransportFactory.java +++ b/src/main/java/org/redkale/net/TransportFactory.java @@ -306,7 +306,7 @@ public class TransportFactory { if (node.disabletime < 1) { continue; //可用 } - CompletableFuture future = Utility.orTimeout(asyncGroup.createTCPClient(node.address), 2, TimeUnit.SECONDS); + CompletableFuture future = Utility.orTimeout(asyncGroup.createTCPClient(node.address), null, 2, TimeUnit.SECONDS); future.whenComplete((r, t) -> { node.disabletime = t == null ? 0 : System.currentTimeMillis(); if (r != null) { diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index 9997d525d..700ce7620 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -130,6 +130,7 @@ public abstract class Client, R extends ClientR t.setDaemon(true); return t; }); + this.timeoutScheduler.setRemoveOnCancelPolicy(true); int pingSeconds = pingIntervalSeconds(); if (pingRequestSupplier != null && pingSeconds > 0 && this.timeoutFuture == null) { this.timeoutFuture = this.timeoutScheduler.scheduleAtFixedRate(() -> { @@ -384,7 +385,7 @@ public abstract class Client, R extends ClientR }); } else { int seconds = connectTimeoutSeconds > 0 ? connectTimeoutSeconds : 6; - CompletableFuture rs = Utility.orTimeout(new CompletableFuture(), seconds, TimeUnit.SECONDS); + CompletableFuture rs = Utility.orTimeout(new CompletableFuture(), () -> addr + " connect timeout", seconds, TimeUnit.SECONDS); waitQueue.offer(rs); return rs; } diff --git a/src/main/java/org/redkale/net/client/ClientFuture.java b/src/main/java/org/redkale/net/client/ClientFuture.java index b9182433f..f2ccf410c 100644 --- a/src/main/java/org/redkale/net/client/ClientFuture.java +++ b/src/main/java/org/redkale/net/client/ClientFuture.java @@ -43,7 +43,7 @@ public class ClientFuture extends CompletableFuture< } void cancelTimeout() { - if (timeout != null) { + if (timeout != null && !timeout.isDone()) { timeout.cancel(true); } } @@ -88,7 +88,9 @@ public class ClientFuture extends CompletableFuture< } workThread.runWork(() -> { Traces.currentTraceid(traceid); - completeExceptionally(ex); + if (!isDone()) { + completeExceptionally(ex); + } Traces.removeTraceid(); }); } diff --git a/src/main/java/org/redkale/net/http/HttpServer.java b/src/main/java/org/redkale/net/http/HttpServer.java index ea1052c91..a798a69c1 100644 --- a/src/main/java/org/redkale/net/http/HttpServer.java +++ b/src/main/java/org/redkale/net/http/HttpServer.java @@ -484,6 +484,7 @@ public class HttpServer extends Server dateRef = new ObjectRef<>(); final DateFormat gmtDateFormat = new SimpleDateFormat("EEE, d MMM y HH:mm:ss z", Locale.ENGLISH); gmtDateFormat.setTimeZone(TimeZone.getTimeZone("GMT")); diff --git a/src/main/java/org/redkale/net/http/WebSocketEngine.java b/src/main/java/org/redkale/net/http/WebSocketEngine.java index 54b525e1e..93541f508 100644 --- a/src/main/java/org/redkale/net/http/WebSocketEngine.java +++ b/src/main/java/org/redkale/net/http/WebSocketEngine.java @@ -121,6 +121,7 @@ public class WebSocketEngine { t.setDaemon(true); return t; }); + this.scheduler.setRemoveOnCancelPolicy(true); long delay = (liveInterval - System.currentTimeMillis() / 1000 % liveInterval) + index * 5; final int intervalms = liveInterval * 1000; scheduler.scheduleWithFixedDelay(() -> { diff --git a/src/main/java/org/redkale/scheduling/ScheduledFactory.java b/src/main/java/org/redkale/scheduling/ScheduledFactory.java index e3e89ebf3..bef09acf9 100644 --- a/src/main/java/org/redkale/scheduling/ScheduledFactory.java +++ b/src/main/java/org/redkale/scheduling/ScheduledFactory.java @@ -56,7 +56,8 @@ public class ScheduledFactory { protected ScheduledFactory(UnaryOperator propertyFunc) { this.propertyFunc = propertyFunc; - this.scheduler = new ScheduledThreadPoolExecutor(Utility.cpus(), Utility.newThreadFactory("Scheduled-Task-Thread-%s")); + this.scheduler = new ScheduledThreadPoolExecutor(Utility.cpus(), Utility.newThreadFactory("Scheduled-Task-Thread-%s")); + this.scheduler.setRemoveOnCancelPolicy(true); } public static ScheduledFactory create(UnaryOperator propertyFunc) { diff --git a/src/main/java/org/redkale/source/EntityCache.java b/src/main/java/org/redkale/source/EntityCache.java index 00be59571..3c63171c7 100644 --- a/src/main/java/org/redkale/source/EntityCache.java +++ b/src/main/java/org/redkale/source/EntityCache.java @@ -163,6 +163,7 @@ public final class EntityCache { t.setDaemon(true); return t; }); + this.scheduler.setRemoveOnCancelPolicy(true); this.scheduler.scheduleAtFixedRate(() -> { try { ConcurrentHashMap newmap2 = new ConcurrentHashMap(); diff --git a/src/main/java/org/redkale/util/Utility.java b/src/main/java/org/redkale/util/Utility.java index a25934975..5035914a7 100644 --- a/src/main/java/org/redkale/util/Utility.java +++ b/src/main/java/org/redkale/util/Utility.java @@ -123,10 +123,20 @@ public final class Utility { private static HttpClient httpClient; + private static final ScheduledThreadPoolExecutor delayer; + //private static final javax.net.ssl.SSLContext DEFAULTSSL_CONTEXT; //private static final javax.net.ssl.HostnameVerifier defaultVerifier = (s, ss) -> true; static { System.setProperty("jdk.httpclient.allowRestrictedHeaders", "host"); + + (delayer = new ScheduledThreadPoolExecutor(1, r -> { + Thread t = new Thread(r); + t.setDaemon(true); + t.setName("RedkaleFutureDelayScheduler"); + return t; + })).setRemoveOnCancelPolicy(true); + Unsafe unsafe0 = null; Function strCharFunction0 = null; Function sbCharFunction0 = null; @@ -401,15 +411,21 @@ public final class Utility { } public static ScheduledThreadPoolExecutor newScheduledExecutor(int corePoolSize) { - return new ScheduledThreadPoolExecutor(corePoolSize, newThreadFactory(null)); + ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(corePoolSize, newThreadFactory(null)); + scheduler.setRemoveOnCancelPolicy(true); + return scheduler; } public static ScheduledThreadPoolExecutor newScheduledExecutor(int corePoolSize, String name) { - return new ScheduledThreadPoolExecutor(corePoolSize, newThreadFactory(name)); + ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(corePoolSize, newThreadFactory(name)); + scheduler.setRemoveOnCancelPolicy(true); + return scheduler; } public static ScheduledThreadPoolExecutor newScheduledExecutor(int corePoolSize, String name, RejectedExecutionHandler handler) { - return new ScheduledThreadPoolExecutor(corePoolSize, newThreadFactory(name), handler); + ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(corePoolSize, newThreadFactory(name), handler); + scheduler.setRemoveOnCancelPolicy(true); + return scheduler; } public static Consumer> signalShutdownConsumer() { @@ -645,16 +661,29 @@ public final class Utility { } } - public static CompletableFuture orTimeout(CompletableFuture future, Duration timeout) { - return future.orTimeout(timeout.toMillis(), TimeUnit.MILLISECONDS); + public static CompletableFuture orTimeout(CompletableFuture future, Supplier errMsgFunc, Duration timeout) { + return orTimeout(future, errMsgFunc, timeout.toMillis(), TimeUnit.MILLISECONDS); + } + + public static CompletableFuture orTimeout(CompletableFuture future, Supplier errMsgFunc, long timeout, TimeUnit unit) { + if (future == null) { + return future; + } + final ScheduledFuture sf = delayer.schedule(() -> { + if (!future.isDone()) { + String msg = errMsgFunc == null ? null : errMsgFunc.get(); + future.completeExceptionally(msg == null ? new TimeoutException(msg) : new TimeoutException()); + } + }, timeout, unit); + return future.whenComplete((v, t) -> { + if (t == null && !sf.isDone()) { + sf.cancel(false); + } + }); } public static CompletableFuture completeOnTimeout(CompletableFuture future, T value, Duration timeout) { - return future.completeOnTimeout(value, timeout.toMillis(), TimeUnit.MILLISECONDS); - } - - public static CompletableFuture orTimeout(CompletableFuture future, long timeout, TimeUnit unit) { - return future.orTimeout(timeout, unit); + return completeOnTimeout(future, value, timeout.toMillis(), TimeUnit.MILLISECONDS); } public static CompletableFuture completeOnTimeout(CompletableFuture future, T value, long timeout, TimeUnit unit) {