From cce667e019d0d6c7d192047dc6927454571581f7 Mon Sep 17 00:00:00 2001 From: redkale Date: Thu, 22 Aug 2024 20:00:16 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0Messaged.regexTopic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/org/redkale/mq/Messaged.java | 11 +++- .../java/org/redkale/mq/ResourceConsumer.java | 11 +++- .../org/redkale/mq/spi/DynForConsumer.java | 25 ++++++++ .../java/org/redkale/mq/spi/MessageAgent.java | 62 ++++++++++++++----- .../redkale/mq/spi/MessageAsmMethodBoost.java | 26 ++++++-- .../redkale/mq/spi/MessageModuleEngine.java | 8 +++ 6 files changed, 118 insertions(+), 25 deletions(-) create mode 100644 src/main/java/org/redkale/mq/spi/DynForConsumer.java diff --git a/src/main/java/org/redkale/mq/Messaged.java b/src/main/java/org/redkale/mq/Messaged.java index 195987684..eae1bcadd 100644 --- a/src/main/java/org/redkale/mq/Messaged.java +++ b/src/main/java/org/redkale/mq/Messaged.java @@ -73,11 +73,18 @@ public @interface Messaged { boolean required() default true; /** - * 监听的topic + * 监听的topic, 当{@link #regexTopic() }值不为空时忽略此值 * * @return topic */ - String[] topics(); + String[] topics() default {}; + + /** + * 监听的topic, 与 {@link #topics() }的值必须二选一,优先级高 + * + * @return topic正则表达式 + */ + String regexTopic() default ""; /** * 消息序列化类型 diff --git a/src/main/java/org/redkale/mq/ResourceConsumer.java b/src/main/java/org/redkale/mq/ResourceConsumer.java index af71ad9f8..b710b7ebd 100644 --- a/src/main/java/org/redkale/mq/ResourceConsumer.java +++ b/src/main/java/org/redkale/mq/ResourceConsumer.java @@ -71,11 +71,18 @@ public @interface ResourceConsumer { boolean required() default true; /** - * 监听的topic + * 监听的topic, 当{@link #regexTopic() }值不为空时忽略此值 * * @return topic */ - String[] topics(); + String[] topics() default {}; + + /** + * 监听的topic, 与 {@link #topics() }的值必须二选一,优先级高 + * + * @return topic正则表达式 + */ + String regexTopic() default ""; /** * 消息序列化类型 diff --git a/src/main/java/org/redkale/mq/spi/DynForConsumer.java b/src/main/java/org/redkale/mq/spi/DynForConsumer.java new file mode 100644 index 000000000..f2ae7e720 --- /dev/null +++ b/src/main/java/org/redkale/mq/spi/DynForConsumer.java @@ -0,0 +1,25 @@ +/* + +*/ + +package org.redkale.mq.spi; + +import java.lang.annotation.Documented; +import static java.lang.annotation.ElementType.TYPE; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import static java.lang.annotation.RetentionPolicy.RUNTIME; +import java.lang.annotation.Target; + +/** + * 只标记给动态生成的MessageConsumer子类上 + * + * @author zhangjx + */ +@Inherited +@Documented +@Target({TYPE}) +@Retention(RUNTIME) +public @interface DynForConsumer { + String group(); +} diff --git a/src/main/java/org/redkale/mq/spi/MessageAgent.java b/src/main/java/org/redkale/mq/spi/MessageAgent.java index 6726b2f0d..1f13be361 100644 --- a/src/main/java/org/redkale/mq/spi/MessageAgent.java +++ b/src/main/java/org/redkale/mq/spi/MessageAgent.java @@ -81,7 +81,10 @@ public abstract class MessageAgent implements MessageManager { protected final CopyOnWriteArrayList messageConsumerList = new CopyOnWriteArrayList<>(); // key: group, sub-key: topic - protected final Map> messageConsumerMap = new HashMap<>(); + protected final Map> messageTopicConsumerMap = new HashMap<>(); + + // key: group, sub-key: regexTopic + protected final Map> messageRegexConsumerMap = new HashMap<>(); // -------------------------- HttpRpcClient、SncpMessageClient -------------------------- // cluster和mq同名组件时,HttpRpcClient优先使用MQ,默认不优先走MQ。 @@ -211,7 +214,8 @@ public abstract class MessageAgent implements MessageManager { consumer.destroy(config); } this.messageConsumerList.clear(); - this.messageConsumerMap.clear(); + this.messageTopicConsumerMap.clear(); + this.messageRegexConsumerMap.clear(); // -------------- MessageClient -------------- if (this.messageClientProducer != null) { this.messageClientProducer.stop(); @@ -248,23 +252,31 @@ public abstract class MessageAgent implements MessageManager { final StringBuilder sb = new StringBuilder(); clientConsumerLock.lock(); try { - Map> maps = new HashMap<>(); + Map> topicMaps = new HashMap<>(); + Map> regexMaps = new HashMap<>(); + AtomicInteger groupMax = new AtomicInteger(); AtomicInteger typeMax = new AtomicInteger(); AtomicInteger topicMax = new AtomicInteger(); - Map views = new LinkedHashMap<>(); + Map views = new LinkedHashMap<>(); for (MessageConsumer consumer : consumers) { ResourceConsumer res = consumer.getClass().getAnnotation(ResourceConsumer.class); String group = environment.getPropertyValue(res.group()); + final String typeName = + Sncp.getResourceType((Class) consumer.getClass()).getName(); if (Utility.isBlank(group)) { - group = consumer.getClass().getName().replace('$', '.'); + DynForConsumer dc = consumer.getClass().getAnnotation(DynForConsumer.class); + group = dc == null ? typeName.replace('$', '.') : dc.group(); } - Map map = maps.computeIfAbsent(group, g -> new HashMap<>()); + boolean regex = Utility.isNotEmpty(res.regexTopic()); + Map map = + (regex ? regexMaps : topicMaps).computeIfAbsent(group, g -> new HashMap<>()); List topics = new ArrayList<>(); - for (String t : res.topics()) { - String topic = environment.getPropertyValue(t); - if (!topic.trim().isEmpty()) { + String[] ts = regex ? new String[] {res.regexTopic()} : res.topics(); + for (String t : ts) { + String topic = environment.getPropertyValue(t).trim(); + if (!topic.isEmpty()) { topics.add(topic); - if (map.containsKey(topic.trim())) { + if (map.containsKey(topic)) { throw new RedkaleException(MessageConsumer.class.getSimpleName() + " consume topic (" + topic + ") repeat with " + map.get(topic).getClass().getName() + " and " @@ -279,29 +291,39 @@ public abstract class MessageAgent implements MessageManager { + ")"); } } - map.put(topic.trim(), new MessageConsumerWrapper(this, consumer, res.convertType())); + MessageConsumerWrapper wrapper = + new MessageConsumerWrapper(this, consumer, regex ? topic : null, res.convertType()); + map.put(topic, wrapper); } } - String typestr = consumer.getClass().getName(); + String typestr = typeName; String topicstr = JsonConvert.root().convertTo(topics.size() == 1 ? topics.get(0) : topics); + if (group.length() > groupMax.get()) { + groupMax.set(group.length()); + } if (typestr.length() > typeMax.get()) { typeMax.set(typestr.length()); } if (topicstr.length() > topicMax.get()) { topicMax.set(topicstr.length()); } - views.put(typestr, topicstr); + views.put(typestr, new String[]{group, topicstr}); } - views.forEach((typestr, topicstr) -> { + views.forEach((typestr, strs) -> { + String groupstr = strs[0]; + String topicstr = strs[1]; sb.append(MessageConsumer.class.getSimpleName()) .append(" (type=") .append(alignString(typestr, typeMax.get())) + .append(", group=") + .append(alignString(groupstr, groupMax.get())) .append(", topics=") .append(alignString(topicstr, topicMax.get())) .append(") startuped\r\n"); }); messageConsumerList.addAll(consumers); - messageConsumerMap.putAll(maps); + messageTopicConsumerMap.putAll(topicMaps); + messageRegexConsumerMap.putAll(regexMaps); } finally { clientConsumerLock.unlock(); } @@ -511,13 +533,17 @@ public abstract class MessageAgent implements MessageManager { private final Type messageType; - public MessageConsumerWrapper(MessageAgent messageAgent, MessageConsumer consumer, ConvertType convertType) { + private final String regexTopic; + + public MessageConsumerWrapper( + MessageAgent messageAgent, MessageConsumer consumer, String regexTopic, ConvertType convertType) { Objects.requireNonNull(messageAgent); Objects.requireNonNull(consumer); Objects.requireNonNull(convertType); this.messageAgent = messageAgent; this.convertType = convertType; this.consumer = consumer; + this.regexTopic = regexTopic; this.convert = ConvertFactory.findConvert(convertType); this.messageType = parseMessageType(consumer.getClass()); } @@ -564,6 +590,10 @@ public abstract class MessageAgent implements MessageManager { }); } + public String getRegexTopic() { + return regexTopic; + } + public MessageConsumer getConsumer() { return consumer; } diff --git a/src/main/java/org/redkale/mq/spi/MessageAsmMethodBoost.java b/src/main/java/org/redkale/mq/spi/MessageAsmMethodBoost.java index 30b464d8a..0dc1294fc 100644 --- a/src/main/java/org/redkale/mq/spi/MessageAsmMethodBoost.java +++ b/src/main/java/org/redkale/mq/spi/MessageAsmMethodBoost.java @@ -14,6 +14,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.redkale.annotation.AutoLoad; +import org.redkale.annotation.Nonnull; import org.redkale.asm.AnnotationVisitor; import org.redkale.asm.AsmMethodBean; import org.redkale.asm.AsmMethodBoost; @@ -97,6 +98,14 @@ public class MessageAsmMethodBoost extends AsmMethodBoost { if (messaged == null) { return newMethod; } + if (Utility.isEmpty(messaged.regexTopic()) && Utility.isEmpty(messaged.topics())) { + throw new RedkaleException( + "@" + Messaged.class.getSimpleName() + " regexTopic and topics both empty on " + method); + } + if (Utility.isNotEmpty(messaged.regexTopic()) && Utility.isNotEmpty(messaged.topics())) { + throw new RedkaleException( + "@" + Messaged.class.getSimpleName() + " regexTopic and topics both not empty on " + method); + } if (!LoadMode.matches(remote, messaged.mode())) { return newMethod; } @@ -157,8 +166,8 @@ public class MessageAsmMethodBoost extends AsmMethodBoost { protected void createInnerConsumer( ClassWriter pcw, - Class serviceImplClass, - Method method, + @Nonnull Class serviceImplClass, + @Nonnull Method method, Type messageType, Messaged messaged, String newDynName, @@ -168,7 +177,6 @@ public class MessageAsmMethodBoost extends AsmMethodBoost { final String innerClassName = "Dyn" + MessageConsumer.class.getSimpleName() + index.incrementAndGet(); final String innerFullName = newDynName + (pcw == null ? "" : "$") + innerClassName; final Class msgTypeClass = TypeToken.typeToClass(messageType); - final String msgTypeName = msgTypeClass.getName().replace('.', '/'); final String msgTypeDesc = org.redkale.asm.Type.getDescriptor(msgTypeClass); final String messageConsumerName = MessageConsumer.class.getName().replace('.', '/'); final String messageConsumerDesc = org.redkale.asm.Type.getDescriptor(MessageConsumer.class); @@ -180,10 +188,9 @@ public class MessageAsmMethodBoost extends AsmMethodBoost { methodBeans = AsmMethodBoost.getMethodBeans(serviceType); } AsmMethodBean methodBean = AsmMethodBean.get(methodBeans, method); - String methodSignature = null; String genericMsgTypeDesc = msgTypeDesc; if (Utility.isNotEmpty(methodBean.getSignature())) { - methodSignature = methodBean.getSignature(); + String methodSignature = methodBean.getSignature(); methodSignature = methodSignature.substring(0, methodSignature.lastIndexOf(')') + 1) + "V"; int start = methodSignature.indexOf('<') + 1; genericMsgTypeDesc = methodSignature.substring(start, methodSignature.lastIndexOf('>')); // 获取<>中的值 @@ -206,6 +213,15 @@ public class MessageAsmMethodBoost extends AsmMethodBoost { Asms.visitAnnotation(av, ResourceConsumer.class, messaged); av.visitEnd(); } + { // 设置DynForConsumer + AnnotationVisitor av = cw.visitAnnotation(org.redkale.asm.Type.getDescriptor(DynForConsumer.class), true); + String group = messaged.group(); + if (Utility.isBlank(group)) { + group = serviceImplClass.getName().replace('$', '.'); + } + av.visit("group", group); + av.visitEnd(); + } { // 必须设置成@AutoLoad(false), 否则预编译打包后会被自动加载 AnnotationVisitor av = cw.visitAnnotation(org.redkale.asm.Type.getDescriptor(AutoLoad.class), true); av.visit("value", false); diff --git a/src/main/java/org/redkale/mq/spi/MessageModuleEngine.java b/src/main/java/org/redkale/mq/spi/MessageModuleEngine.java index 081ca2243..85c94b9eb 100644 --- a/src/main/java/org/redkale/mq/spi/MessageModuleEngine.java +++ b/src/main/java/org/redkale/mq/spi/MessageModuleEngine.java @@ -286,6 +286,14 @@ public class MessageModuleEngine extends ModuleEngine { throw new RedkaleException("Not found " + MessageAgent.class.getSimpleName() + "(name = " + res.mq() + ") on " + clazz.getName()); } + if (res != null && Utility.isEmpty(res.regexTopic()) && Utility.isEmpty(res.topics())) { + throw new RedkaleException("@" + ResourceConsumer.class.getSimpleName() + + " regexTopic and topics both empty on " + clazz.getName()); + } + if (res != null && Utility.isNotEmpty(res.regexTopic()) && Utility.isNotEmpty(res.topics())) { + throw new RedkaleException("@" + ResourceConsumer.class.getSimpleName() + + " regexTopic and topics both not empty on " + clazz.getName()); + } } this.allMessageConsumerEntrys = allEntrys; logger.info("MessageAgent load MessageConsumer in " + (System.currentTimeMillis() - s) + " ms");