优化traceid
This commit is contained in:
@@ -72,6 +72,8 @@ public abstract class MessageServlet implements MessageProcessor {
|
|||||||
onError(response, message, ex);
|
onError(response, message, ex);
|
||||||
}
|
}
|
||||||
logger.log(Level.SEVERE, getClass().getSimpleName() + " process error, message=" + message, ex instanceof CompletionException ? ((CompletionException) ex).getCause() : ex);
|
logger.log(Level.SEVERE, getClass().getSimpleName() + " process error, message=" + message, ex instanceof CompletionException ? ((CompletionException) ex).getCause() : ex);
|
||||||
|
} finally {
|
||||||
|
Traces.removeTraceid();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -123,6 +123,7 @@ public class Context {
|
|||||||
response.context.logger.log(Level.WARNING, "Execute servlet occur exception. request = " + request, t);
|
response.context.logger.log(Level.WARNING, "Execute servlet occur exception. request = " + request, t);
|
||||||
response.finishError(t);
|
response.finishError(t);
|
||||||
}
|
}
|
||||||
|
Traces.removeTraceid();
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -141,21 +141,25 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
|
|||||||
readThread.runWork(() -> {
|
readThread.runWork(() -> {
|
||||||
Traces.computeIfAbsent(request.traceid);
|
Traces.computeIfAbsent(request.traceid);
|
||||||
respFuture.complete(rs);
|
respFuture.complete(rs);
|
||||||
|
Traces.removeTraceid();
|
||||||
});
|
});
|
||||||
} else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
|
} else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
|
||||||
if (workThread.inIO()) {
|
if (workThread.inIO()) {
|
||||||
Traces.computeIfAbsent(request.traceid);
|
Traces.computeIfAbsent(request.traceid);
|
||||||
respFuture.complete(rs);
|
respFuture.complete(rs);
|
||||||
|
Traces.removeTraceid();
|
||||||
} else {
|
} else {
|
||||||
workThread.execute(() -> {
|
workThread.execute(() -> {
|
||||||
Traces.computeIfAbsent(request.traceid);
|
Traces.computeIfAbsent(request.traceid);
|
||||||
respFuture.complete(rs);
|
respFuture.complete(rs);
|
||||||
|
Traces.removeTraceid();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
workThread.runWork(() -> {
|
workThread.runWork(() -> {
|
||||||
Traces.computeIfAbsent(request.traceid);
|
Traces.computeIfAbsent(request.traceid);
|
||||||
respFuture.complete(rs);
|
respFuture.complete(rs);
|
||||||
|
Traces.removeTraceid();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
} else { //异常
|
} else { //异常
|
||||||
@@ -163,21 +167,25 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
|
|||||||
readThread.runWork(() -> {
|
readThread.runWork(() -> {
|
||||||
Traces.computeIfAbsent(request.traceid);
|
Traces.computeIfAbsent(request.traceid);
|
||||||
respFuture.completeExceptionally(exc);
|
respFuture.completeExceptionally(exc);
|
||||||
|
Traces.removeTraceid();
|
||||||
});
|
});
|
||||||
} else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
|
} else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
|
||||||
if (workThread.inIO()) {
|
if (workThread.inIO()) {
|
||||||
Traces.computeIfAbsent(request.traceid);
|
Traces.computeIfAbsent(request.traceid);
|
||||||
respFuture.completeExceptionally(exc);
|
respFuture.completeExceptionally(exc);
|
||||||
|
Traces.removeTraceid();
|
||||||
} else {
|
} else {
|
||||||
workThread.execute(() -> {
|
workThread.execute(() -> {
|
||||||
Traces.computeIfAbsent(request.traceid);
|
Traces.computeIfAbsent(request.traceid);
|
||||||
respFuture.completeExceptionally(exc);
|
respFuture.completeExceptionally(exc);
|
||||||
|
Traces.removeTraceid();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
workThread.runWork(() -> {
|
workThread.runWork(() -> {
|
||||||
Traces.computeIfAbsent(request.traceid);
|
Traces.computeIfAbsent(request.traceid);
|
||||||
respFuture.completeExceptionally(exc);
|
respFuture.completeExceptionally(exc);
|
||||||
|
Traces.removeTraceid();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -186,21 +194,25 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
|
|||||||
readThread.runWork(() -> {
|
readThread.runWork(() -> {
|
||||||
Traces.computeIfAbsent(request.traceid);
|
Traces.computeIfAbsent(request.traceid);
|
||||||
respFuture.completeExceptionally(t);
|
respFuture.completeExceptionally(t);
|
||||||
|
Traces.removeTraceid();
|
||||||
});
|
});
|
||||||
} else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
|
} else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
|
||||||
if (workThread.inIO()) {
|
if (workThread.inIO()) {
|
||||||
Traces.computeIfAbsent(request.traceid);
|
Traces.computeIfAbsent(request.traceid);
|
||||||
respFuture.completeExceptionally(t);
|
respFuture.completeExceptionally(t);
|
||||||
|
Traces.removeTraceid();
|
||||||
} else {
|
} else {
|
||||||
workThread.execute(() -> {
|
workThread.execute(() -> {
|
||||||
Traces.computeIfAbsent(request.traceid);
|
Traces.computeIfAbsent(request.traceid);
|
||||||
respFuture.completeExceptionally(t);
|
respFuture.completeExceptionally(t);
|
||||||
|
Traces.removeTraceid();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
workThread.runWork(() -> {
|
workThread.runWork(() -> {
|
||||||
Traces.computeIfAbsent(request.traceid);
|
Traces.computeIfAbsent(request.traceid);
|
||||||
respFuture.completeExceptionally(t);
|
respFuture.completeExceptionally(t);
|
||||||
|
Traces.removeTraceid();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
connection.client.logger.log(Level.INFO, "Complete result error, request: " + respFuture.request, t);
|
connection.client.logger.log(Level.INFO, "Complete result error, request: " + respFuture.request, t);
|
||||||
|
|||||||
@@ -89,6 +89,7 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
|
|||||||
response.getContext().getLogger().log(Level.WARNING, "Servlet occur exception. request = " + request, t);
|
response.getContext().getLogger().log(Level.WARNING, "Servlet occur exception. request = " + request, t);
|
||||||
response.finishError(t);
|
response.finishError(t);
|
||||||
}
|
}
|
||||||
|
Traces.removeTraceid();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -3,6 +3,7 @@
|
|||||||
package org.redkale.util;
|
package org.redkale.util;
|
||||||
|
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -20,15 +21,12 @@ public class Traces {
|
|||||||
|
|
||||||
private static final String PROCESS_ID = UUID.randomUUID().toString().replaceAll("-", "");
|
private static final String PROCESS_ID = UUID.randomUUID().toString().replaceAll("-", "");
|
||||||
|
|
||||||
private static final ThreadLocal<IdSequence> THREAD_SEQUENCE = ThreadLocal.withInitial(IdSequence::new);
|
private static final AtomicLong sequence = new AtomicLong(System.currentTimeMillis());
|
||||||
|
|
||||||
|
private static final Supplier<String> tidSupplier = () -> PROCESS_ID + sequence.incrementAndGet();
|
||||||
|
|
||||||
private static final ThreadLocal<String> localTrace = new ThreadLocal<>();
|
private static final ThreadLocal<String> localTrace = new ThreadLocal<>();
|
||||||
|
|
||||||
//借用Skywalking的GlobalIdGenerator生成ID的规则
|
|
||||||
private static final Supplier<String> tidSupplier = () -> PROCESS_ID
|
|
||||||
+ "." + Thread.currentThread().getId()
|
|
||||||
+ "." + THREAD_SEQUENCE.get().nextSeq();
|
|
||||||
|
|
||||||
public static boolean enable() {
|
public static boolean enable() {
|
||||||
return enable;
|
return enable;
|
||||||
}
|
}
|
||||||
@@ -41,6 +39,12 @@ public class Traces {
|
|||||||
return enable ? localTrace.get() : null;
|
return enable ? localTrace.get() : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void removeTraceid() {
|
||||||
|
if (enable) {
|
||||||
|
localTrace.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static String computeIfAbsent(String requestTraceid) {
|
public static String computeIfAbsent(String requestTraceid) {
|
||||||
if (enable) {
|
if (enable) {
|
||||||
String rs = requestTraceid;
|
String rs = requestTraceid;
|
||||||
@@ -52,42 +56,4 @@ public class Traces {
|
|||||||
}
|
}
|
||||||
return requestTraceid;
|
return requestTraceid;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class IdSequence {
|
|
||||||
|
|
||||||
private long lastTimestamp;
|
|
||||||
|
|
||||||
private short threadSeq;
|
|
||||||
|
|
||||||
private long lastShiftTimestamp;
|
|
||||||
|
|
||||||
private int lastShiftValue;
|
|
||||||
|
|
||||||
public IdSequence() {
|
|
||||||
this.lastTimestamp = System.currentTimeMillis();
|
|
||||||
}
|
|
||||||
|
|
||||||
public long nextSeq() {
|
|
||||||
long rs = timestamp() * 10000;
|
|
||||||
if (threadSeq == 10000) {
|
|
||||||
threadSeq = 0;
|
|
||||||
}
|
|
||||||
return rs + threadSeq++;
|
|
||||||
}
|
|
||||||
|
|
||||||
private long timestamp() {
|
|
||||||
long currentTimeMillis = System.currentTimeMillis();
|
|
||||||
if (currentTimeMillis < lastTimestamp) {
|
|
||||||
// Just for considering time-shift-back by Ops or OS. @hanahmily 's suggestion.
|
|
||||||
if (lastShiftTimestamp != currentTimeMillis) {
|
|
||||||
lastShiftValue++;
|
|
||||||
lastShiftTimestamp = currentTimeMillis;
|
|
||||||
}
|
|
||||||
return lastShiftValue;
|
|
||||||
} else {
|
|
||||||
lastTimestamp = currentTimeMillis;
|
|
||||||
return lastTimestamp;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user