diff --git a/src/main/java/org/redkale/cluster/HttpClusterRpcClient.java b/src/main/java/org/redkale/cluster/HttpClusterRpcClient.java index 457ec124c..07c17b354 100644 --- a/src/main/java/org/redkale/cluster/HttpClusterRpcClient.java +++ b/src/main/java/org/redkale/cluster/HttpClusterRpcClient.java @@ -13,6 +13,8 @@ import org.redkale.boot.Application; import org.redkale.net.http.*; import org.redkale.util.Traces; import org.redkale.util.Utility; +import static org.redkale.util.Utility.isEmpty; +import static org.redkale.util.Utility.isNotEmpty; /** * 没有配置MQ的情况下依赖ClusterAgent实现的默认HttpMessageClient实例 @@ -74,8 +76,10 @@ public class HttpClusterRpcClient extends HttpRpcClient { } private CompletableFuture> httpAsync(boolean produce, Serializable userid, HttpSimpleRequest req) { - if (Utility.isEmpty(req.getTraceid())) { + if (isEmpty(req.getTraceid())) { req.setTraceid(Traces.currentTraceid()); + } else { + Traces.computeIfAbsent(req.getTraceid()); } String module = req.getRequestURI(); module = module.substring(1); //去掉/ @@ -87,7 +91,10 @@ public class HttpClusterRpcClient extends HttpRpcClient { logger.log(Level.FINEST, "httpAsync.queryHttpAddress: module=" + localModule + ", resname=" + resname); } return clusterAgent.queryHttpAddress("http", module, resname).thenCompose(addrs -> { - if (addrs == null || addrs.isEmpty()) { + if (isNotEmpty(req.getTraceid())) { + Traces.computeIfAbsent(req.getTraceid()); + } + if (isEmpty(addrs)) { if (logger.isLoggable(Level.WARNING)) { logger.log(Level.WARNING, "httpAsync." + (produce ? "produceMessage" : "sendMessage") + " failed, module=" + localModule + ", resname=" + resname + ", address is empty"); } @@ -98,7 +105,7 @@ public class HttpClusterRpcClient extends HttpRpcClient { if (req.isRpc()) { clientHeaders.put(Rest.REST_HEADER_RPC, "true"); } - if (Utility.isNotEmpty(req.getTraceid())) { + if (isNotEmpty(req.getTraceid())) { clientHeaders.put(Rest.REST_HEADER_TRACEID, req.getTraceid()); } if (req.isFrombody()) { @@ -171,6 +178,9 @@ public class HttpClusterRpcClient extends HttpRpcClient { } return httpClient.sendAsync(builder.build(), java.net.http.HttpResponse.BodyHandlers.ofByteArray()) .thenApply((java.net.http.HttpResponse resp) -> { + if (isNotEmpty(req.getTraceid())) { + Traces.computeIfAbsent(req.getTraceid()); + } final int rs = resp.statusCode(); if (rs != 200) { return new HttpResult().status(rs); diff --git a/src/main/java/org/redkale/cluster/HttpLocalRpcClient.java b/src/main/java/org/redkale/cluster/HttpLocalRpcClient.java index 46eb69219..4c634286d 100644 --- a/src/main/java/org/redkale/cluster/HttpLocalRpcClient.java +++ b/src/main/java/org/redkale/cluster/HttpLocalRpcClient.java @@ -18,7 +18,8 @@ import org.redkale.convert.json.JsonConvert; import org.redkale.net.http.*; import org.redkale.util.RedkaleException; import org.redkale.util.Traces; -import org.redkale.util.Utility; +import static org.redkale.util.Utility.isEmpty; +import static org.redkale.util.Utility.isNotEmpty; /** * 没有配置MQ且也没有ClusterAgent的情况下实现的默认HttpMessageClient实例 @@ -101,7 +102,7 @@ public class HttpLocalRpcClient extends HttpRpcClient { @Override public CompletableFuture sendMessage(Serializable userid, String groupid, HttpSimpleRequest request, Type type) { - if (Utility.isEmpty(request.getTraceid())) { + if (isEmpty(request.getTraceid())) { request.setTraceid(Traces.currentTraceid()); } String topic = generateHttpReqTopic(request, request.getPath()); @@ -126,7 +127,7 @@ public class HttpLocalRpcClient extends HttpRpcClient { @Override public CompletableFuture> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) { - if (Utility.isEmpty(request.getTraceid())) { + if (isEmpty(request.getTraceid())) { request.setTraceid(Traces.currentTraceid()); } HttpServlet servlet = findHttpServlet(topic); @@ -145,6 +146,9 @@ public class HttpLocalRpcClient extends HttpRpcClient { future.completeExceptionally(e); } return future.thenApply(rs -> { + if (isNotEmpty(request.getTraceid())) { + Traces.computeIfAbsent(request.getTraceid()); + } if (rs == null) { return new HttpResult(); } diff --git a/src/main/java/org/redkale/net/WorkThread.java b/src/main/java/org/redkale/net/WorkThread.java index 6713de812..64cc7abd8 100644 --- a/src/main/java/org/redkale/net/WorkThread.java +++ b/src/main/java/org/redkale/net/WorkThread.java @@ -80,7 +80,7 @@ public class WorkThread extends Thread implements Executor { @Override public void execute(Runnable command) { if (workExecutor == null) { - command.run(); + Utility.execute(command); } else { workExecutor.execute(command); } @@ -89,7 +89,7 @@ public class WorkThread extends Thread implements Executor { public void execute(Runnable... commands) { if (workExecutor == null) { for (Runnable command : commands) { - command.run(); + Utility.execute(command); } } else { for (Runnable command : commands) { @@ -104,7 +104,7 @@ public class WorkThread extends Thread implements Executor { } if (workExecutor == null) { for (Runnable command : commands) { - command.run(); + Utility.execute(command); } } else { for (Runnable command : commands) { @@ -116,7 +116,7 @@ public class WorkThread extends Thread implements Executor { //与execute的区别在于子类AsyncIOThread中execute会被重载,确保在IO线程中执行 public final void runWork(Runnable command) { if (workExecutor == null) { - command.run(); + Utility.execute(command); } else { workExecutor.execute(command); } diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index 7f9b01920..b3db2b951 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -219,6 +219,7 @@ public abstract class Client, R extends ClientR } public final CompletableFuture

sendAsync(R request) { + request.traceid = Traces.computeIfAbsent(request.traceid); if (request.workThread == null) { request.workThread = WorkThread.currentWorkThread(); } @@ -226,6 +227,7 @@ public abstract class Client, R extends ClientR } public final CompletableFuture sendAsync(R request, Function respTransfer) { + request.traceid = Traces.computeIfAbsent(request.traceid); if (request.workThread == null) { request.workThread = WorkThread.currentWorkThread(); } @@ -233,6 +235,7 @@ public abstract class Client, R extends ClientR } public final CompletableFuture

sendAsync(SocketAddress addr, R request) { + request.traceid = Traces.computeIfAbsent(request.traceid); if (request.workThread == null) { request.workThread = WorkThread.currentWorkThread(); } @@ -240,6 +243,7 @@ public abstract class Client, R extends ClientR } public final CompletableFuture sendAsync(SocketAddress addr, R request, Function respTransfer) { + request.traceid = Traces.computeIfAbsent(request.traceid); if (request.workThread == null) { request.workThread = WorkThread.currentWorkThread(); } @@ -255,6 +259,7 @@ public abstract class Client, R extends ClientR } public final CompletableFuture> sendAsync(R[] requests) { + requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid); for (R request : requests) { if (request.workThread == null) { request.workThread = WorkThread.currentWorkThread(); @@ -264,6 +269,7 @@ public abstract class Client, R extends ClientR } public final CompletableFuture> sendAsync(R[] requests, Function respTransfer) { + requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid); for (R request : requests) { if (request.workThread == null) { request.workThread = WorkThread.currentWorkThread(); @@ -273,6 +279,7 @@ public abstract class Client, R extends ClientR } public final CompletableFuture> sendAsync(SocketAddress addr, R[] requests) { + requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid); for (R request : requests) { if (request.workThread == null) { request.workThread = WorkThread.currentWorkThread(); @@ -282,6 +289,7 @@ public abstract class Client, R extends ClientR } public final CompletableFuture> sendAsync(SocketAddress addr, R[] requests, Function respTransfer) { + requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid); for (R request : requests) { if (request.workThread == null) { request.workThread = WorkThread.currentWorkThread(); @@ -307,6 +315,7 @@ public abstract class Client, R extends ClientR } private CompletableFuture connect(final boolean pool) { + final String traceid = Traces.currentTraceid(); final int size = this.connArray.length; WorkThread workThread = WorkThread.currentWorkThread(); final int connIndex = (workThread != null && workThread.threads() == size) ? workThread.index() : (int) Math.abs(connIndexSeq.getAndIncrement()) % size; @@ -314,12 +323,14 @@ public abstract class Client, R extends ClientR if (pool && cc != null && cc.isOpen()) { return CompletableFuture.completedFuture(cc); } + final Queue> waitQueue = this.connAcquireWaitings[connIndex]; if (!pool || this.connOpenStates[connIndex].compareAndSet(false, true)) { CompletableFuture future = group.createClient(tcp, this.address.randomAddress(), readTimeoutSeconds, writeTimeoutSeconds) .thenApply(c -> (C) createClientConnection(connIndex, c).setMaxPipelines(maxPipelines)); R virtualReq = createVirtualRequestAfterConnect(); if (virtualReq != null) { + virtualReq.traceid = traceid; future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn)); } else { future = future.thenApply(conn -> { @@ -331,6 +342,7 @@ public abstract class Client, R extends ClientR future = future.thenCompose(authenticate); } return future.thenApply(c -> { + Traces.computeIfAbsent(traceid); c.setAuthenticated(true); if (pool) { this.connArray[connIndex] = c; @@ -339,7 +351,10 @@ public abstract class Client, R extends ClientR if (!f.isDone()) { if (workThread != null) { CompletableFuture fs = f; - workThread.execute(() -> fs.complete(c)); + workThread.execute(() -> { + Traces.computeIfAbsent(traceid); + fs.complete(c); + }); } else { f.complete(c); } @@ -353,7 +368,7 @@ public abstract class Client, R extends ClientR } }); } else { - CompletableFuture rs = Utility.orTimeout(new CompletableFuture(), 6, TimeUnit.SECONDS); + CompletableFuture rs = Utility.orTimeout(new CompletableFuture(), readTimeoutSeconds, TimeUnit.SECONDS); waitQueue.offer(rs); return rs; } @@ -371,6 +386,7 @@ public abstract class Client, R extends ClientR //指定地址获取连接 private CompletableFuture connect(final boolean pool, final SocketAddress addr) { + final String traceid = Traces.currentTraceid(); if (addr == null) { return connect(); } @@ -386,6 +402,7 @@ public abstract class Client, R extends ClientR .thenApply(c -> (C) createClientConnection(-1, c).setMaxPipelines(maxPipelines)); R virtualReq = createVirtualRequestAfterConnect(); if (virtualReq != null) { + virtualReq.traceid = traceid; future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn)); } else { future = future.thenApply(conn -> { @@ -401,11 +418,15 @@ public abstract class Client, R extends ClientR if (pool) { entry.connection = c; CompletableFuture f; + Traces.computeIfAbsent(traceid); while ((f = waitQueue.poll()) != null) { if (!f.isDone()) { if (workThread != null) { CompletableFuture fs = f; - workThread.execute(() -> fs.complete(c)); + workThread.execute(() -> { + Traces.computeIfAbsent(traceid); + fs.complete(c); + }); } else { f.complete(c); } diff --git a/src/main/java/org/redkale/net/client/ClientAddress.java b/src/main/java/org/redkale/net/client/ClientAddress.java index 27815f103..ee502c655 100644 --- a/src/main/java/org/redkale/net/client/ClientAddress.java +++ b/src/main/java/org/redkale/net/client/ClientAddress.java @@ -68,7 +68,11 @@ public class ClientAddress implements java.io.Serializable { this.addresses = createAddressArray(this.weights); addrs = this.addresses; } - addr = addrs[ThreadLocalRandom.current().nextInt(addrs.length)]; + if (addrs.length == 1) { + addr = addrs[0]; + } else { + addr = addrs[ThreadLocalRandom.current().nextInt(addrs.length)]; + } } return addr; } diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 187ca470d..0c6b0b254 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -14,6 +14,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.concurrent.locks.ReentrantLock; import java.util.function.*; +import java.util.logging.Level; import org.redkale.annotation.*; import org.redkale.net.*; import org.redkale.util.*; @@ -133,6 +134,7 @@ public abstract class ClientConnection implements Co } else { sendRequestInLocking(request, respFuture); } + client.logger.log(Level.INFO, channel + ", " + request.getTraceid()+ " 发送完请求: " + request); } finally { writeLock.unlock(); } @@ -319,6 +321,7 @@ public abstract class ClientConnection implements Co public void dispose(Throwable exc) { channel.offerWriteBuffer(writeBuffer); channel.dispose(); + System.out.println(Thread.currentThread().getName() + ": " + channel + ", 被关闭了"); Throwable e = exc == null ? new ClosedChannelException() : exc; CompletableFuture f; respWaitingCounter.reset(); diff --git a/src/main/java/org/redkale/net/client/ClientFuture.java b/src/main/java/org/redkale/net/client/ClientFuture.java index 3761cbe27..fc00ee538 100644 --- a/src/main/java/org/redkale/net/client/ClientFuture.java +++ b/src/main/java/org/redkale/net/client/ClientFuture.java @@ -9,6 +9,8 @@ import java.util.Objects; import java.util.concurrent.*; import org.redkale.annotation.Nonnull; import org.redkale.net.*; +import org.redkale.util.Traces; +import org.redkale.util.Utility; /** * @@ -72,6 +74,7 @@ public class ClientFuture extends CompletableFuture< } private void runTimeout() { + String traceid = request != null ? request.getTraceid() : null; conn.removeRespFuture(request.getRequestid(), this); TimeoutException ex = new TimeoutException("client-request: " + request); WorkThread workThread = null; @@ -82,7 +85,13 @@ public class ClientFuture extends CompletableFuture< if (workThread == null || workThread.getWorkExecutor() == null) { workThread = conn.getChannel().getReadIOThread(); } - workThread.runWork(() -> completeExceptionally(ex)); + workThread.runWork(() -> { + if (Utility.isNotEmpty(traceid)) { + Traces.computeIfAbsent(traceid); + } + completeExceptionally(ex); + Traces.removeTraceid(); + }); } @Override diff --git a/src/main/java/org/redkale/net/http/HttpResponse.java b/src/main/java/org/redkale/net/http/HttpResponse.java index 79b64f901..6b2daad77 100644 --- a/src/main/java/org/redkale/net/http/HttpResponse.java +++ b/src/main/java/org/redkale/net/http/HttpResponse.java @@ -585,8 +585,9 @@ public class HttpResponse extends Response { */ public void finishFuture(final Convert convert, Type valueType, CompletionStage future) { future.whenComplete((v, e) -> { + Traces.computeIfAbsent(request.getTraceid()); if (e != null) { - context.getLogger().log(Level.WARNING, "Servlet occur exception. request = " + request + ", result is CompletionStage", e instanceof TimeoutException ? e.getClass() : e); + context.getLogger().log(Level.WARNING, "Servlet occur exception. request = " + request + ", result is CompletionStage", (Throwable) e); if (e instanceof TimeoutException) { finish504(); } else { @@ -595,6 +596,7 @@ public class HttpResponse extends Response { return; } finish(convert, valueType, v); + Traces.removeTraceid(); }); } @@ -617,8 +619,9 @@ public class HttpResponse extends Response { */ public void finishJsonFuture(final Convert convert, Type valueType, CompletionStage future) { future.whenComplete((v, e) -> { + Traces.computeIfAbsent(request.getTraceid()); if (e != null) { - context.getLogger().log(Level.WARNING, "Servlet occur exception. request = " + request + ", result is CompletionStage", e instanceof TimeoutException ? e.getClass() : e); + context.getLogger().log(Level.WARNING, "Servlet occur exception. request = " + request + ", result is CompletionStage", (Throwable) e); if (e instanceof TimeoutException) { finish504(); } else { @@ -627,6 +630,7 @@ public class HttpResponse extends Response { return; } finishJson(convert, valueType, v); + Traces.removeTraceid(); }); } diff --git a/src/main/java/org/redkale/net/http/HttpSimpleClient.java b/src/main/java/org/redkale/net/http/HttpSimpleClient.java index 5bb5d2ebf..9b2a6b9bc 100644 --- a/src/main/java/org/redkale/net/http/HttpSimpleClient.java +++ b/src/main/java/org/redkale/net/http/HttpSimpleClient.java @@ -17,6 +17,7 @@ import org.redkale.convert.json.JsonConvert; import org.redkale.net.*; import static org.redkale.net.http.HttpRequest.parseHeaderName; import org.redkale.util.*; +import static org.redkale.util.Utility.isNotEmpty; /** * 简单的HttpClient实现, 存在以下情况不能使用此类:
@@ -201,10 +202,14 @@ public class HttpSimpleClient { } public CompletableFuture> sendAsync(String method, String url, Map headers, byte[] body, Convert convert, Type valueType) { + final String traceid = Traces.currentTraceid(); final URI uri = URI.create(url); final String host = uri.getHost(); final int port = uri.getPort() > 0 ? uri.getPort() : (url.startsWith("https:") ? 443 : 80); return createConnection(host, port).thenCompose(conn -> { + if (isNotEmpty(traceid)) { + Traces.computeIfAbsent(traceid); + } final ByteArray array = new ByteArray(); int urlpos = url.indexOf("/", url.indexOf("//") + 3); array.put((method.toUpperCase() + " " + (urlpos > 0 ? url.substring(urlpos) : "/") + " HTTP/1.1\r\n" @@ -229,11 +234,14 @@ public class HttpSimpleClient { conn.write(array, new CompletionHandler() { @Override public void completed(Integer result, Void attachment) { - conn.read(new ClientReadCompletionHandler(conn, array.clear(), convert, valueType, future)); + conn.read(new ClientReadCompletionHandler(conn, traceid, array.clear(), convert, valueType, future)); } @Override public void failed(Throwable exc, Void attachment) { + if (isNotEmpty(traceid)) { + Traces.computeIfAbsent(traceid); + } conn.dispose(); future.completeExceptionally(exc); } @@ -299,6 +307,8 @@ public class HttpSimpleClient { protected final ByteArray array; + protected final String traceid; + protected final CompletableFuture> future; protected Convert convert; @@ -311,8 +321,9 @@ public class HttpSimpleClient { protected int contentLength = -1; - public ClientReadCompletionHandler(HttpConnection conn, ByteArray array, Convert convert, Type valueType, CompletableFuture> future) { + public ClientReadCompletionHandler(HttpConnection conn, String traceid, ByteArray array, Convert convert, Type valueType, CompletableFuture> future) { this.conn = conn; + this.traceid = traceid; this.array = array; this.convert = convert; this.valueType = valueType; @@ -321,6 +332,9 @@ public class HttpSimpleClient { @Override public void completed(Integer count, ByteBuffer buffer) { + if (isNotEmpty(traceid)) { + Traces.computeIfAbsent(traceid); + } buffer.flip(); if (this.readState == READ_STATE_ROUTE) { if (this.responseResult == null) { diff --git a/src/main/java/org/redkale/net/http/WebSocketServlet.java b/src/main/java/org/redkale/net/http/WebSocketServlet.java index d410d589b..cb8e0b71e 100644 --- a/src/main/java/org/redkale/net/http/WebSocketServlet.java +++ b/src/main/java/org/redkale/net/http/WebSocketServlet.java @@ -286,6 +286,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl @Override public void completed(Integer result, Void attachment) { + Traces.computeIfAbsent(request.getTraceid()); webSocket._readHandler = new WebSocketReadHandler(response.getContext(), webSocket, byteArrayPool, restMessageConsumer); webSocket._writeHandler = new WebSocketWriteHandler(response.getContext(), webSocket, byteArrayPool); @@ -299,6 +300,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl return; } userFuture.whenComplete((userid, ex2) -> { + Traces.computeIfAbsent(request.getTraceid()); if ((userid == null && webSocket.delayPackets == null) || ex2 != null) { if (debug || ex2 != null) { logger.log(ex2 == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create userid abort. request = " + request, ex2); @@ -310,6 +312,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl webSocket._userid = userid; if (single && !anyuser) { webSocketNode.existsWebSocket(userid).whenComplete((rs, nex) -> { + Traces.computeIfAbsent(request.getTraceid()); if (rs) { CompletableFuture rcFuture = webSocket.onSingleRepeatConnect(); Consumer task = (oldkilled) -> { @@ -353,6 +356,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl //CompletableFuture cf = webSocket._writeIOThread.send(webSocket, delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); CompletableFuture cf = webSocket._writeHandler.send(delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); cf.whenComplete((Integer v, Throwable t) -> { + Traces.computeIfAbsent(request.getTraceid()); if (userid == null || t != null) { if (t != null) { logger.log(Level.FINEST, "WebSocket connect abort, Response send delayPackets abort. request = " + request, t); @@ -373,6 +377,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl //CompletableFuture cf = webSocket._writeIOThread.send(webSocket, delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); CompletableFuture cf = webSocket._writeHandler.send(delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); cf.whenComplete((Integer v, Throwable t) -> { + Traces.computeIfAbsent(request.getTraceid()); if (sessionid == null || t != null) { if (t != null) { logger.log(Level.FINEST, "WebSocket connect abort, Response send delayPackets abort. request = " + request, t); @@ -396,10 +401,14 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl }; WorkThread workThread = WorkThread.currentWorkThread(); sessionFuture.whenComplete((sessionid, ex) -> { + Traces.computeIfAbsent(request.getTraceid()); if (workThread == null || workThread == Thread.currentThread()) { sessionConsumer.accept(sessionid, ex); } else { - workThread.runWork(() -> sessionConsumer.accept(sessionid, ex)); + workThread.runWork(() -> { + Traces.computeIfAbsent(request.getTraceid()); + sessionConsumer.accept(sessionid, ex); + }); } }); } diff --git a/src/main/java/org/redkale/net/sncp/SncpResponse.java b/src/main/java/org/redkale/net/sncp/SncpResponse.java index 995de7d88..4193654ea 100644 --- a/src/main/java/org/redkale/net/sncp/SncpResponse.java +++ b/src/main/java/org/redkale/net/sncp/SncpResponse.java @@ -12,6 +12,7 @@ import org.redkale.asm.AsmDepends; import org.redkale.convert.bson.BsonWriter; import org.redkale.net.Response; import org.redkale.util.ByteArray; +import org.redkale.util.Traces; /** * @@ -145,11 +146,13 @@ public class SncpResponse extends Response { finishVoid(); } else if (future instanceof CompletionStage) { ((CompletionStage) future).whenComplete((v, t) -> { + Traces.computeIfAbsent(request.getTraceid()); if (t != null) { finishError((Throwable) t); } else { finish(futureResultType, v); } + Traces.removeTraceid(); }); } else { try { diff --git a/src/main/java/org/redkale/net/sncp/SncpServlet.java b/src/main/java/org/redkale/net/sncp/SncpServlet.java index aaf449b47..7fc4a2a21 100644 --- a/src/main/java/org/redkale/net/sncp/SncpServlet.java +++ b/src/main/java/org/redkale/net/sncp/SncpServlet.java @@ -90,11 +90,13 @@ public class SncpServlet extends Servlet response.updateNonBlocking(false); response.getWorkExecutor().execute(() -> { try { + Traces.computeIfAbsent(request.getTraceid()); action.execute(request, response); } catch (Throwable t) { response.getContext().getLogger().log(Level.WARNING, "Servlet occur exception. request = " + request, t); response.finishError(t); } + Traces.removeTraceid(); }); } } else { diff --git a/src/main/java/org/redkale/util/AnonymousVirtualExecutor.java b/src/main/java/org/redkale/util/AnonymousVirtualExecutor.java index 26bab0c4e..d1d14781b 100644 --- a/src/main/java/org/redkale/util/AnonymousVirtualExecutor.java +++ b/src/main/java/org/redkale/util/AnonymousVirtualExecutor.java @@ -7,8 +7,10 @@ //import java.util.function.Function; // ///** -// * +// * 虚拟线程池 +// * // * @author zhangjx +// * @since 2.8.0 // */ //public class AnonymousVirtualExecutor implements Function { // diff --git a/src/main/java/org/redkale/util/AnonymousVirtualRunner.java b/src/main/java/org/redkale/util/AnonymousVirtualRunner.java new file mode 100644 index 000000000..8e8c211b6 --- /dev/null +++ b/src/main/java/org/redkale/util/AnonymousVirtualRunner.java @@ -0,0 +1,21 @@ +///* +// * +// */ +//package org.redkale.util; +// +//import java.util.function.Consumer; +// +///** +// * 虚拟线程运行 +// * +// * @author zhangjx +// * @since 2.8.0 +// */ +//public class AnonymousVirtualRunner implements Consumer { +// +// @Override +// public void accept(Runnable t) { +// Thread.startVirtualThread(t); +// } +// +//} diff --git a/src/main/java/org/redkale/util/Utility.java b/src/main/java/org/redkale/util/Utility.java index 6fdd0d187..48686bbc1 100644 --- a/src/main/java/org/redkale/util/Utility.java +++ b/src/main/java/org/redkale/util/Utility.java @@ -81,6 +81,11 @@ public final class Utility { private static final Function virtualThreadLocalFunction; + //org.redkale.util.AnonymousVirtualRunner + private static final String consumerVirtualRunnerBinary = "cafebabe0000004000260a000200030700040c000500060100106a6176612f6c616e672f4f626a6563740100063c696e69743e0100032829560a0008000907000a0c000b000c0100106a6176612f6c616e672f54687265616401001273746172745669727475616c546872656164010028284c6a6176612f6c616e672f52756e6e61626c653b294c6a6176612f6c616e672f5468726561643b07000e0100126a6176612f6c616e672f52756e6e61626c650a001000110700120c001300140100276f72672f7265646b616c652f7574696c2f416e6f6e796d6f75735669727475616c52756e6e6572010006616363657074010017284c6a6176612f6c616e672f52756e6e61626c653b295607001601001b6a6176612f7574696c2f66756e6374696f6e2f436f6e73756d6572010004436f646501000f4c696e654e756d6265725461626c650100124c6f63616c5661726961626c655461626c65010004746869730100294c6f72672f7265646b616c652f7574696c2f416e6f6e796d6f75735669727475616c52756e6e65723b010001740100144c6a6176612f6c616e672f52756e6e61626c653b0100104d6574686f64506172616d65746572730100236f72672e6e65746265616e732e536f757263654c6576656c416e6e6f746174696f6e730100144c6a6176612f6c616e672f4f766572726964653b010015284c6a6176612f6c616e672f4f626a6563743b29560100095369676e61747572650100454c6a6176612f6c616e672f4f626a6563743b4c6a6176612f7574696c2f66756e6374696f6e2f436f6e73756d65723c4c6a6176612f6c616e672f52756e6e61626c653b3e3b01000a536f7572636546696c6501001b416e6f6e796d6f75735669727475616c52756e6e65722e6a6176610021001000020001001500000003000100050006000100170000002f00010001000000052ab70001b10000000200180000000600010000000e00190000000c000100000005001a001b0000000100130014000300170000003e00010002000000062bb8000757b10000000200180000000a00020000001200050013001900000016000200000006001a001b000000000006001c001d0001001e0000000501001c0000001f00000006000100200000104100130021000300170000003300020002000000092a2bc0000db6000fb10000000200180000000600010000000e00190000000c000100000009001a001b0000001e0000000501001c1000001f00000006000100200000000200220000000200230024000000020025"; + + private static final Consumer virtualRunnerConsumer; + //org.redkale.util.SignalShutDown private static final String consumerSignalShutdownBinary = "cafebabe00000037006b0a0019003a090018003b07003c08003d08003e08003f0800400800410800420800430800440800450700460a000d00470a000d004807004907004a0a000d004b0a000d004c12000000500b001600510700520a0018005307005407005507005601001073687574646f776e436f6e73756d657201001d4c6a6176612f7574696c2f66756e6374696f6e2f436f6e73756d65723b0100095369676e61747572650100314c6a6176612f7574696c2f66756e6374696f6e2f436f6e73756d65723c4c6a6176612f6c616e672f537472696e673b3e3b0100063c696e69743e010003282956010004436f646501000f4c696e654e756d6265725461626c650100124c6f63616c5661726961626c655461626c65010004746869730100214c6f72672f7265646b616c652f7574696c2f5369676e616c53687574446f776e3b010006616363657074010020284c6a6176612f7574696c2f66756e6374696f6e2f436f6e73756d65723b29560100037369670100124c6a6176612f6c616e672f537472696e673b010004736967730100135b4c6a6176612f6c616e672f537472696e673b010008636f6e73756d65720100164c6f63616c5661726961626c65547970655461626c6501000d537461636b4d61705461626c6507002b0100104d6574686f64506172616d6574657273010034284c6a6176612f7574696c2f66756e6374696f6e2f436f6e73756d65723c4c6a6176612f6c616e672f537472696e673b3e3b295601000668616e646c65010014284c73756e2f6d6973632f5369676e616c3b29560100114c73756e2f6d6973632f5369676e616c3b010006736967737472010015284c6a6176612f6c616e672f4f626a6563743b295601007a4c6a6176612f6c616e672f4f626a6563743b4c6a6176612f7574696c2f66756e6374696f6e2f436f6e73756d65723c4c6a6176612f7574696c2f66756e6374696f6e2f436f6e73756d65723c4c6a6176612f6c616e672f537472696e673b3e3b3e3b4c73756e2f6d6973632f5369676e616c48616e646c65723b01000a536f7572636546696c650100135369676e616c53687574446f776e2e6a6176610c001f00200c001b001c0100106a6176612f6c616e672f537472696e670100034855500100045445524d010003494e54010004515549540100044b494c4c01000454535450010004555352310100045553523201000453544f5001000f73756e2f6d6973632f5369676e616c0c001f00570c003200580100136a6176612f6c616e672f457863657074696f6e0100136a6176612f6c616e672f5468726f7761626c650c0059005a0c005b005c010010426f6f7473747261704d6574686f64730f06005d08005e0c005f00600c0026003601001b6a6176612f7574696c2f66756e6374696f6e2f436f6e73756d65720c0026002701001f6f72672f7265646b616c652f7574696c2f5369676e616c53687574446f776e0100106a6176612f6c616e672f4f626a65637401001673756e2f6d6973632f5369676e616c48616e646c6572010015284c6a6176612f6c616e672f537472696e673b2956010043284c73756e2f6d6973632f5369676e616c3b4c73756e2f6d6973632f5369676e616c48616e646c65723b294c73756e2f6d6973632f5369676e616c48616e646c65723b0100076765744e616d6501001428294c6a6176612f6c616e672f537472696e673b0100096765744e756d6265720100032829490a00610062010005012c012c010100176d616b65436f6e63617457697468436f6e7374616e7473010038284c73756e2f6d6973632f5369676e616c3b4c6a6176612f6c616e672f537472696e673b49294c6a6176612f6c616e672f537472696e673b0700630c005f00670100246a6176612f6c616e672f696e766f6b652f537472696e67436f6e636174466163746f72790700690100064c6f6f6b757001000c496e6e6572436c6173736573010098284c6a6176612f6c616e672f696e766f6b652f4d6574686f6448616e646c6573244c6f6f6b75703b4c6a6176612f6c616e672f537472696e673b4c6a6176612f6c616e672f696e766f6b652f4d6574686f64547970653b4c6a6176612f6c616e672f537472696e673b5b4c6a6176612f6c616e672f4f626a6563743b294c6a6176612f6c616e672f696e766f6b652f43616c6c536974653b07006a0100256a6176612f6c616e672f696e766f6b652f4d6574686f6448616e646c6573244c6f6f6b757001001e6a6176612f6c616e672f696e766f6b652f4d6574686f6448616e646c657300210018001900020016001a00010002001b001c0001001d00000002001e00040001001f0020000100210000002f00010001000000052ab70001b10000000200220000000600010000000c00230000000c000100000005002400250000000100260027000300210000014a000400080000006f2a2bb500021009bd0003590312045359041205535905120653590612075359071208535908120953591006120a53591007120b53591008120c534d2c4e2dbe360403360515051504a200222d1505323a06bb000d591906b7000e2ab8000f57a700053a07840501a7ffdda700044db100020051005f006200100005006a006d0011000400220000002a000a0000001200050014003b001500510017005f00190062001800640015006a001c006d001b006e001d00230000002a000400510013002800290006003b002f002a002b00020000006f0024002500000000006f002c001c0001002d0000000c00010000006f002c001e0001002e000000470006ff0044000607001807001607002f07002f01010000ff001d000707001807001607002f07002f01010700030001070010fa0001ff000500020700180700160000420700110000300000000501002c0000001d0000000200310021003200330002002100000060000300030000001a2b2bb600122bb60013ba001400004d2ab400022cb900150200b10000000200220000000e000300000021000f00220019002300230000002000030000001a0024002500000000001a002800340001000f000b0035002900020030000000050100280000104100260036000200210000003300020002000000092a2bc00016b60017b10000000200220000000600010000000c00230000000c00010000000900240025000000300000000501002c10000004001d000000020037003800000002003900660000000a00010064006800650019004d000000080001004e0001004f"; @@ -125,6 +130,7 @@ public final class Utility { Consumer> signalShutdownConsumer0 = null; Function virtualExecutorFunction0 = null; Function virtualThreadLocalFunction0 = null; + Consumer virtualRunnerConsumer0 = null; if (!nativeImageEnv) { //not native-image try { @@ -172,6 +178,27 @@ public final class Utility { } } } + { //virtualRunnerConsumer + Class> virtualClazz1 = null; + try { + virtualClazz1 = (Class) loader.loadClass("org.redkale.util.AnonymousVirtualRunner"); + } catch (Throwable t) { + } + if (virtualClazz1 == null) { + byte[] classBytes = hexToBin(consumerVirtualRunnerBinary); + try { + virtualClazz1 = (Class>) new ClassLoader(loader) { + public final Class loadClass(String name, byte[] b) { + return defineClass(name, b, 0, b.length); + } + }.loadClass("org.redkale.util.AnonymousVirtualRunner", classBytes); + RedkaleClassLoader.putDynClass(virtualClazz1.getName(), classBytes, virtualClazz1); + RedkaleClassLoader.putReflectionDeclaredConstructors(virtualClazz1, virtualClazz1.getName()); + virtualRunnerConsumer0 = virtualClazz1.getConstructor().newInstance(); + } catch (Throwable t) { + } + } + } { //unsafe Field f = String.class.getDeclaredField("value"); final Class unsafeClass = loader.loadClass("sun.misc.Unsafe"); @@ -241,6 +268,7 @@ public final class Utility { signalShutdownConsumer = signalShutdownConsumer0; virtualExecutorFunction = virtualExecutorFunction0; virtualThreadLocalFunction = virtualThreadLocalFunction0; + virtualRunnerConsumer = virtualRunnerConsumer0; // try { // DEFAULTSSL_CONTEXT = javax.net.ssl.SSLContext.getInstance("SSL"); @@ -294,6 +322,14 @@ public final class Utility { return futureArrayFunc; } + public static void execute(Runnable task) { + if (virtualRunnerConsumer != null) { + virtualRunnerConsumer.accept(task); + } else { + task.run(); + } + } + public static void sleep(long millis) { try { Thread.sleep(millis);