diff --git a/src/main/java/org/redkale/cluster/HttpClusterRpcClient.java b/src/main/java/org/redkale/cluster/HttpClusterRpcClient.java index 07c17b354..6b13bc055 100644 --- a/src/main/java/org/redkale/cluster/HttpClusterRpcClient.java +++ b/src/main/java/org/redkale/cluster/HttpClusterRpcClient.java @@ -10,6 +10,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.redkale.annotation.Resource; import org.redkale.boot.Application; +import org.redkale.net.WorkThread; import org.redkale.net.http.*; import org.redkale.util.Traces; import org.redkale.util.Utility; @@ -81,6 +82,7 @@ public class HttpClusterRpcClient extends HttpRpcClient { } else { Traces.computeIfAbsent(req.getTraceid()); } + final WorkThread workThread = WorkThread.currentWorkThread(); String module = req.getRequestURI(); module = module.substring(1); //去掉/ module = module.substring(0, module.indexOf('/')); @@ -91,9 +93,7 @@ public class HttpClusterRpcClient extends HttpRpcClient { logger.log(Level.FINEST, "httpAsync.queryHttpAddress: module=" + localModule + ", resname=" + resname); } return clusterAgent.queryHttpAddress("http", module, resname).thenCompose(addrs -> { - if (isNotEmpty(req.getTraceid())) { - Traces.computeIfAbsent(req.getTraceid()); - } + Traces.currentTraceid(req.getTraceid()); if (isEmpty(addrs)) { if (logger.isLoggable(Level.WARNING)) { logger.log(Level.WARNING, "httpAsync." + (produce ? "produceMessage" : "sendMessage") + " failed, module=" + localModule + ", resname=" + resname + ", address is empty"); @@ -150,13 +150,13 @@ public class HttpClusterRpcClient extends HttpRpcClient { if (logger.isLoggable(Level.FINEST)) { logger.log(Level.FINEST, "httpAsync: module=" + localModule + ", resname=" + resname + ", enter forEachCollectionFuture"); } - return forEachCollectionFuture(logger.isLoggable(Level.FINEST), userid, req, + return forEachCollectionFuture(workThread, logger.isLoggable(Level.FINEST), userid, req, (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + req.getRequestURI(), clientHeaders, clientBody, addrs.iterator()); }); } - private CompletableFuture> forEachCollectionFuture(boolean finest, Serializable userid, + private CompletableFuture> forEachCollectionFuture(final WorkThread workThread, boolean finest, Serializable userid, HttpSimpleRequest req, String requesturi, final Map clientHeaders, byte[] clientBody, Iterator it) { if (!it.hasNext()) { return CompletableFuture.completedFuture(null); @@ -178,9 +178,7 @@ public class HttpClusterRpcClient extends HttpRpcClient { } return httpClient.sendAsync(builder.build(), java.net.http.HttpResponse.BodyHandlers.ofByteArray()) .thenApply((java.net.http.HttpResponse resp) -> { - if (isNotEmpty(req.getTraceid())) { - Traces.computeIfAbsent(req.getTraceid()); - } + Traces.currentTraceid(req.getTraceid()); final int rs = resp.statusCode(); if (rs != 200) { return new HttpResult().status(rs); diff --git a/src/main/java/org/redkale/cluster/HttpLocalRpcClient.java b/src/main/java/org/redkale/cluster/HttpLocalRpcClient.java index 4c634286d..fd6c99d9c 100644 --- a/src/main/java/org/redkale/cluster/HttpLocalRpcClient.java +++ b/src/main/java/org/redkale/cluster/HttpLocalRpcClient.java @@ -19,7 +19,6 @@ import org.redkale.net.http.*; import org.redkale.util.RedkaleException; import org.redkale.util.Traces; import static org.redkale.util.Utility.isEmpty; -import static org.redkale.util.Utility.isNotEmpty; /** * 没有配置MQ且也没有ClusterAgent的情况下实现的默认HttpMessageClient实例 @@ -146,9 +145,7 @@ public class HttpLocalRpcClient extends HttpRpcClient { future.completeExceptionally(e); } return future.thenApply(rs -> { - if (isNotEmpty(request.getTraceid())) { - Traces.computeIfAbsent(request.getTraceid()); - } + Traces.currentTraceid(request.getTraceid()); if (rs == null) { return new HttpResult(); } diff --git a/src/main/java/org/redkale/mq/MessageRespProcessor.java b/src/main/java/org/redkale/mq/MessageRespProcessor.java index f543c5391..6059a0533 100644 --- a/src/main/java/org/redkale/mq/MessageRespProcessor.java +++ b/src/main/java/org/redkale/mq/MessageRespProcessor.java @@ -44,7 +44,7 @@ public class MessageRespProcessor implements MessageProcessor { logger.log(Level.FINEST, getClass().getSimpleName() + ".MessageRespFuture.receive (mq.delay = " + deplay + "ms, mq.seqid = " + msg.getSeqid() + ")"); } messageClient.getMessageAgent().execute(() -> { - Traces.computeIfAbsent(traceid); + Traces.currentTraceid(traceid); resp.future.complete(msg); long comems = System.currentTimeMillis() - now; if ((deplay > 1000 || comems > 1000) && logger.isLoggable(Level.FINE)) { diff --git a/src/main/java/org/redkale/net/AsyncNioCompletionHandler.java b/src/main/java/org/redkale/net/AsyncNioCompletionHandler.java index 882ff5c1c..a4c854aa5 100644 --- a/src/main/java/org/redkale/net/AsyncNioCompletionHandler.java +++ b/src/main/java/org/redkale/net/AsyncNioCompletionHandler.java @@ -117,7 +117,13 @@ class AsyncNioCompletionHandler implements CompletionHandler, Run CompletionHandler handler0 = handler; A attachment0 = attachment; clear(); - handler0.failed(exc, attachment0); + if (handler0 == null) { //可能和超时run方法同时执行 + if (exc != null) { + exc.printStackTrace(); + } + } else { + handler0.failed(exc, attachment0); + } } @Override diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index 6d5cd41a3..c0a8be959 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -13,7 +13,6 @@ import java.util.function.*; import java.util.logging.Logger; import org.redkale.net.*; import org.redkale.util.*; -import static org.redkale.util.Utility.isNotEmpty; /** * @@ -31,7 +30,7 @@ public abstract class Client, R extends ClientR public static final int DEFAULT_MAX_PIPELINES = 128; - protected boolean debug; + protected boolean debug = false; protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); @@ -91,7 +90,7 @@ public abstract class Client, R extends ClientR protected Supplier closeRequestSupplier; //创建连接后进行的登录鉴权操作 - protected Function> authenticate; + protected Function>> authenticate; protected Client(String name, AsyncGroup group, ClientAddress address) { this(name, group, true, address, Utility.cpus(), DEFAULT_MAX_PIPELINES, null, null, null); @@ -110,18 +109,18 @@ public abstract class Client, R extends ClientR } protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxConns, - Function> authenticate) { + Function>> authenticate) { this(name, group, tcp, address, maxConns, DEFAULT_MAX_PIPELINES, null, null, authenticate); } protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxConns, - Supplier closeRequestSupplier, Function> authenticate) { + Supplier closeRequestSupplier, Function>> authenticate) { this(name, group, tcp, address, maxConns, DEFAULT_MAX_PIPELINES, null, closeRequestSupplier, authenticate); } @SuppressWarnings("OverridableMethodCallInConstructor") protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxConns, - int maxPipelines, Supplier pingRequestSupplier, Supplier closeRequestSupplier, Function> authenticate) { + int maxPipelines, Supplier pingRequestSupplier, Supplier closeRequestSupplier, Function>> authenticate) { if (maxPipelines < 1) { throw new IllegalArgumentException("maxPipelines must bigger 0"); } @@ -220,7 +219,7 @@ public abstract class Client, R extends ClientR } public final CompletableFuture

