增加Messaged.regexTopic

This commit is contained in:
redkale
2024-08-22 20:00:16 +08:00
parent 6a72fecbb1
commit cce667e019
6 changed files with 118 additions and 25 deletions

View File

@@ -73,11 +73,18 @@ public @interface Messaged {
boolean required() default true;
/**
* 监听的topic
* 监听的topic, 当{@link #regexTopic() }值不为空时忽略此值
*
* @return topic
*/
String[] topics();
String[] topics() default {};
/**
* 监听的topic 与 {@link #topics() }的值必须二选一,优先级高
*
* @return topic正则表达式
*/
String regexTopic() default "";
/**
* 消息序列化类型

View File

@@ -71,11 +71,18 @@ public @interface ResourceConsumer {
boolean required() default true;
/**
* 监听的topic
* 监听的topic, 当{@link #regexTopic() }值不为空时忽略此值
*
* @return topic
*/
String[] topics();
String[] topics() default {};
/**
* 监听的topic 与 {@link #topics() }的值必须二选一,优先级高
*
* @return topic正则表达式
*/
String regexTopic() default "";
/**
* 消息序列化类型

View File

@@ -0,0 +1,25 @@
/*
*/
package org.redkale.mq.spi;
import java.lang.annotation.Documented;
import static java.lang.annotation.ElementType.TYPE;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.Target;
/**
* 只标记给动态生成的MessageConsumer子类上
*
* @author zhangjx
*/
@Inherited
@Documented
@Target({TYPE})
@Retention(RUNTIME)
public @interface DynForConsumer {
String group();
}

View File

@@ -81,7 +81,10 @@ public abstract class MessageAgent implements MessageManager {
protected final CopyOnWriteArrayList<MessageConsumer> messageConsumerList = new CopyOnWriteArrayList<>();
// key: group, sub-key: topic
protected final Map<String, Map<String, MessageConsumerWrapper>> messageConsumerMap = new HashMap<>();
protected final Map<String, Map<String, MessageConsumerWrapper>> messageTopicConsumerMap = new HashMap<>();
// key: group, sub-key: regexTopic
protected final Map<String, Map<String, MessageConsumerWrapper>> messageRegexConsumerMap = new HashMap<>();
// -------------------------- HttpRpcClient、SncpMessageClient --------------------------
// cluster和mq同名组件时HttpRpcClient优先使用MQ默认不优先走MQ。
@@ -211,7 +214,8 @@ public abstract class MessageAgent implements MessageManager {
consumer.destroy(config);
}
this.messageConsumerList.clear();
this.messageConsumerMap.clear();
this.messageTopicConsumerMap.clear();
this.messageRegexConsumerMap.clear();
// -------------- MessageClient --------------
if (this.messageClientProducer != null) {
this.messageClientProducer.stop();
@@ -248,23 +252,31 @@ public abstract class MessageAgent implements MessageManager {
final StringBuilder sb = new StringBuilder();
clientConsumerLock.lock();
try {
Map<String, Map<String, MessageConsumerWrapper>> maps = new HashMap<>();
Map<String, Map<String, MessageConsumerWrapper>> topicMaps = new HashMap<>();
Map<String, Map<String, MessageConsumerWrapper>> regexMaps = new HashMap<>();
AtomicInteger groupMax = new AtomicInteger();
AtomicInteger typeMax = new AtomicInteger();
AtomicInteger topicMax = new AtomicInteger();
Map<String, String> views = new LinkedHashMap<>();
Map<String, String[]> views = new LinkedHashMap<>();
for (MessageConsumer consumer : consumers) {
ResourceConsumer res = consumer.getClass().getAnnotation(ResourceConsumer.class);
String group = environment.getPropertyValue(res.group());
final String typeName =
Sncp.getResourceType((Class) consumer.getClass()).getName();
if (Utility.isBlank(group)) {
group = consumer.getClass().getName().replace('$', '.');
DynForConsumer dc = consumer.getClass().getAnnotation(DynForConsumer.class);
group = dc == null ? typeName.replace('$', '.') : dc.group();
}
Map<String, MessageConsumerWrapper> map = maps.computeIfAbsent(group, g -> new HashMap<>());
boolean regex = Utility.isNotEmpty(res.regexTopic());
Map<String, MessageConsumerWrapper> map =
(regex ? regexMaps : topicMaps).computeIfAbsent(group, g -> new HashMap<>());
List<String> topics = new ArrayList<>();
for (String t : res.topics()) {
String topic = environment.getPropertyValue(t);
if (!topic.trim().isEmpty()) {
String[] ts = regex ? new String[] {res.regexTopic()} : res.topics();
for (String t : ts) {
String topic = environment.getPropertyValue(t).trim();
if (!topic.isEmpty()) {
topics.add(topic);
if (map.containsKey(topic.trim())) {
if (map.containsKey(topic)) {
throw new RedkaleException(MessageConsumer.class.getSimpleName()
+ " consume topic (" + topic + ") repeat with "
+ map.get(topic).getClass().getName() + " and "
@@ -279,29 +291,39 @@ public abstract class MessageAgent implements MessageManager {
+ ")");
}
}
map.put(topic.trim(), new MessageConsumerWrapper(this, consumer, res.convertType()));
MessageConsumerWrapper wrapper =
new MessageConsumerWrapper(this, consumer, regex ? topic : null, res.convertType());
map.put(topic, wrapper);
}
}
String typestr = consumer.getClass().getName();
String typestr = typeName;
String topicstr = JsonConvert.root().convertTo(topics.size() == 1 ? topics.get(0) : topics);
if (group.length() > groupMax.get()) {
groupMax.set(group.length());
}
if (typestr.length() > typeMax.get()) {
typeMax.set(typestr.length());
}
if (topicstr.length() > topicMax.get()) {
topicMax.set(topicstr.length());
}
views.put(typestr, topicstr);
views.put(typestr, new String[]{group, topicstr});
}
views.forEach((typestr, topicstr) -> {
views.forEach((typestr, strs) -> {
String groupstr = strs[0];
String topicstr = strs[1];
sb.append(MessageConsumer.class.getSimpleName())
.append(" (type=")
.append(alignString(typestr, typeMax.get()))
.append(", group=")
.append(alignString(groupstr, groupMax.get()))
.append(", topics=")
.append(alignString(topicstr, topicMax.get()))
.append(") startuped\r\n");
});
messageConsumerList.addAll(consumers);
messageConsumerMap.putAll(maps);
messageTopicConsumerMap.putAll(topicMaps);
messageRegexConsumerMap.putAll(regexMaps);
} finally {
clientConsumerLock.unlock();
}
@@ -511,13 +533,17 @@ public abstract class MessageAgent implements MessageManager {
private final Type messageType;
public MessageConsumerWrapper(MessageAgent messageAgent, MessageConsumer<T> consumer, ConvertType convertType) {
private final String regexTopic;
public MessageConsumerWrapper(
MessageAgent messageAgent, MessageConsumer<T> consumer, String regexTopic, ConvertType convertType) {
Objects.requireNonNull(messageAgent);
Objects.requireNonNull(consumer);
Objects.requireNonNull(convertType);
this.messageAgent = messageAgent;
this.convertType = convertType;
this.consumer = consumer;
this.regexTopic = regexTopic;
this.convert = ConvertFactory.findConvert(convertType);
this.messageType = parseMessageType(consumer.getClass());
}
@@ -564,6 +590,10 @@ public abstract class MessageAgent implements MessageManager {
});
}
public String getRegexTopic() {
return regexTopic;
}
public MessageConsumer getConsumer() {
return consumer;
}

View File

@@ -14,6 +14,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.Nonnull;
import org.redkale.asm.AnnotationVisitor;
import org.redkale.asm.AsmMethodBean;
import org.redkale.asm.AsmMethodBoost;
@@ -97,6 +98,14 @@ public class MessageAsmMethodBoost extends AsmMethodBoost {
if (messaged == null) {
return newMethod;
}
if (Utility.isEmpty(messaged.regexTopic()) && Utility.isEmpty(messaged.topics())) {
throw new RedkaleException(
"@" + Messaged.class.getSimpleName() + " regexTopic and topics both empty on " + method);
}
if (Utility.isNotEmpty(messaged.regexTopic()) && Utility.isNotEmpty(messaged.topics())) {
throw new RedkaleException(
"@" + Messaged.class.getSimpleName() + " regexTopic and topics both not empty on " + method);
}
if (!LoadMode.matches(remote, messaged.mode())) {
return newMethod;
}
@@ -157,8 +166,8 @@ public class MessageAsmMethodBoost extends AsmMethodBoost {
protected void createInnerConsumer(
ClassWriter pcw,
Class serviceImplClass,
Method method,
@Nonnull Class serviceImplClass,
@Nonnull Method method,
Type messageType,
Messaged messaged,
String newDynName,
@@ -168,7 +177,6 @@ public class MessageAsmMethodBoost extends AsmMethodBoost {
final String innerClassName = "Dyn" + MessageConsumer.class.getSimpleName() + index.incrementAndGet();
final String innerFullName = newDynName + (pcw == null ? "" : "$") + innerClassName;
final Class msgTypeClass = TypeToken.typeToClass(messageType);
final String msgTypeName = msgTypeClass.getName().replace('.', '/');
final String msgTypeDesc = org.redkale.asm.Type.getDescriptor(msgTypeClass);
final String messageConsumerName = MessageConsumer.class.getName().replace('.', '/');
final String messageConsumerDesc = org.redkale.asm.Type.getDescriptor(MessageConsumer.class);
@@ -180,10 +188,9 @@ public class MessageAsmMethodBoost extends AsmMethodBoost {
methodBeans = AsmMethodBoost.getMethodBeans(serviceType);
}
AsmMethodBean methodBean = AsmMethodBean.get(methodBeans, method);
String methodSignature = null;
String genericMsgTypeDesc = msgTypeDesc;
if (Utility.isNotEmpty(methodBean.getSignature())) {
methodSignature = methodBean.getSignature();
String methodSignature = methodBean.getSignature();
methodSignature = methodSignature.substring(0, methodSignature.lastIndexOf(')') + 1) + "V";
int start = methodSignature.indexOf('<') + 1;
genericMsgTypeDesc = methodSignature.substring(start, methodSignature.lastIndexOf('>')); // 获取<>中的值
@@ -206,6 +213,15 @@ public class MessageAsmMethodBoost extends AsmMethodBoost {
Asms.visitAnnotation(av, ResourceConsumer.class, messaged);
av.visitEnd();
}
{ // 设置DynForConsumer
AnnotationVisitor av = cw.visitAnnotation(org.redkale.asm.Type.getDescriptor(DynForConsumer.class), true);
String group = messaged.group();
if (Utility.isBlank(group)) {
group = serviceImplClass.getName().replace('$', '.');
}
av.visit("group", group);
av.visitEnd();
}
{ // 必须设置成@AutoLoad(false) 否则预编译打包后会被自动加载
AnnotationVisitor av = cw.visitAnnotation(org.redkale.asm.Type.getDescriptor(AutoLoad.class), true);
av.visit("value", false);

View File

@@ -286,6 +286,14 @@ public class MessageModuleEngine extends ModuleEngine {
throw new RedkaleException("Not found " + MessageAgent.class.getSimpleName() + "(name = " + res.mq()
+ ") on " + clazz.getName());
}
if (res != null && Utility.isEmpty(res.regexTopic()) && Utility.isEmpty(res.topics())) {
throw new RedkaleException("@" + ResourceConsumer.class.getSimpleName()
+ " regexTopic and topics both empty on " + clazz.getName());
}
if (res != null && Utility.isNotEmpty(res.regexTopic()) && Utility.isNotEmpty(res.topics())) {
throw new RedkaleException("@" + ResourceConsumer.class.getSimpleName()
+ " regexTopic and topics both not empty on " + clazz.getName());
}
}
this.allMessageConsumerEntrys = allEntrys;
logger.info("MessageAgent load MessageConsumer in " + (System.currentTimeMillis() - s) + " ms");