MessageAsmMethodBoost优化

This commit is contained in:
redkale
2024-08-13 15:27:54 +08:00
parent cb9b57a04c
commit 44bb9ac086
14 changed files with 414 additions and 30 deletions

View File

@@ -1216,30 +1216,30 @@ public final class Application {
} }
/** 执行Service.init方法前被调用 */ /** 执行Service.init方法前被调用 */
void onServicePreInit(Service service) { void onServicePreInit(NodeServer server, Service service) {
for (ModuleEngine item : moduleEngines) { for (ModuleEngine item : moduleEngines) {
item.onServicePreInit(service); item.onServicePreInit(server, service);
} }
} }
/** 执行Service.init方法后被调用 */ /** 执行Service.init方法后被调用 */
void onServicePostInit(Service service) { void onServicePostInit(NodeServer server, Service service) {
for (ModuleEngine item : moduleEngines) { for (ModuleEngine item : moduleEngines) {
item.onServicePostInit(service); item.onServicePostInit(server, service);
} }
} }
/** 执行Service.destroy方法前被调用 */ /** 执行Service.destroy方法前被调用 */
void onServicePreDestroy(Service service) { void onServicePreDestroy(NodeServer server, Service service) {
for (ModuleEngine item : moduleEngines) { for (ModuleEngine item : moduleEngines) {
item.onServicePreDestroy(service); item.onServicePreDestroy(server, service);
} }
} }
/** 执行Service.destroy方法后被调用 */ /** 执行Service.destroy方法后被调用 */
void onServicePostDestroy(Service service) { void onServicePostDestroy(NodeServer server, Service service) {
for (ModuleEngine item : moduleEngines) { for (ModuleEngine item : moduleEngines) {
item.onServicePostDestroy(service); item.onServicePostDestroy(server, service);
} }
} }

View File

