From 44bb9ac086b148fd6df425fa331dc1d6cfb8fcda Mon Sep 17 00:00:00 2001 From: redkale Date: Tue, 13 Aug 2024 15:27:54 +0800 Subject: [PATCH] =?UTF-8?q?MessageAsmMethodBoost=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/boot/Application.java | 16 ++-- .../java/org/redkale/boot/ModuleEngine.java | 12 ++- .../java/org/redkale/boot/NodeServer.java | 8 +- .../cached/spi/CachedAsmMethodBoost.java | 2 +- .../cached/spi/CachedModuleEngine.java | 23 +++++ .../locked/spi/LockedModuleEngine.java | 25 ++++++ .../redkale/mq/spi/MessageAsmMethodBoost.java | 35 +++++--- .../redkale/mq/spi/MessageModuleEngine.java | 84 +++++++++++++++++++ .../scheduled/spi/ScheduledModuleEngine.java | 7 +- .../java/org/redkale/test/mq/TestBean.java | 46 ++++++++++ .../redkale/test/mq/TestMessageConsumer.java | 33 ++++++++ .../org/redkale/test/mq/TestMessageFacde.java | 23 +++++ .../redkale/test/mq/TestMessageService.java | 54 ++++++++++++ .../test/mq/_DynLocalTestMessageService.java | 76 +++++++++++++++++ 14 files changed, 414 insertions(+), 30 deletions(-) create mode 100644 src/test/java/org/redkale/test/mq/TestBean.java create mode 100644 src/test/java/org/redkale/test/mq/TestMessageConsumer.java create mode 100644 src/test/java/org/redkale/test/mq/TestMessageFacde.java create mode 100644 src/test/java/org/redkale/test/mq/TestMessageService.java create mode 100644 src/test/java/org/redkale/test/mq/_DynLocalTestMessageService.java diff --git a/src/main/java/org/redkale/boot/Application.java b/src/main/java/org/redkale/boot/Application.java index e084e46c4..85dd52a72 100644 --- a/src/main/java/org/redkale/boot/Application.java +++ b/src/main/java/org/redkale/boot/Application.java @@ -1216,30 +1216,30 @@ public final class Application { } /** 执行Service.init方法前被调用 */ - void onServicePreInit(Service service) { + void onServicePreInit(NodeServer server, Service service) { for (ModuleEngine item : moduleEngines) { - item.onServicePreInit(service); + item.onServicePreInit(server, service); } } /** 执行Service.init方法后被调用 */ - void onServicePostInit(Service service) { + void onServicePostInit(NodeServer server, Service service) { for (ModuleEngine item : moduleEngines) { - item.onServicePostInit(service); + item.onServicePostInit(server, service); } } /** 执行Service.destroy方法前被调用 */ - void onServicePreDestroy(Service service) { + void onServicePreDestroy(NodeServer server, Service service) { for (ModuleEngine item : moduleEngines) { - item.onServicePreDestroy(service); + item.onServicePreDestroy(server, service); } } /** 执行Service.destroy方法后被调用 */ - void onServicePostDestroy(Service service) { + void onServicePostDestroy(NodeServer server, Service service) { for (ModuleEngine item : moduleEngines) { - item.onServicePostDestroy(service); + item.onServicePostDestroy(server, service); } } diff --git a/src/main/java/org/redkale/boot/ModuleEngine.java b/src/main/java/org/redkale/boot/ModuleEngine.java index 69fdd20f2..bf5747083 100644 --- a/src/main/java/org/redkale/boot/ModuleEngine.java +++ b/src/main/java/org/redkale/boot/ModuleEngine.java @@ -136,36 +136,40 @@ public abstract class ModuleEngine { /** * 执行Service.init方法前被调用 * + * @param server NodeServer * @param service Service */ - public void onServicePreInit(Service service) { + public void onServicePreInit(NodeServer server, Service service) { // do nothing } /** * 执行Service.init方法后被调用 * + * @param server NodeServer * @param service Service */ - public void onServicePostInit(Service service) { + public void onServicePostInit(NodeServer server, Service service) { // do nothing } /** * 执行Service.destroy方法前被调用 * + * @param server NodeServer * @param service Service */ - public void onServicePreDestroy(Service service) { + public void onServicePreDestroy(NodeServer server, Service service) { // do nothing } /** * 执行Service.destroy方法后被调用 * + * @param server NodeServer * @param service Service */ - public void onServicePostDestroy(Service service) { + public void onServicePostDestroy(NodeServer server, Service service) { // do nothing } diff --git a/src/main/java/org/redkale/boot/NodeServer.java b/src/main/java/org/redkale/boot/NodeServer.java index 860a89adf..d2ad718c2 100644 --- a/src/main/java/org/redkale/boot/NodeServer.java +++ b/src/main/java/org/redkale/boot/NodeServer.java @@ -403,9 +403,9 @@ public abstract class NodeServer { } else { localServices.stream().forEach(y -> { long s = System.currentTimeMillis(); - application.onServicePreInit(y); + application.onServicePreInit(this, y); y.init(Sncp.getResourceConf(y)); - application.onServicePostInit(y); + application.onServicePostInit(this, y); long e = System.currentTimeMillis() - s; if (slist != null) { String serstr = Sncp.toSimpleString(y, maxNameLength, maxTypeLength); @@ -707,12 +707,12 @@ public abstract class NodeServer { if (finest) { logger.finest(Sncp.getResourceType(y) + " is destroying"); } - application.onServicePreDestroy(y); + application.onServicePreDestroy(this, y); y.destroy(Sncp.getResourceConf(y)); if (finest) { logger.finest(Sncp.getResourceType(y) + " was destroyed"); } - application.onServicePostDestroy(y); + application.onServicePostDestroy(this, y); long e = System.currentTimeMillis() - s; if (e > 2 && sb != null) { sb.append(Sncp.toSimpleString(y, maxNameLength, maxTypeLength)) diff --git a/src/main/java/org/redkale/cached/spi/CachedAsmMethodBoost.java b/src/main/java/org/redkale/cached/spi/CachedAsmMethodBoost.java index d2a6b3555..8f269269c 100644 --- a/src/main/java/org/redkale/cached/spi/CachedAsmMethodBoost.java +++ b/src/main/java/org/redkale/cached/spi/CachedAsmMethodBoost.java @@ -44,7 +44,7 @@ import org.redkale.util.TypeToken; */ public class CachedAsmMethodBoost extends AsmMethodBoost { - private static final java.lang.reflect.Type FUTURE_VOID = new TypeToken>() {}.getType(); + static final java.lang.reflect.Type FUTURE_VOID = new TypeToken>() {}.getType(); private static final List> FILTER_ANN = List.of(Cached.class, DynForCached.class); diff --git a/src/main/java/org/redkale/cached/spi/CachedModuleEngine.java b/src/main/java/org/redkale/cached/spi/CachedModuleEngine.java index af222d968..117097f67 100644 --- a/src/main/java/org/redkale/cached/spi/CachedModuleEngine.java +++ b/src/main/java/org/redkale/cached/spi/CachedModuleEngine.java @@ -3,6 +3,7 @@ */ package org.redkale.cached.spi; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -10,10 +11,14 @@ import java.util.List; import java.util.Map; import java.util.ServiceLoader; import java.util.concurrent.ConcurrentHashMap; +import org.redkale.annotation.Component; import org.redkale.asm.AsmMethodBoost; import org.redkale.boot.Application; import org.redkale.boot.ModuleEngine; +import org.redkale.boot.NodeServer; +import org.redkale.cached.Cached; import org.redkale.cached.CachedManager; +import org.redkale.net.sncp.Sncp; import org.redkale.service.Service; import org.redkale.util.AnyValue; import org.redkale.util.InstanceProvider; @@ -89,6 +94,24 @@ public class CachedModuleEngine extends ModuleEngine { this.resourceFactory.register(new CachedKeyGeneratorLoader(this)); } + /** + * 执行Service.init方法后被调用 + * + * @param service Service + */ + @Override + public void onServicePostInit(NodeServer server, Service service) { + if (Sncp.isSncpDyn(service)) { + return; // 跳过动态生成的Service + } + for (Method method : service.getClass().getDeclaredMethods()) { + if (method.getAnnotation(Cached.class) != null) { + throw new RedkaleException("@" + Cached.class.getSimpleName() + " cannot on final or @" + + Component.class.getSimpleName() + " class, but on " + method); + } + } + } + /** * 进入Application.shutdown方法被调用 */ diff --git a/src/main/java/org/redkale/locked/spi/LockedModuleEngine.java b/src/main/java/org/redkale/locked/spi/LockedModuleEngine.java index d9c6c6a87..2432f6460 100644 --- a/src/main/java/org/redkale/locked/spi/LockedModuleEngine.java +++ b/src/main/java/org/redkale/locked/spi/LockedModuleEngine.java @@ -3,18 +3,24 @@ */ package org.redkale.locked.spi; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.ServiceLoader; +import org.redkale.annotation.Component; import org.redkale.asm.AsmMethodBoost; import org.redkale.boot.Application; import org.redkale.boot.ModuleEngine; +import org.redkale.boot.NodeServer; +import org.redkale.locked.Locked; import org.redkale.locked.LockedManager; +import org.redkale.net.sncp.Sncp; import org.redkale.service.Service; import org.redkale.util.AnyValue; import org.redkale.util.InstanceProvider; import org.redkale.util.RedkaleClassLoader; +import org.redkale.util.RedkaleException; /** @author zhangjx */ public class LockedModuleEngine extends ModuleEngine { @@ -73,6 +79,25 @@ public class LockedModuleEngine extends ModuleEngine { this.resourceFactory.register("", LockedManager.class, this.lockManager); } + /** + * 执行Service.init方法后被调用 + * + * @param server NodeServer + * @param service Service + */ + @Override + public void onServicePostInit(NodeServer server, Service service) { + if (Sncp.isSncpDyn(service)) { + return; // 跳过动态生成的Service + } + for (Method method : service.getClass().getDeclaredMethods()) { + if (method.getAnnotation(Locked.class) != null) { + throw new RedkaleException("@" + Locked.class.getSimpleName() + " cannot on final or @" + + Component.class.getSimpleName() + " class, but on " + method); + } + } + } + /** 进入Application.shutdown方法被调用 */ @Override public void onAppPreShutdown() { diff --git a/src/main/java/org/redkale/mq/spi/MessageAsmMethodBoost.java b/src/main/java/org/redkale/mq/spi/MessageAsmMethodBoost.java index 083e29230..d5ce4ed04 100644 --- a/src/main/java/org/redkale/mq/spi/MessageAsmMethodBoost.java +++ b/src/main/java/org/redkale/mq/spi/MessageAsmMethodBoost.java @@ -69,7 +69,7 @@ public class MessageAsmMethodBoost extends AsmMethodBoost { private Map methodBeans; - private Map consumerBytes; + Map consumerBytes; public MessageAsmMethodBoost(boolean remote, Class serviceType, MessageModuleEngine messageEngine) { super(remote, serviceType); @@ -143,7 +143,15 @@ public class MessageAsmMethodBoost extends AsmMethodBoost { if (Modifier.isProtected(method.getModifiers())) { createMessageMethod(cw, method, serviceImplClass, filterAnns, newMethod); } - createInnerConsumer(cw, method, paramKind, TypeToken.typeToClass(messageType), messaged, newDynName, newMethod); + createInnerConsumer( + cw, + serviceImplClass, + method, + paramKind, + TypeToken.typeToClass(messageType), + messaged, + newDynName, + newMethod); return newMethod; } @@ -166,17 +174,19 @@ public class MessageAsmMethodBoost extends AsmMethodBoost { } // paramKind: 1:单个MessageType; 2: MessageConext & MessageType; 3: MessageType & MessageConext; - private void createInnerConsumer( + protected void createInnerConsumer( ClassWriter pcw, + Class serviceImplClass, Method method, int paramKind, Class msgType, Messaged messaged, String newDynName, AsmNewMethod newMethod) { - final String newDynDesc = "L" + newDynName + ";"; + final String newDynDesc = + pcw == null ? org.redkale.asm.Type.getDescriptor(serviceImplClass) : ("L" + newDynName + ";"); final String innerClassName = "Dyn" + MessageConsumer.class.getSimpleName() + index.incrementAndGet(); - final String innerFullName = newDynName + "$" + innerClassName; + final String innerFullName = newDynName + (pcw == null ? "" : "$") + innerClassName; final String msgTypeName = TypeToken.primitiveToWrapper(msgType).getName().replace('.', '/'); final String msgTypeDesc = org.redkale.asm.Type.getDescriptor(TypeToken.primitiveToWrapper(msgType)); @@ -195,9 +205,9 @@ public class MessageAsmMethodBoost extends AsmMethodBoost { String methodSignature = methodBean.getSignature().replace(messageConextDesc, ""); genericMsgTypeDesc = methodSignature.substring(1, methodSignature.lastIndexOf(')')); // 获取()中的值 } - - pcw.visitInnerClass(innerFullName, newDynName, innerClassName, ACC_PUBLIC + ACC_STATIC); - + if (pcw != null) { // 不一定是关联类 + pcw.visitInnerClass(innerFullName, newDynName, innerClassName, ACC_PUBLIC + ACC_STATIC); + } MethodVisitor mv; ClassWriter cw = new ClassWriter(COMPUTE_FRAMES); // @@ -218,7 +228,9 @@ public class MessageAsmMethodBoost extends AsmMethodBoost { av.visit("value", false); av.visitEnd(); } - cw.visitInnerClass(innerFullName, newDynName, innerClassName, ACC_PUBLIC + ACC_STATIC); + if (pcw != null) { // 不一定是关联类 + cw.visitInnerClass(innerFullName, newDynName, innerClassName, ACC_PUBLIC + ACC_STATIC); + } { FieldVisitor fv = cw.visitField(ACC_PRIVATE, "service", newDynDesc, null, null); fv.visitEnd(); @@ -276,8 +288,9 @@ public class MessageAsmMethodBoost extends AsmMethodBoost { Asms.visitPrimitiveVirtual(mv, msgType); mv.visitVarInsn(ALOAD, 1); } - mv.visitMethodInsn( - INVOKEVIRTUAL, newDynName, methodName, org.redkale.asm.Type.getMethodDescriptor(method), false); + String methodDesc = org.redkale.asm.Type.getMethodDescriptor(method); + String owner = pcw == null ? serviceImplClass.getName().replace('.', '/') : newDynName; + mv.visitMethodInsn(INVOKEVIRTUAL, owner, methodName, methodDesc, false); if (method.getReturnType() != void.class) { mv.visitInsn(POP); } diff --git a/src/main/java/org/redkale/mq/spi/MessageModuleEngine.java b/src/main/java/org/redkale/mq/spi/MessageModuleEngine.java index 3de46f6b0..360cf5a73 100644 --- a/src/main/java/org/redkale/mq/spi/MessageModuleEngine.java +++ b/src/main/java/org/redkale/mq/spi/MessageModuleEngine.java @@ -4,6 +4,8 @@ package org.redkale.mq.spi; import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; import java.lang.reflect.Type; import java.util.ArrayList; import java.util.HashSet; @@ -19,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.logging.Level; import org.redkale.annotation.AutoLoad; +import org.redkale.annotation.Component; import org.redkale.asm.AsmMethodBoost; import org.redkale.boot.Application; import org.redkale.boot.ClassFilter; @@ -29,16 +32,22 @@ import org.redkale.inject.ResourceAnnotationLoader; import org.redkale.inject.ResourceEvent; import org.redkale.inject.ResourceFactory; import org.redkale.inject.ResourceTypeLoader; +import org.redkale.mq.MessageConext; import org.redkale.mq.MessageConsumer; import org.redkale.mq.MessageManager; import org.redkale.mq.MessageProducer; +import org.redkale.mq.Messaged; import org.redkale.mq.ResourceConsumer; import org.redkale.mq.ResourceProducer; import org.redkale.net.http.RestException; +import org.redkale.net.sncp.Sncp; +import org.redkale.service.Service; import org.redkale.util.AnyValue; import org.redkale.util.AnyValueWriter; import org.redkale.util.RedkaleClassLoader; +import org.redkale.util.RedkaleClassLoader.DynBytesClassLoader; import org.redkale.util.RedkaleException; +import org.redkale.util.TypeToken; import org.redkale.util.Utility; /** @author zhangjx */ @@ -396,6 +405,81 @@ public class MessageModuleEngine extends ModuleEngine { } } + /** + * 执行Service.init方法后被调用 + * + * @param server NodeServer + * @param service Service + */ + @Override + public void onServicePostInit(NodeServer server, Service service) { + if (Sncp.isSncpDyn(service)) { + return; // 跳过动态生成的Service + } + MessageAsmMethodBoost boost = null; + for (Method method : service.getClass().getDeclaredMethods()) { + Messaged messaged = method.getAnnotation(Messaged.class); + if (messaged == null) { + continue; + } + if (Modifier.isStatic(method.getModifiers())) { + throw new RedkaleException( + "@" + Messaged.class.getSimpleName() + " cannot on static method, but on " + method); + } + if (!Modifier.isPublic(method.getModifiers())) { + throw new RedkaleException("@" + Messaged.class.getSimpleName() + " must on public method in @" + + Component.class.getSimpleName() + " class, but on " + method); + } + int paramCount = method.getParameterCount(); + if (paramCount != 1 && paramCount != 2) { + throw new RedkaleException("@" + Messaged.class.getSimpleName() + + " must on one or two parameter method, but on " + method); + } + int paramKind = 1; // 1:单个MessageType; 2: MessageConext & MessageType; 3: MessageType & MessageConext; + Type messageType; + Type[] paramTypes = method.getGenericParameterTypes(); + if (paramCount == 1) { + messageType = paramTypes[0]; + paramKind = 1; + } else { + if (paramTypes[0] == MessageConext.class) { + messageType = paramTypes[1]; + paramKind = 2; + } else if (paramTypes[1] == MessageConext.class) { + messageType = paramTypes[0]; + paramKind = 3; + } else { + throw new RedkaleException( + "@" + Messaged.class.getSimpleName() + " on two-parameter method must contains " + + MessageConext.class.getSimpleName() + " parameter type, but on " + method); + } + } + if (boost == null) { + boost = new MessageAsmMethodBoost(false, service.getClass(), this); + String newDynName = "org/redkaledyn/service/local/_DynMessageService__" + + service.getClass().getName().replace('.', '_').replace('$', '_'); + Class msgType = TypeToken.typeToClass(messageType); + boost.createInnerConsumer( + null, service.getClass(), method, paramKind, msgType, messaged, newDynName, null); + } + } + if (boost != null && Utility.isNotEmpty(boost.consumerBytes)) { + DynBytesClassLoader classLoader = DynBytesClassLoader.create(null); + boost.consumerBytes.forEach((innerFullName, bytes) -> { + try { + String clzName = innerFullName.replace('/', '.'); + Class clazz = (Class) classLoader.loadClass(clzName, bytes); + RedkaleClassLoader.putDynClass(clzName, bytes, clazz); + RedkaleClassLoader.putReflectionPublicConstructors(clazz, clzName); + MessageConsumer consumer = (MessageConsumer) clazz.getConstructors()[0].newInstance(service); + addMessageConsumer(consumer); + } catch (Exception e) { + throw new RedkaleException(e); + } + }); + } + } + /** 服务全部启动后被调用 */ @Override public void onServersPostStart() { diff --git a/src/main/java/org/redkale/scheduled/spi/ScheduledModuleEngine.java b/src/main/java/org/redkale/scheduled/spi/ScheduledModuleEngine.java index ff8570daf..58601a152 100644 --- a/src/main/java/org/redkale/scheduled/spi/ScheduledModuleEngine.java +++ b/src/main/java/org/redkale/scheduled/spi/ScheduledModuleEngine.java @@ -9,6 +9,7 @@ import java.util.List; import java.util.ServiceLoader; import org.redkale.boot.Application; import org.redkale.boot.ModuleEngine; +import org.redkale.boot.NodeServer; import org.redkale.scheduled.ScheduledManager; import org.redkale.service.Service; import org.redkale.util.AnyValue; @@ -65,20 +66,22 @@ public class ScheduledModuleEngine extends ModuleEngine { /** * 执行Service.init方法后被调用 * + * @param server NodeServer * @param service Service */ @Override - public void onServicePostInit(Service service) { + public void onServicePostInit(NodeServer server, Service service) { this.scheduledManager.schedule(service); } /** * 执行Service.destroy方法后被调用 * + * @param server NodeServer * @param service Service */ @Override - public void onServicePreDestroy(Service service) { + public void onServicePreDestroy(NodeServer server, Service service) { this.scheduledManager.unschedule(service); } diff --git a/src/test/java/org/redkale/test/mq/TestBean.java b/src/test/java/org/redkale/test/mq/TestBean.java new file mode 100644 index 000000000..5b35a43fa --- /dev/null +++ b/src/test/java/org/redkale/test/mq/TestBean.java @@ -0,0 +1,46 @@ +/* + +*/ + +package org.redkale.test.mq; + +import org.redkale.convert.json.JsonConvert; + +/** + * + * @author zhangjx + */ +public class TestBean { + + private int userid; + + private String message; + + public TestBean() {} + + public TestBean(int userid, String message) { + this.userid = userid; + this.message = message; + } + + public int getUserid() { + return userid; + } + + public void setUserid(int userid) { + this.userid = userid; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + @Override + public String toString() { + return JsonConvert.root().convertTo(this); + } +} diff --git a/src/test/java/org/redkale/test/mq/TestMessageConsumer.java b/src/test/java/org/redkale/test/mq/TestMessageConsumer.java new file mode 100644 index 000000000..aafc355d5 --- /dev/null +++ b/src/test/java/org/redkale/test/mq/TestMessageConsumer.java @@ -0,0 +1,33 @@ +/* + +*/ + +package org.redkale.test.mq; + +import org.redkale.mq.MessageConext; +import org.redkale.mq.MessageConsumer; +import org.redkale.mq.ResourceConsumer; +import org.redkale.util.AnyValue; + +/** + * + * @author zhangjx + */ +@ResourceConsumer(mq = "mymq", topics = "test_bean_topic") +public class TestMessageConsumer implements MessageConsumer { + + @Override + public void init(AnyValue config) { + System.out.println("执行 TestMessageConsumer.init"); + } + + @Override + public void onMessage(MessageConext context, TestBean message) { + System.out.println("TestMessageConsumer消费消息, context: " + context + ", message: " + message); + } + + @Override + public void destroy(AnyValue config) { + System.out.println("执行 TestMessageConsumer.destroy"); + } +} diff --git a/src/test/java/org/redkale/test/mq/TestMessageFacde.java b/src/test/java/org/redkale/test/mq/TestMessageFacde.java new file mode 100644 index 000000000..ba71473a5 --- /dev/null +++ b/src/test/java/org/redkale/test/mq/TestMessageFacde.java @@ -0,0 +1,23 @@ +/* + +*/ + +package org.redkale.test.mq; + +import org.redkale.annotation.Component; +import org.redkale.mq.Messaged; +import org.redkale.service.AbstractService; + +/** + * + * @author zhangjx + */ +@Component +public class TestMessageFacde extends AbstractService { + + @Messaged(mq = "mymq", topics = "test_bean_topic", group = "group_5") + public int runMessage5(TestBean message) { + System.out.println("TestMessageFacde 消费消息5, message: " + message); + return 0; + } +} diff --git a/src/test/java/org/redkale/test/mq/TestMessageService.java b/src/test/java/org/redkale/test/mq/TestMessageService.java new file mode 100644 index 000000000..1d9e88089 --- /dev/null +++ b/src/test/java/org/redkale/test/mq/TestMessageService.java @@ -0,0 +1,54 @@ +/* + +*/ + +package org.redkale.test.mq; + +import org.redkale.mq.MessageConext; +import org.redkale.mq.MessageProducer; +import org.redkale.mq.Messaged; +import org.redkale.mq.ResourceProducer; +import org.redkale.service.AbstractService; +import org.redkale.util.AnyValue; + +/** + * + * @author zhangjx + */ +public class TestMessageService extends AbstractService { + + @ResourceProducer(mq = "mymq") + private MessageProducer producer; + + @Override + public void init(AnyValue config) { + sendMessage(); + } + + public void sendMessage() { + TestBean bean = new TestBean(12345, "this is a message"); + System.out.println("生产消息: " + bean); + producer.sendMessage("test_bean_topic", bean).whenComplete((v, t) -> { + if (t != null) { + t.printStackTrace(); + } + System.out.println("消息发送结果: " + v); + }); + } + + @Messaged(mq = "mymq", topics = "test_bean_topic", group = "group_2") + protected void runMessage2(MessageConext context, TestBean message) { + System.out.println("TestMessageService 消费消息2, context: " + context + ", message: " + message); + } + + @Messaged(mq = "mymq", topics = "test_bean_topic", group = "group_3") + protected void runMessage3(TestBean message) { + System.out.println("TestMessageService 消费消息3, message: " + message); + } + + @Messaged(mq = "mymq", topics = "test_bean_topic", group = "group_4") + protected int runMessage4(TestBean message) { + System.out.println("TestMessageService 消费消息4, message: " + message); + return 0; + } +} diff --git a/src/test/java/org/redkale/test/mq/_DynLocalTestMessageService.java b/src/test/java/org/redkale/test/mq/_DynLocalTestMessageService.java new file mode 100644 index 000000000..76d54b241 --- /dev/null +++ b/src/test/java/org/redkale/test/mq/_DynLocalTestMessageService.java @@ -0,0 +1,76 @@ +/* + +*/ + +package org.redkale.test.mq; + +import org.redkale.annotation.AutoLoad; +import org.redkale.convert.ConvertType; +import org.redkale.mq.MessageConext; +import org.redkale.mq.MessageConsumer; +import org.redkale.mq.ResourceConsumer; + +@AutoLoad(false) +public class _DynLocalTestMessageService extends TestMessageService { + public _DynLocalTestMessageService() { + super(); + System.out.println("哈哈哈哈哈"); + } + + @AutoLoad(false) + @ResourceConsumer( + topics = {"test_bean_topic"}, + required = true, + group = "group_4", + convertType = ConvertType.JSON, + mq = "mymq") + public static class DynMessageConsumerx1 implements MessageConsumer { + private _DynLocalTestMessageService service; + + public DynMessageConsumerx1(_DynLocalTestMessageService service) { + this.service = service; + } + + public void onMessage(MessageConext context, TestBean message) { + this.service.runMessage4(message); + } + } + + @AutoLoad(false) + @ResourceConsumer( + topics = {"test_bean_topic"}, + required = true, + group = "group_3", + convertType = ConvertType.JSON, + mq = "mymq") + public static class DynMessageConsumerx2 implements MessageConsumer { + private _DynLocalTestMessageService service; + + public DynMessageConsumerx2(_DynLocalTestMessageService service) { + this.service = service; + } + + public void onMessage(MessageConext context, TestBean message) { + this.service.runMessage3(message); + } + } + + @AutoLoad(false) + @ResourceConsumer( + topics = {"test_bean_topic"}, + required = true, + group = "group_2", + convertType = ConvertType.JSON, + mq = "mymq") + public static class DynMessageConsumerx3 implements MessageConsumer { + private _DynLocalTestMessageService service; + + public DynMessageConsumerx3(_DynLocalTestMessageService service) { + this.service = service; + } + + public void onMessage(MessageConext context, TestBean message) { + this.service.runMessage2(context, message); + } + } +}