优化traceid

This commit is contained in:
redkale
2023-10-21 00:18:22 +08:00
parent 48a4c60737
commit da81509157
15 changed files with 163 additions and 21 deletions

View File

@@ -13,6 +13,8 @@ import org.redkale.boot.Application;
import org.redkale.net.http.*; import org.redkale.net.http.*;
import org.redkale.util.Traces; import org.redkale.util.Traces;
import org.redkale.util.Utility; import org.redkale.util.Utility;
import static org.redkale.util.Utility.isEmpty;
import static org.redkale.util.Utility.isNotEmpty;
/** /**
* 没有配置MQ的情况下依赖ClusterAgent实现的默认HttpMessageClient实例 * 没有配置MQ的情况下依赖ClusterAgent实现的默认HttpMessageClient实例
@@ -74,8 +76,10 @@ public class HttpClusterRpcClient extends HttpRpcClient {
} }
private CompletableFuture<HttpResult<byte[]>> httpAsync(boolean produce, Serializable userid, HttpSimpleRequest req) { private CompletableFuture<HttpResult<byte[]>> httpAsync(boolean produce, Serializable userid, HttpSimpleRequest req) {
if (Utility.isEmpty(req.getTraceid())) { if (isEmpty(req.getTraceid())) {
req.setTraceid(Traces.currentTraceid()); req.setTraceid(Traces.currentTraceid());
} else {
Traces.computeIfAbsent(req.getTraceid());
} }
String module = req.getRequestURI(); String module = req.getRequestURI();
module = module.substring(1); //去掉/ module = module.substring(1); //去掉/
@@ -87,7 +91,10 @@ public class HttpClusterRpcClient extends HttpRpcClient {
logger.log(Level.FINEST, "httpAsync.queryHttpAddress: module=" + localModule + ", resname=" + resname); logger.log(Level.FINEST, "httpAsync.queryHttpAddress: module=" + localModule + ", resname=" + resname);
} }
return clusterAgent.queryHttpAddress("http", module, resname).thenCompose(addrs -> { return clusterAgent.queryHttpAddress("http", module, resname).thenCompose(addrs -> {
if (addrs == null || addrs.isEmpty()) { if (isNotEmpty(req.getTraceid())) {
Traces.computeIfAbsent(req.getTraceid());
}
if (isEmpty(addrs)) {
if (logger.isLoggable(Level.WARNING)) { if (logger.isLoggable(Level.WARNING)) {
logger.log(Level.WARNING, "httpAsync." + (produce ? "produceMessage" : "sendMessage") + " failed, module=" + localModule + ", resname=" + resname + ", address is empty"); logger.log(Level.WARNING, "httpAsync." + (produce ? "produceMessage" : "sendMessage") + " failed, module=" + localModule + ", resname=" + resname + ", address is empty");
} }
@@ -98,7 +105,7 @@ public class HttpClusterRpcClient extends HttpRpcClient {
if (req.isRpc()) { if (req.isRpc()) {
clientHeaders.put(Rest.REST_HEADER_RPC, "true"); clientHeaders.put(Rest.REST_HEADER_RPC, "true");
} }
if (Utility.isNotEmpty(req.getTraceid())) { if (isNotEmpty(req.getTraceid())) {
clientHeaders.put(Rest.REST_HEADER_TRACEID, req.getTraceid()); clientHeaders.put(Rest.REST_HEADER_TRACEID, req.getTraceid());
} }
if (req.isFrombody()) { if (req.isFrombody()) {
@@ -171,6 +178,9 @@ public class HttpClusterRpcClient extends HttpRpcClient {
} }
return httpClient.sendAsync(builder.build(), java.net.http.HttpResponse.BodyHandlers.ofByteArray()) return httpClient.sendAsync(builder.build(), java.net.http.HttpResponse.BodyHandlers.ofByteArray())
.thenApply((java.net.http.HttpResponse<byte[]> resp) -> { .thenApply((java.net.http.HttpResponse<byte[]> resp) -> {
if (isNotEmpty(req.getTraceid())) {
Traces.computeIfAbsent(req.getTraceid());
}
final int rs = resp.statusCode(); final int rs = resp.statusCode();
if (rs != 200) { if (rs != 200) {
return new HttpResult<byte[]>().status(rs); return new HttpResult<byte[]>().status(rs);

View File

@@ -18,7 +18,8 @@ import org.redkale.convert.json.JsonConvert;
import org.redkale.net.http.*; import org.redkale.net.http.*;
import org.redkale.util.RedkaleException; import org.redkale.util.RedkaleException;
import org.redkale.util.Traces; import org.redkale.util.Traces;
import org.redkale.util.Utility; import static org.redkale.util.Utility.isEmpty;
import static org.redkale.util.Utility.isNotEmpty;
/** /**
* 没有配置MQ且也没有ClusterAgent的情况下实现的默认HttpMessageClient实例 * 没有配置MQ且也没有ClusterAgent的情况下实现的默认HttpMessageClient实例
@@ -101,7 +102,7 @@ public class HttpLocalRpcClient extends HttpRpcClient {
@Override @Override
public <T> CompletableFuture<T> sendMessage(Serializable userid, String groupid, HttpSimpleRequest request, Type type) { public <T> CompletableFuture<T> sendMessage(Serializable userid, String groupid, HttpSimpleRequest request, Type type) {
if (Utility.isEmpty(request.getTraceid())) { if (isEmpty(request.getTraceid())) {
request.setTraceid(Traces.currentTraceid()); request.setTraceid(Traces.currentTraceid());
} }
String topic = generateHttpReqTopic(request, request.getPath()); String topic = generateHttpReqTopic(request, request.getPath());
@@ -126,7 +127,7 @@ public class HttpLocalRpcClient extends HttpRpcClient {
@Override @Override
public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) { public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) {
if (Utility.isEmpty(request.getTraceid())) { if (isEmpty(request.getTraceid())) {
request.setTraceid(Traces.currentTraceid()); request.setTraceid(Traces.currentTraceid());
} }
HttpServlet servlet = findHttpServlet(topic); HttpServlet servlet = findHttpServlet(topic);
@@ -145,6 +146,9 @@ public class HttpLocalRpcClient extends HttpRpcClient {
future.completeExceptionally(e); future.completeExceptionally(e);
} }
return future.thenApply(rs -> { return future.thenApply(rs -> {
if (isNotEmpty(request.getTraceid())) {
Traces.computeIfAbsent(request.getTraceid());
}
if (rs == null) { if (rs == null) {
return new HttpResult(); return new HttpResult();
} }

View File

@@ -80,7 +80,7 @@ public class WorkThread extends Thread implements Executor {
@Override @Override
public void execute(Runnable command) { public void execute(Runnable command) {
if (workExecutor == null) { if (workExecutor == null) {
command.run(); Utility.execute(command);
} else { } else {
workExecutor.execute(command); workExecutor.execute(command);
} }
@@ -89,7 +89,7 @@ public class WorkThread extends Thread implements Executor {
public void execute(Runnable... commands) { public void execute(Runnable... commands) {
if (workExecutor == null) { if (workExecutor == null) {
for (Runnable command : commands) { for (Runnable command : commands) {
command.run(); Utility.execute(command);
} }
} else { } else {
for (Runnable command : commands) { for (Runnable command : commands) {
@@ -104,7 +104,7 @@ public class WorkThread extends Thread implements Executor {
} }
if (workExecutor == null) { if (workExecutor == null) {
for (Runnable command : commands) { for (Runnable command : commands) {
command.run(); Utility.execute(command);
} }
} else { } else {
for (Runnable command : commands) { for (Runnable command : commands) {
@@ -116,7 +116,7 @@ public class WorkThread extends Thread implements Executor {
//与execute的区别在于子类AsyncIOThread中execute会被重载确保在IO线程中执行 //与execute的区别在于子类AsyncIOThread中execute会被重载确保在IO线程中执行
public final void runWork(Runnable command) { public final void runWork(Runnable command) {
if (workExecutor == null) { if (workExecutor == null) {
command.run(); Utility.execute(command);
} else { } else {
workExecutor.execute(command); workExecutor.execute(command);
} }

View File

@@ -219,6 +219,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
} }
public final CompletableFuture<P> sendAsync(R request) { public final CompletableFuture<P> sendAsync(R request) {
request.traceid = Traces.computeIfAbsent(request.traceid);
if (request.workThread == null) { if (request.workThread == null) {
request.workThread = WorkThread.currentWorkThread(); request.workThread = WorkThread.currentWorkThread();
} }
@@ -226,6 +227,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
} }
public final <T> CompletableFuture<T> sendAsync(R request, Function<P, T> respTransfer) { public final <T> CompletableFuture<T> sendAsync(R request, Function<P, T> respTransfer) {
request.traceid = Traces.computeIfAbsent(request.traceid);
if (request.workThread == null) { if (request.workThread == null) {
request.workThread = WorkThread.currentWorkThread(); request.workThread = WorkThread.currentWorkThread();
} }
@@ -233,6 +235,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
} }
public final CompletableFuture<P> sendAsync(SocketAddress addr, R request) { public final CompletableFuture<P> sendAsync(SocketAddress addr, R request) {
request.traceid = Traces.computeIfAbsent(request.traceid);
if (request.workThread == null) { if (request.workThread == null) {
request.workThread = WorkThread.currentWorkThread(); request.workThread = WorkThread.currentWorkThread();
} }
@@ -240,6 +243,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
} }
public final <T> CompletableFuture<T> sendAsync(SocketAddress addr, R request, Function<P, T> respTransfer) { public final <T> CompletableFuture<T> sendAsync(SocketAddress addr, R request, Function<P, T> respTransfer) {
request.traceid = Traces.computeIfAbsent(request.traceid);
if (request.workThread == null) { if (request.workThread == null) {
request.workThread = WorkThread.currentWorkThread(); request.workThread = WorkThread.currentWorkThread();
} }
@@ -255,6 +259,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
} }
public final CompletableFuture<List<P>> sendAsync(R[] requests) { public final CompletableFuture<List<P>> sendAsync(R[] requests) {
requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid);
for (R request : requests) { for (R request : requests) {
if (request.workThread == null) { if (request.workThread == null) {
request.workThread = WorkThread.currentWorkThread(); request.workThread = WorkThread.currentWorkThread();
@@ -264,6 +269,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
} }
public final <T> CompletableFuture<List<T>> sendAsync(R[] requests, Function<P, T> respTransfer) { public final <T> CompletableFuture<List<T>> sendAsync(R[] requests, Function<P, T> respTransfer) {
requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid);
for (R request : requests) { for (R request : requests) {
if (request.workThread == null) { if (request.workThread == null) {
request.workThread = WorkThread.currentWorkThread(); request.workThread = WorkThread.currentWorkThread();
@@ -273,6 +279,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
} }
public final CompletableFuture<List<P>> sendAsync(SocketAddress addr, R[] requests) { public final CompletableFuture<List<P>> sendAsync(SocketAddress addr, R[] requests) {
requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid);
for (R request : requests) { for (R request : requests) {
if (request.workThread == null) { if (request.workThread == null) {
request.workThread = WorkThread.currentWorkThread(); request.workThread = WorkThread.currentWorkThread();
@@ -282,6 +289,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
} }
public final <T> CompletableFuture<List<T>> sendAsync(SocketAddress addr, R[] requests, Function<P, T> respTransfer) { public final <T> CompletableFuture<List<T>> sendAsync(SocketAddress addr, R[] requests, Function<P, T> respTransfer) {
requests[0].traceid = Traces.computeIfAbsent(requests[0].traceid);
for (R request : requests) { for (R request : requests) {
if (request.workThread == null) { if (request.workThread == null) {
request.workThread = WorkThread.currentWorkThread(); request.workThread = WorkThread.currentWorkThread();
@@ -307,6 +315,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
} }
private CompletableFuture<C> connect(final boolean pool) { private CompletableFuture<C> connect(final boolean pool) {
final String traceid = Traces.currentTraceid();
final int size = this.connArray.length; final int size = this.connArray.length;
WorkThread workThread = WorkThread.currentWorkThread(); WorkThread workThread = WorkThread.currentWorkThread();
final int connIndex = (workThread != null && workThread.threads() == size) ? workThread.index() : (int) Math.abs(connIndexSeq.getAndIncrement()) % size; final int connIndex = (workThread != null && workThread.threads() == size) ? workThread.index() : (int) Math.abs(connIndexSeq.getAndIncrement()) % size;
@@ -314,12 +323,14 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
if (pool && cc != null && cc.isOpen()) { if (pool && cc != null && cc.isOpen()) {
return CompletableFuture.completedFuture(cc); return CompletableFuture.completedFuture(cc);
} }
final Queue<CompletableFuture<C>> waitQueue = this.connAcquireWaitings[connIndex]; final Queue<CompletableFuture<C>> waitQueue = this.connAcquireWaitings[connIndex];
if (!pool || this.connOpenStates[connIndex].compareAndSet(false, true)) { if (!pool || this.connOpenStates[connIndex].compareAndSet(false, true)) {
CompletableFuture<C> future = group.createClient(tcp, this.address.randomAddress(), readTimeoutSeconds, writeTimeoutSeconds) CompletableFuture<C> future = group.createClient(tcp, this.address.randomAddress(), readTimeoutSeconds, writeTimeoutSeconds)
.thenApply(c -> (C) createClientConnection(connIndex, c).setMaxPipelines(maxPipelines)); .thenApply(c -> (C) createClientConnection(connIndex, c).setMaxPipelines(maxPipelines));
R virtualReq = createVirtualRequestAfterConnect(); R virtualReq = createVirtualRequestAfterConnect();
if (virtualReq != null) { if (virtualReq != null) {
virtualReq.traceid = traceid;
future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn)); future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn));
} else { } else {
future = future.thenApply(conn -> { future = future.thenApply(conn -> {
@@ -331,6 +342,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
future = future.thenCompose(authenticate); future = future.thenCompose(authenticate);
} }
return future.thenApply(c -> { return future.thenApply(c -> {
Traces.computeIfAbsent(traceid);
c.setAuthenticated(true); c.setAuthenticated(true);
if (pool) { if (pool) {
this.connArray[connIndex] = c; this.connArray[connIndex] = c;
@@ -339,7 +351,10 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
if (!f.isDone()) { if (!f.isDone()) {
if (workThread != null) { if (workThread != null) {
CompletableFuture<C> fs = f; CompletableFuture<C> fs = f;
workThread.execute(() -> fs.complete(c)); workThread.execute(() -> {
Traces.computeIfAbsent(traceid);
fs.complete(c);
});
} else { } else {
f.complete(c); f.complete(c);
} }
@@ -353,7 +368,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
} }
}); });
} else { } else {
CompletableFuture rs = Utility.orTimeout(new CompletableFuture(), 6, TimeUnit.SECONDS); CompletableFuture rs = Utility.orTimeout(new CompletableFuture(), readTimeoutSeconds, TimeUnit.SECONDS);
waitQueue.offer(rs); waitQueue.offer(rs);
return rs; return rs;
} }
@@ -371,6 +386,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
//指定地址获取连接 //指定地址获取连接
private CompletableFuture<C> connect(final boolean pool, final SocketAddress addr) { private CompletableFuture<C> connect(final boolean pool, final SocketAddress addr) {
final String traceid = Traces.currentTraceid();
if (addr == null) { if (addr == null) {
return connect(); return connect();
} }
@@ -386,6 +402,7 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
.thenApply(c -> (C) createClientConnection(-1, c).setMaxPipelines(maxPipelines)); .thenApply(c -> (C) createClientConnection(-1, c).setMaxPipelines(maxPipelines));
R virtualReq = createVirtualRequestAfterConnect(); R virtualReq = createVirtualRequestAfterConnect();
if (virtualReq != null) { if (virtualReq != null) {
virtualReq.traceid = traceid;
future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn)); future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn));
} else { } else {
future = future.thenApply(conn -> { future = future.thenApply(conn -> {
@@ -401,11 +418,15 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
if (pool) { if (pool) {
entry.connection = c; entry.connection = c;
CompletableFuture<C> f; CompletableFuture<C> f;
Traces.computeIfAbsent(traceid);
while ((f = waitQueue.poll()) != null) { while ((f = waitQueue.poll()) != null) {
if (!f.isDone()) { if (!f.isDone()) {
if (workThread != null) { if (workThread != null) {
CompletableFuture<C> fs = f; CompletableFuture<C> fs = f;
workThread.execute(() -> fs.complete(c)); workThread.execute(() -> {
Traces.computeIfAbsent(traceid);
fs.complete(c);
});
} else { } else {
f.complete(c); f.complete(c);
} }

View File

@@ -68,7 +68,11 @@ public class ClientAddress implements java.io.Serializable {
this.addresses = createAddressArray(this.weights); this.addresses = createAddressArray(this.weights);
addrs = this.addresses; addrs = this.addresses;
} }
addr = addrs[ThreadLocalRandom.current().nextInt(addrs.length)]; if (addrs.length == 1) {
addr = addrs[0];
} else {
addr = addrs[ThreadLocalRandom.current().nextInt(addrs.length)];
}
} }
return addr; return addr;
} }

View File

@@ -14,6 +14,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.*; import java.util.function.*;
import java.util.logging.Level;
import org.redkale.annotation.*; import org.redkale.annotation.*;
import org.redkale.net.*; import org.redkale.net.*;
import org.redkale.util.*; import org.redkale.util.*;
@@ -133,6 +134,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
} else { } else {
sendRequestInLocking(request, respFuture); sendRequestInLocking(request, respFuture);
} }
client.logger.log(Level.INFO, channel + ", " + request.getTraceid()+ " 发送完请求: " + request);
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
@@ -319,6 +321,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
public void dispose(Throwable exc) { public void dispose(Throwable exc) {
channel.offerWriteBuffer(writeBuffer); channel.offerWriteBuffer(writeBuffer);
channel.dispose(); channel.dispose();
System.out.println(Thread.currentThread().getName() + ": " + channel + ", 被关闭了");
Throwable e = exc == null ? new ClosedChannelException() : exc; Throwable e = exc == null ? new ClosedChannelException() : exc;
CompletableFuture f; CompletableFuture f;
respWaitingCounter.reset(); respWaitingCounter.reset();

View File

@@ -9,6 +9,8 @@ import java.util.Objects;
import java.util.concurrent.*; import java.util.concurrent.*;
import org.redkale.annotation.Nonnull; import org.redkale.annotation.Nonnull;
import org.redkale.net.*; import org.redkale.net.*;
import org.redkale.util.Traces;
import org.redkale.util.Utility;
/** /**
* *
@@ -72,6 +74,7 @@ public class ClientFuture<R extends ClientRequest, T> extends CompletableFuture<
} }
private void runTimeout() { private void runTimeout() {
String traceid = request != null ? request.getTraceid() : null;
conn.removeRespFuture(request.getRequestid(), this); conn.removeRespFuture(request.getRequestid(), this);
TimeoutException ex = new TimeoutException("client-request: " + request); TimeoutException ex = new TimeoutException("client-request: " + request);
WorkThread workThread = null; WorkThread workThread = null;
@@ -82,7 +85,13 @@ public class ClientFuture<R extends ClientRequest, T> extends CompletableFuture<
if (workThread == null || workThread.getWorkExecutor() == null) { if (workThread == null || workThread.getWorkExecutor() == null) {
workThread = conn.getChannel().getReadIOThread(); workThread = conn.getChannel().getReadIOThread();
} }
workThread.runWork(() -> completeExceptionally(ex)); workThread.runWork(() -> {
if (Utility.isNotEmpty(traceid)) {
Traces.computeIfAbsent(traceid);
}
completeExceptionally(ex);
Traces.removeTraceid();
});
} }
@Override @Override

View File

@@ -585,8 +585,9 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
*/ */
public void finishFuture(final Convert convert, Type valueType, CompletionStage future) { public void finishFuture(final Convert convert, Type valueType, CompletionStage future) {
future.whenComplete((v, e) -> { future.whenComplete((v, e) -> {
Traces.computeIfAbsent(request.getTraceid());
if (e != null) { if (e != null) {
context.getLogger().log(Level.WARNING, "Servlet occur exception. request = " + request + ", result is CompletionStage", e instanceof TimeoutException ? e.getClass() : e); context.getLogger().log(Level.WARNING, "Servlet occur exception. request = " + request + ", result is CompletionStage", (Throwable) e);
if (e instanceof TimeoutException) { if (e instanceof TimeoutException) {
finish504(); finish504();
} else { } else {
@@ -595,6 +596,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
return; return;
} }
finish(convert, valueType, v); finish(convert, valueType, v);
Traces.removeTraceid();
}); });
} }
@@ -617,8 +619,9 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
*/ */
public void finishJsonFuture(final Convert convert, Type valueType, CompletionStage future) { public void finishJsonFuture(final Convert convert, Type valueType, CompletionStage future) {
future.whenComplete((v, e) -> { future.whenComplete((v, e) -> {
Traces.computeIfAbsent(request.getTraceid());
if (e != null) { if (e != null) {
context.getLogger().log(Level.WARNING, "Servlet occur exception. request = " + request + ", result is CompletionStage", e instanceof TimeoutException ? e.getClass() : e); context.getLogger().log(Level.WARNING, "Servlet occur exception. request = " + request + ", result is CompletionStage", (Throwable) e);
if (e instanceof TimeoutException) { if (e instanceof TimeoutException) {
finish504(); finish504();
} else { } else {
@@ -627,6 +630,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
return; return;
} }
finishJson(convert, valueType, v); finishJson(convert, valueType, v);
Traces.removeTraceid();
}); });
} }

View File

@@ -17,6 +17,7 @@ import org.redkale.convert.json.JsonConvert;
import org.redkale.net.*; import org.redkale.net.*;
import static org.redkale.net.http.HttpRequest.parseHeaderName; import static org.redkale.net.http.HttpRequest.parseHeaderName;
import org.redkale.util.*; import org.redkale.util.*;
import static org.redkale.util.Utility.isNotEmpty;
/** /**
* 简单的HttpClient实现, 存在以下情况不能使用此类: <br> * 简单的HttpClient实现, 存在以下情况不能使用此类: <br>
@@ -201,10 +202,14 @@ public class HttpSimpleClient {
} }
public <T> CompletableFuture<HttpResult<T>> sendAsync(String method, String url, Map<String, String> headers, byte[] body, Convert convert, Type valueType) { public <T> CompletableFuture<HttpResult<T>> sendAsync(String method, String url, Map<String, String> headers, byte[] body, Convert convert, Type valueType) {
final String traceid = Traces.currentTraceid();
final URI uri = URI.create(url); final URI uri = URI.create(url);
final String host = uri.getHost(); final String host = uri.getHost();
final int port = uri.getPort() > 0 ? uri.getPort() : (url.startsWith("https:") ? 443 : 80); final int port = uri.getPort() > 0 ? uri.getPort() : (url.startsWith("https:") ? 443 : 80);
return createConnection(host, port).thenCompose(conn -> { return createConnection(host, port).thenCompose(conn -> {
if (isNotEmpty(traceid)) {
Traces.computeIfAbsent(traceid);
}
final ByteArray array = new ByteArray(); final ByteArray array = new ByteArray();
int urlpos = url.indexOf("/", url.indexOf("//") + 3); int urlpos = url.indexOf("/", url.indexOf("//") + 3);
array.put((method.toUpperCase() + " " + (urlpos > 0 ? url.substring(urlpos) : "/") + " HTTP/1.1\r\n" array.put((method.toUpperCase() + " " + (urlpos > 0 ? url.substring(urlpos) : "/") + " HTTP/1.1\r\n"
@@ -229,11 +234,14 @@ public class HttpSimpleClient {
conn.write(array, new CompletionHandler<Integer, Void>() { conn.write(array, new CompletionHandler<Integer, Void>() {
@Override @Override
public void completed(Integer result, Void attachment) { public void completed(Integer result, Void attachment) {
conn.read(new ClientReadCompletionHandler(conn, array.clear(), convert, valueType, future)); conn.read(new ClientReadCompletionHandler(conn, traceid, array.clear(), convert, valueType, future));
} }
@Override @Override
public void failed(Throwable exc, Void attachment) { public void failed(Throwable exc, Void attachment) {
if (isNotEmpty(traceid)) {
Traces.computeIfAbsent(traceid);
}
conn.dispose(); conn.dispose();
future.completeExceptionally(exc); future.completeExceptionally(exc);
} }
@@ -299,6 +307,8 @@ public class HttpSimpleClient {
protected final ByteArray array; protected final ByteArray array;
protected final String traceid;
protected final CompletableFuture<HttpResult<T>> future; protected final CompletableFuture<HttpResult<T>> future;
protected Convert convert; protected Convert convert;
@@ -311,8 +321,9 @@ public class HttpSimpleClient {
protected int contentLength = -1; protected int contentLength = -1;
public ClientReadCompletionHandler(HttpConnection conn, ByteArray array, Convert convert, Type valueType, CompletableFuture<HttpResult<T>> future) { public ClientReadCompletionHandler(HttpConnection conn, String traceid, ByteArray array, Convert convert, Type valueType, CompletableFuture<HttpResult<T>> future) {
this.conn = conn; this.conn = conn;
this.traceid = traceid;
this.array = array; this.array = array;
this.convert = convert; this.convert = convert;
this.valueType = valueType; this.valueType = valueType;
@@ -321,6 +332,9 @@ public class HttpSimpleClient {
@Override @Override
public void completed(Integer count, ByteBuffer buffer) { public void completed(Integer count, ByteBuffer buffer) {
if (isNotEmpty(traceid)) {
Traces.computeIfAbsent(traceid);
}
buffer.flip(); buffer.flip();
if (this.readState == READ_STATE_ROUTE) { if (this.readState == READ_STATE_ROUTE) {
if (this.responseResult == null) { if (this.responseResult == null) {

View File

@@ -286,6 +286,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
@Override @Override
public void completed(Integer result, Void attachment) { public void completed(Integer result, Void attachment) {
Traces.computeIfAbsent(request.getTraceid());
webSocket._readHandler = new WebSocketReadHandler(response.getContext(), webSocket, byteArrayPool, restMessageConsumer); webSocket._readHandler = new WebSocketReadHandler(response.getContext(), webSocket, byteArrayPool, restMessageConsumer);
webSocket._writeHandler = new WebSocketWriteHandler(response.getContext(), webSocket, byteArrayPool); webSocket._writeHandler = new WebSocketWriteHandler(response.getContext(), webSocket, byteArrayPool);
@@ -299,6 +300,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
return; return;
} }
userFuture.whenComplete((userid, ex2) -> { userFuture.whenComplete((userid, ex2) -> {
Traces.computeIfAbsent(request.getTraceid());
if ((userid == null && webSocket.delayPackets == null) || ex2 != null) { if ((userid == null && webSocket.delayPackets == null) || ex2 != null) {
if (debug || ex2 != null) { if (debug || ex2 != null) {
logger.log(ex2 == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create userid abort. request = " + request, ex2); logger.log(ex2 == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create userid abort. request = " + request, ex2);
@@ -310,6 +312,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
webSocket._userid = userid; webSocket._userid = userid;
if (single && !anyuser) { if (single && !anyuser) {
webSocketNode.existsWebSocket(userid).whenComplete((rs, nex) -> { webSocketNode.existsWebSocket(userid).whenComplete((rs, nex) -> {
Traces.computeIfAbsent(request.getTraceid());
if (rs) { if (rs) {
CompletableFuture<Boolean> rcFuture = webSocket.onSingleRepeatConnect(); CompletableFuture<Boolean> rcFuture = webSocket.onSingleRepeatConnect();
Consumer<Boolean> task = (oldkilled) -> { Consumer<Boolean> task = (oldkilled) -> {
@@ -353,6 +356,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
//CompletableFuture<Integer> cf = webSocket._writeIOThread.send(webSocket, delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); //CompletableFuture<Integer> cf = webSocket._writeIOThread.send(webSocket, delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
CompletableFuture<Integer> cf = webSocket._writeHandler.send(delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); CompletableFuture<Integer> cf = webSocket._writeHandler.send(delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
cf.whenComplete((Integer v, Throwable t) -> { cf.whenComplete((Integer v, Throwable t) -> {
Traces.computeIfAbsent(request.getTraceid());
if (userid == null || t != null) { if (userid == null || t != null) {
if (t != null) { if (t != null) {
logger.log(Level.FINEST, "WebSocket connect abort, Response send delayPackets abort. request = " + request, t); logger.log(Level.FINEST, "WebSocket connect abort, Response send delayPackets abort. request = " + request, t);
@@ -373,6 +377,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
//CompletableFuture<Integer> cf = webSocket._writeIOThread.send(webSocket, delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); //CompletableFuture<Integer> cf = webSocket._writeIOThread.send(webSocket, delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
CompletableFuture<Integer> cf = webSocket._writeHandler.send(delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); CompletableFuture<Integer> cf = webSocket._writeHandler.send(delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
cf.whenComplete((Integer v, Throwable t) -> { cf.whenComplete((Integer v, Throwable t) -> {
Traces.computeIfAbsent(request.getTraceid());
if (sessionid == null || t != null) { if (sessionid == null || t != null) {
if (t != null) { if (t != null) {
logger.log(Level.FINEST, "WebSocket connect abort, Response send delayPackets abort. request = " + request, t); logger.log(Level.FINEST, "WebSocket connect abort, Response send delayPackets abort. request = " + request, t);
@@ -396,10 +401,14 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
}; };
WorkThread workThread = WorkThread.currentWorkThread(); WorkThread workThread = WorkThread.currentWorkThread();
sessionFuture.whenComplete((sessionid, ex) -> { sessionFuture.whenComplete((sessionid, ex) -> {
Traces.computeIfAbsent(request.getTraceid());
if (workThread == null || workThread == Thread.currentThread()) { if (workThread == null || workThread == Thread.currentThread()) {
sessionConsumer.accept(sessionid, ex); sessionConsumer.accept(sessionid, ex);
} else { } else {
workThread.runWork(() -> sessionConsumer.accept(sessionid, ex)); workThread.runWork(() -> {
Traces.computeIfAbsent(request.getTraceid());
sessionConsumer.accept(sessionid, ex);
});
} }
}); });
} }

View File

@@ -12,6 +12,7 @@ import org.redkale.asm.AsmDepends;
import org.redkale.convert.bson.BsonWriter; import org.redkale.convert.bson.BsonWriter;
import org.redkale.net.Response; import org.redkale.net.Response;
import org.redkale.util.ByteArray; import org.redkale.util.ByteArray;
import org.redkale.util.Traces;
/** /**
* *
@@ -145,11 +146,13 @@ public class SncpResponse extends Response<SncpContext, SncpRequest> {
finishVoid(); finishVoid();
} else if (future instanceof CompletionStage) { } else if (future instanceof CompletionStage) {
((CompletionStage) future).whenComplete((v, t) -> { ((CompletionStage) future).whenComplete((v, t) -> {
Traces.computeIfAbsent(request.getTraceid());
if (t != null) { if (t != null) {
finishError((Throwable) t); finishError((Throwable) t);
} else { } else {
finish(futureResultType, v); finish(futureResultType, v);
} }
Traces.removeTraceid();
}); });
} else { } else {
try { try {

View File

@@ -90,11 +90,13 @@ public class SncpServlet extends Servlet<SncpContext, SncpRequest, SncpResponse>
response.updateNonBlocking(false); response.updateNonBlocking(false);
response.getWorkExecutor().execute(() -> { response.getWorkExecutor().execute(() -> {
try { try {
Traces.computeIfAbsent(request.getTraceid());
action.execute(request, response); action.execute(request, response);
} catch (Throwable t) { } catch (Throwable t) {
response.getContext().getLogger().log(Level.WARNING, "Servlet occur exception. request = " + request, t); response.getContext().getLogger().log(Level.WARNING, "Servlet occur exception. request = " + request, t);
response.finishError(t); response.finishError(t);
} }
Traces.removeTraceid();
}); });
} }
} else { } else {

View File

@@ -7,8 +7,10 @@
//import java.util.function.Function; //import java.util.function.Function;
// //
///** ///**
// * 虚拟线程池
// * // *
// * @author zhangjx // * @author zhangjx
// * @since 2.8.0
// */ // */
//public class AnonymousVirtualExecutor implements Function<String, ExecutorService> { //public class AnonymousVirtualExecutor implements Function<String, ExecutorService> {
// //

View File

@@ -0,0 +1,21 @@
///*
// *
// */
//package org.redkale.util;
//
//import java.util.function.Consumer;
//
///**
// * 虚拟线程运行
// *
// * @author zhangjx
// * @since 2.8.0
// */
//public class AnonymousVirtualRunner implements Consumer<Runnable> {
//
// @Override
// public void accept(Runnable t) {
// Thread.startVirtualThread(t);
// }
//
//}

View File

@@ -81,6 +81,11 @@ public final class Utility {
private static final Function<Supplier, ThreadLocal> virtualThreadLocalFunction; private static final Function<Supplier, ThreadLocal> virtualThreadLocalFunction;
//org.redkale.util.AnonymousVirtualRunner
private static final String consumerVirtualRunnerBinary = "cafebabe0000004000260a000200030700040c000500060100106a6176612f6c616e672f4f626a6563740100063c696e69743e0100032829560a0008000907000a0c000b000c0100106a6176612f6c616e672f54687265616401001273746172745669727475616c546872656164010028284c6a6176612f6c616e672f52756e6e61626c653b294c6a6176612f6c616e672f5468726561643b07000e0100126a6176612f6c616e672f52756e6e61626c650a001000110700120c001300140100276f72672f7265646b616c652f7574696c2f416e6f6e796d6f75735669727475616c52756e6e6572010006616363657074010017284c6a6176612f6c616e672f52756e6e61626c653b295607001601001b6a6176612f7574696c2f66756e6374696f6e2f436f6e73756d6572010004436f646501000f4c696e654e756d6265725461626c650100124c6f63616c5661726961626c655461626c65010004746869730100294c6f72672f7265646b616c652f7574696c2f416e6f6e796d6f75735669727475616c52756e6e65723b010001740100144c6a6176612f6c616e672f52756e6e61626c653b0100104d6574686f64506172616d65746572730100236f72672e6e65746265616e732e536f757263654c6576656c416e6e6f746174696f6e730100144c6a6176612f6c616e672f4f766572726964653b010015284c6a6176612f6c616e672f4f626a6563743b29560100095369676e61747572650100454c6a6176612f6c616e672f4f626a6563743b4c6a6176612f7574696c2f66756e6374696f6e2f436f6e73756d65723c4c6a6176612f6c616e672f52756e6e61626c653b3e3b01000a536f7572636546696c6501001b416e6f6e796d6f75735669727475616c52756e6e65722e6a6176610021001000020001001500000003000100050006000100170000002f00010001000000052ab70001b10000000200180000000600010000000e00190000000c000100000005001a001b0000000100130014000300170000003e00010002000000062bb8000757b10000000200180000000a00020000001200050013001900000016000200000006001a001b000000000006001c001d0001001e0000000501001c0000001f00000006000100200000104100130021000300170000003300020002000000092a2bc0000db6000fb10000000200180000000600010000000e00190000000c000100000009001a001b0000001e0000000501001c1000001f00000006000100200000000200220000000200230024000000020025";
private static final Consumer<Runnable> virtualRunnerConsumer;
//org.redkale.util.SignalShutDown //org.redkale.util.SignalShutDown
private static final String consumerSignalShutdownBinary = "cafebabe00000037006b0a0019003a090018003b07003c08003d08003e08003f0800400800410800420800430800440800450700460a000d00470a000d004807004907004a0a000d004b0a000d004c12000000500b001600510700520a0018005307005407005507005601001073687574646f776e436f6e73756d657201001d4c6a6176612f7574696c2f66756e6374696f6e2f436f6e73756d65723b0100095369676e61747572650100314c6a6176612f7574696c2f66756e6374696f6e2f436f6e73756d65723c4c6a6176612f6c616e672f537472696e673b3e3b0100063c696e69743e010003282956010004436f646501000f4c696e654e756d6265725461626c650100124c6f63616c5661726961626c655461626c65010004746869730100214c6f72672f7265646b616c652f7574696c2f5369676e616c53687574446f776e3b010006616363657074010020284c6a6176612f7574696c2f66756e6374696f6e2f436f6e73756d65723b29560100037369670100124c6a6176612f6c616e672f537472696e673b010004736967730100135b4c6a6176612f6c616e672f537472696e673b010008636f6e73756d65720100164c6f63616c5661726961626c65547970655461626c6501000d537461636b4d61705461626c6507002b0100104d6574686f64506172616d6574657273010034284c6a6176612f7574696c2f66756e6374696f6e2f436f6e73756d65723c4c6a6176612f6c616e672f537472696e673b3e3b295601000668616e646c65010014284c73756e2f6d6973632f5369676e616c3b29560100114c73756e2f6d6973632f5369676e616c3b010006736967737472010015284c6a6176612f6c616e672f4f626a6563743b295601007a4c6a6176612f6c616e672f4f626a6563743b4c6a6176612f7574696c2f66756e6374696f6e2f436f6e73756d65723c4c6a6176612f7574696c2f66756e6374696f6e2f436f6e73756d65723c4c6a6176612f6c616e672f537472696e673b3e3b3e3b4c73756e2f6d6973632f5369676e616c48616e646c65723b01000a536f7572636546696c650100135369676e616c53687574446f776e2e6a6176610c001f00200c001b001c0100106a6176612f6c616e672f537472696e670100034855500100045445524d010003494e54010004515549540100044b494c4c01000454535450010004555352310100045553523201000453544f5001000f73756e2f6d6973632f5369676e616c0c001f00570c003200580100136a6176612f6c616e672f457863657074696f6e0100136a6176612f6c616e672f5468726f7761626c650c0059005a0c005b005c010010426f6f7473747261704d6574686f64730f06005d08005e0c005f00600c0026003601001b6a6176612f7574696c2f66756e6374696f6e2f436f6e73756d65720c0026002701001f6f72672f7265646b616c652f7574696c2f5369676e616c53687574446f776e0100106a6176612f6c616e672f4f626a65637401001673756e2f6d6973632f5369676e616c48616e646c6572010015284c6a6176612f6c616e672f537472696e673b2956010043284c73756e2f6d6973632f5369676e616c3b4c73756e2f6d6973632f5369676e616c48616e646c65723b294c73756e2f6d6973632f5369676e616c48616e646c65723b0100076765744e616d6501001428294c6a6176612f6c616e672f537472696e673b0100096765744e756d6265720100032829490a00610062010005012c012c010100176d616b65436f6e63617457697468436f6e7374616e7473010038284c73756e2f6d6973632f5369676e616c3b4c6a6176612f6c616e672f537472696e673b49294c6a6176612f6c616e672f537472696e673b0700630c005f00670100246a6176612f6c616e672f696e766f6b652f537472696e67436f6e636174466163746f72790700690100064c6f6f6b757001000c496e6e6572436c6173736573010098284c6a6176612f6c616e672f696e766f6b652f4d6574686f6448616e646c6573244c6f6f6b75703b4c6a6176612f6c616e672f537472696e673b4c6a6176612f6c616e672f696e766f6b652f4d6574686f64547970653b4c6a6176612f6c616e672f537472696e673b5b4c6a6176612f6c616e672f4f626a6563743b294c6a6176612f6c616e672f696e766f6b652f43616c6c536974653b07006a0100256a6176612f6c616e672f696e766f6b652f4d6574686f6448616e646c6573244c6f6f6b757001001e6a6176612f6c616e672f696e766f6b652f4d6574686f6448616e646c657300210018001900020016001a00010002001b001c0001001d00000002001e00040001001f0020000100210000002f00010001000000052ab70001b10000000200220000000600010000000c00230000000c000100000005002400250000000100260027000300210000014a000400080000006f2a2bb500021009bd0003590312045359041205535905120653590612075359071208535908120953591006120a53591007120b53591008120c534d2c4e2dbe360403360515051504a200222d1505323a06bb000d591906b7000e2ab8000f57a700053a07840501a7ffdda700044db100020051005f006200100005006a006d0011000400220000002a000a0000001200050014003b001500510017005f00190062001800640015006a001c006d001b006e001d00230000002a000400510013002800290006003b002f002a002b00020000006f0024002500000000006f002c001c0001002d0000000c00010000006f002c001e0001002e000000470006ff0044000607001807001607002f07002f01010000ff001d000707001807001607002f07002f01010700030001070010fa0001ff000500020700180700160000420700110000300000000501002c0000001d0000000200310021003200330002002100000060000300030000001a2b2bb600122bb60013ba001400004d2ab400022cb900150200b10000000200220000000e000300000021000f00220019002300230000002000030000001a0024002500000000001a002800340001000f000b0035002900020030000000050100280000104100260036000200210000003300020002000000092a2bc00016b60017b10000000200220000000600010000000c00230000000c00010000000900240025000000300000000501002c10000004001d000000020037003800000002003900660000000a00010064006800650019004d000000080001004e0001004f"; private static final String consumerSignalShutdownBinary = "cafebabe00000037006b0a0019003a090018003b07003c08003d08003e08003f0800400800410800420800430800440800450700460a000d00470a000d004807004907004a0a000d004b0a000d004c12000000500b001600510700520a0018005307005407005507005601001073687574646f776e436f6e73756d657201001d4c6a6176612f7574696c2f66756e6374696f6e2f436f6e73756d65723b0100095369676e61747572650100314c6a6176612f7574696c2f66756e6374696f6e2f436f6e73756d65723c4c6a6176612f6c616e672f537472696e673b3e3b0100063c696e69743e010003282956010004436f646501000f4c696e654e756d6265725461626c650100124c6f63616c5661726961626c655461626c65010004746869730100214c6f72672f7265646b616c652f7574696c2f5369676e616c53687574446f776e3b010006616363657074010020284c6a6176612f7574696c2f66756e6374696f6e2f436f6e73756d65723b29560100037369670100124c6a6176612f6c616e672f537472696e673b010004736967730100135b4c6a6176612f6c616e672f537472696e673b010008636f6e73756d65720100164c6f63616c5661726961626c65547970655461626c6501000d537461636b4d61705461626c6507002b0100104d6574686f64506172616d6574657273010034284c6a6176612f7574696c2f66756e6374696f6e2f436f6e73756d65723c4c6a6176612f6c616e672f537472696e673b3e3b295601000668616e646c65010014284c73756e2f6d6973632f5369676e616c3b29560100114c73756e2f6d6973632f5369676e616c3b010006736967737472010015284c6a6176612f6c616e672f4f626a6563743b295601007a4c6a6176612f6c616e672f4f626a6563743b4c6a6176612f7574696c2f66756e6374696f6e2f436f6e73756d65723c4c6a6176612f7574696c2f66756e6374696f6e2f436f6e73756d65723c4c6a6176612f6c616e672f537472696e673b3e3b3e3b4c73756e2f6d6973632f5369676e616c48616e646c65723b01000a536f7572636546696c650100135369676e616c53687574446f776e2e6a6176610c001f00200c001b001c0100106a6176612f6c616e672f537472696e670100034855500100045445524d010003494e54010004515549540100044b494c4c01000454535450010004555352310100045553523201000453544f5001000f73756e2f6d6973632f5369676e616c0c001f00570c003200580100136a6176612f6c616e672f457863657074696f6e0100136a6176612f6c616e672f5468726f7761626c650c0059005a0c005b005c010010426f6f7473747261704d6574686f64730f06005d08005e0c005f00600c0026003601001b6a6176612f7574696c2f66756e6374696f6e2f436f6e73756d65720c0026002701001f6f72672f7265646b616c652f7574696c2f5369676e616c53687574446f776e0100106a6176612f6c616e672f4f626a65637401001673756e2f6d6973632f5369676e616c48616e646c6572010015284c6a6176612f6c616e672f537472696e673b2956010043284c73756e2f6d6973632f5369676e616c3b4c73756e2f6d6973632f5369676e616c48616e646c65723b294c73756e2f6d6973632f5369676e616c48616e646c65723b0100076765744e616d6501001428294c6a6176612f6c616e672f537472696e673b0100096765744e756d6265720100032829490a00610062010005012c012c010100176d616b65436f6e63617457697468436f6e7374616e7473010038284c73756e2f6d6973632f5369676e616c3b4c6a6176612f6c616e672f537472696e673b49294c6a6176612f6c616e672f537472696e673b0700630c005f00670100246a6176612f6c616e672f696e766f6b652f537472696e67436f6e636174466163746f72790700690100064c6f6f6b757001000c496e6e6572436c6173736573010098284c6a6176612f6c616e672f696e766f6b652f4d6574686f6448616e646c6573244c6f6f6b75703b4c6a6176612f6c616e672f537472696e673b4c6a6176612f6c616e672f696e766f6b652f4d6574686f64547970653b4c6a6176612f6c616e672f537472696e673b5b4c6a6176612f6c616e672f4f626a6563743b294c6a6176612f6c616e672f696e766f6b652f43616c6c536974653b07006a0100256a6176612f6c616e672f696e766f6b652f4d6574686f6448616e646c6573244c6f6f6b757001001e6a6176612f6c616e672f696e766f6b652f4d6574686f6448616e646c657300210018001900020016001a00010002001b001c0001001d00000002001e00040001001f0020000100210000002f00010001000000052ab70001b10000000200220000000600010000000c00230000000c000100000005002400250000000100260027000300210000014a000400080000006f2a2bb500021009bd0003590312045359041205535905120653590612075359071208535908120953591006120a53591007120b53591008120c534d2c4e2dbe360403360515051504a200222d1505323a06bb000d591906b7000e2ab8000f57a700053a07840501a7ffdda700044db100020051005f006200100005006a006d0011000400220000002a000a0000001200050014003b001500510017005f00190062001800640015006a001c006d001b006e001d00230000002a000400510013002800290006003b002f002a002b00020000006f0024002500000000006f002c001c0001002d0000000c00010000006f002c001e0001002e000000470006ff0044000607001807001607002f07002f01010000ff001d000707001807001607002f07002f01010700030001070010fa0001ff000500020700180700160000420700110000300000000501002c0000001d0000000200310021003200330002002100000060000300030000001a2b2bb600122bb60013ba001400004d2ab400022cb900150200b10000000200220000000e000300000021000f00220019002300230000002000030000001a0024002500000000001a002800340001000f000b0035002900020030000000050100280000104100260036000200210000003300020002000000092a2bc00016b60017b10000000200220000000600010000000c00230000000c00010000000900240025000000300000000501002c10000004001d000000020037003800000002003900660000000a00010064006800650019004d000000080001004e0001004f";
@@ -125,6 +130,7 @@ public final class Utility {
Consumer<Consumer<String>> signalShutdownConsumer0 = null; Consumer<Consumer<String>> signalShutdownConsumer0 = null;
Function<String, ExecutorService> virtualExecutorFunction0 = null; Function<String, ExecutorService> virtualExecutorFunction0 = null;
Function<Supplier, ThreadLocal> virtualThreadLocalFunction0 = null; Function<Supplier, ThreadLocal> virtualThreadLocalFunction0 = null;
Consumer<Runnable> virtualRunnerConsumer0 = null;
if (!nativeImageEnv) { //not native-image if (!nativeImageEnv) { //not native-image
try { try {
@@ -172,6 +178,27 @@ public final class Utility {
} }
} }
} }
{ //virtualRunnerConsumer
Class<Consumer<Runnable>> virtualClazz1 = null;
try {
virtualClazz1 = (Class) loader.loadClass("org.redkale.util.AnonymousVirtualRunner");
} catch (Throwable t) {
}
if (virtualClazz1 == null) {
byte[] classBytes = hexToBin(consumerVirtualRunnerBinary);
try {
virtualClazz1 = (Class<Consumer<Runnable>>) new ClassLoader(loader) {
public final Class<?> loadClass(String name, byte[] b) {
return defineClass(name, b, 0, b.length);
}
}.loadClass("org.redkale.util.AnonymousVirtualRunner", classBytes);
RedkaleClassLoader.putDynClass(virtualClazz1.getName(), classBytes, virtualClazz1);
RedkaleClassLoader.putReflectionDeclaredConstructors(virtualClazz1, virtualClazz1.getName());
virtualRunnerConsumer0 = virtualClazz1.getConstructor().newInstance();
} catch (Throwable t) {
}
}
}
{ //unsafe { //unsafe
Field f = String.class.getDeclaredField("value"); Field f = String.class.getDeclaredField("value");
final Class unsafeClass = loader.loadClass("sun.misc.Unsafe"); final Class unsafeClass = loader.loadClass("sun.misc.Unsafe");
@@ -241,6 +268,7 @@ public final class Utility {
signalShutdownConsumer = signalShutdownConsumer0; signalShutdownConsumer = signalShutdownConsumer0;
virtualExecutorFunction = virtualExecutorFunction0; virtualExecutorFunction = virtualExecutorFunction0;
virtualThreadLocalFunction = virtualThreadLocalFunction0; virtualThreadLocalFunction = virtualThreadLocalFunction0;
virtualRunnerConsumer = virtualRunnerConsumer0;
// try { // try {
// DEFAULTSSL_CONTEXT = javax.net.ssl.SSLContext.getInstance("SSL"); // DEFAULTSSL_CONTEXT = javax.net.ssl.SSLContext.getInstance("SSL");
@@ -294,6 +322,14 @@ public final class Utility {
return futureArrayFunc; return futureArrayFunc;
} }
public static void execute(Runnable task) {
if (virtualRunnerConsumer != null) {
virtualRunnerConsumer.accept(task);
} else {
task.run();
}
}
public static void sleep(long millis) { public static void sleep(long millis) {
try { try {
Thread.sleep(millis); Thread.sleep(millis);