diff --git a/src/main/java/org/redkale/asm/AsmMethodBoost.java b/src/main/java/org/redkale/asm/AsmMethodBoost.java index d627f41d7..d3325ebb4 100644 --- a/src/main/java/org/redkale/asm/AsmMethodBoost.java +++ b/src/main/java/org/redkale/asm/AsmMethodBoost.java @@ -120,10 +120,11 @@ public abstract class AsmMethodBoost { /** * 实例对象进行操作,通常用于给动态的字段赋值 * + * @param classLoader ClassLoader * @param resourceFactory ResourceFactory * @param service 实例对象 */ - public abstract void doInstance(ResourceFactory resourceFactory, T service); + public abstract void doInstance(ClassLoader classLoader, ResourceFactory resourceFactory, T service); protected AsmMethodBean getMethodBean(Method method) { Map methodBeans = AsmMethodBoost.getMethodBeans(serviceType); @@ -324,10 +325,10 @@ public abstract class AsmMethodBoost { } @Override - public void doInstance(ResourceFactory resourceFactory, T service) { + public void doInstance(ClassLoader classLoader, ResourceFactory resourceFactory, T service) { for (AsmMethodBoost item : items) { if (item != null) { - item.doInstance(resourceFactory, service); + item.doInstance(classLoader, resourceFactory, service); } } } diff --git a/src/main/java/org/redkale/boot/Application.java b/src/main/java/org/redkale/boot/Application.java index 905171a12..fbfdaa863 100644 --- a/src/main/java/org/redkale/boot/Application.java +++ b/src/main/java/org/redkale/boot/Application.java @@ -709,11 +709,8 @@ public final class Application { new ClassFilter(this.getClassLoader(), ResourceAnnotationLoader.class); ClassFilter resTypeFilter = new ClassFilter(this.getClassLoader(), ResourceTypeLoader.class); - try { - loadClassByFilters(resConfigFilter, resAnnFilter, resTypeFilter); - } catch (IOException e) { - throw new RedkaleException(e); - } + + loadClassByFilters(resConfigFilter, resAnnFilter, resTypeFilter); { // Configuration StringBuilder sb = new StringBuilder(); resConfigFilter.getFilterEntrys().forEach(en -> { @@ -1689,12 +1686,20 @@ public final class Application { return moduleEngines; } - public void loadClassByFilters(final ClassFilter... filters) throws IOException { - ClassFilter.Loader.load(getHome(), getClassLoader(), filters); + public void loadClassByFilters(final ClassFilter... filters) { + try { + ClassFilter.Loader.load(getHome(), getClassLoader(), filters); + } catch (IOException e) { + throw new RedkaleException(e); + } } - public void loadServerClassFilters(final ClassFilter... filters) throws IOException { - ClassFilter.Loader.load(getHome(), getServerClassLoader(), filters); + public void loadServerClassFilters(final ClassFilter... filters) { + try { + ClassFilter.Loader.load(getHome(), getServerClassLoader(), filters); + } catch (IOException e) { + throw new RedkaleException(e); + } } public DataSource loadDataSource(final String sourceName, boolean autoMemory) { diff --git a/src/main/java/org/redkale/cached/spi/CachedAsmMethodBoost.java b/src/main/java/org/redkale/cached/spi/CachedAsmMethodBoost.java index e3ad958fa..f9b9b296b 100644 --- a/src/main/java/org/redkale/cached/spi/CachedAsmMethodBoost.java +++ b/src/main/java/org/redkale/cached/spi/CachedAsmMethodBoost.java @@ -213,7 +213,7 @@ public class CachedAsmMethodBoost extends AsmMethodBoost { } @Override - public void doInstance(ResourceFactory resourceFactory, Object service) { + public void doInstance(ClassLoader classLoader, ResourceFactory resourceFactory, Object service) { Class clazz = service.getClass(); if (actionMap == null) { // 为null表示没有调用过doMethod, 动态类在编译是已经生成好了 actionMap = new LinkedHashMap<>(); diff --git a/src/main/java/org/redkale/locked/spi/LockedAsmMethodBoost.java b/src/main/java/org/redkale/locked/spi/LockedAsmMethodBoost.java index dfe4e9ebd..4656e2a56 100644 --- a/src/main/java/org/redkale/locked/spi/LockedAsmMethodBoost.java +++ b/src/main/java/org/redkale/locked/spi/LockedAsmMethodBoost.java @@ -93,7 +93,7 @@ public class LockedAsmMethodBoost extends AsmMethodBoost { } @Override - public void doInstance(ResourceFactory resourceFactory, Object service) { + public void doInstance(ClassLoader classLoader, ResourceFactory resourceFactory, Object service) { // do nothing } } diff --git a/src/main/java/org/redkale/mq/Messaged.java b/src/main/java/org/redkale/mq/Messaged.java index b3e633213..5c1a20fb7 100644 --- a/src/main/java/org/redkale/mq/Messaged.java +++ b/src/main/java/org/redkale/mq/Messaged.java @@ -52,12 +52,39 @@ import org.redkale.service.LoadMode; @Retention(RUNTIME) public @interface Messaged { + /** + * {@link org.redkale.mq.spi.MessageAgent}对象对应名称 + * + * @return MQ名称 + */ String mq() default ""; + /** + * MQ客户端分组名称 + * + * @return 组名称 + */ String group() default ""; + /** + * 是否必须要加载,为ture时若mq()值对应{@link org.redkale.mq.spi.MessageAgent}对象不存在的情况下会抛异常 + * + * @return 是否必须要加载 + */ + boolean required() default true; + + /** + * 监听的topic + * + * @return topic + */ String[] topics(); + /** + * 消息序列化类型 + * + * @return 序列化类型 + */ ConvertType convertType() default ConvertType.JSON; /** diff --git a/src/main/java/org/redkale/mq/ResourceConsumer.java b/src/main/java/org/redkale/mq/ResourceConsumer.java index ceaead0d4..a8899e156 100644 --- a/src/main/java/org/redkale/mq/ResourceConsumer.java +++ b/src/main/java/org/redkale/mq/ResourceConsumer.java @@ -38,6 +38,13 @@ public @interface ResourceConsumer { */ String group() default ""; + /** + * 是否必须要加载,为ture时若mq()值对应{@link org.redkale.mq.spi.MessageAgent}对象不存在的情况下会抛异常 + * + * @return 是否必须要加载 + */ + boolean required() default true; + /** * 监听的topic * diff --git a/src/main/java/org/redkale/mq/spi/MessageAgent.java b/src/main/java/org/redkale/mq/spi/MessageAgent.java index 3ace4b48a..21c8319e5 100644 --- a/src/main/java/org/redkale/mq/spi/MessageAgent.java +++ b/src/main/java/org/redkale/mq/spi/MessageAgent.java @@ -261,7 +261,7 @@ public abstract class MessageAgent implements MessageManager { ResourceConsumer res = consumer.getClass().getAnnotation(ResourceConsumer.class); String group = environment.getPropertyValue(res.group()); if (Utility.isBlank(group)) { - group = consumer.getClass().getName(); + group = consumer.getClass().getName().replace('$', '.'); } Map map = maps.computeIfAbsent(group, g -> new HashMap<>()); List topics = new ArrayList<>(); diff --git a/src/main/java/org/redkale/mq/spi/MessageAsmMethodBoost.java b/src/main/java/org/redkale/mq/spi/MessageAsmMethodBoost.java index f18755d83..465d83842 100644 --- a/src/main/java/org/redkale/mq/spi/MessageAsmMethodBoost.java +++ b/src/main/java/org/redkale/mq/spi/MessageAsmMethodBoost.java @@ -8,6 +8,7 @@ import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.lang.reflect.Type; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -66,7 +67,7 @@ public class MessageAsmMethodBoost extends AsmMethodBoost { private RedkaleClassLoader.DynBytesClassLoader newLoader; - private List> consumers; + private Map consumerBytes; public MessageAsmMethodBoost(boolean remote, Class serviceType, MessageModuleEngine messageEngine) { super(remote, serviceType); @@ -132,10 +133,6 @@ public class MessageAsmMethodBoost extends AsmMethodBoost { ConvertFactory factory = ConvertFactory.findConvert(messaged.convertType()).getFactory(); factory.loadDecoder(messageType); - if (newLoader == null) { - newLoader = new RedkaleClassLoader.DynBytesClassLoader( - classLoader == null ? Thread.currentThread().getContextClassLoader() : classLoader); - } createInnerConsumer( cw, method, paramKind, TypeToken.typeToClass(messageType), messaged, newDynName, newMethodName); return newMethodName; @@ -153,6 +150,8 @@ public class MessageAsmMethodBoost extends AsmMethodBoost { final String newDynDesc = "L" + newDynName + ";"; final String innerClassName = "Dyn" + MessageConsumer.class.getSimpleName() + index.incrementAndGet(); final String innerFullName = newDynName + "$" + innerClassName; + final String msgTypeName = + TypeToken.primitiveToWrapper(msgType).getName().replace('.', '/'); final String msgTypeDesc = org.redkale.asm.Type.getDescriptor(TypeToken.primitiveToWrapper(msgType)); final String messageConsumerName = MessageConsumer.class.getName().replace('.', '/'); final String messageConsumerDesc = org.redkale.asm.Type.getDescriptor(MessageConsumer.class); @@ -165,7 +164,7 @@ public class MessageAsmMethodBoost extends AsmMethodBoost { } AsmMethodBean methodBean = AsmMethodBean.get(methodBeans, method); String genericMsgTypeDesc = msgTypeDesc; - if (!msgType.isPrimitive()) { + if (!msgType.isPrimitive() && Utility.isNotEmpty(methodBean.getSignature())) { String methodSignature = methodBean.getSignature().replace(messageConextDesc, ""); genericMsgTypeDesc = methodSignature.substring(1, methodSignature.lastIndexOf(')')); // 获取()中的值 } @@ -300,7 +299,7 @@ public class MessageAsmMethodBoost extends AsmMethodBoost { mv.visitVarInsn(ALOAD, 0); mv.visitVarInsn(ALOAD, 1); mv.visitVarInsn(ALOAD, 2); - mv.visitTypeInsn(CHECKCAST, "java/lang/String"); + mv.visitTypeInsn(CHECKCAST, msgTypeName); mv.visitMethodInsn( INVOKEVIRTUAL, innerFullName, "onMessage", "(" + messageConextDesc + msgTypeDesc + ")V", false); mv.visitInsn(RETURN); @@ -310,22 +309,39 @@ public class MessageAsmMethodBoost extends AsmMethodBoost { cw.visitEnd(); byte[] bytes = cw.toByteArray(); - Class cz = newLoader.loadClass((innerFullName).replace('/', '.'), bytes); - if (consumers == null) { - consumers = new ArrayList<>(); + if (consumerBytes == null) { + consumerBytes = new LinkedHashMap<>(); } - consumers.add(cz); - RedkaleClassLoader.putDynClass((innerFullName).replace('/', '.'), bytes, cz); + consumerBytes.put(innerFullName.replace('/', '.'), bytes); } @Override - public void doInstance(ResourceFactory resourceFactory, Object service) { + public void doAfterMethods(ClassLoader classLoader, ClassWriter cw, String newDynName, String fieldPrefix) { + if (Utility.isNotEmpty(consumerBytes)) { + AnnotationVisitor av = cw.visitAnnotation(org.redkale.asm.Type.getDescriptor(DynForMessage.class), true); + av.visit("value", org.redkale.asm.Type.getType("L" + newDynName.replace('.', '/') + ";")); + av.visitEnd(); + } + } + + @Override + public void doInstance(ClassLoader classLoader, ResourceFactory resourceFactory, Object service) { DynForMessage[] dyns = service.getClass().getAnnotationsByType(DynForMessage.class); if (Utility.isEmpty(dyns)) { return; } try { - if (Utility.isNotEmpty(consumers)) { + if (Utility.isNotEmpty(consumerBytes)) { + if (newLoader == null) { + newLoader = new RedkaleClassLoader.DynBytesClassLoader( + classLoader == null ? Thread.currentThread().getContextClassLoader() : classLoader); + } + List> consumers = new ArrayList<>(); + consumerBytes.forEach((clzName, bytes) -> { + Class clazz = (Class) newLoader.loadClass(clzName, bytes); + RedkaleClassLoader.putDynClass(clzName, bytes, clazz); + consumers.add(clazz); + }); for (Class clazz : consumers) { MessageConsumer consumer = (MessageConsumer) clazz.getConstructors()[0].newInstance(service); messageEngine.addMessageConsumer(consumer); diff --git a/src/main/java/org/redkale/mq/spi/MessageModuleEngine.java b/src/main/java/org/redkale/mq/spi/MessageModuleEngine.java index f42b9bc3e..e32e026c7 100644 --- a/src/main/java/org/redkale/mq/spi/MessageModuleEngine.java +++ b/src/main/java/org/redkale/mq/spi/MessageModuleEngine.java @@ -54,6 +54,8 @@ public class MessageModuleEngine extends ModuleEngine { // @since 2.1.0 private MessageAgent[] messageAgents; + private List> allMessageConsumerEntrys; + public MessageModuleEngine(Application application) { super(application); } @@ -70,11 +72,16 @@ public class MessageModuleEngine extends ModuleEngine { return new MessageAsmMethodBoost(remote, serviceClass, this); } + // 在doInstance方法里被调用 void addMessageConsumer(MessageConsumer consumer) { - String agentName = environment.getPropertyValue( + String mqName = environment.getPropertyValue( consumer.getClass().getAnnotation(ResourceConsumer.class).mq()); + if (findMessageAgent(mqName) == null) { + throw new RedkaleException("Not found " + MessageAgent.class.getSimpleName() + "(name = " + mqName + ") on " + + consumer.getClass().getName()); + } agentConsumers - .computeIfAbsent(agentName, v -> new CopyOnWriteArrayList<>()) + .computeIfAbsent(mqName, v -> new CopyOnWriteArrayList<>()) .add(consumer); } @@ -251,6 +258,22 @@ public class MessageModuleEngine extends ModuleEngine { this.resourceFactory.register(agentName, MessageAgent.class, agent); } logger.info("MessageAgent init in " + (System.currentTimeMillis() - s) + " ms"); + // 加载MessageConsumer + s = System.currentTimeMillis(); + final RedkaleClassLoader cl = application.getServerClassLoader(); + ClassFilter allFilter = new ClassFilter(cl, ResourceConsumer.class, MessageConsumer.class); + application.loadServerClassFilters(allFilter); + List> allEntrys = new ArrayList(allFilter.getFilterEntrys()); + for (ClassFilter.FilterEntry en : allEntrys) { + Class clazz = en.getType(); + ResourceConsumer res = clazz.getAnnotation(ResourceConsumer.class); + if (res != null && res.required() && findMessageAgent(res.mq()) == null) { + throw new RedkaleException("Not found " + MessageAgent.class.getSimpleName() + "(name = " + res.mq() + + ") on " + clazz.getName()); + } + } + this.allMessageConsumerEntrys = allEntrys; + logger.info("MessageAgent load MessageConsumer in " + (System.currentTimeMillis() - s) + " ms"); } /** @@ -419,8 +442,8 @@ public class MessageModuleEngine extends ModuleEngine { agentConsumers.getOrDefault(agent.getName(), new CopyOnWriteArrayList<>()); AnyValue consumerConf = agent.getConfig().getAnyValue("consumer"); if (consumerConf != null) { // 加载 MessageConsumer - ClassFilter filter = new ClassFilter( - application.getServerClassLoader(), ResourceConsumer.class, MessageConsumer.class, null, null); + final RedkaleClassLoader cl = application.getServerClassLoader(); + ClassFilter filter = new ClassFilter(cl, ResourceConsumer.class, MessageConsumer.class); if (consumerConf.getBoolValue("autoload", true)) { String includes = consumerConf.getValue("includes", ""); String excludes = consumerConf.getValue("excludes", ""); @@ -431,13 +454,11 @@ public class MessageModuleEngine extends ModuleEngine { } try { - application.loadServerClassFilters(filter); - List> entrys = - new ArrayList(filter.getFilterEntrys()); - for (ClassFilter.FilterEntry en : entrys) { + for (ClassFilter.FilterEntry en : allMessageConsumerEntrys) { Class clazz = en.getType(); ResourceConsumer res = clazz.getAnnotation(ResourceConsumer.class); - if (!Objects.equals(agent.getName(), environment.getPropertyValue(res.mq()))) { + if (!filter.accept(clazz.getName()) + || !Objects.equals(agent.getName(), environment.getPropertyValue(res.mq()))) { continue; } RedkaleClassLoader.putReflectionDeclaredConstructors(clazz, clazz.getName()); @@ -510,4 +531,16 @@ public class MessageModuleEngine extends ModuleEngine { } return null; } + + public MessageAgent findMessageAgent(String mqName) { + if (this.messageAgents != null) { + String name = environment.getPropertyValue(mqName); + for (MessageAgent agent : this.messageAgents) { + if (Objects.equals(agent.getName(), name)) { + return agent; + } + } + } + return null; + } } diff --git a/src/main/java/org/redkale/net/sncp/Sncp.java b/src/main/java/org/redkale/net/sncp/Sncp.java index 83b1d6a95..5aae29717 100644 --- a/src/main/java/org/redkale/net/sncp/Sncp.java +++ b/src/main/java/org/redkale/net/sncp/Sncp.java @@ -827,7 +827,8 @@ public abstract class Sncp { c.set(service, agent == null ? null : agent.getName()); } if (methodBoost != null) { - methodBoost.doInstance(resourceFactory, service); + // 必须用servcie的ClassLoader, 因为service是动态ClassLoader会与doMethod里的动态ClassLoader不一致 + methodBoost.doInstance(service.getClass().getClassLoader(), resourceFactory, service); } return service; } catch (RuntimeException rex) { @@ -982,7 +983,8 @@ public abstract class Sncp { c.set(service, info); } if (methodBoost != null) { - methodBoost.doInstance(resourceFactory, service); + // 必须用servcie的ClassLoader, 因为service是动态ClassLoader会与doMethod里的动态ClassLoader不一致 + methodBoost.doInstance(service.getClass().getClassLoader(), resourceFactory, service); } return service; } catch (Throwable ex) { @@ -1264,7 +1266,8 @@ public abstract class Sncp { RedkaleClassLoader.putReflectionField(newDynName.replace('/', '.'), c); } if (methodBoost != null) { - methodBoost.doInstance(resourceFactory, service); + // 必须用servcie的ClassLoader, 因为service是动态ClassLoader会与doMethod里的动态ClassLoader不一致 + methodBoost.doInstance(service.getClass().getClassLoader(), resourceFactory, service); } return service; } catch (Exception ex) {