diff --git a/src/main/java/org/redkale/mq/MessageAgent.java b/src/main/java/org/redkale/mq/MessageAgent.java index 72400b6f3..6e5da475c 100644 --- a/src/main/java/org/redkale/mq/MessageAgent.java +++ b/src/main/java/org/redkale/mq/MessageAgent.java @@ -564,7 +564,9 @@ public abstract class MessageAgent implements Resourcable { for (byte[] bs : messages) { msgs[++index] = (T) convert.convertFrom(messageType, bs); } - consumer.onMessage(context, msgs); + for (T msg : msgs) { + consumer.onMessage(context, msg); + } } catch (Throwable t) { messageAgent.getLogger().log(Level.SEVERE, MessageConsumer.class.getSimpleName() + " execute error, topic: " + context.getTopic() + ", messages: " + messages.stream().map(v -> new String(v, StandardCharsets.UTF_8)).collect(Collectors.toList())); diff --git a/src/main/java/org/redkale/mq/MessageConsumer.java b/src/main/java/org/redkale/mq/MessageConsumer.java index c820a6f68..35b8a48c5 100644 --- a/src/main/java/org/redkale/mq/MessageConsumer.java +++ b/src/main/java/org/redkale/mq/MessageConsumer.java @@ -25,7 +25,7 @@ public interface MessageConsumer { default void init(AnyValue config) { } - public void onMessage(MessageConext context, T[] messages); + public void onMessage(MessageConext context, T messages); default void destroy(AnyValue config) { }