MessageEvent

This commit is contained in:
redkale
2024-08-18 01:44:09 +08:00
parent 9137aed8db
commit 74823fe2fe
13 changed files with 185 additions and 222 deletions

View File

@@ -1,61 +0,0 @@
/*
*
*/
package org.redkale.mq;
import java.util.Objects;
import org.redkale.convert.ConvertColumn;
import org.redkale.convert.json.JsonConvert;
/**
* MessageConsumer回调的上下文
*
* <p>详情见: https://redkale.org
*
* @see org.redkale.mq.MessageConsumer
* @author zhangjx
* @since 2.8.0
*/
public class MessageConext {
@ConvertColumn(index = 1)
protected String topic;
@ConvertColumn(index = 2)
protected Integer partition;
public MessageConext(String topic, Integer partition) {
this.topic = topic;
this.partition = partition;
}
public String getTopic() {
return topic;
}
public Integer getPartition() {
return partition;
}
@Override
public int hashCode() {
return Objects.hash(this.topic, this.partition);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final MessageConext other = (MessageConext) obj;
return Objects.equals(this.topic, other.topic) && Objects.equals(this.partition, other.partition);
}
@Override
public String toString() {
return JsonConvert.root().convertTo(this);
}
}

View File

@@ -22,8 +22,10 @@ import org.redkale.util.AnyValue;
* } * }
* *
* &#64;Override * &#64;Override
* public void onMessage(MessageConext context, TestBean message) { * public void onMessage(MessageEvent&lt;TestBean&gt;[] events) {
* System.out.println("TestMessageConsumer消费消息, context: " + context + ", message: " + message); * for(MessageEvent&lt;TestBean&gt; event : events) {
* System.out.println("TestMessageConsumer消费消息, message: " + event.getMessage());
* }
* } * }
* *
* &#64;Override * &#64;Override
@@ -36,7 +38,7 @@ import org.redkale.util.AnyValue;
* *
* <p>详情见: https://redkale.org * <p>详情见: https://redkale.org
* *
* @see org.redkale.mq.MessageConext * @see org.redkale.mq.MessageEvent
* @see org.redkale.mq.ResourceConsumer * @see org.redkale.mq.ResourceConsumer
* @see org.redkale.mq.Messaged * @see org.redkale.mq.Messaged
* @author zhangjx * @author zhangjx
@@ -50,7 +52,7 @@ public interface MessageConsumer<T> {
default void init(AnyValue config) {} default void init(AnyValue config) {}
public void onMessage(MessageConext context, T message); public void onMessage(MessageEvent<T>[] events);
default void destroy(AnyValue config) {} default void destroy(AnyValue config) {}
} }

View File

