MessageAgent优化

This commit is contained in:
redkale
2024-08-12 17:15:33 +08:00
parent f8fbabaef9
commit 6ad3ba192b
10 changed files with 133 additions and 41 deletions

View File

@@ -120,10 +120,11 @@ public abstract class AsmMethodBoost<T> {
/** /**
* 实例对象进行操作,通常用于给动态的字段赋值 * 实例对象进行操作,通常用于给动态的字段赋值
* *
* @param classLoader ClassLoader
* @param resourceFactory ResourceFactory * @param resourceFactory ResourceFactory
* @param service 实例对象 * @param service 实例对象
*/ */
public abstract void doInstance(ResourceFactory resourceFactory, T service); public abstract void doInstance(ClassLoader classLoader, ResourceFactory resourceFactory, T service);
protected AsmMethodBean getMethodBean(Method method) { protected AsmMethodBean getMethodBean(Method method) {
Map<String, AsmMethodBean> methodBeans = AsmMethodBoost.getMethodBeans(serviceType); Map<String, AsmMethodBean> methodBeans = AsmMethodBoost.getMethodBeans(serviceType);
@@ -324,10 +325,10 @@ public abstract class AsmMethodBoost<T> {
} }
@Override @Override
public void doInstance(ResourceFactory resourceFactory, T service) { public void doInstance(ClassLoader classLoader, ResourceFactory resourceFactory, T service) {
for (AsmMethodBoost item : items) { for (AsmMethodBoost item : items) {
if (item != null) { if (item != null) {
item.doInstance(resourceFactory, service); item.doInstance(classLoader, resourceFactory, service);
} }
} }
} }

View File

@@ -709,11 +709,8 @@ public final class Application {
new ClassFilter(this.getClassLoader(), ResourceAnnotationLoader.class); new ClassFilter(this.getClassLoader(), ResourceAnnotationLoader.class);
ClassFilter<ResourceTypeLoader> resTypeFilter = ClassFilter<ResourceTypeLoader> resTypeFilter =
new ClassFilter(this.getClassLoader(), ResourceTypeLoader.class); new ClassFilter(this.getClassLoader(), ResourceTypeLoader.class);
try {
loadClassByFilters(resConfigFilter, resAnnFilter, resTypeFilter); loadClassByFilters(resConfigFilter, resAnnFilter, resTypeFilter);
} catch (IOException e) {
throw new RedkaleException(e);
}
{ // Configuration { // Configuration
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
resConfigFilter.getFilterEntrys().forEach(en -> { resConfigFilter.getFilterEntrys().forEach(en -> {
@@ -1689,12 +1686,20 @@ public final class Application {
return moduleEngines; return moduleEngines;
} }
public void loadClassByFilters(final ClassFilter... filters) throws IOException { public void loadClassByFilters(final ClassFilter... filters) {
try {
ClassFilter.Loader.load(getHome(), getClassLoader(), filters); ClassFilter.Loader.load(getHome(), getClassLoader(), filters);
} catch (IOException e) {
throw new RedkaleException(e);
}
} }
public void loadServerClassFilters(final ClassFilter... filters) throws IOException { public void loadServerClassFilters(final ClassFilter... filters) {
try {
ClassFilter.Loader.load(getHome(), getServerClassLoader(), filters); ClassFilter.Loader.load(getHome(), getServerClassLoader(), filters);
} catch (IOException e) {
throw new RedkaleException(e);
}
} }
public DataSource loadDataSource(final String sourceName, boolean autoMemory) { public DataSource loadDataSource(final String sourceName, boolean autoMemory) {

View File

@@ -213,7 +213,7 @@ public class CachedAsmMethodBoost extends AsmMethodBoost {
} }
@Override @Override
public void doInstance(ResourceFactory resourceFactory, Object service) { public void doInstance(ClassLoader classLoader, ResourceFactory resourceFactory, Object service) {
Class clazz = service.getClass(); Class clazz = service.getClass();
if (actionMap == null) { // 为null表示没有调用过doMethod 动态类在编译是已经生成好了 if (actionMap == null) { // 为null表示没有调用过doMethod 动态类在编译是已经生成好了
actionMap = new LinkedHashMap<>(); actionMap = new LinkedHashMap<>();

View File

@@ -93,7 +93,7 @@ public class LockedAsmMethodBoost extends AsmMethodBoost {
} }
@Override @Override
public void doInstance(ResourceFactory resourceFactory, Object service) { public void doInstance(ClassLoader classLoader, ResourceFactory resourceFactory, Object service) {
// do nothing // do nothing
} }
} }

View File

@@ -52,12 +52,39 @@ import org.redkale.service.LoadMode;
@Retention(RUNTIME) @Retention(RUNTIME)
public @interface Messaged { public @interface Messaged {
/**
* {@link org.redkale.mq.spi.MessageAgent}对象对应名称
*
* @return MQ名称
*/
String mq() default ""; String mq() default "";
/**
* MQ客户端分组名称
*
* @return 组名称
*/
String group() default ""; String group() default "";
/**
* 是否必须要加载为ture时若mq()值对应{@link org.redkale.mq.spi.MessageAgent}对象不存在的情况下会抛异常
*
* @return 是否必须要加载
*/
boolean required() default true;
/**
* 监听的topic
*
* @return topic
*/
String[] topics(); String[] topics();
/**
* 消息序列化类型
*
* @return 序列化类型
*/
ConvertType convertType() default ConvertType.JSON; ConvertType convertType() default ConvertType.JSON;
/** /**

View File

@@ -38,6 +38,13 @@ public @interface ResourceConsumer {
*/ */
String group() default ""; String group() default "";
/**
* 是否必须要加载为ture时若mq()值对应{@link org.redkale.mq.spi.MessageAgent}对象不存在的情况下会抛异常
*
* @return 是否必须要加载
*/
boolean required() default true;
/** /**
* 监听的topic * 监听的topic
* *

View File

@@ -261,7 +261,7 @@ public abstract class MessageAgent implements MessageManager {
ResourceConsumer res = consumer.getClass().getAnnotation(ResourceConsumer.class); ResourceConsumer res = consumer.getClass().getAnnotation(ResourceConsumer.class);
String group = environment.getPropertyValue(res.group()); String group = environment.getPropertyValue(res.group());
if (Utility.isBlank(group)) { if (Utility.isBlank(group)) {
group = consumer.getClass().getName(); group = consumer.getClass().getName().replace('$', '.');
} }
Map<String, MessageConsumerWrapper> map = maps.computeIfAbsent(group, g -> new HashMap<>()); Map<String, MessageConsumerWrapper> map = maps.computeIfAbsent(group, g -> new HashMap<>());
List<String> topics = new ArrayList<>(); List<String> topics = new ArrayList<>();

View File

@@ -8,6 +8,7 @@ import java.lang.reflect.Method;
import java.lang.reflect.Modifier; import java.lang.reflect.Modifier;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@@ -66,7 +67,7 @@ public class MessageAsmMethodBoost extends AsmMethodBoost {
private RedkaleClassLoader.DynBytesClassLoader newLoader; private RedkaleClassLoader.DynBytesClassLoader newLoader;
private List<Class<? extends MessageConsumer>> consumers; private Map<String, byte[]> consumerBytes;
public MessageAsmMethodBoost(boolean remote, Class serviceType, MessageModuleEngine messageEngine) { public MessageAsmMethodBoost(boolean remote, Class serviceType, MessageModuleEngine messageEngine) {
super(remote, serviceType); super(remote, serviceType);
@@ -132,10 +133,6 @@ public class MessageAsmMethodBoost extends AsmMethodBoost {
ConvertFactory factory = ConvertFactory factory =
ConvertFactory.findConvert(messaged.convertType()).getFactory(); ConvertFactory.findConvert(messaged.convertType()).getFactory();
factory.loadDecoder(messageType); factory.loadDecoder(messageType);
if (newLoader == null) {
newLoader = new RedkaleClassLoader.DynBytesClassLoader(
classLoader == null ? Thread.currentThread().getContextClassLoader() : classLoader);
}
createInnerConsumer( createInnerConsumer(
cw, method, paramKind, TypeToken.typeToClass(messageType), messaged, newDynName, newMethodName); cw, method, paramKind, TypeToken.typeToClass(messageType), messaged, newDynName, newMethodName);
return newMethodName; return newMethodName;
@@ -153,6 +150,8 @@ public class MessageAsmMethodBoost extends AsmMethodBoost {
final String newDynDesc = "L" + newDynName + ";"; final String newDynDesc = "L" + newDynName + ";";
final String innerClassName = "Dyn" + MessageConsumer.class.getSimpleName() + index.incrementAndGet(); final String innerClassName = "Dyn" + MessageConsumer.class.getSimpleName() + index.incrementAndGet();
final String innerFullName = newDynName + "$" + innerClassName; final String innerFullName = newDynName + "$" + innerClassName;
final String msgTypeName =
TypeToken.primitiveToWrapper(msgType).getName().replace('.', '/');
final String msgTypeDesc = org.redkale.asm.Type.getDescriptor(TypeToken.primitiveToWrapper(msgType)); final String msgTypeDesc = org.redkale.asm.Type.getDescriptor(TypeToken.primitiveToWrapper(msgType));
final String messageConsumerName = MessageConsumer.class.getName().replace('.', '/'); final String messageConsumerName = MessageConsumer.class.getName().replace('.', '/');
final String messageConsumerDesc = org.redkale.asm.Type.getDescriptor(MessageConsumer.class); final String messageConsumerDesc = org.redkale.asm.Type.getDescriptor(MessageConsumer.class);
@@ -165,7 +164,7 @@ public class MessageAsmMethodBoost extends AsmMethodBoost {
} }
AsmMethodBean methodBean = AsmMethodBean.get(methodBeans, method); AsmMethodBean methodBean = AsmMethodBean.get(methodBeans, method);
String genericMsgTypeDesc = msgTypeDesc; String genericMsgTypeDesc = msgTypeDesc;
if (!msgType.isPrimitive()) { if (!msgType.isPrimitive() && Utility.isNotEmpty(methodBean.getSignature())) {
String methodSignature = methodBean.getSignature().replace(messageConextDesc, ""); String methodSignature = methodBean.getSignature().replace(messageConextDesc, "");
genericMsgTypeDesc = methodSignature.substring(1, methodSignature.lastIndexOf(')')); // 获取()中的值 genericMsgTypeDesc = methodSignature.substring(1, methodSignature.lastIndexOf(')')); // 获取()中的值
} }
@@ -300,7 +299,7 @@ public class MessageAsmMethodBoost extends AsmMethodBoost {
mv.visitVarInsn(ALOAD, 0); mv.visitVarInsn(ALOAD, 0);
mv.visitVarInsn(ALOAD, 1); mv.visitVarInsn(ALOAD, 1);
mv.visitVarInsn(ALOAD, 2); mv.visitVarInsn(ALOAD, 2);
mv.visitTypeInsn(CHECKCAST, "java/lang/String"); mv.visitTypeInsn(CHECKCAST, msgTypeName);
mv.visitMethodInsn( mv.visitMethodInsn(
INVOKEVIRTUAL, innerFullName, "onMessage", "(" + messageConextDesc + msgTypeDesc + ")V", false); INVOKEVIRTUAL, innerFullName, "onMessage", "(" + messageConextDesc + msgTypeDesc + ")V", false);
mv.visitInsn(RETURN); mv.visitInsn(RETURN);
@@ -310,22 +309,39 @@ public class MessageAsmMethodBoost extends AsmMethodBoost {
cw.visitEnd(); cw.visitEnd();
byte[] bytes = cw.toByteArray(); byte[] bytes = cw.toByteArray();
Class cz = newLoader.loadClass((innerFullName).replace('/', '.'), bytes); if (consumerBytes == null) {
if (consumers == null) { consumerBytes = new LinkedHashMap<>();
consumers = new ArrayList<>();
} }
consumers.add(cz); consumerBytes.put(innerFullName.replace('/', '.'), bytes);
RedkaleClassLoader.putDynClass((innerFullName).replace('/', '.'), bytes, cz);
} }
@Override @Override
public void doInstance(ResourceFactory resourceFactory, Object service) { public void doAfterMethods(ClassLoader classLoader, ClassWriter cw, String newDynName, String fieldPrefix) {
if (Utility.isNotEmpty(consumerBytes)) {
AnnotationVisitor av = cw.visitAnnotation(org.redkale.asm.Type.getDescriptor(DynForMessage.class), true);
av.visit("value", org.redkale.asm.Type.getType("L" + newDynName.replace('.', '/') + ";"));
av.visitEnd();
}
}
@Override
public void doInstance(ClassLoader classLoader, ResourceFactory resourceFactory, Object service) {
DynForMessage[] dyns = service.getClass().getAnnotationsByType(DynForMessage.class); DynForMessage[] dyns = service.getClass().getAnnotationsByType(DynForMessage.class);
if (Utility.isEmpty(dyns)) { if (Utility.isEmpty(dyns)) {
return; return;
} }
try { try {
if (Utility.isNotEmpty(consumers)) { if (Utility.isNotEmpty(consumerBytes)) {
if (newLoader == null) {
newLoader = new RedkaleClassLoader.DynBytesClassLoader(
classLoader == null ? Thread.currentThread().getContextClassLoader() : classLoader);
}
List<Class<? extends MessageConsumer>> consumers = new ArrayList<>();
consumerBytes.forEach((clzName, bytes) -> {
Class<? extends MessageConsumer> clazz = (Class) newLoader.loadClass(clzName, bytes);
RedkaleClassLoader.putDynClass(clzName, bytes, clazz);
consumers.add(clazz);
});
for (Class<? extends MessageConsumer> clazz : consumers) { for (Class<? extends MessageConsumer> clazz : consumers) {
MessageConsumer consumer = (MessageConsumer) clazz.getConstructors()[0].newInstance(service); MessageConsumer consumer = (MessageConsumer) clazz.getConstructors()[0].newInstance(service);
messageEngine.addMessageConsumer(consumer); messageEngine.addMessageConsumer(consumer);

View File

@@ -54,6 +54,8 @@ public class MessageModuleEngine extends ModuleEngine {
// @since 2.1.0 // @since 2.1.0
private MessageAgent[] messageAgents; private MessageAgent[] messageAgents;
private List<ClassFilter.FilterEntry<? extends MessageConsumer>> allMessageConsumerEntrys;
public MessageModuleEngine(Application application) { public MessageModuleEngine(Application application) {
super(application); super(application);
} }
@@ -70,11 +72,16 @@ public class MessageModuleEngine extends ModuleEngine {
return new MessageAsmMethodBoost(remote, serviceClass, this); return new MessageAsmMethodBoost(remote, serviceClass, this);
} }
// 在doInstance方法里被调用
void addMessageConsumer(MessageConsumer consumer) { void addMessageConsumer(MessageConsumer consumer) {
String agentName = environment.getPropertyValue( String mqName = environment.getPropertyValue(
consumer.getClass().getAnnotation(ResourceConsumer.class).mq()); consumer.getClass().getAnnotation(ResourceConsumer.class).mq());
if (findMessageAgent(mqName) == null) {
throw new RedkaleException("Not found " + MessageAgent.class.getSimpleName() + "(name = " + mqName + ") on "
+ consumer.getClass().getName());
}
agentConsumers agentConsumers
.computeIfAbsent(agentName, v -> new CopyOnWriteArrayList<>()) .computeIfAbsent(mqName, v -> new CopyOnWriteArrayList<>())
.add(consumer); .add(consumer);
} }
@@ -251,6 +258,22 @@ public class MessageModuleEngine extends ModuleEngine {
this.resourceFactory.register(agentName, MessageAgent.class, agent); this.resourceFactory.register(agentName, MessageAgent.class, agent);
} }
logger.info("MessageAgent init in " + (System.currentTimeMillis() - s) + " ms"); logger.info("MessageAgent init in " + (System.currentTimeMillis() - s) + " ms");
// 加载MessageConsumer
s = System.currentTimeMillis();
final RedkaleClassLoader cl = application.getServerClassLoader();
ClassFilter allFilter = new ClassFilter(cl, ResourceConsumer.class, MessageConsumer.class);
application.loadServerClassFilters(allFilter);
List<ClassFilter.FilterEntry<? extends MessageConsumer>> allEntrys = new ArrayList(allFilter.getFilterEntrys());
for (ClassFilter.FilterEntry<? extends MessageConsumer> en : allEntrys) {
Class<? extends MessageConsumer> clazz = en.getType();
ResourceConsumer res = clazz.getAnnotation(ResourceConsumer.class);
if (res != null && res.required() && findMessageAgent(res.mq()) == null) {
throw new RedkaleException("Not found " + MessageAgent.class.getSimpleName() + "(name = " + res.mq()
+ ") on " + clazz.getName());
}
}
this.allMessageConsumerEntrys = allEntrys;
logger.info("MessageAgent load MessageConsumer in " + (System.currentTimeMillis() - s) + " ms");
} }
/** /**
@@ -419,8 +442,8 @@ public class MessageModuleEngine extends ModuleEngine {
agentConsumers.getOrDefault(agent.getName(), new CopyOnWriteArrayList<>()); agentConsumers.getOrDefault(agent.getName(), new CopyOnWriteArrayList<>());
AnyValue consumerConf = agent.getConfig().getAnyValue("consumer"); AnyValue consumerConf = agent.getConfig().getAnyValue("consumer");
if (consumerConf != null) { // 加载 MessageConsumer if (consumerConf != null) { // 加载 MessageConsumer
ClassFilter filter = new ClassFilter( final RedkaleClassLoader cl = application.getServerClassLoader();
application.getServerClassLoader(), ResourceConsumer.class, MessageConsumer.class, null, null); ClassFilter filter = new ClassFilter(cl, ResourceConsumer.class, MessageConsumer.class);
if (consumerConf.getBoolValue("autoload", true)) { if (consumerConf.getBoolValue("autoload", true)) {
String includes = consumerConf.getValue("includes", ""); String includes = consumerConf.getValue("includes", "");
String excludes = consumerConf.getValue("excludes", ""); String excludes = consumerConf.getValue("excludes", "");
@@ -431,13 +454,11 @@ public class MessageModuleEngine extends ModuleEngine {
} }
try { try {
application.loadServerClassFilters(filter); for (ClassFilter.FilterEntry<? extends MessageConsumer> en : allMessageConsumerEntrys) {
List<ClassFilter.FilterEntry<? extends MessageConsumer>> entrys =
new ArrayList(filter.getFilterEntrys());
for (ClassFilter.FilterEntry<? extends MessageConsumer> en : entrys) {
Class<? extends MessageConsumer> clazz = en.getType(); Class<? extends MessageConsumer> clazz = en.getType();
ResourceConsumer res = clazz.getAnnotation(ResourceConsumer.class); ResourceConsumer res = clazz.getAnnotation(ResourceConsumer.class);
if (!Objects.equals(agent.getName(), environment.getPropertyValue(res.mq()))) { if (!filter.accept(clazz.getName())
|| !Objects.equals(agent.getName(), environment.getPropertyValue(res.mq()))) {
continue; continue;
} }
RedkaleClassLoader.putReflectionDeclaredConstructors(clazz, clazz.getName()); RedkaleClassLoader.putReflectionDeclaredConstructors(clazz, clazz.getName());
@@ -510,4 +531,16 @@ public class MessageModuleEngine extends ModuleEngine {
} }
return null; return null;
} }
public MessageAgent findMessageAgent(String mqName) {
if (this.messageAgents != null) {
String name = environment.getPropertyValue(mqName);
for (MessageAgent agent : this.messageAgents) {
if (Objects.equals(agent.getName(), name)) {
return agent;
}
}
}
return null;
}
} }

View File

@@ -827,7 +827,8 @@ public abstract class Sncp {
c.set(service, agent == null ? null : agent.getName()); c.set(service, agent == null ? null : agent.getName());
} }
if (methodBoost != null) { if (methodBoost != null) {
methodBoost.doInstance(resourceFactory, service); // 必须用servcie的ClassLoader 因为service是动态ClassLoader会与doMethod里的动态ClassLoader不一致
methodBoost.doInstance(service.getClass().getClassLoader(), resourceFactory, service);
} }
return service; return service;
} catch (RuntimeException rex) { } catch (RuntimeException rex) {
@@ -982,7 +983,8 @@ public abstract class Sncp {
c.set(service, info); c.set(service, info);
} }
if (methodBoost != null) { if (methodBoost != null) {
methodBoost.doInstance(resourceFactory, service); // 必须用servcie的ClassLoader 因为service是动态ClassLoader会与doMethod里的动态ClassLoader不一致
methodBoost.doInstance(service.getClass().getClassLoader(), resourceFactory, service);
} }
return service; return service;
} catch (Throwable ex) { } catch (Throwable ex) {
@@ -1264,7 +1266,8 @@ public abstract class Sncp {
RedkaleClassLoader.putReflectionField(newDynName.replace('/', '.'), c); RedkaleClassLoader.putReflectionField(newDynName.replace('/', '.'), c);
} }
if (methodBoost != null) { if (methodBoost != null) {
methodBoost.doInstance(resourceFactory, service); // 必须用servcie的ClassLoader 因为service是动态ClassLoader会与doMethod里的动态ClassLoader不一致
methodBoost.doInstance(service.getClass().getClassLoader(), resourceFactory, service);
} }
return service; return service;
} catch (Exception ex) { } catch (Exception ex) {