MessageAgent优化

This commit is contained in:
redkale
2024-08-12 20:25:55 +08:00
parent 45548f2de9
commit cebd463d58
3 changed files with 11 additions and 13 deletions

View File

@@ -171,7 +171,7 @@
<executions> <executions>
<execution> <execution>
<goals> <goals>
<goal>check</goal> <goal>apply</goal>
</goals> </goals>
<phase>compile</phase> <phase>compile</phase>
</execution> </execution>

View File

@@ -7,7 +7,6 @@ package org.redkale.mq.spi;
import java.lang.reflect.ParameterizedType; import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@@ -554,17 +553,12 @@ public abstract class MessageAgent implements MessageManager {
MessageConsumer m = this.consumer; MessageConsumer m = this.consumer;
return messageAgent.submit(() -> { return messageAgent.submit(() -> {
Traces.computeIfAbsent(traceid); Traces.computeIfAbsent(traceid);
T msg = null;
try { try {
m.onMessage(context, (T) c.convertFrom(messageType, message)); msg = (T) c.convertFrom(messageType, message);
m.onMessage(context, msg);
} catch (Throwable t) { } catch (Throwable t) {
messageAgent messageAgent.getLogger().log(Level.SEVERE, "MessageConsumer.onMessage error, message: " + msg, t);
.getLogger()
.log(
Level.SEVERE,
m.getClass().getSimpleName()
+ " onMessage error, topic: " + context.getTopic()
+ ", messages: " + new String(message, StandardCharsets.UTF_8),
t);
} }
Traces.removeTraceid(); Traces.removeTraceid();
}); });

View File

@@ -340,8 +340,12 @@ public class MessageAsmMethodBoost extends AsmMethodBoost {
try { try {
if (Utility.isNotEmpty(consumerBytes)) { if (Utility.isNotEmpty(consumerBytes)) {
if (newLoader == null) { if (newLoader == null) {
newLoader = new RedkaleClassLoader.DynBytesClassLoader( if (classLoader instanceof RedkaleClassLoader.DynBytesClassLoader) {
classLoader == null ? Thread.currentThread().getContextClassLoader() : classLoader); newLoader = (RedkaleClassLoader.DynBytesClassLoader) classLoader;
} else {
newLoader = new RedkaleClassLoader.DynBytesClassLoader(
classLoader == null ? Thread.currentThread().getContextClassLoader() : classLoader);
}
} }
List<Class<? extends MessageConsumer>> consumers = new ArrayList<>(); List<Class<? extends MessageConsumer>> consumers = new ArrayList<>();
consumerBytes.forEach((clzName, bytes) -> { consumerBytes.forEach((clzName, bytes) -> {