@@ -0,0 +1,81 @@
/*
*/
package org.redkale.mq;
import org.redkale.convert.ConvertColumn;
import org.redkale.convert.json.JsonConvert;
/**
* MessageConsumer的消息实体类
*
* <p>详情见: https://redkale.org
*
* @see org.redkale.mq.MessageConsumer
* @author zhangjx
* @since 2.8.0
* @param <T> T
*/
public final class MessageEvent<T> {
@ConvertColumn(index = 1)
protected String topic;
@ConvertColumn(index = 2)
protected Integer partition;
@ConvertColumn(index = 3)
protected String traceid;
@ConvertColumn(index = 4)
protected T message;
public MessageEvent() {
//
}
public MessageEvent(String topic, Integer partition, String traceid, T message) {
this.topic = topic;
this.partition = partition;
this.traceid = traceid;
this.message = message;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public Integer getPartition() {
return partition;
}
public void setPartition(Integer partition) {
this.partition = partition;
}
public String getTraceid() {
return traceid;
}
public void setTraceid(String traceid) {
this.traceid = traceid;
}
public T getMessage() {
return message;
}
public void setMessage(T message) {
this.message = message;
}
@Override
public String toString() {
return JsonConvert.root().convertTo(this);
}
}

View File

@@ -15,25 +15,25 @@ import org.redkale.service.LoadMode;
* MQ资源注解, 只能标记在Service类方法上, 方法会被框架动态生成{@link org.redkale.mq.MessageConsumer}对象供内部调用 <br> * MQ资源注解, 只能标记在Service类方法上, 方法会被框架动态生成{@link org.redkale.mq.MessageConsumer}对象供内部调用 <br>
* 1、方法必须是protected/public <br> * 1、方法必须是protected/public <br>
* 2、方法不能是final/static <br> * 2、方法不能是final/static <br>
* 3、方法的参数只能是1个或者2个 1个参数视为Message数据类型2个参数则另一个必须是{@link org.redkale.mq.MessageConext} <br> * 3、方法的参数只能是1个且为MessageEvent[] <br>
* *
* <blockquote> * <blockquote>
* <pre> * <pre>
* public class MyMessageService extends AbstractService { * public class MyMessageService extends AbstractService {
* *
* &#64;Messaged(mq="defaultmq", topics={"test-topic"}) * &#64;Messaged(mq="defaultmq", topics={"test-topic"})
* protected void onMessage(Record msg) { * protected void onMessage(MessageEvent&lt;Record&gt;[] events) {
* //打印msg * //打印events
* } * }
* *
* &#64;Messaged(topics={"test-topic2"}) * &#64;Messaged(topics={"test-topic2"})
* protected void onMessage2(MessageConext context, Record msg) { * protected void onMessage2(MessageEvent&lt;Record&gt;[] events) {
* //打印msg * //打印events
* } * }
* *
* &#64;Messaged(topics={"test-topic3"}) * &#64;Messaged(topics={"test-topic3"})
* public void onMessage3(Record msg, MessageConext context) { * public void onMessage3(MessageEvent&lt;Record&gt;[] events) {
* //打印msg * //打印events
* } * }
* } * }
* </pre> * </pre>
@@ -42,7 +42,7 @@ import org.redkale.service.LoadMode;
* <p>详情见: https://redkale.org * <p>详情见: https://redkale.org
* *
* @see org.redkale.mq.ResourceConsumer * @see org.redkale.mq.ResourceConsumer
* @see org.redkale.mq.MessageConext * @see org.redkale.mq.MessageEvent
* @author zhangjx * @author zhangjx
* @since 2.8.0 * @since 2.8.0
*/ */

View File

@@ -23,8 +23,10 @@ import org.redkale.convert.ConvertType;
* } * }
* *
* &#64;Override * &#64;Override
* public void onMessage(MessageConext context, TestBean message) { * public void onMessage(MessageEvent&lt;TestBean&gt;[] events) {
* System.out.println("TestMessageConsumer消费消息, context: " + context + ", message: " + message); * for(MessageEvent&lt;TestBean&gt; event : events) {
* System.out.println("TestMessageConsumer消费消息, message: " + event.getMessage());
* }
* } * }
* *
* &#64;Override * &#64;Override

View File

