优化Traces

This commit is contained in:
Redkale
2022-12-18 21:25:52 +08:00
parent d57b4715d0
commit 4fa449990f
8 changed files with 78 additions and 52 deletions

View File

@@ -103,7 +103,7 @@ public class HttpMessageLocalClient extends HttpMessageClient {
}
HttpRequest req = new HttpMessageLocalRequest(context(), request, userid);
HttpResponse resp = new HttpMessageLocalResponse(req, future);
Traces.loadTraceid();
Traces.computeCurrTraceid(request.getTraceid());
try {
servlet.execute(req, resp);
} catch (Exception e) {
@@ -122,7 +122,7 @@ public class HttpMessageLocalClient extends HttpMessageClient {
HttpRequest req = new HttpMessageLocalRequest(context(), request, userid);
CompletableFuture future = new CompletableFuture();
HttpResponse resp = new HttpMessageLocalResponse(req, future);
Traces.loadTraceid();
Traces.computeCurrTraceid(request.getTraceid());
try {
servlet.execute(req, resp);
} catch (Exception e) {
@@ -149,7 +149,7 @@ public class HttpMessageLocalClient extends HttpMessageClient {
}
HttpRequest req = new HttpMessageLocalRequest(context(), request, userid);
HttpResponse resp = new HttpMessageLocalResponse(req, null);
Traces.loadTraceid();
Traces.computeCurrTraceid(request.getTraceid());
try {
servlet.execute(req, resp);
} catch (Exception e) {
@@ -162,7 +162,7 @@ public class HttpMessageLocalClient extends HttpMessageClient {
HttpDispatcherServlet ps = dispatcherServlet();
HttpRequest req = new HttpMessageLocalRequest(context(), request, userid);
HttpResponse resp = new HttpMessageLocalResponse(req, null);
Traces.loadTraceid();
Traces.computeCurrTraceid(request.getTraceid());
ps.filterServletsByMmcTopic(topic).forEach(s -> {
try {
s.execute(req, resp);

View File

@@ -121,11 +121,11 @@ public abstract class MessageClient {
protected abstract MessageProducers getProducer();
public MessageRecord createMessageRecord(String resptopic, String content) {
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, null, resptopic, Traces.loadTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8));
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, null, resptopic, Traces.currTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8));
}
public MessageRecord createMessageRecord(String topic, String resptopic, String content) {
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, Traces.loadTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8));
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, Traces.currTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8));
}
public MessageRecord createMessageRecord(String topic, String resptopic, String traceid, String content) {
@@ -133,7 +133,7 @@ public abstract class MessageClient {
}
public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String content) {
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, Traces.loadTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8));
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, Traces.currTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8));
}
public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String traceid, String content) {
@@ -141,7 +141,7 @@ public abstract class MessageClient {
}
public MessageRecord createMessageRecord(String topic, String resptopic, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, Traces.loadTraceid(), convert.convertToBytes(bean));
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean));
}
public MessageRecord createMessageRecord(String topic, String resptopic, String traceid, Convert convert, Object bean) {
@@ -149,27 +149,27 @@ public abstract class MessageClient {
}
public MessageRecord createMessageRecord(int userid, String topic, String resptopic, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, Traces.loadTraceid(), convert.convertToBytes(bean));
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean));
}
public MessageRecord createMessageRecord(int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, groupid, topic, resptopic, Traces.loadTraceid(), convert.convertToBytes(bean));
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, groupid, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean));
}
public MessageRecord createMessageRecord(int flag, int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, Traces.loadTraceid(), convert.convertToBytes(bean));
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, Traces.currTraceid(), convert.convertToBytes(bean));
}
public MessageRecord createMessageRecord(String topic, String resptopic, byte[] content) {
return new MessageRecord(msgSeqno.incrementAndGet(), (byte) 0, topic, resptopic, Traces.loadTraceid(), content);
return new MessageRecord(msgSeqno.incrementAndGet(), (byte) 0, topic, resptopic, Traces.currTraceid(), content);
}
public MessageRecord createMessageRecord(long seqid, String topic, String resptopic, byte[] content) {
return new MessageRecord(seqid, (byte) 0, topic, resptopic, Traces.loadTraceid(), content);
return new MessageRecord(seqid, (byte) 0, topic, resptopic, Traces.currTraceid(), content);
}
protected MessageRecord createMessageRecord(byte ctype, String topic, String resptopic, byte[] content) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype, topic, resptopic, Traces.loadTraceid(), content);
return new MessageRecord(msgSeqno.incrementAndGet(), ctype, topic, resptopic, Traces.currTraceid(), content);
}
protected MessageRecord createMessageRecord(byte ctype, String topic, String resptopic, String traceid, byte[] content) {
@@ -177,7 +177,7 @@ public abstract class MessageClient {
}
protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String resptopic, byte[] content) {
return new MessageRecord(seqid, ctype, topic, resptopic, Traces.loadTraceid(), content);
return new MessageRecord(seqid, ctype, topic, resptopic, Traces.currTraceid(), content);
}
protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String resptopic, String traceid, byte[] content) {

View File

@@ -125,7 +125,7 @@ public class Context {
workHashExecutor.execute(request.getHashid(), () -> {
try {
long cha = System.currentTimeMillis() - request.getCreateTime();
Traces.currTraceid(request.getTraceid());
Traces.computeCurrTraceid(request.getTraceid());
servlet.execute(request, response);
if (cha > 1000 && response.context.logger.isLoggable(Level.WARNING)) {
response.context.logger.log(Level.WARNING, "hash execute servlet delays=" + cha + "ms, request=" + request);
@@ -140,7 +140,7 @@ public class Context {
} else if (workExecutor != null) {
workExecutor.execute(() -> {
try {
Traces.currTraceid(request.getTraceid());
Traces.computeCurrTraceid(request.getTraceid());
servlet.execute(request, response);
} catch (Throwable t) {
response.context.logger.log(Level.WARNING, "execute servlet abort, force to close channel ", t);
@@ -149,7 +149,7 @@ public class Context {
});
} else {
try {
Traces.currTraceid(request.getTraceid());
Traces.computeCurrTraceid(request.getTraceid());
servlet.execute(request, response);
} catch (Throwable t) {
response.context.logger.log(Level.WARNING, "execute servlet abort, force to close channel ", t);

View File

@@ -37,11 +37,11 @@ public abstract class DispatcherServlet<K extends Serializable, C extends Contex
protected Application application;
private final Object lock1 = new Object();
private final Object servletLock = new Object();
private Set<S> servlets = new HashSet<>();
private final Object lock2 = new Object();
private final Object mappingLock = new Object();
private Map<K, S> mappings = new HashMap<>();
@@ -50,7 +50,7 @@ public abstract class DispatcherServlet<K extends Serializable, C extends Contex
protected Filter<C, R, P> headFilter;
protected void putServlet(S servlet) {
synchronized (lock1) {
synchronized (servletLock) {
Set<S> newservlets = new HashSet<>(servlets);
newservlets.add(servlet);
this.servlets = newservlets;
@@ -58,7 +58,7 @@ public abstract class DispatcherServlet<K extends Serializable, C extends Contex
}
protected void removeServlet(S servlet) {
synchronized (lock1) {
synchronized (servletLock) {
Set<S> newservlets = new HashSet<>(servlets);
newservlets.remove(servlet);
this.servlets = newservlets;
@@ -67,7 +67,7 @@ public abstract class DispatcherServlet<K extends Serializable, C extends Contex
}
public boolean containsServlet(Class<? extends S> servletClass) {
synchronized (lock1) {
synchronized (servletLock) {
for (S servlet : new HashSet<>(servlets)) {
if (servlet.getClass().equals(servletClass)) return true;
}
@@ -76,7 +76,7 @@ public abstract class DispatcherServlet<K extends Serializable, C extends Contex
}
public boolean containsServlet(String servletClassName) {
synchronized (lock1) {
synchronized (servletLock) {
for (S servlet : new HashSet<>(servlets)) {
if (servlet.getClass().getName().equals(servletClassName)) return true;
}
@@ -85,7 +85,7 @@ public abstract class DispatcherServlet<K extends Serializable, C extends Contex
}
protected void putMapping(K key, S servlet) {
synchronized (lock2) {
synchronized (mappingLock) {
Map<K, S> newmappings = new HashMap<>(mappings);
newmappings.put(key, servlet);
this.mappings = newmappings;
@@ -93,7 +93,7 @@ public abstract class DispatcherServlet<K extends Serializable, C extends Contex
}
protected void removeMapping(K key) {
synchronized (lock2) {
synchronized (mappingLock) {
if (mappings.containsKey(key)) {
Map<K, S> newmappings = new HashMap<>(mappings);
S s = newmappings.remove(key);
@@ -104,7 +104,7 @@ public abstract class DispatcherServlet<K extends Serializable, C extends Contex
}
protected void removeMapping(S servlet) {
synchronized (lock2) {
synchronized (mappingLock) {
List<K> keys = new ArrayList<>();
Map<K, S> newmappings = new HashMap<>(mappings);
for (Map.Entry<K, S> en : newmappings.entrySet()) {
@@ -221,7 +221,7 @@ public abstract class DispatcherServlet<K extends Serializable, C extends Contex
public final void prepare(final R request, final P response) {
try {
Traces.loadTraceid();
Traces.computeCurrTraceid(request.getTraceid());
request.prepare();
response.filter = this.headFilter;
response.servlet = this;

View File

@@ -23,9 +23,9 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
private final Context context;
private Supplier<Response> responseSupplier;
private final Supplier<Response> responseSupplier;
private Consumer<Response> responseConsumer;
private final Consumer<Response> responseConsumer;
private Response resp;

View File

@@ -223,7 +223,7 @@ public final class Rest {
}
/**
* 判断HttpServlet是否为Rest动态生成的,且simple
* 判断HttpServlet是否为Rest动态生成的,且simple, 不需要读取http-header的方法视为simple=true
*
* @param servlet 检测的HttpServlet
*
@@ -277,11 +277,19 @@ public final class Rest {
}
public static <T extends WebSocketServlet> T createRestWebSocketServlet(final ClassLoader classLoader, final Class<? extends WebSocket> webSocketType, MessageAgent messageAgent) {
if (webSocketType == null) throw new RuntimeException("Rest WebSocket Class is null on createRestWebSocketServlet");
if (Modifier.isAbstract(webSocketType.getModifiers())) throw new RuntimeException("Rest WebSocket Class(" + webSocketType + ") cannot abstract on createRestWebSocketServlet");
if (Modifier.isFinal(webSocketType.getModifiers())) throw new RuntimeException("Rest WebSocket Class(" + webSocketType + ") cannot final on createRestWebSocketServlet");
if (webSocketType == null) {
throw new RuntimeException("Rest WebSocket Class is null on createRestWebSocketServlet");
}
if (Modifier.isAbstract(webSocketType.getModifiers())) {
throw new RuntimeException("Rest WebSocket Class(" + webSocketType + ") cannot abstract on createRestWebSocketServlet");
}
if (Modifier.isFinal(webSocketType.getModifiers())) {
throw new RuntimeException("Rest WebSocket Class(" + webSocketType + ") cannot final on createRestWebSocketServlet");
}
final RestWebSocket rws = webSocketType.getAnnotation(RestWebSocket.class);
if (rws == null || rws.ignore()) throw new RuntimeException("Rest WebSocket Class(" + webSocketType + ") have not @RestWebSocket or @RestWebSocket.ignore=true on createRestWebSocketServlet");
if (rws == null || rws.ignore()) {
throw new RuntimeException("Rest WebSocket Class(" + webSocketType + ") have not @RestWebSocket or @RestWebSocket.ignore=true on createRestWebSocketServlet");
}
boolean valid = false;
for (Constructor c : webSocketType.getDeclaredConstructors()) {
if (c.getParameterCount() == 0 && (Modifier.isPublic(c.getModifiers()) || Modifier.isProtected(c.getModifiers()))) {
@@ -291,8 +299,12 @@ public final class Rest {
}
if (!valid) throw new RuntimeException("Rest WebSocket Class(" + webSocketType + ") must have public or protected Constructor on createRestWebSocketServlet");
final String rwsname = ResourceFactory.formatResourceName(rws.name());
if (!checkName(rws.catalog())) throw new RuntimeException(webSocketType.getName() + " have illegal " + RestWebSocket.class.getSimpleName() + ".catalog, only 0-9 a-z A-Z _ cannot begin 0-9");
if (!checkName(rwsname)) throw new RuntimeException(webSocketType.getName() + " have illegal " + RestWebSocket.class.getSimpleName() + ".name, only 0-9 a-z A-Z _ cannot begin 0-9");
if (!checkName(rws.catalog())) {
throw new RuntimeException(webSocketType.getName() + " have illegal " + RestWebSocket.class.getSimpleName() + ".catalog, only 0-9 a-z A-Z _ cannot begin 0-9");
}
if (!checkName(rwsname)) {
throw new RuntimeException(webSocketType.getName() + " have illegal " + RestWebSocket.class.getSimpleName() + ".name, only 0-9 a-z A-Z _ cannot begin 0-9");
}
//----------------------------------------------------------------------------------------
final Set<Field> resourcesFieldSet = new LinkedHashSet<>();
@@ -303,8 +315,12 @@ public final class Rest {
for (Field field : clzz.getDeclaredFields()) {
if (field.getAnnotation(Resource.class) == null) continue;
if (resourcesFieldNameSet.contains(field.getName())) continue;
if (Modifier.isStatic(field.getModifiers())) throw new RuntimeException(field + " cannot static on createRestWebSocketServlet");
if (Modifier.isFinal(field.getModifiers())) throw new RuntimeException(field + " cannot final on createRestWebSocketServlet");
if (Modifier.isStatic(field.getModifiers())) {
throw new RuntimeException(field + " cannot static on createRestWebSocketServlet");
}
if (Modifier.isFinal(field.getModifiers())) {
throw new RuntimeException(field + " cannot final on createRestWebSocketServlet");
}
if (!Modifier.isPublic(field.getModifiers()) && !Modifier.isProtected(field.getModifiers())) {
throw new RuntimeException(field + " must be public or protected on createRestWebSocketServlet");
}

View File

@@ -2,6 +2,7 @@
*/
package org.redkale.util;
import java.util.UUID;
import java.util.function.Supplier;
/**
@@ -19,26 +20,35 @@ public class Traces {
private static final ThreadLocal<String> localTrace = new ThreadLocal<>();
private static final Supplier<String> tidSupplier = () -> Utility.uuid();
private static final Supplier<String> tidSupplier = () -> UUID.randomUUID().toString().replace("-", "");
public static boolean enable() {
return enable;
}
public static String onceTraceid() {
public static String createTraceid() {
return enable ? tidSupplier.get() : null;
}
public static String loadTraceid() {
if (!enable) return null;
String traceid = localTrace.get();
if (traceid == null) {
traceid = tidSupplier.get();
localTrace.set(traceid);
public static void computeCurrTraceid(String requestTraceid) {
if (enable) {
if (requestTraceid == null) {
localTrace.set(tidSupplier.get());
} else {
localTrace.set(requestTraceid);
}
}
return traceid;
}
// public static String loadTraceid() {
// if (!enable) return null;
// String traceid = localTrace.get();
// if (traceid == null) {
// traceid = tidSupplier.get();
// localTrace.set(traceid);
// }
// return traceid;
// }
public static void currTraceid(String traceid) {
if (enable) {
localTrace.set(traceid);

View File

@@ -113,7 +113,7 @@ public final class Utility {
Class<Unsafe> unsafeClazz1 = null;
try {
unsafeClazz1 = (Class) loader.loadClass("org.re" + "dkale.util.AnonymousUnsafe");
unsafeClazz1 = (Class) loader.loadClass("org.redkale.util.AnonymousUnsafe");
} catch (Throwable t) {
}
if (unsafeClazz1 == null) {
@@ -122,7 +122,7 @@ public final class Utility {
public final Class<?> loadClass(String name, byte[] b) {
return defineClass(name, b, 0, b.length);
}
}.loadClass("org.re" + "dkale.util.AnonymousUnsafe", classBytes);
}.loadClass("org.redkale.util.AnonymousUnsafe", classBytes);
RedkaleClassLoader.putDynClass(unsafeClazz1.getName(), classBytes, unsafeClazz1);
}
RedkaleClassLoader.putReflectionDeclaredConstructors(unsafeClazz1, unsafeClazz1.getName(), Object.class);
@@ -142,7 +142,7 @@ public final class Utility {
{ //signalShutdown
Class<Consumer<Consumer<String>>> shutdownClazz1 = null;
try {
shutdownClazz1 = (Class) loader.loadClass("org.re" + "dkale.util.SignalShutDown");
shutdownClazz1 = (Class) loader.loadClass("org.redkale.util.SignalShutDown");
} catch (Throwable t) {
}
if (shutdownClazz1 == null) {
@@ -151,7 +151,7 @@ public final class Utility {
public final Class<?> loadClass(String name, byte[] b) {
return defineClass(name, b, 0, b.length);
}
}.loadClass("org.re" + "dkale.util.SignalShutDown", classBytes);
}.loadClass("org.redkale.util.SignalShutDown", classBytes);
RedkaleClassLoader.putDynClass(shutdownClazz1.getName(), classBytes, shutdownClazz1);
RedkaleClassLoader.putReflectionDeclaredConstructors(shutdownClazz1, shutdownClazz1.getName());
signalShutdownConsumer0 = shutdownClazz1.getConstructor().newInstance();