优化client

This commit is contained in:
redkale
2023-10-21 14:27:23 +08:00
parent cb3cb00e2b
commit 9dd3128a65
13 changed files with 219 additions and 101 deletions

View File

@@ -10,6 +10,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import org.redkale.annotation.Resource;
import org.redkale.boot.Application;
import org.redkale.net.WorkThread;
import org.redkale.net.http.*;
import org.redkale.util.Traces;
import org.redkale.util.Utility;
@@ -81,6 +82,7 @@ public class HttpClusterRpcClient extends HttpRpcClient {
} else {
Traces.computeIfAbsent(req.getTraceid());
}
final WorkThread workThread = WorkThread.currentWorkThread();
String module = req.getRequestURI();
module = module.substring(1); //去掉/
module = module.substring(0, module.indexOf('/'));
@@ -91,9 +93,7 @@ public class HttpClusterRpcClient extends HttpRpcClient {
logger.log(Level.FINEST, "httpAsync.queryHttpAddress: module=" + localModule + ", resname=" + resname);
}
return clusterAgent.queryHttpAddress("http", module, resname).thenCompose(addrs -> {
if (isNotEmpty(req.getTraceid())) {
Traces.computeIfAbsent(req.getTraceid());
}
Traces.currentTraceid(req.getTraceid());
if (isEmpty(addrs)) {
if (logger.isLoggable(Level.WARNING)) {
logger.log(Level.WARNING, "httpAsync." + (produce ? "produceMessage" : "sendMessage") + " failed, module=" + localModule + ", resname=" + resname + ", address is empty");
@@ -150,13 +150,13 @@ public class HttpClusterRpcClient extends HttpRpcClient {
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "httpAsync: module=" + localModule + ", resname=" + resname + ", enter forEachCollectionFuture");
}
return forEachCollectionFuture(logger.isLoggable(Level.FINEST), userid, req,
return forEachCollectionFuture(workThread, logger.isLoggable(Level.FINEST), userid, req,
(req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + req.getRequestURI(),
clientHeaders, clientBody, addrs.iterator());
});
}
private CompletableFuture<HttpResult<byte[]>> forEachCollectionFuture(boolean finest, Serializable userid,
private CompletableFuture<HttpResult<byte[]>> forEachCollectionFuture(final WorkThread workThread, boolean finest, Serializable userid,
HttpSimpleRequest req, String requesturi, final Map<String, String> clientHeaders, byte[] clientBody, Iterator<InetSocketAddress> it) {
if (!it.hasNext()) {
return CompletableFuture.completedFuture(null);
@@ -178,9 +178,7 @@ public class HttpClusterRpcClient extends HttpRpcClient {
}
return httpClient.sendAsync(builder.build(), java.net.http.HttpResponse.BodyHandlers.ofByteArray())
.thenApply((java.net.http.HttpResponse<byte[]> resp) -> {
if (isNotEmpty(req.getTraceid())) {
Traces.computeIfAbsent(req.getTraceid());
}
Traces.currentTraceid(req.getTraceid());
final int rs = resp.statusCode();
if (rs != 200) {
return new HttpResult<byte[]>().status(rs);

View File

@@ -19,7 +19,6 @@ import org.redkale.net.http.*;
import org.redkale.util.RedkaleException;
import org.redkale.util.Traces;
import static org.redkale.util.Utility.isEmpty;
import static org.redkale.util.Utility.isNotEmpty;
/**
* 没有配置MQ且也没有ClusterAgent的情况下实现的默认HttpMessageClient实例
@@ -146,9 +145,7 @@ public class HttpLocalRpcClient extends HttpRpcClient {
future.completeExceptionally(e);
}
return future.thenApply(rs -> {
if (isNotEmpty(request.getTraceid())) {
Traces.computeIfAbsent(request.getTraceid());
}
Traces.currentTraceid(request.getTraceid());
if (rs == null) {
return new HttpResult();
}

View File

@@ -44,7 +44,7 @@ public class MessageRespProcessor implements MessageProcessor {
logger.log(Level.FINEST, getClass().getSimpleName() + ".MessageRespFuture.receive (mq.delay = " + deplay + "ms, mq.seqid = " + msg.getSeqid() + ")");
}
messageClient.getMessageAgent().execute(() -> {
Traces.computeIfAbsent(traceid);
Traces.currentTraceid(traceid);
resp.future.complete(msg);
long comems = System.currentTimeMillis() - now;
if ((deplay > 1000 || comems > 1000) && logger.isLoggable(Level.FINE)) {

View File

@@ -117,7 +117,13 @@ class AsyncNioCompletionHandler<A> implements CompletionHandler<Integer, A>, Run
CompletionHandler<Integer, A> handler0 = handler;
A attachment0 = attachment;
clear();
handler0.failed(exc, attachment0);
if (handler0 == null) { //可能和超时run方法同时执行
if (exc != null) {
exc.printStackTrace();
}
} else {
handler0.failed(exc, attachment0);
}
}
@Override

View File

@@ -13,7 +13,6 @@ import java.util.function.*;
import java.util.logging.Logger;
import org.redkale.net.*;
import org.redkale.util.*;
import static org.redkale.util.Utility.isNotEmpty;
/**
*
@@ -31,7 +30,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
public static final int DEFAULT_MAX_PIPELINES = 128;
protected boolean debug;
protected boolean debug = false;
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
@@ -91,7 +90,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
protected Supplier<R> closeRequestSupplier;
//创建连接后进行的登录鉴权操作
protected Function<C, CompletableFuture<C>> authenticate;
protected Function<String, Function<C, CompletableFuture<C>>> authenticate;
protected Client(String name, AsyncGroup group, ClientAddress address) {
this(name, group, true, address, Utility.cpus(), DEFAULT_MAX_PIPELINES, null, null, null);
@@ -110,18 +109,18 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
}
protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxConns,
Function<C, CompletableFuture<C>> authenticate) {
Function<String, Function<C, CompletableFuture<C>>> authenticate) {
this(name, group, tcp, address, maxConns, DEFAULT_MAX_PIPELINES, null, null, authenticate);
}
protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxConns,
Supplier<R> closeRequestSupplier, Function<C, CompletableFuture<C>> authenticate) {
Supplier<R> closeRequestSupplier, Function<String, Function<C, CompletableFuture<C>>> authenticate) {
this(name, group, tcp, address, maxConns, DEFAULT_MAX_PIPELINES, null, closeRequestSupplier, authenticate);
}
@SuppressWarnings("OverridableMethodCallInConstructor")
protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxConns,
int maxPipelines, Supplier<R> pingRequestSupplier, Supplier<R> closeRequestSupplier, Function<C, CompletableFuture<C>> authenticate) {
int maxPipelines, Supplier<R> pingRequestSupplier, Supplier<R> closeRequestSupplier, Function<String, Function<C, CompletableFuture<C>>> authenticate) {
if (maxPipelines < 1) {
throw new IllegalArgumentException("maxPipelines must bigger 0");
}
@@ -220,7 +219,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
}
public final CompletableFuture<P> sendAsync(R request) {
request.traceid = Traces.computeIfAbsent(request.traceid);
request.traceid = Traces.computeIfAbsent(request.traceid, Traces.currentTraceid());
if (request.workThread == null) {
request.workThread = WorkThread.currentWorkThread();
}
@@ -228,7 +227,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
}
public final <T> CompletableFuture<T> sendAsync(R request, Function<P, T> respTransfer) {
request.traceid = Traces.computeIfAbsent(request.traceid);
request.traceid = Traces.computeIfAbsent(request.traceid, Traces.currentTraceid());
if (request.workThread == null) {
request.workThread = WorkThread.currentWorkThread();
}
@@ -236,7 +235,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
}
public final CompletableFuture<P> sendAsync(SocketAddress addr, R request) {
request.traceid = Traces.computeIfAbsent(request.traceid);
request.traceid = Traces.computeIfAbsent(request.traceid, Traces.currentTraceid());
if (request.workThread == null) {
request.workThread = WorkThread.currentWorkThread();
}
@@ -244,7 +243,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
}
public final <T> CompletableFuture<T> sendAsync(SocketAddress addr, R request, Function<P, T> respTransfer) {
request.traceid = Traces.computeIfAbsent(request.traceid);
request.traceid = Traces.computeIfAbsent(request.traceid, Traces.currentTraceid());
if (request.workThread == null) {
request.workThread = WorkThread.currentWorkThread();
}
@@ -260,7 +259,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
}
public final CompletableFuture<List<P>> sendAsync(R[] requests) {
requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid);
requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid());
for (R request : requests) {
if (request.workThread == null) {
request.workThread = WorkThread.currentWorkThread();
@@ -270,7 +269,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
}
public final <T> CompletableFuture<List<T>> sendAsync(R[] requests, Function<P, T> respTransfer) {
requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid);
requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid());
for (R request : requests) {
if (request.workThread == null) {
request.workThread = WorkThread.currentWorkThread();
@@ -280,7 +279,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
}
public final CompletableFuture<List<P>> sendAsync(SocketAddress addr, R[] requests) {
requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid);
requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid());
for (R request : requests) {
if (request.workThread == null) {
request.workThread = WorkThread.currentWorkThread();
@@ -290,7 +289,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
}
public final <T> CompletableFuture<List<T>> sendAsync(SocketAddress addr, R[] requests, Function<P, T> respTransfer) {
requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid);
requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid());
for (R request : requests) {
if (request.workThread == null) {
request.workThread = WorkThread.currentWorkThread();
@@ -299,6 +298,11 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
return connect(addr).thenCompose(conn -> writeChannel(conn, requests, respTransfer));
}
protected CompletableFuture<List<P>> writeChannelBatch(ClientConnection conn, R... requests) {
requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid());
return conn.writeChannel(requests);
}
protected CompletableFuture<List<P>> writeChannel(ClientConnection conn, R[] requests) {
return conn.writeChannel(requests);
}
@@ -324,43 +328,38 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
if (pool && cc != null && cc.isOpen()) {
return CompletableFuture.completedFuture(cc);
}
long s = System.currentTimeMillis();
final Queue<CompletableFuture<C>> waitQueue = this.connAcquireWaitings[connIndex];
if (!pool || this.connOpenStates[connIndex].compareAndSet(false, true)) {
CompletableFuture<C> future = group.createClient(tcp, this.address.randomAddress(), readTimeoutSeconds, writeTimeoutSeconds)
.thenApply(c -> {
if (isNotEmpty(traceid)) {
Traces.computeIfAbsent(traceid);
}
return (C) createClientConnection(connIndex, c).setMaxPipelines(maxPipelines);
Traces.currentTraceid(traceid);
C rs = (C) createClientConnection(connIndex, c).setMaxPipelines(maxPipelines);
// if (debug) {
// logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + rs
// + ", " + rs.channel + ", 创建TCP连接耗时: (" + (System.currentTimeMillis() - s) + "ms)");
// }
return rs;
});
R virtualReq = createVirtualRequestAfterConnect();
if (virtualReq != null) {
virtualReq.traceid = traceid;
future = future.thenCompose(conn -> {
if (isNotEmpty(traceid)) {
Traces.computeIfAbsent(traceid);
}
return conn.writeVirtualRequest(virtualReq).thenApply(v -> {
if (isNotEmpty(traceid)) {
Traces.computeIfAbsent(traceid);
}
return conn;
});
Traces.currentTraceid(traceid);
return conn.writeVirtualRequest(virtualReq).thenApply(v -> conn);
});
} else {
future = future.thenApply(conn -> {
Traces.currentTraceid(traceid);
conn.channel.readRegister(conn.getCodec()); //不用readRegisterInIOThread因executeRead可能会异步
return conn;
});
}
if (authenticate != null) {
future = future.thenCompose(authenticate);
future = future.thenCompose(authenticate.apply(traceid));
}
return future.thenApply(c -> {
if (isNotEmpty(traceid)) {
Traces.computeIfAbsent(traceid);
}
Traces.currentTraceid(traceid);
c.setAuthenticated(true);
if (pool) {
this.connArray[connIndex] = c;
@@ -370,9 +369,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
if (workThread != null) {
CompletableFuture<C> fs = f;
workThread.execute(() -> {
if (isNotEmpty(traceid)) {
Traces.computeIfAbsent(traceid);
}
Traces.currentTraceid(traceid);
fs.complete(c);
});
} else {
@@ -418,33 +415,41 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
WorkThread workThread = WorkThread.currentWorkThread();
final Queue<CompletableFuture<C>> waitQueue = entry.connAcquireWaitings;
if (!pool || entry.connOpenState.compareAndSet(false, true)) {
long s = System.currentTimeMillis();
CompletableFuture<C> future = group.createClient(tcp, addr, readTimeoutSeconds, writeTimeoutSeconds)
.thenApply(c -> (C) createClientConnection(-1, c).setMaxPipelines(maxPipelines));
R virtualReq = createVirtualRequestAfterConnect();
if (virtualReq != null) {
virtualReq.traceid = traceid;
future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn));
future = future.thenCompose(conn -> {
Traces.currentTraceid(traceid);
return conn.writeVirtualRequest(virtualReq).thenApply(v -> conn);
});
} else {
future = future.thenApply(conn -> {
// if (debug) {
// logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + conn
// + ", 注册读操作: (" + (System.currentTimeMillis() - s) + "ms) " + conn.channel);
// }
conn.channel.readRegister(conn.getCodec()); //不用readRegisterInIOThread因executeRead可能会异步
return conn;
});
}
if (authenticate != null) {
future = future.thenCompose(authenticate);
future = future.thenCompose(authenticate.apply(traceid));
}
return future.thenApply(c -> {
c.setAuthenticated(true);
if (pool) {
entry.connection = c;
CompletableFuture<C> f;
Traces.computeIfAbsent(traceid);
Traces.currentTraceid(traceid);
while ((f = waitQueue.poll()) != null) {
if (!f.isDone()) {
if (workThread != null) {
CompletableFuture<C> fs = f;
workThread.execute(() -> {
Traces.computeIfAbsent(traceid);
Traces.currentTraceid(traceid);
fs.complete(c);
});
} else {

View File

@@ -112,6 +112,9 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
void responseComplete(boolean halfCompleted, ClientFuture<R, P> respFuture, P message, Throwable exc) {
R request = respFuture.request;
if (request != null) {
Traces.currentTraceid(request.getTraceid());
}
AsyncIOThread readThread = connection.channel.getReadIOThread();
final WorkThread workThread = request.workThread;
try {
@@ -130,34 +133,35 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
connection.client.incrRespDoneCounter();
}
respFuture.cancelTimeout();
// if (connection.client.debug) {
// connection.client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + connection + ", 回调处理, req=" + request + ", message=" + message, cause);
// }
// if (connection.client.debug) {
// connection.client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": " + connection
// + ", 回调处理(" + (request != null ? (System.currentTimeMillis() - request.getCreateTime()) : -1) + "ms), req=" + request + ", message=" + message, exc);
// }
connection.preComplete(message, (R) request, exc);
if (exc == null) {
final P rs = request.respTransfer == null ? message : (P) request.respTransfer.apply(message);
if (workThread == null) {
readThread.runWork(() -> {
Traces.computeIfAbsent(request.traceid);
Traces.currentTraceid(request.traceid);
respFuture.complete(rs);
Traces.removeTraceid();
});
} else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
if (workThread.inIO()) {
Traces.computeIfAbsent(request.traceid);
Traces.currentTraceid(request.traceid);
respFuture.complete(rs);
Traces.removeTraceid();
} else {
workThread.execute(() -> {
Traces.computeIfAbsent(request.traceid);
Traces.currentTraceid(request.traceid);
respFuture.complete(rs);
Traces.removeTraceid();
});
}
} else {
workThread.runWork(() -> {
Traces.computeIfAbsent(request.traceid);
Traces.currentTraceid(request.traceid);
respFuture.complete(rs);
Traces.removeTraceid();
});
@@ -165,25 +169,25 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
} else { //异常
if (workThread == null) {
readThread.runWork(() -> {
Traces.computeIfAbsent(request.traceid);
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(exc);
Traces.removeTraceid();
});
} else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
if (workThread.inIO()) {
Traces.computeIfAbsent(request.traceid);
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(exc);
Traces.removeTraceid();
} else {
workThread.execute(() -> {
Traces.computeIfAbsent(request.traceid);
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(exc);
Traces.removeTraceid();
});
}
} else {
workThread.runWork(() -> {
Traces.computeIfAbsent(request.traceid);
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(exc);
Traces.removeTraceid();
});
@@ -192,25 +196,25 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
} catch (Throwable t) {
if (workThread == null) {
readThread.runWork(() -> {
Traces.computeIfAbsent(request.traceid);
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(t);
Traces.removeTraceid();
});
} else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
if (workThread.inIO()) {
Traces.computeIfAbsent(request.traceid);
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(t);
Traces.removeTraceid();
} else {
workThread.execute(() -> {
Traces.computeIfAbsent(request.traceid);
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(t);
Traces.removeTraceid();
});
}
} else {
workThread.runWork(() -> {
Traces.computeIfAbsent(request.traceid);
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(t);
Traces.removeTraceid();
});

View File

@@ -118,6 +118,10 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
//respTransfer只会在ClientCodec的读线程里调用
protected final <T> CompletableFuture<T> writeChannel(R request, Function<P, T> respTransfer) {
// if (client.debug) {
// client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": "
// + this + ", 发送请求: " + request);
// }
request.respTransfer = respTransfer;
ClientFuture respFuture = createClientFuture(request);
int rts = this.channel.getReadTimeoutSeconds();
@@ -163,6 +167,10 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
//respTransfer只会在ClientCodec的读线程里调用
protected final <T> CompletableFuture<List<T>> writeChannel(R[] requests, Function<P, T> respTransfer) {
// if (client.debug) {
// client.logger.log(Level.FINEST, Utility.nowMillis() + ": " + Thread.currentThread().getName() + ": "
// + this + ", 发送请求: " + Arrays.toString(requests) + ", readTimeoutSeconds: " + this.channel.getReadTimeoutSeconds());
// }
ClientFuture[] respFutures = new ClientFuture[requests.length];
int rts = this.channel.getReadTimeoutSeconds();
for (int i = 0; i < respFutures.length; i++) {

View File

@@ -10,7 +10,6 @@ import java.util.concurrent.*;
import org.redkale.annotation.Nonnull;
import org.redkale.net.*;
import org.redkale.util.Traces;
import org.redkale.util.Utility;
/**
*
@@ -86,9 +85,7 @@ public class ClientFuture<R extends ClientRequest, T> extends CompletableFuture<
workThread = conn.getChannel().getReadIOThread();
}
workThread.runWork(() -> {
if (Utility.isNotEmpty(traceid)) {
Traces.computeIfAbsent(traceid);
}
Traces.currentTraceid(traceid);
completeExceptionally(ex);
Traces.removeTraceid();
});

View File

@@ -585,7 +585,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
*/
public void finishFuture(final Convert convert, Type valueType, CompletionStage future) {
future.whenComplete((v, e) -> {
Traces.computeIfAbsent(request.getTraceid());
Traces.currentTraceid(request.getTraceid());
if (e != null) {
context.getLogger().log(Level.WARNING, "Servlet occur exception. request = " + request + ", result is CompletionStage", (Throwable) e);
if (e instanceof TimeoutException) {
@@ -619,7 +619,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
*/
public void finishJsonFuture(final Convert convert, Type valueType, CompletionStage future) {
future.whenComplete((v, e) -> {
Traces.computeIfAbsent(request.getTraceid());
Traces.currentTraceid(request.getTraceid());
if (e != null) {
context.getLogger().log(Level.WARNING, "Servlet occur exception. request = " + request + ", result is CompletionStage", (Throwable) e);
if (e instanceof TimeoutException) {

View File

@@ -17,7 +17,6 @@ import org.redkale.convert.json.JsonConvert;
import org.redkale.net.*;
import static org.redkale.net.http.HttpRequest.parseHeaderName;
import org.redkale.util.*;
import static org.redkale.util.Utility.isNotEmpty;
/**
* 简单的HttpClient实现, 存在以下情况不能使用此类: <br>
@@ -203,13 +202,12 @@ public class HttpSimpleClient {
public <T> CompletableFuture<HttpResult<T>> sendAsync(String method, String url, Map<String, String> headers, byte[] body, Convert convert, Type valueType) {
final String traceid = Traces.currentTraceid();
final WorkThread workThread = WorkThread.currentWorkThread();
final URI uri = URI.create(url);
final String host = uri.getHost();
final int port = uri.getPort() > 0 ? uri.getPort() : (url.startsWith("https:") ? 443 : 80);
return createConnection(host, port).thenCompose(conn -> {
if (isNotEmpty(traceid)) {
Traces.computeIfAbsent(traceid);
}
Traces.currentTraceid(traceid);
final ByteArray array = new ByteArray();
int urlpos = url.indexOf("/", url.indexOf("//") + 3);
array.put((method.toUpperCase() + " " + (urlpos > 0 ? url.substring(urlpos) : "/") + " HTTP/1.1\r\n"
@@ -234,16 +232,24 @@ public class HttpSimpleClient {
conn.write(array, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
conn.read(new ClientReadCompletionHandler(conn, traceid, array.clear(), convert, valueType, future));
conn.read(new ClientReadCompletionHandler(conn, workThread, traceid, array.clear(), convert, valueType, future));
}
@Override
public void failed(Throwable exc, Void attachment) {
if (isNotEmpty(traceid)) {
Traces.computeIfAbsent(traceid);
}
Traces.currentTraceid(traceid);
conn.dispose();
future.completeExceptionally(exc);
if (workThread == null) {
Utility.execute(() -> {
Traces.currentTraceid(traceid);
future.completeExceptionally(exc);
});
} else {
workThread.runWork(() -> {
Traces.currentTraceid(traceid);
future.completeExceptionally(exc);
});
}
}
});
return future;
@@ -309,6 +315,8 @@ public class HttpSimpleClient {
protected final String traceid;
protected final WorkThread workThread;
protected final CompletableFuture<HttpResult<T>> future;
protected Convert convert;
@@ -321,8 +329,9 @@ public class HttpSimpleClient {
protected int contentLength = -1;
public ClientReadCompletionHandler(HttpConnection conn, String traceid, ByteArray array, Convert convert, Type valueType, CompletableFuture<HttpResult<T>> future) {
public ClientReadCompletionHandler(HttpConnection conn, WorkThread workThread, String traceid, ByteArray array, Convert convert, Type valueType, CompletableFuture<HttpResult<T>> future) {
this.conn = conn;
this.workThread = workThread;
this.traceid = traceid;
this.array = array;
this.convert = convert;
@@ -332,9 +341,7 @@ public class HttpSimpleClient {
@Override
public void completed(Integer count, ByteBuffer buffer) {
if (isNotEmpty(traceid)) {
Traces.computeIfAbsent(traceid);
}
Traces.currentTraceid(traceid);
buffer.flip();
if (this.readState == READ_STATE_ROUTE) {
if (this.responseResult == null) {
@@ -388,12 +395,42 @@ public class HttpSimpleClient {
HttpResult result = this.responseResult;
try {
result.result(c.convertFrom(valueType, this.responseResult.getResult()));
this.future.complete((HttpResult<T>) this.responseResult);
if (workThread == null) {
Utility.execute(() -> {
Traces.currentTraceid(traceid);
future.complete((HttpResult<T>) this.responseResult);
});
} else {
workThread.runWork(() -> {
Traces.currentTraceid(traceid);
future.complete((HttpResult<T>) this.responseResult);
});
}
} catch (Exception e) {
this.future.completeExceptionally(e);
if (workThread == null) {
Utility.execute(() -> {
Traces.currentTraceid(traceid);
future.completeExceptionally(e);
});
} else {
workThread.runWork(() -> {
Traces.currentTraceid(traceid);
future.completeExceptionally(e);
});
}
}
} else {
this.future.complete((HttpResult<T>) this.responseResult);
if (workThread == null) {
Utility.execute(() -> {
Traces.currentTraceid(traceid);
future.complete((HttpResult<T>) this.responseResult);
});
} else {
workThread.runWork(() -> {
Traces.currentTraceid(traceid);
future.complete((HttpResult<T>) this.responseResult);
});
}
}
}

View File

@@ -286,7 +286,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
@Override
public void completed(Integer result, Void attachment) {
Traces.computeIfAbsent(request.getTraceid());
Traces.currentTraceid(request.getTraceid());
webSocket._readHandler = new WebSocketReadHandler(response.getContext(), webSocket, byteArrayPool, restMessageConsumer);
webSocket._writeHandler = new WebSocketWriteHandler(response.getContext(), webSocket, byteArrayPool);
@@ -300,7 +300,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
return;
}
userFuture.whenComplete((userid, ex2) -> {
Traces.computeIfAbsent(request.getTraceid());
Traces.currentTraceid(request.getTraceid());
if ((userid == null && webSocket.delayPackets == null) || ex2 != null) {
if (debug || ex2 != null) {
logger.log(ex2 == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create userid abort. request = " + request, ex2);
@@ -312,7 +312,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
webSocket._userid = userid;
if (single && !anyuser) {
webSocketNode.existsWebSocket(userid).whenComplete((rs, nex) -> {
Traces.computeIfAbsent(request.getTraceid());
Traces.currentTraceid(request.getTraceid());
if (rs) {
CompletableFuture<Boolean> rcFuture = webSocket.onSingleRepeatConnect();
Consumer<Boolean> task = (oldkilled) -> {
@@ -356,7 +356,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
//CompletableFuture<Integer> cf = webSocket._writeIOThread.send(webSocket, delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
CompletableFuture<Integer> cf = webSocket._writeHandler.send(delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
cf.whenComplete((Integer v, Throwable t) -> {
Traces.computeIfAbsent(request.getTraceid());
Traces.currentTraceid(request.getTraceid());
if (userid == null || t != null) {
if (t != null) {
logger.log(Level.FINEST, "WebSocket connect abort, Response send delayPackets abort. request = " + request, t);
@@ -377,7 +377,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
//CompletableFuture<Integer> cf = webSocket._writeIOThread.send(webSocket, delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
CompletableFuture<Integer> cf = webSocket._writeHandler.send(delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
cf.whenComplete((Integer v, Throwable t) -> {
Traces.computeIfAbsent(request.getTraceid());
Traces.currentTraceid(request.getTraceid());
if (sessionid == null || t != null) {
if (t != null) {
logger.log(Level.FINEST, "WebSocket connect abort, Response send delayPackets abort. request = " + request, t);
@@ -401,12 +401,12 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
};
WorkThread workThread = WorkThread.currentWorkThread();
sessionFuture.whenComplete((sessionid, ex) -> {
Traces.computeIfAbsent(request.getTraceid());
Traces.currentTraceid(request.getTraceid());
if (workThread == null || workThread == Thread.currentThread()) {
sessionConsumer.accept(sessionid, ex);
} else {
workThread.runWork(() -> {
Traces.computeIfAbsent(request.getTraceid());
Traces.currentTraceid(request.getTraceid());
sessionConsumer.accept(sessionid, ex);
});
}

View File

@@ -146,7 +146,7 @@ public class SncpResponse extends Response<SncpContext, SncpRequest> {
finishVoid();
} else if (future instanceof CompletionStage) {
((CompletionStage) future).whenComplete((v, t) -> {
Traces.computeIfAbsent(request.getTraceid());
Traces.currentTraceid(request.getTraceid());
if (t != null) {
finishError((Throwable) t);
} else {

View File

@@ -27,33 +27,99 @@ public class Traces {
private static final ThreadLocal<String> localTrace = new ThreadLocal<>();
/**
* 是否开启了trace功能
*
* @return
*/
public static boolean enable() {
return enable;
}
/**
* 创建一个新的traceid
*
* @return
*/
public static String createTraceid() {
return enable ? tidSupplier.get() : null;
}
/**
* 获取当前线程的traceid
*
* @return
*/
public static String currentTraceid() {
return enable ? localTrace.get() : null;
}
/**
* 移除当前线程的traceid
*/
public static void removeTraceid() {
if (enable) {
localTrace.remove();
}
}
public static String computeIfAbsent(String requestTraceid) {
/**
* 设置当前线程的traceid 如果参数为空则清除当前线程traceid
*
* @param traceid
*
* @return
*/
public static void currentTraceid(String traceid) {
if (enable) {
String rs = requestTraceid;
if (traceid != null && !traceid.isEmpty()) {
localTrace.set(traceid);
} else {
localTrace.remove();
}
}
}
/**
* 设置当前线程的traceid 若参数为空则会创建一个新的traceid
*
* @param traceid
*
* @return
*/
public static String computeIfAbsent(String traceid) {
if (enable) {
String rs = traceid;
if (rs == null || rs.isEmpty()) {
rs = tidSupplier.get();
}
localTrace.set(rs);
return rs;
}
return requestTraceid;
return traceid;
}
/**
* 设置当前线程的traceid 若参数1为空则使用参数2若参数2未空则会创建一个新的traceid
*
* @param traceid
* @param traceid2
*
* @return
*/
public static String computeIfAbsent(String traceid, String traceid2) {
if (enable) {
String rs = traceid;
if (rs == null || rs.isEmpty()) {
if (traceid2 == null || traceid2.isEmpty()) {
rs = tidSupplier.get();
} else {
rs = traceid2;
}
}
localTrace.set(rs);
return rs;
}
return traceid;
}
}