@@ -24,8 +24,8 @@ import org.redkale.convert.ConvertFactory;
import org.redkale.convert.ConvertType; import org.redkale.convert.ConvertType;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import org.redkale.inject.ResourceEvent; import org.redkale.inject.ResourceEvent;
import org.redkale.mq.MessageConext;
import org.redkale.mq.MessageConsumer; import org.redkale.mq.MessageConsumer;
import org.redkale.mq.MessageEvent;
import org.redkale.mq.MessageManager; import org.redkale.mq.MessageManager;
import org.redkale.mq.MessageProducer; import org.redkale.mq.MessageProducer;
import org.redkale.mq.ResourceConsumer; import org.redkale.mq.ResourceConsumer;
@@ -227,10 +227,6 @@ public abstract class MessageAgent implements MessageManager {
} }
} }
public MessageConext createMessageConext(String topic, Integer partition) {
return new MessageConext(topic, partition);
}
public MessageProducer loadMessageProducer(ResourceProducer ann) { public MessageProducer loadMessageProducer(ResourceProducer ann) {
MessageProducer baseProducer = this.messageBaseProducer; MessageProducer baseProducer = this.messageBaseProducer;
if (this.messageBaseProducer == null) { if (this.messageBaseProducer == null) {
@@ -548,17 +544,21 @@ public abstract class MessageAgent implements MessageManager {
consumer.init(config); consumer.init(config);
} }
public Future onMessage(MessageConext context, String traceid, byte[] message) { public Future onMessage(List<MessageEvent<byte[]>> events) {
Convert c = this.convert; Convert c = this.convert;
MessageConsumer m = this.consumer; MessageConsumer m = this.consumer;
return messageAgent.submit(() -> { return messageAgent.submit(() -> {
Traces.computeIfAbsent(traceid); if (events.size() == 1) {
T msg = null; Traces.computeIfAbsent(events.get(0).getTraceid());
}
try { try {
msg = (T) c.convertFrom(messageType, message); for (MessageEvent event : events) {
m.onMessage(context, msg); event.setMessage((T) c.convertFrom(messageType, (byte[]) event.getMessage()));
}
m.onMessage(events.toArray(new MessageEvent[events.size()]));
} catch (Throwable t) { } catch (Throwable t) {
messageAgent.getLogger().log(Level.SEVERE, "MessageConsumer.onMessage error, message: " + msg, t); String msg = JsonConvert.root().convertTo(events);
messageAgent.getLogger().log(Level.SEVERE, "MessageConsumer.onMessage error, events: " + msg, t);
} }
Traces.removeTraceid(); Traces.removeTraceid();
}); });

View File

@@ -4,8 +4,10 @@
package org.redkale.mq.spi; package org.redkale.mq.spi;
import java.lang.annotation.Annotation; import java.lang.annotation.Annotation;
import java.lang.reflect.GenericArrayType;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.lang.reflect.Modifier; import java.lang.reflect.Modifier;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
@@ -23,16 +25,13 @@ import org.redkale.asm.FieldVisitor;
import org.redkale.asm.Label; import org.redkale.asm.Label;
import org.redkale.asm.MethodVisitor; import org.redkale.asm.MethodVisitor;
import org.redkale.asm.Opcodes; import org.redkale.asm.Opcodes;
import static org.redkale.asm.Opcodes.ACC_BRIDGE;
import static org.redkale.asm.Opcodes.ACC_PRIVATE; import static org.redkale.asm.Opcodes.ACC_PRIVATE;
import static org.redkale.asm.Opcodes.ACC_PUBLIC; import static org.redkale.asm.Opcodes.ACC_PUBLIC;
import static org.redkale.asm.Opcodes.ACC_STATIC; import static org.redkale.asm.Opcodes.ACC_STATIC;
import static org.redkale.asm.Opcodes.ACC_SUPER; import static org.redkale.asm.Opcodes.ACC_SUPER;
import static org.redkale.asm.Opcodes.ACC_SYNTHETIC;
import static org.redkale.asm.Opcodes.ALOAD; import static org.redkale.asm.Opcodes.ALOAD;
import static org.redkale.asm.Opcodes.ASTORE; import static org.redkale.asm.Opcodes.ASTORE;
import static org.redkale.asm.Opcodes.ATHROW; import static org.redkale.asm.Opcodes.ATHROW;
import static org.redkale.asm.Opcodes.CHECKCAST;
import static org.redkale.asm.Opcodes.DUP; import static org.redkale.asm.Opcodes.DUP;
import static org.redkale.asm.Opcodes.GETFIELD; import static org.redkale.asm.Opcodes.GETFIELD;
import static org.redkale.asm.Opcodes.GOTO; import static org.redkale.asm.Opcodes.GOTO;
@@ -46,8 +45,8 @@ import static org.redkale.asm.Opcodes.V11;
import org.redkale.convert.Convert; import org.redkale.convert.Convert;
import org.redkale.convert.ConvertFactory; import org.redkale.convert.ConvertFactory;
import org.redkale.inject.ResourceFactory; import org.redkale.inject.ResourceFactory;
import org.redkale.mq.MessageConext;
import org.redkale.mq.MessageConsumer; import org.redkale.mq.MessageConsumer;
import org.redkale.mq.MessageEvent;
import org.redkale.mq.Messaged; import org.redkale.mq.Messaged;
import org.redkale.mq.ResourceConsumer; import org.redkale.mq.ResourceConsumer;
import org.redkale.mq.spi.DynForMessaged.DynForMessageds; import org.redkale.mq.spi.DynForMessaged.DynForMessageds;
@@ -113,45 +112,17 @@ public class MessageAsmMethodBoost extends AsmMethodBoost {
throw new RedkaleException( throw new RedkaleException(
"@" + Messaged.class.getSimpleName() + " must on protected or public method, but on " + method); "@" + Messaged.class.getSimpleName() + " must on protected or public method, but on " + method);
} }
if (method.getParameterCount() != 1 || method.getParameterTypes()[0] != MessageEvent[].class) {
int paramCount = method.getParameterCount(); throw new RedkaleException("@" + Messaged.class.getSimpleName()
if (paramCount != 1 && paramCount != 2) { + " must on one parameter(type: MessageEvent[]) method, but on " + method);
throw new RedkaleException(
"@" + Messaged.class.getSimpleName() + " must on one or two parameter method, but on " + method);
}
int paramKind = 1; // 1:单个MessageType; 2: MessageConext & MessageType; 3: MessageType & MessageConext;
Type messageType;
Type[] paramTypes = method.getGenericParameterTypes();
if (paramCount == 1) {
messageType = paramTypes[0];
paramKind = 1;
} else {
if (paramTypes[0] == MessageConext.class) {
messageType = paramTypes[1];
paramKind = 2;
} else if (paramTypes[1] == MessageConext.class) {
messageType = paramTypes[0];
paramKind = 3;
} else {
throw new RedkaleException(
"@" + Messaged.class.getSimpleName() + " on two-parameter method must contains "
+ MessageConext.class.getSimpleName() + " parameter type, but on " + method);
}
} }
Type messageType = getMethodMessageType(method);
Convert convert = ConvertFactory.findConvert(messaged.convertType()); Convert convert = ConvertFactory.findConvert(messaged.convertType());
convert.getFactory().loadDecoder(messageType); convert.getFactory().loadDecoder(messageType);
if (Modifier.isProtected(method.getModifiers())) { if (Modifier.isProtected(method.getModifiers())) {
createMessageMethod(cw, method, serviceImplClass, filterAnns, newMethod); createMessageMethod(cw, method, serviceImplClass, filterAnns, newMethod);
} }
createInnerConsumer( createInnerConsumer(cw, serviceImplClass, method, messageType, messaged, newDynName, newMethod);
cw,
serviceImplClass,
method,
paramKind,
TypeToken.typeToClass(messageType),
messaged,
newDynName,
newMethod);
return newMethod; return newMethod;
} }
@@ -173,13 +144,22 @@ public class MessageAsmMethodBoost extends AsmMethodBoost {
mv.visitEnd(); mv.visitEnd();
} }
// paramKind: 1:单个MessageType; 2: MessageConext & MessageType; 3: MessageType & MessageConext; protected static Type getMethodMessageType(Method method) {
Type paramType = method.getGenericParameterTypes()[0];
if (!(paramType instanceof GenericArrayType)) {
throw new RedkaleException("@" + Messaged.class.getSimpleName()
+ " must on one generic type parameter method, but on " + method);
}
GenericArrayType arrayType = (GenericArrayType) paramType;
Type omponentType = arrayType.getGenericComponentType();
return ((ParameterizedType) omponentType).getActualTypeArguments()[0];
}
protected void createInnerConsumer( protected void createInnerConsumer(
ClassWriter pcw, ClassWriter pcw,
Class serviceImplClass, Class serviceImplClass,
Method method, Method method,
int paramKind, Type messageType,
Class msgType,
Messaged messaged, Messaged messaged,
String newDynName, String newDynName,
AsmNewMethod newMethod) { AsmNewMethod newMethod) {
@@ -187,12 +167,12 @@ public class MessageAsmMethodBoost extends AsmMethodBoost {
pcw == null ? org.redkale.asm.Type.getDescriptor(serviceImplClass) : ("L" + newDynName + ";"); pcw == null ? org.redkale.asm.Type.getDescriptor(serviceImplClass) : ("L" + newDynName + ";");
final String innerClassName = "Dyn" + MessageConsumer.class.getSimpleName() + index.incrementAndGet(); final String innerClassName = "Dyn" + MessageConsumer.class.getSimpleName() + index.incrementAndGet();
final String innerFullName = newDynName + (pcw == null ? "" : "$") + innerClassName; final String innerFullName = newDynName + (pcw == null ? "" : "$") + innerClassName;
final String msgTypeName = final Class msgTypeClass = TypeToken.typeToClass(messageType);
TypeToken.primitiveToWrapper(msgType).getName().replace('.', '/'); final String msgTypeName = msgTypeClass.getName().replace('.', '/');
final String msgTypeDesc = org.redkale.asm.Type.getDescriptor(TypeToken.primitiveToWrapper(msgType)); final String msgTypeDesc = org.redkale.asm.Type.getDescriptor(msgTypeClass);
final String messageConsumerName = MessageConsumer.class.getName().replace('.', '/'); final String messageConsumerName = MessageConsumer.class.getName().replace('.', '/');
final String messageConsumerDesc = org.redkale.asm.Type.getDescriptor(MessageConsumer.class); final String messageConsumerDesc = org.redkale.asm.Type.getDescriptor(MessageConsumer.class);
final String messageConextDesc = org.redkale.asm.Type.getDescriptor(MessageConext.class); final String messageEventsDesc = org.redkale.asm.Type.getDescriptor(MessageEvent[].class);
final boolean throwFlag = final boolean throwFlag =
Utility.contains(method.getExceptionTypes(), e -> !RuntimeException.class.isAssignableFrom(e)); Utility.contains(method.getExceptionTypes(), e -> !RuntimeException.class.isAssignableFrom(e));
@@ -200,10 +180,13 @@ public class MessageAsmMethodBoost extends AsmMethodBoost {
methodBeans = AsmMethodBoost.getMethodBeans(serviceType); methodBeans = AsmMethodBoost.getMethodBeans(serviceType);
} }
AsmMethodBean methodBean = AsmMethodBean.get(methodBeans, method); AsmMethodBean methodBean = AsmMethodBean.get(methodBeans, method);
String methodSignature = null;
String genericMsgTypeDesc = msgTypeDesc; String genericMsgTypeDesc = msgTypeDesc;
if (!msgType.isPrimitive() && Utility.isNotEmpty(methodBean.getSignature())) { if (Utility.isNotEmpty(methodBean.getSignature())) {
String methodSignature = methodBean.getSignature().replace(messageConextDesc, ""); methodSignature = methodBean.getSignature();
genericMsgTypeDesc = methodSignature.substring(1, methodSignature.lastIndexOf(')')); // 获取()中的值 methodSignature = methodSignature.substring(0, methodSignature.lastIndexOf(')') + 1) + "V";
int start = methodSignature.indexOf('<') + 1;
genericMsgTypeDesc = methodSignature.substring(start, methodSignature.lastIndexOf('>')); // 获取<>中的值
} }
if (pcw != null) { // 不一定是关联类 if (pcw != null) { // 不一定是关联类
pcw.visitInnerClass(innerFullName, newDynName, innerClassName, ACC_PUBLIC + ACC_STATIC); pcw.visitInnerClass(innerFullName, newDynName, innerClassName, ACC_PUBLIC + ACC_STATIC);
@@ -262,10 +245,10 @@ public class MessageAsmMethodBoost extends AsmMethodBoost {
mv = cw.visitMethod( mv = cw.visitMethod(
ACC_PUBLIC, ACC_PUBLIC,
"onMessage", "onMessage",
"(" + messageConextDesc + msgTypeDesc + ")V", "(" + messageEventsDesc + ")V",
msgTypeDesc.equals(genericMsgTypeDesc) msgTypeDesc.equals(genericMsgTypeDesc)
? null ? null
: ("(" + messageConextDesc + genericMsgTypeDesc + ")V"), : ("(" + messageEventsDesc.replace(";", ("<" + genericMsgTypeDesc + ">;")) + ")V"),
null); null);
Label l0 = new Label(); Label l0 = new Label();
Label l1 = new Label(); Label l1 = new Label();
@@ -276,18 +259,7 @@ public class MessageAsmMethodBoost extends AsmMethodBoost {
} }
mv.visitVarInsn(ALOAD, 0); mv.visitVarInsn(ALOAD, 0);
mv.visitFieldInsn(GETFIELD, innerFullName, "service", newDynDesc); mv.visitFieldInsn(GETFIELD, innerFullName, "service", newDynDesc);
if (paramKind == 1) { // 1: 单个MessageType; mv.visitVarInsn(ALOAD, 1);
mv.visitVarInsn(ALOAD, 2);
Asms.visitPrimitiveVirtual(mv, msgType);
} else if (paramKind == 2) { // 2: MessageConext & MessageType;
mv.visitVarInsn(ALOAD, 1);
mv.visitVarInsn(ALOAD, 2);
Asms.visitPrimitiveVirtual(mv, msgType);
} else { // 3: MessageType & MessageConext;
mv.visitVarInsn(ALOAD, 2);
Asms.visitPrimitiveVirtual(mv, msgType);
mv.visitVarInsn(ALOAD, 1);
}
String methodDesc = org.redkale.asm.Type.getMethodDescriptor(method); String methodDesc = org.redkale.asm.Type.getMethodDescriptor(method);
String owner = pcw == null ? serviceImplClass.getName().replace('.', '/') : newDynName; String owner = pcw == null ? serviceImplClass.getName().replace('.', '/') : newDynName;
mv.visitMethodInsn(INVOKEVIRTUAL, owner, methodName, methodDesc, false); mv.visitMethodInsn(INVOKEVIRTUAL, owner, methodName, methodDesc, false);
@@ -317,40 +289,13 @@ public class MessageAsmMethodBoost extends AsmMethodBoost {
Label l5 = new Label(); Label l5 = new Label();
mv.visitLabel(l5); mv.visitLabel(l5);
mv.visitLocalVariable("this", "L" + innerFullName + ";", null, l0, l5, 0); mv.visitLocalVariable("this", "L" + innerFullName + ";", null, l0, l5, 0);
mv.visitLocalVariable("context", messageConextDesc, null, l0, l5, 1); mv.visitLocalVariable("events", messageEventsDesc, null, l0, l5, 1);
mv.visitLocalVariable(
"message",
msgTypeDesc,
msgTypeDesc.equals(genericMsgTypeDesc) ? null : genericMsgTypeDesc,
l0,
l5,
2);
if (throwFlag) { if (throwFlag) {
mv.visitLocalVariable("e", "Ljava/lang/Throwable;", null, l4, l3, 3); mv.visitLocalVariable("e", "Ljava/lang/Throwable;", null, l4, l3, 3);
} }
mv.visitMaxs(4, 4); mv.visitMaxs(4, 4);
mv.visitEnd(); mv.visitEnd();
} }
{
mv = cw.visitMethod(
ACC_PUBLIC + ACC_BRIDGE + ACC_SYNTHETIC,
"onMessage",
"(" + messageConextDesc + "Ljava/lang/Object;)V",
null,
null);
mv.visitCode();
Label l0 = new Label();
mv.visitLabel(l0);
mv.visitVarInsn(ALOAD, 0);
mv.visitVarInsn(ALOAD, 1);
mv.visitVarInsn(ALOAD, 2);
mv.visitTypeInsn(CHECKCAST, msgTypeName);
mv.visitMethodInsn(
INVOKEVIRTUAL, innerFullName, "onMessage", "(" + messageConextDesc + msgTypeDesc + ")V", false);
mv.visitInsn(RETURN);
mv.visitMaxs(3, 3);
mv.visitEnd();
}
cw.visitEnd(); cw.visitEnd();
byte[] bytes = cw.toByteArray(); byte[] bytes = cw.toByteArray();

View File

@@ -27,13 +27,15 @@ import org.redkale.boot.Application;
import org.redkale.boot.ClassFilter; import org.redkale.boot.ClassFilter;
import org.redkale.boot.ModuleEngine; import org.redkale.boot.ModuleEngine;
import org.redkale.boot.NodeServer; import org.redkale.boot.NodeServer;
import org.redkale.convert.Convert;
import org.redkale.convert.ConvertFactory;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import org.redkale.inject.ResourceAnnotationLoader; import org.redkale.inject.ResourceAnnotationLoader;
import org.redkale.inject.ResourceEvent; import org.redkale.inject.ResourceEvent;
import org.redkale.inject.ResourceFactory; import org.redkale.inject.ResourceFactory;
import org.redkale.inject.ResourceTypeLoader; import org.redkale.inject.ResourceTypeLoader;
import org.redkale.mq.MessageConext;
import org.redkale.mq.MessageConsumer; import org.redkale.mq.MessageConsumer;
import org.redkale.mq.MessageEvent;
import org.redkale.mq.MessageManager; import org.redkale.mq.MessageManager;
import org.redkale.mq.MessageProducer; import org.redkale.mq.MessageProducer;
import org.redkale.mq.Messaged; import org.redkale.mq.Messaged;
@@ -47,7 +49,6 @@ import org.redkale.util.AnyValueWriter;
import org.redkale.util.RedkaleClassLoader; import org.redkale.util.RedkaleClassLoader;
import org.redkale.util.RedkaleClassLoader.DynBytesClassLoader; import org.redkale.util.RedkaleClassLoader.DynBytesClassLoader;
import org.redkale.util.RedkaleException; import org.redkale.util.RedkaleException;
import org.redkale.util.TypeToken;
import org.redkale.util.Utility; import org.redkale.util.Utility;
/** @author zhangjx */ /** @author zhangjx */
@@ -430,37 +431,19 @@ public class MessageModuleEngine extends ModuleEngine {
throw new RedkaleException("@" + Messaged.class.getSimpleName() + " must on public method in @" throw new RedkaleException("@" + Messaged.class.getSimpleName() + " must on public method in @"
+ Component.class.getSimpleName() + " class, but on " + method); + Component.class.getSimpleName() + " class, but on " + method);
} }
int paramCount = method.getParameterCount(); if (method.getParameterCount() != 1 || method.getParameterTypes()[0] != MessageEvent[].class) {
if (paramCount != 1 && paramCount != 2) {
throw new RedkaleException("@" + Messaged.class.getSimpleName() throw new RedkaleException("@" + Messaged.class.getSimpleName()
+ " must on one or two parameter method, but on " + method); + " must on one parameter(type: MessageEvent[]) method, but on " + method);
}
int paramKind = 1; // 1:单个MessageType; 2: MessageConext & MessageType; 3: MessageType & MessageConext;
Type messageType;
Type[] paramTypes = method.getGenericParameterTypes();
if (paramCount == 1) {
messageType = paramTypes[0];
paramKind = 1;
} else {
if (paramTypes[0] == MessageConext.class) {
messageType = paramTypes[1];
paramKind = 2;
} else if (paramTypes[1] == MessageConext.class) {
messageType = paramTypes[0];
paramKind = 3;
} else {
throw new RedkaleException(
"@" + Messaged.class.getSimpleName() + " on two-parameter method must contains "
+ MessageConext.class.getSimpleName() + " parameter type, but on " + method);
}
} }
Type messageType = MessageAsmMethodBoost.getMethodMessageType(method);
Convert convert = ConvertFactory.findConvert(messaged.convertType());
convert.getFactory().loadDecoder(messageType);
if (boost == null) { if (boost == null) {
boost = new MessageAsmMethodBoost(false, service.getClass(), this); boost = new MessageAsmMethodBoost(false, service.getClass(), this);
String newDynName = "org/redkaledyn/service/local/_DynMessageService__" String newDynName = "org/redkaledyn/service/local/_DynMessageService__"
+ service.getClass().getName().replace('.', '_').replace('$', '_'); + service.getClass().getName().replace('.', '_').replace('$', '_');
Class msgType = TypeToken.typeToClass(messageType); boost.createInnerConsumer(null, service.getClass(), method, messageType, messaged, newDynName, null);
boost.createInnerConsumer(
null, service.getClass(), method, paramKind, msgType, messaged, newDynName, null);
} }
} }
if (boost != null && Utility.isNotEmpty(boost.consumerBytes)) { if (boost != null && Utility.isNotEmpty(boost.consumerBytes)) {

View File

@@ -14,7 +14,7 @@ import org.redkale.net.client.ClientConnection;
* @see org.redkale.net.http.WebCodec * @see org.redkale.net.http.WebCodec
* @see org.redkale.net.http.WebRequest * @see org.redkale.net.http.WebRequest
* @see org.redkale.net.http.WebResult * @see org.redkale.net.http.WebResult
* *
* @author zhangjx * @author zhangjx
* @since 2.8.0 * @since 2.8.0
*/ */

View File

@@ -4,8 +4,8 @@
package org.redkale.test.mq; package org.redkale.test.mq;
import org.redkale.mq.MessageConext;
import org.redkale.mq.MessageConsumer; import org.redkale.mq.MessageConsumer;
import org.redkale.mq.MessageEvent;
import org.redkale.mq.ResourceConsumer; import org.redkale.mq.ResourceConsumer;
import org.redkale.util.AnyValue; import org.redkale.util.AnyValue;
@@ -22,8 +22,10 @@ public class TestMessageConsumer implements MessageConsumer<TestBean> {
} }
@Override @Override
public void onMessage(MessageConext context, TestBean message) { public void onMessage(MessageEvent<TestBean>[] events) {
System.out.println("TestMessageConsumer消费消息, context: " + context + ", message: " + message); for (MessageEvent<TestBean> event : events) {
System.out.println("TestMessageConsumer消费消息, message: " + event.getMessage());
}
} }
@Override @Override

View File

@@ -5,6 +5,7 @@
package org.redkale.test.mq; package org.redkale.test.mq;
import org.redkale.annotation.Component; import org.redkale.annotation.Component;
import org.redkale.mq.MessageEvent;
import org.redkale.mq.Messaged; import org.redkale.mq.Messaged;
import org.redkale.service.AbstractService; import org.redkale.service.AbstractService;
@@ -16,8 +17,10 @@ import org.redkale.service.AbstractService;
public class TestMessageFacade extends AbstractService { public class TestMessageFacade extends AbstractService {
@Messaged(mq = "mymq", topics = "test_bean_topic", group = "group_5") @Messaged(mq = "mymq", topics = "test_bean_topic", group = "group_5")
public int runMessage5(TestBean message) { public int runMessage5(MessageEvent<TestBean>[] events) {
System.out.println("TestMessageFacde 消费消息5, message: " + message); for (MessageEvent<TestBean> event : events) {
System.out.println("TestMessageFacde 消费消息5, message: " + event.getMessage());
}
return 0; return 0;
} }
} }

View File

@@ -4,7 +4,7 @@
package org.redkale.test.mq; package org.redkale.test.mq;
import org.redkale.mq.MessageConext; import org.redkale.mq.MessageEvent;
import org.redkale.mq.MessageProducer; import org.redkale.mq.MessageProducer;
import org.redkale.mq.Messaged; import org.redkale.mq.Messaged;
import org.redkale.mq.ResourceProducer; import org.redkale.mq.ResourceProducer;
@@ -37,18 +37,24 @@ public class TestMessageService extends AbstractService {
} }
@Messaged(mq = "mymq", topics = "test_bean_topic", group = "group_2") @Messaged(mq = "mymq", topics = "test_bean_topic", group = "group_2")
protected void runMessage2(MessageConext context, TestBean message) { protected void runMessage2(MessageEvent<TestBean>[] events) {
System.out.println("TestMessageService 消费消息2, context: " + context + ", message: " + message); for (MessageEvent<TestBean> event : events) {
System.out.println("TestMessageService 消费消息2, message: " + event.getMessage());
}
} }
@Messaged(mq = "mymq", topics = "test_bean_topic", group = "group_3") @Messaged(mq = "mymq", topics = "test_bean_topic", group = "group_3")
protected void runMessage3(TestBean message) { protected void runMessage3(MessageEvent<TestBean>[] events) {
System.out.println("TestMessageService 消费消息3, message: " + message); for (MessageEvent<TestBean> event : events) {
System.out.println("TestMessageService 消费消息3, message: " + event.getMessage());
}
} }
@Messaged(mq = "mymq", topics = "test_bean_topic", group = "group_4") @Messaged(mq = "mymq", topics = "test_bean_topic", group = "group_4")
protected int runMessage4(TestBean message) { protected int runMessage4(MessageEvent<TestBean>[] events) {
System.out.println("TestMessageService 消费消息4, message: " + message); for (MessageEvent<TestBean> event : events) {
System.out.println("TestMessageService 消费消息4, message: " + event.getMessage());
}
return 0; return 0;
} }
} }

