diff --git a/src/main/java/org/redkale/mq/MessageConsumer.java b/src/main/java/org/redkale/mq/MessageConsumer.java index 8e298d52d..05ecef472 100644 --- a/src/main/java/org/redkale/mq/MessageConsumer.java +++ b/src/main/java/org/redkale/mq/MessageConsumer.java @@ -4,6 +4,7 @@ package org.redkale.mq; import org.redkale.annotation.Component; +import org.redkale.asm.AsmDepends; import org.redkale.service.Local; import org.redkale.util.AnyValue; @@ -20,6 +21,7 @@ import org.redkale.util.AnyValue; */ @Local @Component +@AsmDepends public interface MessageConsumer { default void init(AnyValue config) { diff --git a/src/main/java/org/redkale/mq/ResourceConsumer.java b/src/main/java/org/redkale/mq/ResourceConsumer.java index 03e7ff119..a063fc80f 100644 --- a/src/main/java/org/redkale/mq/ResourceConsumer.java +++ b/src/main/java/org/redkale/mq/ResourceConsumer.java @@ -6,6 +6,7 @@ package org.redkale.mq; import java.lang.annotation.*; import static java.lang.annotation.ElementType.TYPE; import static java.lang.annotation.RetentionPolicy.RUNTIME; +import org.redkale.asm.AsmDepends; import org.redkale.convert.ConvertType; /** @@ -18,6 +19,7 @@ import org.redkale.convert.ConvertType; * * @since 2.8.0 */ +@AsmDepends @Documented @Target({TYPE}) @Retention(RUNTIME) diff --git a/src/main/java/org/redkale/mq/spi/DynForMessage.java b/src/main/java/org/redkale/mq/spi/DynForMessage.java new file mode 100644 index 000000000..cbf0ee5ce --- /dev/null +++ b/src/main/java/org/redkale/mq/spi/DynForMessage.java @@ -0,0 +1,34 @@ +/* + * + */ +package org.redkale.mq.spi; + +import java.lang.annotation.Documented; +import static java.lang.annotation.ElementType.TYPE; +import java.lang.annotation.Repeatable; +import java.lang.annotation.Retention; +import static java.lang.annotation.RetentionPolicy.RUNTIME; +import java.lang.annotation.Target; +import org.redkale.mq.MessageConsumer; + +/** + * 只标准在类上面,因动态方法不作变动,只增加内部类 + * + * @author zhangjx + */ +@Documented +@Target({TYPE}) +@Retention(RUNTIME) +@Repeatable(DynForMessage.DynForMessages.class) +public @interface DynForMessage { + + Class value(); + + @Documented + @Target({TYPE}) + @Retention(RUNTIME) + @interface DynForMessages { + + DynForMessage[] value(); + } +} diff --git a/src/main/java/org/redkale/mq/spi/HttpRpcMessageClient.java b/src/main/java/org/redkale/mq/spi/HttpRpcMessageClient.java index 81a93f7ed..5e32c29a4 100644 --- a/src/main/java/org/redkale/mq/spi/HttpRpcMessageClient.java +++ b/src/main/java/org/redkale/mq/spi/HttpRpcMessageClient.java @@ -3,9 +3,6 @@ */ package org.redkale.mq.spi; -import org.redkale.mq.spi.HttpResultCoder; -import org.redkale.mq.spi.MessageCoder; -import org.redkale.mq.spi.HttpSimpleRequestCoder; import java.io.Serializable; import java.util.concurrent.CompletableFuture; import org.redkale.cluster.HttpRpcClient; diff --git a/src/main/java/org/redkale/mq/spi/MessageAsmMethodBoost.java b/src/main/java/org/redkale/mq/spi/MessageAsmMethodBoost.java new file mode 100644 index 000000000..67e59027b --- /dev/null +++ b/src/main/java/org/redkale/mq/spi/MessageAsmMethodBoost.java @@ -0,0 +1,82 @@ +/* + * + */ +package org.redkale.mq.spi; + +import java.lang.annotation.Annotation; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.lang.reflect.Type; +import java.util.List; +import org.redkale.asm.AsmMethodBoost; +import org.redkale.asm.ClassWriter; +import org.redkale.convert.ConvertFactory; +import org.redkale.inject.ResourceFactory; +import org.redkale.mq.MessageConext; +import org.redkale.mq.Messaged; +import org.redkale.util.RedkaleException; + +/** + * + * @author zhangjx + */ +public class MessageAsmMethodBoost extends AsmMethodBoost { + + private static final List> FILTER_ANN = List.of(Messaged.class); + + public MessageAsmMethodBoost(Class serviceType) { + super(serviceType); + } + + @Override + public List> filterMethodAnnotations(Method method) { + return FILTER_ANN; + } + + @Override + public String doMethod(ClassWriter cw, String newDynName, String fieldPrefix, List filterAnns, Method method, String newMethodName) { + if (serviceType.getAnnotation(DynForMessage.class) != null) { + return newMethodName; + } + Messaged messaged = method.getAnnotation(Messaged.class); + if (messaged == null) { + return newMethodName; + } + if (Modifier.isFinal(method.getModifiers()) || Modifier.isStatic(method.getModifiers())) { + throw new RedkaleException("@" + Messaged.class.getSimpleName() + " cannot on final or static method, but on " + method); + } + if (!Modifier.isProtected(method.getModifiers()) && !Modifier.isPublic(method.getModifiers())) { + throw new RedkaleException("@" + Messaged.class.getSimpleName() + " must on protected or public method, but on " + method); + } + int paramCount = method.getParameterCount(); + if (paramCount != 1 && paramCount != 2) { + throw new RedkaleException("@" + Messaged.class.getSimpleName() + " must on one or two parameter method, but on " + method); + } + int paramKind = 1; // 1:单个MessageType; 2: MessageConext & MessageType; 3: MessageType & MessageConext; + Type messageType; + Type[] paramTypes = method.getGenericParameterTypes(); + if (paramCount == 1) { + messageType = paramTypes[0]; + paramKind = 1; + } else { + if (paramTypes[0] == MessageConext.class) { + messageType = paramTypes[1]; + paramKind = 2; + } else if (paramTypes[1] == MessageConext.class) { + messageType = paramTypes[0]; + paramKind = 3; + } else { + throw new RedkaleException("@" + Messaged.class.getSimpleName() + " on two-parameter method must contains " + + MessageConext.class.getSimpleName() + " parameter type, but on " + method); + } + } + ConvertFactory factory = ConvertFactory.findConvert(messaged.convertType()).getFactory(); + factory.loadDecoder(messageType); + return newMethodName; + } + + @Override + public void doInstance(ResourceFactory resourceFactory, Object service) { + } + +} diff --git a/src/main/java/org/redkale/mq/spi/MessageModuleEngine.java b/src/main/java/org/redkale/mq/spi/MessageModuleEngine.java index 1a8639b48..9a104a2aa 100644 --- a/src/main/java/org/redkale/mq/spi/MessageModuleEngine.java +++ b/src/main/java/org/redkale/mq/spi/MessageModuleEngine.java @@ -9,10 +9,13 @@ import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Properties; import java.util.ServiceLoader; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.logging.Level; import org.redkale.boot.Application; import org.redkale.boot.ClassFilter; @@ -45,6 +48,9 @@ public class MessageModuleEngine extends ModuleEngine { //@since 2.8.0 private Properties messageProperties = new Properties(); + // + private final Map> agentConsumers = new ConcurrentHashMap<>(); + //MQ管理接口 //@since 2.1.0 private MessageAgent[] messageAgents; @@ -352,7 +358,7 @@ public class MessageModuleEngine extends ModuleEngine { }, Object.class); for (MessageAgent agent : this.messageAgents) { names.add(agent.getName()); - List consumers = new ArrayList<>(); + List consumers = agentConsumers.getOrDefault(agent.getName(), new CopyOnWriteArrayList<>()); AnyValue consumerConf = agent.getConfig().getAnyValue("consumer"); if (consumerConf != null) { //加载 MessageConsumer ClassFilter filter = new ClassFilter(application.getServerClassLoader(), ResourceConsumer.class, MessageConsumer.class, null, null); diff --git a/src/main/java/org/redkale/mq/spi/MessageRecord.java b/src/main/java/org/redkale/mq/spi/MessageRecord.java index bd4a18798..f625f88b8 100644 --- a/src/main/java/org/redkale/mq/spi/MessageRecord.java +++ b/src/main/java/org/redkale/mq/spi/MessageRecord.java @@ -5,9 +5,6 @@ */ package org.redkale.mq.spi; -import org.redkale.mq.spi.HttpResultCoder; -import org.redkale.mq.spi.MessageCoder; -import org.redkale.mq.spi.HttpSimpleRequestCoder; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.Arrays;