diff --git a/src/main/java/org/redkale/mq/MessageConext.java b/src/main/java/org/redkale/mq/MessageConext.java deleted file mode 100644 index f3f8c9890..000000000 --- a/src/main/java/org/redkale/mq/MessageConext.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * - */ -package org.redkale.mq; - -import java.util.Objects; -import org.redkale.convert.ConvertColumn; -import org.redkale.convert.json.JsonConvert; - -/** - * MessageConsumer回调的上下文 - * - *

详情见: https://redkale.org - * - * @see org.redkale.mq.MessageConsumer - * @author zhangjx - * @since 2.8.0 - */ -public class MessageConext { - - @ConvertColumn(index = 1) - protected String topic; - - @ConvertColumn(index = 2) - protected Integer partition; - - public MessageConext(String topic, Integer partition) { - this.topic = topic; - this.partition = partition; - } - - public String getTopic() { - return topic; - } - - public Integer getPartition() { - return partition; - } - - @Override - public int hashCode() { - return Objects.hash(this.topic, this.partition); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null || getClass() != obj.getClass()) { - return false; - } - final MessageConext other = (MessageConext) obj; - return Objects.equals(this.topic, other.topic) && Objects.equals(this.partition, other.partition); - } - - @Override - public String toString() { - return JsonConvert.root().convertTo(this); - } -} diff --git a/src/main/java/org/redkale/mq/MessageConsumer.java b/src/main/java/org/redkale/mq/MessageConsumer.java index 2968dea01..fe50834de 100644 --- a/src/main/java/org/redkale/mq/MessageConsumer.java +++ b/src/main/java/org/redkale/mq/MessageConsumer.java @@ -22,8 +22,10 @@ import org.redkale.util.AnyValue; * } * * @Override - * public void onMessage(MessageConext context, TestBean message) { - * System.out.println("TestMessageConsumer消费消息, context: " + context + ", message: " + message); + * public void onMessage(MessageEvent<TestBean>[] events) { + * for(MessageEvent<TestBean> event : events) { + * System.out.println("TestMessageConsumer消费消息, message: " + event.getMessage()); + * } * } * * @Override @@ -36,7 +38,7 @@ import org.redkale.util.AnyValue; * *

详情见: https://redkale.org * - * @see org.redkale.mq.MessageConext + * @see org.redkale.mq.MessageEvent * @see org.redkale.mq.ResourceConsumer * @see org.redkale.mq.Messaged * @author zhangjx @@ -50,7 +52,7 @@ public interface MessageConsumer { default void init(AnyValue config) {} - public void onMessage(MessageConext context, T message); + public void onMessage(MessageEvent[] events); default void destroy(AnyValue config) {} } diff --git a/src/main/java/org/redkale/mq/MessageEvent.java b/src/main/java/org/redkale/mq/MessageEvent.java new file mode 100644 index 000000000..0b3d51239 --- /dev/null +++ b/src/main/java/org/redkale/mq/MessageEvent.java @@ -0,0 +1,81 @@ +/* + +*/ + +package org.redkale.mq; + +import org.redkale.convert.ConvertColumn; +import org.redkale.convert.json.JsonConvert; + +/** + * MessageConsumer的消息实体类 + * + *

详情见: https://redkale.org + * + * @see org.redkale.mq.MessageConsumer + * @author zhangjx + * @since 2.8.0 + * @param T + */ +public final class MessageEvent { + + @ConvertColumn(index = 1) + protected String topic; + + @ConvertColumn(index = 2) + protected Integer partition; + + @ConvertColumn(index = 3) + protected String traceid; + + @ConvertColumn(index = 4) + protected T message; + + public MessageEvent() { + // + } + + public MessageEvent(String topic, Integer partition, String traceid, T message) { + this.topic = topic; + this.partition = partition; + this.traceid = traceid; + this.message = message; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public Integer getPartition() { + return partition; + } + + public void setPartition(Integer partition) { + this.partition = partition; + } + + public String getTraceid() { + return traceid; + } + + public void setTraceid(String traceid) { + this.traceid = traceid; + } + + public T getMessage() { + return message; + } + + public void setMessage(T message) { + this.message = message; + } + + @Override + public String toString() { + return JsonConvert.root().convertTo(this); + } +} diff --git a/src/main/java/org/redkale/mq/Messaged.java b/src/main/java/org/redkale/mq/Messaged.java index 63a013415..195987684 100644 --- a/src/main/java/org/redkale/mq/Messaged.java +++ b/src/main/java/org/redkale/mq/Messaged.java @@ -15,25 +15,25 @@ import org.redkale.service.LoadMode; * MQ资源注解, 只能标记在Service类方法上, 方法会被框架动态生成{@link org.redkale.mq.MessageConsumer}对象供内部调用
* 1、方法必须是protected/public
* 2、方法不能是final/static
- * 3、方法的参数只能是1个或者2个, 1个参数视为Message数据类型,2个参数则另一个必须是{@link org.redkale.mq.MessageConext}
+ * 3、方法的参数只能是1个且为MessageEvent[]
* *

*
  * public class MyMessageService extends AbstractService {
  *
  *    @Messaged(mq="defaultmq", topics={"test-topic"})
- *    protected void onMessage(Record msg) {
- *        //打印msg
+ *    protected void onMessage(MessageEvent<Record>[] events) {
+ *        //打印events
  *    }
  *
  *    @Messaged(topics={"test-topic2"})
- *    protected void onMessage2(MessageConext context, Record msg) {
- *        //打印msg
+ *    protected void onMessage2(MessageEvent<Record>[] events) {
+ *        //打印events
  *    }
  *
  *    @Messaged(topics={"test-topic3"})
- *    public void onMessage3(Record msg, MessageConext context) {
- *        //打印msg
+ *    public void onMessage3(MessageEvent<Record>[] events) {
+ *        //打印events
  *    }
  * }
  * 
@@ -42,7 +42,7 @@ import org.redkale.service.LoadMode; *

详情见: https://redkale.org * * @see org.redkale.mq.ResourceConsumer - * @see org.redkale.mq.MessageConext + * @see org.redkale.mq.MessageEvent * @author zhangjx * @since 2.8.0 */ diff --git a/src/main/java/org/redkale/mq/ResourceConsumer.java b/src/main/java/org/redkale/mq/ResourceConsumer.java index 06c9d0648..af71ad9f8 100644 --- a/src/main/java/org/redkale/mq/ResourceConsumer.java +++ b/src/main/java/org/redkale/mq/ResourceConsumer.java @@ -23,8 +23,10 @@ import org.redkale.convert.ConvertType; * } * * @Override - * public void onMessage(MessageConext context, TestBean message) { - * System.out.println("TestMessageConsumer消费消息, context: " + context + ", message: " + message); + * public void onMessage(MessageEvent<TestBean>[] events) { + * for(MessageEvent<TestBean> event : events) { + * System.out.println("TestMessageConsumer消费消息, message: " + event.getMessage()); + * } * } * * @Override diff --git a/src/main/java/org/redkale/mq/spi/MessageAgent.java b/src/main/java/org/redkale/mq/spi/MessageAgent.java index 5341ed696..6726b2f0d 100644 --- a/src/main/java/org/redkale/mq/spi/MessageAgent.java +++ b/src/main/java/org/redkale/mq/spi/MessageAgent.java @@ -24,8 +24,8 @@ import org.redkale.convert.ConvertFactory; import org.redkale.convert.ConvertType; import org.redkale.convert.json.JsonConvert; import org.redkale.inject.ResourceEvent; -import org.redkale.mq.MessageConext; import org.redkale.mq.MessageConsumer; +import org.redkale.mq.MessageEvent; import org.redkale.mq.MessageManager; import org.redkale.mq.MessageProducer; import org.redkale.mq.ResourceConsumer; @@ -227,10 +227,6 @@ public abstract class MessageAgent implements MessageManager { } } - public MessageConext createMessageConext(String topic, Integer partition) { - return new MessageConext(topic, partition); - } - public MessageProducer loadMessageProducer(ResourceProducer ann) { MessageProducer baseProducer = this.messageBaseProducer; if (this.messageBaseProducer == null) { @@ -548,17 +544,21 @@ public abstract class MessageAgent implements MessageManager { consumer.init(config); } - public Future onMessage(MessageConext context, String traceid, byte[] message) { + public Future onMessage(List> events) { Convert c = this.convert; MessageConsumer m = this.consumer; return messageAgent.submit(() -> { - Traces.computeIfAbsent(traceid); - T msg = null; + if (events.size() == 1) { + Traces.computeIfAbsent(events.get(0).getTraceid()); + } try { - msg = (T) c.convertFrom(messageType, message); - m.onMessage(context, msg); + for (MessageEvent event : events) { + event.setMessage((T) c.convertFrom(messageType, (byte[]) event.getMessage())); + } + m.onMessage(events.toArray(new MessageEvent[events.size()])); } catch (Throwable t) { - messageAgent.getLogger().log(Level.SEVERE, "MessageConsumer.onMessage error, message: " + msg, t); + String msg = JsonConvert.root().convertTo(events); + messageAgent.getLogger().log(Level.SEVERE, "MessageConsumer.onMessage error, events: " + msg, t); } Traces.removeTraceid(); }); diff --git a/src/main/java/org/redkale/mq/spi/MessageAsmMethodBoost.java b/src/main/java/org/redkale/mq/spi/MessageAsmMethodBoost.java index d5ce4ed04..30b464d8a 100644 --- a/src/main/java/org/redkale/mq/spi/MessageAsmMethodBoost.java +++ b/src/main/java/org/redkale/mq/spi/MessageAsmMethodBoost.java @@ -4,8 +4,10 @@ package org.redkale.mq.spi; import java.lang.annotation.Annotation; +import java.lang.reflect.GenericArrayType; import java.lang.reflect.Method; import java.lang.reflect.Modifier; +import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.util.LinkedHashMap; import java.util.List; @@ -23,16 +25,13 @@ import org.redkale.asm.FieldVisitor; import org.redkale.asm.Label; import org.redkale.asm.MethodVisitor; import org.redkale.asm.Opcodes; -import static org.redkale.asm.Opcodes.ACC_BRIDGE; import static org.redkale.asm.Opcodes.ACC_PRIVATE; import static org.redkale.asm.Opcodes.ACC_PUBLIC; import static org.redkale.asm.Opcodes.ACC_STATIC; import static org.redkale.asm.Opcodes.ACC_SUPER; -import static org.redkale.asm.Opcodes.ACC_SYNTHETIC; import static org.redkale.asm.Opcodes.ALOAD; import static org.redkale.asm.Opcodes.ASTORE; import static org.redkale.asm.Opcodes.ATHROW; -import static org.redkale.asm.Opcodes.CHECKCAST; import static org.redkale.asm.Opcodes.DUP; import static org.redkale.asm.Opcodes.GETFIELD; import static org.redkale.asm.Opcodes.GOTO; @@ -46,8 +45,8 @@ import static org.redkale.asm.Opcodes.V11; import org.redkale.convert.Convert; import org.redkale.convert.ConvertFactory; import org.redkale.inject.ResourceFactory; -import org.redkale.mq.MessageConext; import org.redkale.mq.MessageConsumer; +import org.redkale.mq.MessageEvent; import org.redkale.mq.Messaged; import org.redkale.mq.ResourceConsumer; import org.redkale.mq.spi.DynForMessaged.DynForMessageds; @@ -113,45 +112,17 @@ public class MessageAsmMethodBoost extends AsmMethodBoost { throw new RedkaleException( "@" + Messaged.class.getSimpleName() + " must on protected or public method, but on " + method); } - - int paramCount = method.getParameterCount(); - if (paramCount != 1 && paramCount != 2) { - throw new RedkaleException( - "@" + Messaged.class.getSimpleName() + " must on one or two parameter method, but on " + method); - } - int paramKind = 1; // 1:单个MessageType; 2: MessageConext & MessageType; 3: MessageType & MessageConext; - Type messageType; - Type[] paramTypes = method.getGenericParameterTypes(); - if (paramCount == 1) { - messageType = paramTypes[0]; - paramKind = 1; - } else { - if (paramTypes[0] == MessageConext.class) { - messageType = paramTypes[1]; - paramKind = 2; - } else if (paramTypes[1] == MessageConext.class) { - messageType = paramTypes[0]; - paramKind = 3; - } else { - throw new RedkaleException( - "@" + Messaged.class.getSimpleName() + " on two-parameter method must contains " - + MessageConext.class.getSimpleName() + " parameter type, but on " + method); - } + if (method.getParameterCount() != 1 || method.getParameterTypes()[0] != MessageEvent[].class) { + throw new RedkaleException("@" + Messaged.class.getSimpleName() + + " must on one parameter(type: MessageEvent[]) method, but on " + method); } + Type messageType = getMethodMessageType(method); Convert convert = ConvertFactory.findConvert(messaged.convertType()); convert.getFactory().loadDecoder(messageType); if (Modifier.isProtected(method.getModifiers())) { createMessageMethod(cw, method, serviceImplClass, filterAnns, newMethod); } - createInnerConsumer( - cw, - serviceImplClass, - method, - paramKind, - TypeToken.typeToClass(messageType), - messaged, - newDynName, - newMethod); + createInnerConsumer(cw, serviceImplClass, method, messageType, messaged, newDynName, newMethod); return newMethod; } @@ -173,13 +144,22 @@ public class MessageAsmMethodBoost extends AsmMethodBoost { mv.visitEnd(); } - // paramKind: 1:单个MessageType; 2: MessageConext & MessageType; 3: MessageType & MessageConext; + protected static Type getMethodMessageType(Method method) { + Type paramType = method.getGenericParameterTypes()[0]; + if (!(paramType instanceof GenericArrayType)) { + throw new RedkaleException("@" + Messaged.class.getSimpleName() + + " must on one generic type parameter method, but on " + method); + } + GenericArrayType arrayType = (GenericArrayType) paramType; + Type omponentType = arrayType.getGenericComponentType(); + return ((ParameterizedType) omponentType).getActualTypeArguments()[0]; + } + protected void createInnerConsumer( ClassWriter pcw, Class serviceImplClass, Method method, - int paramKind, - Class msgType, + Type messageType, Messaged messaged, String newDynName, AsmNewMethod newMethod) { @@ -187,12 +167,12 @@ public class MessageAsmMethodBoost extends AsmMethodBoost { pcw == null ? org.redkale.asm.Type.getDescriptor(serviceImplClass) : ("L" + newDynName + ";"); final String innerClassName = "Dyn" + MessageConsumer.class.getSimpleName() + index.incrementAndGet(); final String innerFullName = newDynName + (pcw == null ? "" : "$") + innerClassName; - final String msgTypeName = - TypeToken.primitiveToWrapper(msgType).getName().replace('.', '/'); - final String msgTypeDesc = org.redkale.asm.Type.getDescriptor(TypeToken.primitiveToWrapper(msgType)); + final Class msgTypeClass = TypeToken.typeToClass(messageType); + final String msgTypeName = msgTypeClass.getName().replace('.', '/'); + final String msgTypeDesc = org.redkale.asm.Type.getDescriptor(msgTypeClass); final String messageConsumerName = MessageConsumer.class.getName().replace('.', '/'); final String messageConsumerDesc = org.redkale.asm.Type.getDescriptor(MessageConsumer.class); - final String messageConextDesc = org.redkale.asm.Type.getDescriptor(MessageConext.class); + final String messageEventsDesc = org.redkale.asm.Type.getDescriptor(MessageEvent[].class); final boolean throwFlag = Utility.contains(method.getExceptionTypes(), e -> !RuntimeException.class.isAssignableFrom(e)); @@ -200,10 +180,13 @@ public class MessageAsmMethodBoost extends AsmMethodBoost { methodBeans = AsmMethodBoost.getMethodBeans(serviceType); } AsmMethodBean methodBean = AsmMethodBean.get(methodBeans, method); + String methodSignature = null; String genericMsgTypeDesc = msgTypeDesc; - if (!msgType.isPrimitive() && Utility.isNotEmpty(methodBean.getSignature())) { - String methodSignature = methodBean.getSignature().replace(messageConextDesc, ""); - genericMsgTypeDesc = methodSignature.substring(1, methodSignature.lastIndexOf(')')); // 获取()中的值 + if (Utility.isNotEmpty(methodBean.getSignature())) { + methodSignature = methodBean.getSignature(); + methodSignature = methodSignature.substring(0, methodSignature.lastIndexOf(')') + 1) + "V"; + int start = methodSignature.indexOf('<') + 1; + genericMsgTypeDesc = methodSignature.substring(start, methodSignature.lastIndexOf('>')); // 获取<>中的值 } if (pcw != null) { // 不一定是关联类 pcw.visitInnerClass(innerFullName, newDynName, innerClassName, ACC_PUBLIC + ACC_STATIC); @@ -262,10 +245,10 @@ public class MessageAsmMethodBoost extends AsmMethodBoost { mv = cw.visitMethod( ACC_PUBLIC, "onMessage", - "(" + messageConextDesc + msgTypeDesc + ")V", + "(" + messageEventsDesc + ")V", msgTypeDesc.equals(genericMsgTypeDesc) ? null - : ("(" + messageConextDesc + genericMsgTypeDesc + ")V"), + : ("(" + messageEventsDesc.replace(";", ("<" + genericMsgTypeDesc + ">;")) + ")V"), null); Label l0 = new Label(); Label l1 = new Label(); @@ -276,18 +259,7 @@ public class MessageAsmMethodBoost extends AsmMethodBoost { } mv.visitVarInsn(ALOAD, 0); mv.visitFieldInsn(GETFIELD, innerFullName, "service", newDynDesc); - if (paramKind == 1) { // 1: 单个MessageType; - mv.visitVarInsn(ALOAD, 2); - Asms.visitPrimitiveVirtual(mv, msgType); - } else if (paramKind == 2) { // 2: MessageConext & MessageType; - mv.visitVarInsn(ALOAD, 1); - mv.visitVarInsn(ALOAD, 2); - Asms.visitPrimitiveVirtual(mv, msgType); - } else { // 3: MessageType & MessageConext; - mv.visitVarInsn(ALOAD, 2); - Asms.visitPrimitiveVirtual(mv, msgType); - mv.visitVarInsn(ALOAD, 1); - } + mv.visitVarInsn(ALOAD, 1); String methodDesc = org.redkale.asm.Type.getMethodDescriptor(method); String owner = pcw == null ? serviceImplClass.getName().replace('.', '/') : newDynName; mv.visitMethodInsn(INVOKEVIRTUAL, owner, methodName, methodDesc, false); @@ -317,40 +289,13 @@ public class MessageAsmMethodBoost extends AsmMethodBoost { Label l5 = new Label(); mv.visitLabel(l5); mv.visitLocalVariable("this", "L" + innerFullName + ";", null, l0, l5, 0); - mv.visitLocalVariable("context", messageConextDesc, null, l0, l5, 1); - mv.visitLocalVariable( - "message", - msgTypeDesc, - msgTypeDesc.equals(genericMsgTypeDesc) ? null : genericMsgTypeDesc, - l0, - l5, - 2); + mv.visitLocalVariable("events", messageEventsDesc, null, l0, l5, 1); if (throwFlag) { mv.visitLocalVariable("e", "Ljava/lang/Throwable;", null, l4, l3, 3); } mv.visitMaxs(4, 4); mv.visitEnd(); } - { - mv = cw.visitMethod( - ACC_PUBLIC + ACC_BRIDGE + ACC_SYNTHETIC, - "onMessage", - "(" + messageConextDesc + "Ljava/lang/Object;)V", - null, - null); - mv.visitCode(); - Label l0 = new Label(); - mv.visitLabel(l0); - mv.visitVarInsn(ALOAD, 0); - mv.visitVarInsn(ALOAD, 1); - mv.visitVarInsn(ALOAD, 2); - mv.visitTypeInsn(CHECKCAST, msgTypeName); - mv.visitMethodInsn( - INVOKEVIRTUAL, innerFullName, "onMessage", "(" + messageConextDesc + msgTypeDesc + ")V", false); - mv.visitInsn(RETURN); - mv.visitMaxs(3, 3); - mv.visitEnd(); - } cw.visitEnd(); byte[] bytes = cw.toByteArray(); diff --git a/src/main/java/org/redkale/mq/spi/MessageModuleEngine.java b/src/main/java/org/redkale/mq/spi/MessageModuleEngine.java index 360cf5a73..081ca2243 100644 --- a/src/main/java/org/redkale/mq/spi/MessageModuleEngine.java +++ b/src/main/java/org/redkale/mq/spi/MessageModuleEngine.java @@ -27,13 +27,15 @@ import org.redkale.boot.Application; import org.redkale.boot.ClassFilter; import org.redkale.boot.ModuleEngine; import org.redkale.boot.NodeServer; +import org.redkale.convert.Convert; +import org.redkale.convert.ConvertFactory; import org.redkale.convert.json.JsonConvert; import org.redkale.inject.ResourceAnnotationLoader; import org.redkale.inject.ResourceEvent; import org.redkale.inject.ResourceFactory; import org.redkale.inject.ResourceTypeLoader; -import org.redkale.mq.MessageConext; import org.redkale.mq.MessageConsumer; +import org.redkale.mq.MessageEvent; import org.redkale.mq.MessageManager; import org.redkale.mq.MessageProducer; import org.redkale.mq.Messaged; @@ -47,7 +49,6 @@ import org.redkale.util.AnyValueWriter; import org.redkale.util.RedkaleClassLoader; import org.redkale.util.RedkaleClassLoader.DynBytesClassLoader; import org.redkale.util.RedkaleException; -import org.redkale.util.TypeToken; import org.redkale.util.Utility; /** @author zhangjx */ @@ -430,37 +431,19 @@ public class MessageModuleEngine extends ModuleEngine { throw new RedkaleException("@" + Messaged.class.getSimpleName() + " must on public method in @" + Component.class.getSimpleName() + " class, but on " + method); } - int paramCount = method.getParameterCount(); - if (paramCount != 1 && paramCount != 2) { + if (method.getParameterCount() != 1 || method.getParameterTypes()[0] != MessageEvent[].class) { throw new RedkaleException("@" + Messaged.class.getSimpleName() - + " must on one or two parameter method, but on " + method); - } - int paramKind = 1; // 1:单个MessageType; 2: MessageConext & MessageType; 3: MessageType & MessageConext; - Type messageType; - Type[] paramTypes = method.getGenericParameterTypes(); - if (paramCount == 1) { - messageType = paramTypes[0]; - paramKind = 1; - } else { - if (paramTypes[0] == MessageConext.class) { - messageType = paramTypes[1]; - paramKind = 2; - } else if (paramTypes[1] == MessageConext.class) { - messageType = paramTypes[0]; - paramKind = 3; - } else { - throw new RedkaleException( - "@" + Messaged.class.getSimpleName() + " on two-parameter method must contains " - + MessageConext.class.getSimpleName() + " parameter type, but on " + method); - } + + " must on one parameter(type: MessageEvent[]) method, but on " + method); } + + Type messageType = MessageAsmMethodBoost.getMethodMessageType(method); + Convert convert = ConvertFactory.findConvert(messaged.convertType()); + convert.getFactory().loadDecoder(messageType); if (boost == null) { boost = new MessageAsmMethodBoost(false, service.getClass(), this); String newDynName = "org/redkaledyn/service/local/_DynMessageService__" + service.getClass().getName().replace('.', '_').replace('$', '_'); - Class msgType = TypeToken.typeToClass(messageType); - boost.createInnerConsumer( - null, service.getClass(), method, paramKind, msgType, messaged, newDynName, null); + boost.createInnerConsumer(null, service.getClass(), method, messageType, messaged, newDynName, null); } } if (boost != null && Utility.isNotEmpty(boost.consumerBytes)) { diff --git a/src/main/java/org/redkale/net/http/WebConnection.java b/src/main/java/org/redkale/net/http/WebConnection.java index 8950434a9..88e46d0e5 100644 --- a/src/main/java/org/redkale/net/http/WebConnection.java +++ b/src/main/java/org/redkale/net/http/WebConnection.java @@ -14,7 +14,7 @@ import org.redkale.net.client.ClientConnection; * @see org.redkale.net.http.WebCodec * @see org.redkale.net.http.WebRequest * @see org.redkale.net.http.WebResult - * + * * @author zhangjx * @since 2.8.0 */ diff --git a/src/test/java/org/redkale/test/mq/TestMessageConsumer.java b/src/test/java/org/redkale/test/mq/TestMessageConsumer.java index aafc355d5..7ca19bc76 100644 --- a/src/test/java/org/redkale/test/mq/TestMessageConsumer.java +++ b/src/test/java/org/redkale/test/mq/TestMessageConsumer.java @@ -4,8 +4,8 @@ package org.redkale.test.mq; -import org.redkale.mq.MessageConext; import org.redkale.mq.MessageConsumer; +import org.redkale.mq.MessageEvent; import org.redkale.mq.ResourceConsumer; import org.redkale.util.AnyValue; @@ -22,8 +22,10 @@ public class TestMessageConsumer implements MessageConsumer { } @Override - public void onMessage(MessageConext context, TestBean message) { - System.out.println("TestMessageConsumer消费消息, context: " + context + ", message: " + message); + public void onMessage(MessageEvent[] events) { + for (MessageEvent event : events) { + System.out.println("TestMessageConsumer消费消息, message: " + event.getMessage()); + } } @Override diff --git a/src/test/java/org/redkale/test/mq/TestMessageFacade.java b/src/test/java/org/redkale/test/mq/TestMessageFacade.java index 42ffc83ba..5742426ca 100644 --- a/src/test/java/org/redkale/test/mq/TestMessageFacade.java +++ b/src/test/java/org/redkale/test/mq/TestMessageFacade.java @@ -5,6 +5,7 @@ package org.redkale.test.mq; import org.redkale.annotation.Component; +import org.redkale.mq.MessageEvent; import org.redkale.mq.Messaged; import org.redkale.service.AbstractService; @@ -16,8 +17,10 @@ import org.redkale.service.AbstractService; public class TestMessageFacade extends AbstractService { @Messaged(mq = "mymq", topics = "test_bean_topic", group = "group_5") - public int runMessage5(TestBean message) { - System.out.println("TestMessageFacde 消费消息5, message: " + message); + public int runMessage5(MessageEvent[] events) { + for (MessageEvent event : events) { + System.out.println("TestMessageFacde 消费消息5, message: " + event.getMessage()); + } return 0; } } diff --git a/src/test/java/org/redkale/test/mq/TestMessageService.java b/src/test/java/org/redkale/test/mq/TestMessageService.java index 195985514..657a044be 100644 --- a/src/test/java/org/redkale/test/mq/TestMessageService.java +++ b/src/test/java/org/redkale/test/mq/TestMessageService.java @@ -4,7 +4,7 @@ package org.redkale.test.mq; -import org.redkale.mq.MessageConext; +import org.redkale.mq.MessageEvent; import org.redkale.mq.MessageProducer; import org.redkale.mq.Messaged; import org.redkale.mq.ResourceProducer; @@ -37,18 +37,24 @@ public class TestMessageService extends AbstractService { } @Messaged(mq = "mymq", topics = "test_bean_topic", group = "group_2") - protected void runMessage2(MessageConext context, TestBean message) { - System.out.println("TestMessageService 消费消息2, context: " + context + ", message: " + message); + protected void runMessage2(MessageEvent[] events) { + for (MessageEvent event : events) { + System.out.println("TestMessageService 消费消息2, message: " + event.getMessage()); + } } @Messaged(mq = "mymq", topics = "test_bean_topic", group = "group_3") - protected void runMessage3(TestBean message) { - System.out.println("TestMessageService 消费消息3, message: " + message); + protected void runMessage3(MessageEvent[] events) { + for (MessageEvent event : events) { + System.out.println("TestMessageService 消费消息3, message: " + event.getMessage()); + } } @Messaged(mq = "mymq", topics = "test_bean_topic", group = "group_4") - protected int runMessage4(TestBean message) { - System.out.println("TestMessageService 消费消息4, message: " + message); + protected int runMessage4(MessageEvent[] events) { + for (MessageEvent event : events) { + System.out.println("TestMessageService 消费消息4, message: " + event.getMessage()); + } return 0; } } diff --git a/src/test/java/org/redkale/test/mq/_DynLocalTestMessageService.java b/src/test/java/org/redkale/test/mq/_DynLocalTestMessageService.java index 76d54b241..16d84d4f6 100644 --- a/src/test/java/org/redkale/test/mq/_DynLocalTestMessageService.java +++ b/src/test/java/org/redkale/test/mq/_DynLocalTestMessageService.java @@ -6,8 +6,8 @@ package org.redkale.test.mq; import org.redkale.annotation.AutoLoad; import org.redkale.convert.ConvertType; -import org.redkale.mq.MessageConext; import org.redkale.mq.MessageConsumer; +import org.redkale.mq.MessageEvent; import org.redkale.mq.ResourceConsumer; @AutoLoad(false) @@ -31,8 +31,8 @@ public class _DynLocalTestMessageService extends TestMessageService { this.service = service; } - public void onMessage(MessageConext context, TestBean message) { - this.service.runMessage4(message); + public void onMessage(MessageEvent[] events) { + this.service.runMessage4(events); } } @@ -50,8 +50,8 @@ public class _DynLocalTestMessageService extends TestMessageService { this.service = service; } - public void onMessage(MessageConext context, TestBean message) { - this.service.runMessage3(message); + public void onMessage(MessageEvent[] events) { + this.service.runMessage3(events); } } @@ -69,8 +69,8 @@ public class _DynLocalTestMessageService extends TestMessageService { this.service = service; } - public void onMessage(MessageConext context, TestBean message) { - this.service.runMessage2(context, message); + public void onMessage(MessageEvent[] events) { + this.service.runMessage2(events); } } }