sendAsync(R request) { - request.traceid = Traces.computeIfAbsent(request.traceid); + request.traceid = Traces.computeIfAbsent(request.traceid, Traces.currentTraceid()); if (request.workThread == null) { request.workThread = WorkThread.currentWorkThread(); } @@ -228,7 +227,7 @@ public abstract class Client, R extends ClientR } public final CompletableFuture sendAsync(R request, Function respTransfer) { - request.traceid = Traces.computeIfAbsent(request.traceid); + request.traceid = Traces.computeIfAbsent(request.traceid, Traces.currentTraceid()); if (request.workThread == null) { request.workThread = WorkThread.currentWorkThread(); } @@ -236,7 +235,7 @@ public abstract class Client, R extends ClientR } public final CompletableFuture

sendAsync(SocketAddress addr, R request) { - request.traceid = Traces.computeIfAbsent(request.traceid); + request.traceid = Traces.computeIfAbsent(request.traceid, Traces.currentTraceid()); if (request.workThread == null) { request.workThread = WorkThread.currentWorkThread(); } @@ -244,7 +243,7 @@ public abstract class Client, R extends ClientR } public final CompletableFuture sendAsync(SocketAddress addr, R request, Function respTransfer) { - request.traceid = Traces.computeIfAbsent(request.traceid); + request.traceid = Traces.computeIfAbsent(request.traceid, Traces.currentTraceid()); if (request.workThread == null) { request.workThread = WorkThread.currentWorkThread(); } @@ -260,7 +259,7 @@ public abstract class Client, R extends ClientR } public final CompletableFuture> sendAsync(R[] requests) { - requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid); + requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid()); for (R request : requests) { if (request.workThread == null) { request.workThread = WorkThread.currentWorkThread(); @@ -270,7 +269,7 @@ public abstract class Client, R extends ClientR } public final CompletableFuture> sendAsync(R[] requests, Function respTransfer) { - requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid); + requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid()); for (R request : requests) { if (request.workThread == null) { request.workThread = WorkThread.currentWorkThread(); @@ -280,7 +279,7 @@ public abstract class Client, R extends ClientR } public final CompletableFuture> sendAsync(SocketAddress addr, R[] requests) { - requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid); + requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid()); for (R request : requests) { if (request.workThread == null) { request.workThread = WorkThread.currentWorkThread(); @@ -290,7 +289,7 @@ public abstract class Client, R extends ClientR } public final CompletableFuture> sendAsync(SocketAddress addr, R[] requests, Function respTransfer) { - requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid); + requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid()); for (R request : requests) { if (request.workThread == null) { request.workThread = WorkThread.currentWorkThread(); @@ -299,6 +298,11 @@ public abstract class Client, R extends ClientR return connect(addr).thenCompose(conn -> writeChannel(conn, requests, respTransfer)); } + protected CompletableFuture> writeChannelBatch(ClientConnection conn, R... requests) { + requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid()); + return conn.writeChannel(requests); + } + protected CompletableFuture> writeChannel(ClientConnection conn, R[] requests) { return conn.writeChannel(requests); } @@ -324,43 +328,38 @@ public abstract class Client, R extends ClientR if (pool && cc != null && cc.isOpen()) { return CompletableFuture.completedFuture(cc); } - + long s = System.currentTimeMillis(); final Queue> waitQueue = this.connAcquireWaitings[connIndex]; if (!pool || this.connOpenStates[connIndex].compareAndSet(false, true)) { CompletableFuture future = group.createClient(tcp, this.address.randomAddress(), readTimeoutSeconds, writeTimeoutSeconds) .thenApply(c -> { - if (isNotEmpty(traceid)) { - Traces.computeIfAbsent(traceid); - } - return (C) createClientConnection(connIndex, c).setMaxPipelines(maxPipelines); + Traces.currentTraceid(traceid); + C rs = (C) createClientConnection(connIndex, c).setMaxPipelines(maxPipelines); +// if (debug) { +// logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + rs +// + ", " + rs.channel + ", 创建TCP连接耗时: (" + (System.currentTimeMillis() - s) + "ms)"); +// } + return rs; }); R virtualReq = createVirtualRequestAfterConnect(); if (virtualReq != null) { virtualReq.traceid = traceid; future = future.thenCompose(conn -> { - if (isNotEmpty(traceid)) { - Traces.computeIfAbsent(traceid); - } - return conn.writeVirtualRequest(virtualReq).thenApply(v -> { - if (isNotEmpty(traceid)) { - Traces.computeIfAbsent(traceid); - } - return conn; - }); + Traces.currentTraceid(traceid); + return conn.writeVirtualRequest(virtualReq).thenApply(v -> conn); }); } else { future = future.thenApply(conn -> { + Traces.currentTraceid(traceid); conn.channel.readRegister(conn.getCodec()); //不用readRegisterInIOThread,因executeRead可能会异步 return conn; }); } if (authenticate != null) { - future = future.thenCompose(authenticate); + future = future.thenCompose(authenticate.apply(traceid)); } return future.thenApply(c -> { - if (isNotEmpty(traceid)) { - Traces.computeIfAbsent(traceid); - } + Traces.currentTraceid(traceid); c.setAuthenticated(true); if (pool) { this.connArray[connIndex] = c; @@ -370,9 +369,7 @@ public abstract class Client, R extends ClientR if (workThread != null) { CompletableFuture fs = f; workThread.execute(() -> { - if (isNotEmpty(traceid)) { - Traces.computeIfAbsent(traceid); - } + Traces.currentTraceid(traceid); fs.complete(c); }); } else { @@ -418,33 +415,41 @@ public abstract class Client, R extends ClientR WorkThread workThread = WorkThread.currentWorkThread(); final Queue> waitQueue = entry.connAcquireWaitings; if (!pool || entry.connOpenState.compareAndSet(false, true)) { + long s = System.currentTimeMillis(); CompletableFuture future = group.createClient(tcp, addr, readTimeoutSeconds, writeTimeoutSeconds) .thenApply(c -> (C) createClientConnection(-1, c).setMaxPipelines(maxPipelines)); R virtualReq = createVirtualRequestAfterConnect(); if (virtualReq != null) { virtualReq.traceid = traceid; - future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn)); + future = future.thenCompose(conn -> { + Traces.currentTraceid(traceid); + return conn.writeVirtualRequest(virtualReq).thenApply(v -> conn); + }); } else { future = future.thenApply(conn -> { +// if (debug) { +// logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + conn +// + ", 注册读操作: (" + (System.currentTimeMillis() - s) + "ms) " + conn.channel); +// } conn.channel.readRegister(conn.getCodec()); //不用readRegisterInIOThread,因executeRead可能会异步 return conn; }); } if (authenticate != null) { - future = future.thenCompose(authenticate); + future = future.thenCompose(authenticate.apply(traceid)); } return future.thenApply(c -> { c.setAuthenticated(true); if (pool) { entry.connection = c; CompletableFuture f; - Traces.computeIfAbsent(traceid); + Traces.currentTraceid(traceid); while ((f = waitQueue.poll()) != null) { if (!f.isDone()) { if (workThread != null) { CompletableFuture fs = f; workThread.execute(() -> { - Traces.computeIfAbsent(traceid); + Traces.currentTraceid(traceid); fs.complete(c); }); } else { diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index b444033c0..71ae20d46 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -112,6 +112,9 @@ public abstract class ClientCodec implements Complet void responseComplete(boolean halfCompleted, ClientFuture respFuture, P message, Throwable exc) { R request = respFuture.request; + if (request != null) { + Traces.currentTraceid(request.getTraceid()); + } AsyncIOThread readThread = connection.channel.getReadIOThread(); final WorkThread workThread = request.workThread; try { @@ -130,34 +133,35 @@ public abstract class ClientCodec implements Complet connection.client.incrRespDoneCounter(); } respFuture.cancelTimeout(); -// if (connection.client.debug) { -// connection.client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + connection + ", 回调处理, req=" + request + ", message=" + message, cause); -// } +// if (connection.client.debug) { +// connection.client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + connection +// + ", 回调处理(" + (request != null ? (System.currentTimeMillis() - request.getCreateTime()) : -1) + "ms), req=" + request + ", message=" + message, exc); +// } connection.preComplete(message, (R) request, exc); if (exc == null) { final P rs = request.respTransfer == null ? message : (P) request.respTransfer.apply(message); if (workThread == null) { readThread.runWork(() -> { - Traces.computeIfAbsent(request.traceid); + Traces.currentTraceid(request.traceid); respFuture.complete(rs); Traces.removeTraceid(); }); } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE if (workThread.inIO()) { - Traces.computeIfAbsent(request.traceid); + Traces.currentTraceid(request.traceid); respFuture.complete(rs); Traces.removeTraceid(); } else { workThread.execute(() -> { - Traces.computeIfAbsent(request.traceid); + Traces.currentTraceid(request.traceid); respFuture.complete(rs); Traces.removeTraceid(); }); } } else { workThread.runWork(() -> { - Traces.computeIfAbsent(request.traceid); + Traces.currentTraceid(request.traceid); respFuture.complete(rs); Traces.removeTraceid(); }); @@ -165,25 +169,25 @@ public abstract class ClientCodec implements Complet } else { //异常 if (workThread == null) { readThread.runWork(() -> { - Traces.computeIfAbsent(request.traceid); + Traces.currentTraceid(request.traceid); respFuture.completeExceptionally(exc); Traces.removeTraceid(); }); } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE if (workThread.inIO()) { - Traces.computeIfAbsent(request.traceid); + Traces.currentTraceid(request.traceid); respFuture.completeExceptionally(exc); Traces.removeTraceid(); } else { workThread.execute(() -> { - Traces.computeIfAbsent(request.traceid); + Traces.currentTraceid(request.traceid); respFuture.completeExceptionally(exc); Traces.removeTraceid(); }); } } else { workThread.runWork(() -> { - Traces.computeIfAbsent(request.traceid); + Traces.currentTraceid(request.traceid); respFuture.completeExceptionally(exc); Traces.removeTraceid(); }); @@ -192,25 +196,25 @@ public abstract class ClientCodec implements Complet } catch (Throwable t) { if (workThread == null) { readThread.runWork(() -> { - Traces.computeIfAbsent(request.traceid); + Traces.currentTraceid(request.traceid); respFuture.completeExceptionally(t); Traces.removeTraceid(); }); } else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE if (workThread.inIO()) { - Traces.computeIfAbsent(request.traceid); + Traces.currentTraceid(request.traceid); respFuture.completeExceptionally(t); Traces.removeTraceid(); } else { workThread.execute(() -> { - Traces.computeIfAbsent(request.traceid); + Traces.currentTraceid(request.traceid); respFuture.completeExceptionally(t); Traces.removeTraceid(); }); } } else { workThread.runWork(() -> { - Traces.computeIfAbsent(request.traceid); + Traces.currentTraceid(request.traceid); respFuture.completeExceptionally(t); Traces.removeTraceid(); }); diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 187ca470d..d8ee26356 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -118,6 +118,10 @@ public abstract class ClientConnection implements Co //respTransfer只会在ClientCodec的读线程里调用 protected final CompletableFuture writeChannel(R request, Function respTransfer) { +// if (client.debug) { +// client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " +// + this + ", 发送请求: " + request); +// } request.respTransfer = respTransfer; ClientFuture respFuture = createClientFuture(request); int rts = this.channel.getReadTimeoutSeconds(); @@ -163,6 +167,10 @@ public abstract class ClientConnection implements Co //respTransfer只会在ClientCodec的读线程里调用 protected final CompletableFuture> writeChannel(R[] requests, Function respTransfer) { +// if (client.debug) { +// client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " +// + this + ", 发送请求: " + Arrays.toString(requests) + ", readTimeoutSeconds: " + this.channel.getReadTimeoutSeconds()); +// } ClientFuture[] respFutures = new ClientFuture[requests.length]; int rts = this.channel.getReadTimeoutSeconds(); for (int i = 0; i < respFutures.length; i++) { diff --git a/src/main/java/org/redkale/net/client/ClientFuture.java b/src/main/java/org/redkale/net/client/ClientFuture.java index fc00ee538..ab3fb6b13 100644 --- a/src/main/java/org/redkale/net/client/ClientFuture.java +++ b/src/main/java/org/redkale/net/client/ClientFuture.java @@ -10,7 +10,6 @@ import java.util.concurrent.*; import org.redkale.annotation.Nonnull; import org.redkale.net.*; import org.redkale.util.Traces; -import org.redkale.util.Utility; /** * @@ -86,9 +85,7 @@ public class ClientFuture extends CompletableFuture< workThread = conn.getChannel().getReadIOThread(); } workThread.runWork(() -> { - if (Utility.isNotEmpty(traceid)) { - Traces.computeIfAbsent(traceid); - } + Traces.currentTraceid(traceid); completeExceptionally(ex); Traces.removeTraceid(); }); diff --git a/src/main/java/org/redkale/net/http/HttpResponse.java b/src/main/java/org/redkale/net/http/HttpResponse.java index 6b2daad77..11e303534 100644 --- a/src/main/java/org/redkale/net/http/HttpResponse.java +++ b/src/main/java/org/redkale/net/http/HttpResponse.java @@ -585,7 +585,7 @@ public class HttpResponse extends Response { */ public void finishFuture(final Convert convert, Type valueType, CompletionStage future) { future.whenComplete((v, e) -> { - Traces.computeIfAbsent(request.getTraceid()); + Traces.currentTraceid(request.getTraceid()); if (e != null) { context.getLogger().log(Level.WARNING, "Servlet occur exception. request = " + request + ", result is CompletionStage", (Throwable) e); if (e instanceof TimeoutException) { @@ -619,7 +619,7 @@ public class HttpResponse extends Response { */ public void finishJsonFuture(final Convert convert, Type valueType, CompletionStage future) { future.whenComplete((v, e) -> { - Traces.computeIfAbsent(request.getTraceid()); + Traces.currentTraceid(request.getTraceid()); if (e != null) { context.getLogger().log(Level.WARNING, "Servlet occur exception. request = " + request + ", result is CompletionStage", (Throwable) e); if (e instanceof TimeoutException) { diff --git a/src/main/java/org/redkale/net/http/HttpSimpleClient.java b/src/main/java/org/redkale/net/http/HttpSimpleClient.java index 9b2a6b9bc..a6c9ce558 100644 --- a/src/main/java/org/redkale/net/http/HttpSimpleClient.java +++ b/src/main/java/org/redkale/net/http/HttpSimpleClient.java @@ -17,7 +17,6 @@ import org.redkale.convert.json.JsonConvert; import org.redkale.net.*; import static org.redkale.net.http.HttpRequest.parseHeaderName; import org.redkale.util.*; -import static org.redkale.util.Utility.isNotEmpty; /** * 简单的HttpClient实现, 存在以下情况不能使用此类:
@@ -203,13 +202,12 @@ public class HttpSimpleClient { public CompletableFuture> sendAsync(String method, String url, Map headers, byte[] body, Convert convert, Type valueType) { final String traceid = Traces.currentTraceid(); + final WorkThread workThread = WorkThread.currentWorkThread(); final URI uri = URI.create(url); final String host = uri.getHost(); final int port = uri.getPort() > 0 ? uri.getPort() : (url.startsWith("https:") ? 443 : 80); return createConnection(host, port).thenCompose(conn -> { - if (isNotEmpty(traceid)) { - Traces.computeIfAbsent(traceid); - } + Traces.currentTraceid(traceid); final ByteArray array = new ByteArray(); int urlpos = url.indexOf("/", url.indexOf("//") + 3); array.put((method.toUpperCase() + " " + (urlpos > 0 ? url.substring(urlpos) : "/") + " HTTP/1.1\r\n" @@ -234,16 +232,24 @@ public class HttpSimpleClient { conn.write(array, new CompletionHandler() { @Override public void completed(Integer result, Void attachment) { - conn.read(new ClientReadCompletionHandler(conn, traceid, array.clear(), convert, valueType, future)); + conn.read(new ClientReadCompletionHandler(conn, workThread, traceid, array.clear(), convert, valueType, future)); } @Override public void failed(Throwable exc, Void attachment) { - if (isNotEmpty(traceid)) { - Traces.computeIfAbsent(traceid); - } + Traces.currentTraceid(traceid); conn.dispose(); - future.completeExceptionally(exc); + if (workThread == null) { + Utility.execute(() -> { + Traces.currentTraceid(traceid); + future.completeExceptionally(exc); + }); + } else { + workThread.runWork(() -> { + Traces.currentTraceid(traceid); + future.completeExceptionally(exc); + }); + } } }); return future; @@ -309,6 +315,8 @@ public class HttpSimpleClient { protected final String traceid; + protected final WorkThread workThread; + protected final CompletableFuture> future; protected Convert convert; @@ -321,8 +329,9 @@ public class HttpSimpleClient { protected int contentLength = -1; - public ClientReadCompletionHandler(HttpConnection conn, String traceid, ByteArray array, Convert convert, Type valueType, CompletableFuture> future) { + public ClientReadCompletionHandler(HttpConnection conn, WorkThread workThread, String traceid, ByteArray array, Convert convert, Type valueType, CompletableFuture> future) { this.conn = conn; + this.workThread = workThread; this.traceid = traceid; this.array = array; this.convert = convert; @@ -332,9 +341,7 @@ public class HttpSimpleClient { @Override public void completed(Integer count, ByteBuffer buffer) { - if (isNotEmpty(traceid)) { - Traces.computeIfAbsent(traceid); - } + Traces.currentTraceid(traceid); buffer.flip(); if (this.readState == READ_STATE_ROUTE) { if (this.responseResult == null) { @@ -388,12 +395,42 @@ public class HttpSimpleClient { HttpResult result = this.responseResult; try { result.result(c.convertFrom(valueType, this.responseResult.getResult())); - this.future.complete((HttpResult) this.responseResult); + if (workThread == null) { + Utility.execute(() -> { + Traces.currentTraceid(traceid); + future.complete((HttpResult) this.responseResult); + }); + } else { + workThread.runWork(() -> { + Traces.currentTraceid(traceid); + future.complete((HttpResult) this.responseResult); + }); + } } catch (Exception e) { - this.future.completeExceptionally(e); + if (workThread == null) { + Utility.execute(() -> { + Traces.currentTraceid(traceid); + future.completeExceptionally(e); + }); + } else { + workThread.runWork(() -> { + Traces.currentTraceid(traceid); + future.completeExceptionally(e); + }); + } } } else { - this.future.complete((HttpResult) this.responseResult); + if (workThread == null) { + Utility.execute(() -> { + Traces.currentTraceid(traceid); + future.complete((HttpResult) this.responseResult); + }); + } else { + workThread.runWork(() -> { + Traces.currentTraceid(traceid); + future.complete((HttpResult) this.responseResult); + }); + } } } diff --git a/src/main/java/org/redkale/net/http/WebSocketServlet.java b/src/main/java/org/redkale/net/http/WebSocketServlet.java index cb8e0b71e..a20a6c71f 100644 --- a/src/main/java/org/redkale/net/http/WebSocketServlet.java +++ b/src/main/java/org/redkale/net/http/WebSocketServlet.java @@ -286,7 +286,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl @Override public void completed(Integer result, Void attachment) { - Traces.computeIfAbsent(request.getTraceid()); + Traces.currentTraceid(request.getTraceid()); webSocket._readHandler = new WebSocketReadHandler(response.getContext(), webSocket, byteArrayPool, restMessageConsumer); webSocket._writeHandler = new WebSocketWriteHandler(response.getContext(), webSocket, byteArrayPool); @@ -300,7 +300,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl return; } userFuture.whenComplete((userid, ex2) -> { - Traces.computeIfAbsent(request.getTraceid()); + Traces.currentTraceid(request.getTraceid()); if ((userid == null && webSocket.delayPackets == null) || ex2 != null) { if (debug || ex2 != null) { logger.log(ex2 == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create userid abort. request = " + request, ex2); @@ -312,7 +312,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl webSocket._userid = userid; if (single && !anyuser) { webSocketNode.existsWebSocket(userid).whenComplete((rs, nex) -> { - Traces.computeIfAbsent(request.getTraceid()); + Traces.currentTraceid(request.getTraceid()); if (rs) { CompletableFuture rcFuture = webSocket.onSingleRepeatConnect(); Consumer task = (oldkilled) -> { @@ -356,7 +356,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl //CompletableFuture cf = webSocket._writeIOThread.send(webSocket, delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); CompletableFuture cf = webSocket._writeHandler.send(delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); cf.whenComplete((Integer v, Throwable t) -> { - Traces.computeIfAbsent(request.getTraceid()); + Traces.currentTraceid(request.getTraceid()); if (userid == null || t != null) { if (t != null) { logger.log(Level.FINEST, "WebSocket connect abort, Response send delayPackets abort. request = " + request, t); @@ -377,7 +377,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl //CompletableFuture cf = webSocket._writeIOThread.send(webSocket, delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); CompletableFuture cf = webSocket._writeHandler.send(delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); cf.whenComplete((Integer v, Throwable t) -> { - Traces.computeIfAbsent(request.getTraceid()); + Traces.currentTraceid(request.getTraceid()); if (sessionid == null || t != null) { if (t != null) { logger.log(Level.FINEST, "WebSocket connect abort, Response send delayPackets abort. request = " + request, t); @@ -401,12 +401,12 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl }; WorkThread workThread = WorkThread.currentWorkThread(); sessionFuture.whenComplete((sessionid, ex) -> { - Traces.computeIfAbsent(request.getTraceid()); + Traces.currentTraceid(request.getTraceid()); if (workThread == null || workThread == Thread.currentThread()) { sessionConsumer.accept(sessionid, ex); } else { workThread.runWork(() -> { - Traces.computeIfAbsent(request.getTraceid()); + Traces.currentTraceid(request.getTraceid()); sessionConsumer.accept(sessionid, ex); }); } diff --git a/src/main/java/org/redkale/net/sncp/SncpResponse.java b/src/main/java/org/redkale/net/sncp/SncpResponse.java index 4193654ea..853f51e81 100644 --- a/src/main/java/org/redkale/net/sncp/SncpResponse.java +++ b/src/main/java/org/redkale/net/sncp/SncpResponse.java @@ -146,7 +146,7 @@ public class SncpResponse extends Response { finishVoid(); } else if (future instanceof CompletionStage) { ((CompletionStage) future).whenComplete((v, t) -> { - Traces.computeIfAbsent(request.getTraceid()); + Traces.currentTraceid(request.getTraceid()); if (t != null) { finishError((Throwable) t); } else { diff --git a/src/main/java/org/redkale/util/Traces.java b/src/main/java/org/redkale/util/Traces.java index 101ad10e0..a626f273a 100644 --- a/src/main/java/org/redkale/util/Traces.java +++ b/src/main/java/org/redkale/util/Traces.java @@ -27,33 +27,99 @@ public class Traces { private static final ThreadLocal localTrace = new ThreadLocal<>(); + /** + * 是否开启了trace功能 + * + * @return + */ public static boolean enable() { return enable; } + /** + * 创建一个新的traceid + * + * @return + */ public static String createTraceid() { return enable ? tidSupplier.get() : null; } + /** + * 获取当前线程的traceid + * + * @return + */ public static String currentTraceid() { return enable ? localTrace.get() : null; } + /** + * 移除当前线程的traceid + */ public static void removeTraceid() { if (enable) { localTrace.remove(); } } - public static String computeIfAbsent(String requestTraceid) { + /** + * 设置当前线程的traceid, 如果参数为空则清除当前线程traceid + * + * @param traceid + * + * @return + */ + public static void currentTraceid(String traceid) { if (enable) { - String rs = requestTraceid; + if (traceid != null && !traceid.isEmpty()) { + localTrace.set(traceid); + } else { + localTrace.remove(); + } + } + } + + /** + * 设置当前线程的traceid, 若参数为空则会创建一个新的traceid + * + * @param traceid + * + * @return + */ + public static String computeIfAbsent(String traceid) { + if (enable) { + String rs = traceid; if (rs == null || rs.isEmpty()) { rs = tidSupplier.get(); } localTrace.set(rs); return rs; } - return requestTraceid; + return traceid; + } + + /** + * 设置当前线程的traceid, 若参数1为空,则使用参数2,若参数2未空,则会创建一个新的traceid + * + * @param traceid + * @param traceid2 + * + * @return + */ + public static String computeIfAbsent(String traceid, String traceid2) { + if (enable) { + String rs = traceid; + if (rs == null || rs.isEmpty()) { + if (traceid2 == null || traceid2.isEmpty()) { + rs = tidSupplier.get(); + } else { + rs = traceid2; + } + } + localTrace.set(rs); + return rs; + } + return traceid; } }