View File

@@ -6,8 +6,8 @@ package org.redkale.test.mq;
import org.redkale.annotation.AutoLoad; import org.redkale.annotation.AutoLoad;
import org.redkale.convert.ConvertType; import org.redkale.convert.ConvertType;
import org.redkale.mq.MessageConext;
import org.redkale.mq.MessageConsumer; import org.redkale.mq.MessageConsumer;
import org.redkale.mq.MessageEvent;
import org.redkale.mq.ResourceConsumer; import org.redkale.mq.ResourceConsumer;
@AutoLoad(false) @AutoLoad(false)
@@ -31,8 +31,8 @@ public class _DynLocalTestMessageService extends TestMessageService {
this.service = service; this.service = service;
} }
public void onMessage(MessageConext context, TestBean message) { public void onMessage(MessageEvent<TestBean>[] events) {
this.service.runMessage4(message); this.service.runMessage4(events);
} }
} }
@@ -50,8 +50,8 @@ public class _DynLocalTestMessageService extends TestMessageService {
this.service = service; this.service = service;
} }
public void onMessage(MessageConext context, TestBean message) { public void onMessage(MessageEvent<TestBean>[] events) {
this.service.runMessage3(message); this.service.runMessage3(events);
} }
} }
@@ -69,8 +69,8 @@ public class _DynLocalTestMessageService extends TestMessageService {
this.service = service; this.service = service;
} }
public void onMessage(MessageConext context, TestBean message) { public void onMessage(MessageEvent<TestBean>[] events) {
this.service.runMessage2(context, message); this.service.runMessage2(events);
} }
} }
} }