@@ -136,36 +136,40 @@ public abstract class ModuleEngine {
/** /**
* 执行Service.init方法前被调用 * 执行Service.init方法前被调用
* *
* @param server NodeServer
* @param service Service * @param service Service
*/ */
public void onServicePreInit(Service service) { public void onServicePreInit(NodeServer server, Service service) {
// do nothing // do nothing
} }
/** /**
* 执行Service.init方法后被调用 * 执行Service.init方法后被调用
* *
* @param server NodeServer
* @param service Service * @param service Service
*/ */
public void onServicePostInit(Service service) { public void onServicePostInit(NodeServer server, Service service) {
// do nothing // do nothing
} }
/** /**
* 执行Service.destroy方法前被调用 * 执行Service.destroy方法前被调用
* *
* @param server NodeServer
* @param service Service * @param service Service
*/ */
public void onServicePreDestroy(Service service) { public void onServicePreDestroy(NodeServer server, Service service) {
// do nothing // do nothing
} }
/** /**
* 执行Service.destroy方法后被调用 * 执行Service.destroy方法后被调用
* *
* @param server NodeServer
* @param service Service * @param service Service
*/ */
public void onServicePostDestroy(Service service) { public void onServicePostDestroy(NodeServer server, Service service) {
// do nothing // do nothing
} }

View File

@@ -403,9 +403,9 @@ public abstract class NodeServer {
} else { } else {
localServices.stream().forEach(y -> { localServices.stream().forEach(y -> {
long s = System.currentTimeMillis(); long s = System.currentTimeMillis();
application.onServicePreInit(y); application.onServicePreInit(this, y);
y.init(Sncp.getResourceConf(y)); y.init(Sncp.getResourceConf(y));
application.onServicePostInit(y); application.onServicePostInit(this, y);
long e = System.currentTimeMillis() - s; long e = System.currentTimeMillis() - s;
if (slist != null) { if (slist != null) {
String serstr = Sncp.toSimpleString(y, maxNameLength, maxTypeLength); String serstr = Sncp.toSimpleString(y, maxNameLength, maxTypeLength);
@@ -707,12 +707,12 @@ public abstract class NodeServer {
if (finest) { if (finest) {
logger.finest(Sncp.getResourceType(y) + " is destroying"); logger.finest(Sncp.getResourceType(y) + " is destroying");
} }
application.onServicePreDestroy(y); application.onServicePreDestroy(this, y);
y.destroy(Sncp.getResourceConf(y)); y.destroy(Sncp.getResourceConf(y));
if (finest) { if (finest) {
logger.finest(Sncp.getResourceType(y) + " was destroyed"); logger.finest(Sncp.getResourceType(y) + " was destroyed");
} }
application.onServicePostDestroy(y); application.onServicePostDestroy(this, y);
long e = System.currentTimeMillis() - s; long e = System.currentTimeMillis() - s;
if (e > 2 && sb != null) { if (e > 2 && sb != null) {
sb.append(Sncp.toSimpleString(y, maxNameLength, maxTypeLength)) sb.append(Sncp.toSimpleString(y, maxNameLength, maxTypeLength))

View File

@@ -44,7 +44,7 @@ import org.redkale.util.TypeToken;
*/ */
public class CachedAsmMethodBoost extends AsmMethodBoost { public class CachedAsmMethodBoost extends AsmMethodBoost {
private static final java.lang.reflect.Type FUTURE_VOID = new TypeToken<CompletableFuture<Void>>() {}.getType(); static final java.lang.reflect.Type FUTURE_VOID = new TypeToken<CompletableFuture<Void>>() {}.getType();
private static final List<Class<? extends Annotation>> FILTER_ANN = List.of(Cached.class, DynForCached.class); private static final List<Class<? extends Annotation>> FILTER_ANN = List.of(Cached.class, DynForCached.class);

View File

@@ -3,6 +3,7 @@
*/ */
package org.redkale.cached.spi; package org.redkale.cached.spi;
import java.lang.reflect.Method;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
@@ -10,10 +11,14 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.ServiceLoader; import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.redkale.annotation.Component;
import org.redkale.asm.AsmMethodBoost; import org.redkale.asm.AsmMethodBoost;
import org.redkale.boot.Application; import org.redkale.boot.Application;
import org.redkale.boot.ModuleEngine; import org.redkale.boot.ModuleEngine;
import org.redkale.boot.NodeServer;
import org.redkale.cached.Cached;
import org.redkale.cached.CachedManager; import org.redkale.cached.CachedManager;
import org.redkale.net.sncp.Sncp;
import org.redkale.service.Service; import org.redkale.service.Service;
import org.redkale.util.AnyValue; import org.redkale.util.AnyValue;
import org.redkale.util.InstanceProvider; import org.redkale.util.InstanceProvider;
@@ -89,6 +94,24 @@ public class CachedModuleEngine extends ModuleEngine {
this.resourceFactory.register(new CachedKeyGeneratorLoader(this)); this.resourceFactory.register(new CachedKeyGeneratorLoader(this));
} }
/**
* 执行Service.init方法后被调用
*
* @param service Service
*/
@Override
public void onServicePostInit(NodeServer server, Service service) {
if (Sncp.isSncpDyn(service)) {
return; // 跳过动态生成的Service
}
for (Method method : service.getClass().getDeclaredMethods()) {
if (method.getAnnotation(Cached.class) != null) {
throw new RedkaleException("@" + Cached.class.getSimpleName() + " cannot on final or @"
+ Component.class.getSimpleName() + " class, but on " + method);
}
}
}
/** /**
* 进入Application.shutdown方法被调用 * 进入Application.shutdown方法被调用
*/ */

View File

@@ -3,18 +3,24 @@
*/ */
package org.redkale.locked.spi; package org.redkale.locked.spi;
import java.lang.reflect.Method;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.ServiceLoader; import java.util.ServiceLoader;
import org.redkale.annotation.Component;
import org.redkale.asm.AsmMethodBoost; import org.redkale.asm.AsmMethodBoost;
import org.redkale.boot.Application; import org.redkale.boot.Application;
import org.redkale.boot.ModuleEngine; import org.redkale.boot.ModuleEngine;
import org.redkale.boot.NodeServer;
import org.redkale.locked.Locked;
import org.redkale.locked.LockedManager; import org.redkale.locked.LockedManager;
import org.redkale.net.sncp.Sncp;
import org.redkale.service.Service; import org.redkale.service.Service;
import org.redkale.util.AnyValue; import org.redkale.util.AnyValue;
import org.redkale.util.InstanceProvider; import org.redkale.util.InstanceProvider;
import org.redkale.util.RedkaleClassLoader; import org.redkale.util.RedkaleClassLoader;
import org.redkale.util.RedkaleException;
/** @author zhangjx */ /** @author zhangjx */
public class LockedModuleEngine extends ModuleEngine { public class LockedModuleEngine extends ModuleEngine {
@@ -73,6 +79,25 @@ public class LockedModuleEngine extends ModuleEngine {
this.resourceFactory.register("", LockedManager.class, this.lockManager); this.resourceFactory.register("", LockedManager.class, this.lockManager);
} }
/**
* 执行Service.init方法后被调用
*
* @param server NodeServer
* @param service Service
*/
@Override
public void onServicePostInit(NodeServer server, Service service) {
if (Sncp.isSncpDyn(service)) {
return; // 跳过动态生成的Service
}
for (Method method : service.getClass().getDeclaredMethods()) {
if (method.getAnnotation(Locked.class) != null) {
throw new RedkaleException("@" + Locked.class.getSimpleName() + " cannot on final or @"
+ Component.class.getSimpleName() + " class, but on " + method);
}
}
}
/** 进入Application.shutdown方法被调用 */ /** 进入Application.shutdown方法被调用 */
@Override @Override
public void onAppPreShutdown() { public void onAppPreShutdown() {

View File

@@ -69,7 +69,7 @@ public class MessageAsmMethodBoost extends AsmMethodBoost {
private Map<String, AsmMethodBean> methodBeans; private Map<String, AsmMethodBean> methodBeans;
private Map<String, byte[]> consumerBytes; Map<String, byte[]> consumerBytes;
public MessageAsmMethodBoost(boolean remote, Class serviceType, MessageModuleEngine messageEngine) { public MessageAsmMethodBoost(boolean remote, Class serviceType, MessageModuleEngine messageEngine) {
super(remote, serviceType); super(remote, serviceType);
@@ -143,7 +143,15 @@ public class MessageAsmMethodBoost extends AsmMethodBoost {
if (Modifier.isProtected(method.getModifiers())) { if (Modifier.isProtected(method.getModifiers())) {
createMessageMethod(cw, method, serviceImplClass, filterAnns, newMethod); createMessageMethod(cw, method, serviceImplClass, filterAnns, newMethod);
} }
createInnerConsumer(cw, method, paramKind, TypeToken.typeToClass(messageType), messaged, newDynName, newMethod); createInnerConsumer(
cw,
serviceImplClass,
method,
paramKind,
TypeToken.typeToClass(messageType),
messaged,
newDynName,
newMethod);
return newMethod; return newMethod;
} }
@@ -166,17 +174,19 @@ public class MessageAsmMethodBoost extends AsmMethodBoost {
} }
// paramKind: 1:单个MessageType; 2: MessageConext & MessageType; 3: MessageType & MessageConext; // paramKind: 1:单个MessageType; 2: MessageConext & MessageType; 3: MessageType & MessageConext;
private void createInnerConsumer( protected void createInnerConsumer(
ClassWriter pcw, ClassWriter pcw,
Class serviceImplClass,
Method method, Method method,
int paramKind, int paramKind,
Class msgType, Class msgType,
Messaged messaged, Messaged messaged,
String newDynName, String newDynName,
AsmNewMethod newMethod) { AsmNewMethod newMethod) {
final String newDynDesc = "L" + newDynName + ";"; final String newDynDesc =
pcw == null ? org.redkale.asm.Type.getDescriptor(serviceImplClass) : ("L" + newDynName + ";");
final String innerClassName = "Dyn" + MessageConsumer.class.getSimpleName() + index.incrementAndGet(); final String innerClassName = "Dyn" + MessageConsumer.class.getSimpleName() + index.incrementAndGet();
final String innerFullName = newDynName + "$" + innerClassName; final String innerFullName = newDynName + (pcw == null ? "" : "$") + innerClassName;
final String msgTypeName = final String msgTypeName =
TypeToken.primitiveToWrapper(msgType).getName().replace('.', '/'); TypeToken.primitiveToWrapper(msgType).getName().replace('.', '/');
final String msgTypeDesc = org.redkale.asm.Type.getDescriptor(TypeToken.primitiveToWrapper(msgType)); final String msgTypeDesc = org.redkale.asm.Type.getDescriptor(TypeToken.primitiveToWrapper(msgType));
@@ -195,9 +205,9 @@ public class MessageAsmMethodBoost extends AsmMethodBoost {
String methodSignature = methodBean.getSignature().replace(messageConextDesc, ""); String methodSignature = methodBean.getSignature().replace(messageConextDesc, "");
genericMsgTypeDesc = methodSignature.substring(1, methodSignature.lastIndexOf(')')); // 获取()中的值 genericMsgTypeDesc = methodSignature.substring(1, methodSignature.lastIndexOf(')')); // 获取()中的值
} }
if (pcw != null) { // 不一定是关联类
pcw.visitInnerClass(innerFullName, newDynName, innerClassName, ACC_PUBLIC + ACC_STATIC); pcw.visitInnerClass(innerFullName, newDynName, innerClassName, ACC_PUBLIC + ACC_STATIC);
}
MethodVisitor mv; MethodVisitor mv;
ClassWriter cw = new ClassWriter(COMPUTE_FRAMES); ClassWriter cw = new ClassWriter(COMPUTE_FRAMES);
// //
@@ -218,7 +228,9 @@ public class MessageAsmMethodBoost extends AsmMethodBoost {
av.visit("value", false); av.visit("value", false);
av.visitEnd(); av.visitEnd();
} }
cw.visitInnerClass(innerFullName, newDynName, innerClassName, ACC_PUBLIC + ACC_STATIC); if (pcw != null) { // 不一定是关联类
cw.visitInnerClass(innerFullName, newDynName, innerClassName, ACC_PUBLIC + ACC_STATIC);
}
{ {
FieldVisitor fv = cw.visitField(ACC_PRIVATE, "service", newDynDesc, null, null); FieldVisitor fv = cw.visitField(ACC_PRIVATE, "service", newDynDesc, null, null);
fv.visitEnd(); fv.visitEnd();
@@ -276,8 +288,9 @@ public class MessageAsmMethodBoost extends AsmMethodBoost {
Asms.visitPrimitiveVirtual(mv, msgType); Asms.visitPrimitiveVirtual(mv, msgType);
mv.visitVarInsn(ALOAD, 1); mv.visitVarInsn(ALOAD, 1);
} }
mv.visitMethodInsn( String methodDesc = org.redkale.asm.Type.getMethodDescriptor(method);
INVOKEVIRTUAL, newDynName, methodName, org.redkale.asm.Type.getMethodDescriptor(method), false); String owner = pcw == null ? serviceImplClass.getName().replace('.', '/') : newDynName;
mv.visitMethodInsn(INVOKEVIRTUAL, owner, methodName, methodDesc, false);
if (method.getReturnType() != void.class) { if (method.getReturnType() != void.class) {
mv.visitInsn(POP); mv.visitInsn(POP);
} }

View File

@@ -4,6 +4,8 @@
package org.redkale.mq.spi; package org.redkale.mq.spi;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
@@ -19,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.logging.Level; import java.util.logging.Level;
import org.redkale.annotation.AutoLoad; import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.Component;
import org.redkale.asm.AsmMethodBoost; import org.redkale.asm.AsmMethodBoost;
import org.redkale.boot.Application; import org.redkale.boot.Application;
import org.redkale.boot.ClassFilter; import org.redkale.boot.ClassFilter;
@@ -29,16 +32,22 @@ import org.redkale.inject.ResourceAnnotationLoader;
import org.redkale.inject.ResourceEvent; import org.redkale.inject.ResourceEvent;
import org.redkale.inject.ResourceFactory; import org.redkale.inject.ResourceFactory;
import org.redkale.inject.ResourceTypeLoader; import org.redkale.inject.ResourceTypeLoader;
import org.redkale.mq.MessageConext;
import org.redkale.mq.MessageConsumer; import org.redkale.mq.MessageConsumer;
import org.redkale.mq.MessageManager; import org.redkale.mq.MessageManager;
import org.redkale.mq.MessageProducer; import org.redkale.mq.MessageProducer;
import org.redkale.mq.Messaged;
import org.redkale.mq.ResourceConsumer; import org.redkale.mq.ResourceConsumer;
import org.redkale.mq.ResourceProducer; import org.redkale.mq.ResourceProducer;
import org.redkale.net.http.RestException; import org.redkale.net.http.RestException;
import org.redkale.net.sncp.Sncp;
import org.redkale.service.Service;
import org.redkale.util.AnyValue; import org.redkale.util.AnyValue;
import org.redkale.util.AnyValueWriter; import org.redkale.util.AnyValueWriter;
import org.redkale.util.RedkaleClassLoader; import org.redkale.util.RedkaleClassLoader;
import org.redkale.util.RedkaleClassLoader.DynBytesClassLoader;
import org.redkale.util.RedkaleException; import org.redkale.util.RedkaleException;
import org.redkale.util.TypeToken;
import org.redkale.util.Utility; import org.redkale.util.Utility;
/** @author zhangjx */ /** @author zhangjx */
@@ -396,6 +405,81 @@ public class MessageModuleEngine extends ModuleEngine {
} }
} }
/**
* 执行Service.init方法后被调用
*
* @param server NodeServer
* @param service Service
*/
@Override
public void onServicePostInit(NodeServer server, Service service) {
if (Sncp.isSncpDyn(service)) {
return; // 跳过动态生成的Service
}
MessageAsmMethodBoost boost = null;
for (Method method : service.getClass().getDeclaredMethods()) {
Messaged messaged = method.getAnnotation(Messaged.class);
if (messaged == null) {
continue;
}
if (Modifier.isStatic(method.getModifiers())) {
throw new RedkaleException(
"@" + Messaged.class.getSimpleName() + " cannot on static method, but on " + method);
}
if (!Modifier.isPublic(method.getModifiers())) {
throw new RedkaleException("@" + Messaged.class.getSimpleName() + " must on public method in @"
+ Component.class.getSimpleName() + " class, but on " + method);
}
int paramCount = method.getParameterCount();
if (paramCount != 1 && paramCount != 2) {
throw new RedkaleException("@" + Messaged.class.getSimpleName()
+ " must on one or two parameter method, but on " + method);
}
int paramKind = 1; // 1:单个MessageType; 2: MessageConext & MessageType; 3: MessageType & MessageConext;
Type messageType;
Type[] paramTypes = method.getGenericParameterTypes();
if (paramCount == 1) {
messageType = paramTypes[0];
paramKind = 1;
} else {
if (paramTypes[0] == MessageConext.class) {
messageType = paramTypes[1];
paramKind = 2;
} else if (paramTypes[1] == MessageConext.class) {
messageType = paramTypes[0];
paramKind = 3;
} else {
throw new RedkaleException(
"@" + Messaged.class.getSimpleName() + " on two-parameter method must contains "
+ MessageConext.class.getSimpleName() + " parameter type, but on " + method);
}
}
if (boost == null) {
boost = new MessageAsmMethodBoost(false, service.getClass(), this);
String newDynName = "org/redkaledyn/service/local/_DynMessageService__"
+ service.getClass().getName().replace('.', '_').replace('$', '_');
Class msgType = TypeToken.typeToClass(messageType);
boost.createInnerConsumer(
null, service.getClass(), method, paramKind, msgType, messaged, newDynName, null);
}
}
if (boost != null && Utility.isNotEmpty(boost.consumerBytes)) {
DynBytesClassLoader classLoader = DynBytesClassLoader.create(null);
boost.consumerBytes.forEach((innerFullName, bytes) -> {
try {
String clzName = innerFullName.replace('/', '.');
Class<? extends MessageConsumer> clazz = (Class) classLoader.loadClass(clzName, bytes);
RedkaleClassLoader.putDynClass(clzName, bytes, clazz);
RedkaleClassLoader.putReflectionPublicConstructors(clazz, clzName);
MessageConsumer consumer = (MessageConsumer) clazz.getConstructors()[0].newInstance(service);
addMessageConsumer(consumer);
} catch (Exception e) {
throw new RedkaleException(e);
}
});
}
}
/** 服务全部启动后被调用 */ /** 服务全部启动后被调用 */
@Override @Override
public void onServersPostStart() { public void onServersPostStart() {

View File

@@ -9,6 +9,7 @@ import java.util.List;
import java.util.ServiceLoader; import java.util.ServiceLoader;
import org.redkale.boot.Application; import org.redkale.boot.Application;
import org.redkale.boot.ModuleEngine; import org.redkale.boot.ModuleEngine;
import org.redkale.boot.NodeServer;
import org.redkale.scheduled.ScheduledManager; import org.redkale.scheduled.ScheduledManager;
import org.redkale.service.Service; import org.redkale.service.Service;
import org.redkale.util.AnyValue; import org.redkale.util.AnyValue;
@@ -65,20 +66,22 @@ public class ScheduledModuleEngine extends ModuleEngine {
/** /**
* 执行Service.init方法后被调用 * 执行Service.init方法后被调用
* *
* @param server NodeServer
* @param service Service * @param service Service
*/ */
@Override @Override
public void onServicePostInit(Service service) { public void onServicePostInit(NodeServer server, Service service) {
this.scheduledManager.schedule(service); this.scheduledManager.schedule(service);
} }
/** /**
* 执行Service.destroy方法后被调用 * 执行Service.destroy方法后被调用
* *
* @param server NodeServer
* @param service Service * @param service Service
*/ */
@Override @Override
public void onServicePreDestroy(Service service) { public void onServicePreDestroy(NodeServer server, Service service) {
this.scheduledManager.unschedule(service); this.scheduledManager.unschedule(service);
} }

View File

@@ -0,0 +1,46 @@
/*
*/
package org.redkale.test.mq;
import org.redkale.convert.json.JsonConvert;
/**
*
* @author zhangjx
*/
public class TestBean {
private int userid;
private String message;
public TestBean() {}
public TestBean(int userid, String message) {
this.userid = userid;
this.message = message;
}
public int getUserid() {
return userid;
}
public void setUserid(int userid) {
this.userid = userid;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
@Override
public String toString() {
return JsonConvert.root().convertTo(this);
}
}

View File

@@ -0,0 +1,33 @@
/*
*/
package org.redkale.test.mq;
import org.redkale.mq.MessageConext;
import org.redkale.mq.MessageConsumer;
import org.redkale.mq.ResourceConsumer;
import org.redkale.util.AnyValue;
/**
*
* @author zhangjx
*/
@ResourceConsumer(mq = "mymq", topics = "test_bean_topic")
public class TestMessageConsumer implements MessageConsumer<TestBean> {
@Override
public void init(AnyValue config) {
System.out.println("执行 TestMessageConsumer.init");
}
@Override
public void onMessage(MessageConext context, TestBean message) {
System.out.println("TestMessageConsumer消费消息, context: " + context + ", message: " + message);
}
@Override
public void destroy(AnyValue config) {
System.out.println("执行 TestMessageConsumer.destroy");
}
}

View File

@@ -0,0 +1,23 @@
/*
*/
package org.redkale.test.mq;
import org.redkale.annotation.Component;
import org.redkale.mq.Messaged;
import org.redkale.service.AbstractService;
/**
*
* @author zhangjx
*/
@Component
public class TestMessageFacde extends AbstractService {
@Messaged(mq = "mymq", topics = "test_bean_topic", group = "group_5")
public int runMessage5(TestBean message) {
System.out.println("TestMessageFacde 消费消息5, message: " + message);
return 0;
}
}

View File

@@ -0,0 +1,54 @@
/*
*/
package org.redkale.test.mq;
import org.redkale.mq.MessageConext;
import org.redkale.mq.MessageProducer;
import org.redkale.mq.Messaged;
import org.redkale.mq.ResourceProducer;
import org.redkale.service.AbstractService;
import org.redkale.util.AnyValue;
/**
*
* @author zhangjx
*/
public class TestMessageService extends AbstractService {
@ResourceProducer(mq = "mymq")
private MessageProducer producer;
@Override
public void init(AnyValue config) {
sendMessage();
}
public void sendMessage() {
TestBean bean = new TestBean(12345, "this is a message");
System.out.println("生产消息: " + bean);
producer.sendMessage("test_bean_topic", bean).whenComplete((v, t) -> {
if (t != null) {
t.printStackTrace();
}
System.out.println("消息发送结果: " + v);
});
}
@Messaged(mq = "mymq", topics = "test_bean_topic", group = "group_2")
protected void runMessage2(MessageConext context, TestBean message) {
System.out.println("TestMessageService 消费消息2, context: " + context + ", message: " + message);
}
@Messaged(mq = "mymq", topics = "test_bean_topic", group = "group_3")
protected void runMessage3(TestBean message) {
System.out.println("TestMessageService 消费消息3, message: " + message);
}
@Messaged(mq = "mymq", topics = "test_bean_topic", group = "group_4")
protected int runMessage4(TestBean message) {
System.out.println("TestMessageService 消费消息4, message: " + message);
return 0;
}
}

View File

@@ -0,0 +1,76 @@
/*
*/
package org.redkale.test.mq;
import org.redkale.annotation.AutoLoad;
import org.redkale.convert.ConvertType;
import org.redkale.mq.MessageConext;
import org.redkale.mq.MessageConsumer;
import org.redkale.mq.ResourceConsumer;
@AutoLoad(false)
public class _DynLocalTestMessageService extends TestMessageService {
public _DynLocalTestMessageService() {
super();
System.out.println("哈哈哈哈哈");
}
@AutoLoad(false)
@ResourceConsumer(
topics = {"test_bean_topic"},
required = true,
group = "group_4",
convertType = ConvertType.JSON,
mq = "mymq")
public static class DynMessageConsumerx1 implements MessageConsumer<TestBean> {
private _DynLocalTestMessageService service;
public DynMessageConsumerx1(_DynLocalTestMessageService service) {
this.service = service;
}
public void onMessage(MessageConext context, TestBean message) {
this.service.runMessage4(message);
}
}
@AutoLoad(false)
@ResourceConsumer(
topics = {"test_bean_topic"},
required = true,
group = "group_3",
convertType = ConvertType.JSON,
mq = "mymq")
public static class DynMessageConsumerx2 implements MessageConsumer<TestBean> {
private _DynLocalTestMessageService service;
public DynMessageConsumerx2(_DynLocalTestMessageService service) {
this.service = service;
}
public void onMessage(MessageConext context, TestBean message) {
this.service.runMessage3(message);
}
}
@AutoLoad(false)
@ResourceConsumer(
topics = {"test_bean_topic"},
required = true,
group = "group_2",
convertType = ConvertType.JSON,
mq = "mymq")
public static class DynMessageConsumerx3 implements MessageConsumer<TestBean> {
private _DynLocalTestMessageService service;
public DynMessageConsumerx3(_DynLocalTestMessageService service) {
this.service = service;
}
public void onMessage(MessageConext context, TestBean message) {
this.service.runMessage2(context, message);
}
}
}