MessageConsumer

This commit is contained in:
redkale
2023-10-05 22:51:12 +08:00
parent da1de47927
commit c7e06ed519
2 changed files with 4 additions and 2 deletions

View File

@@ -564,7 +564,9 @@ public abstract class MessageAgent implements Resourcable {
for (byte[] bs : messages) {
msgs[++index] = (T) convert.convertFrom(messageType, bs);
}
consumer.onMessage(context, msgs);
for (T msg : msgs) {
consumer.onMessage(context, msg);
}
} catch (Throwable t) {
messageAgent.getLogger().log(Level.SEVERE, MessageConsumer.class.getSimpleName() + " execute error, topic: " + context.getTopic()
+ ", messages: " + messages.stream().map(v -> new String(v, StandardCharsets.UTF_8)).collect(Collectors.toList()));

View File

@@ -25,7 +25,7 @@ public interface MessageConsumer<T> {
default void init(AnyValue config) {
}
public void onMessage(MessageConext context, T[] messages);
public void onMessage(MessageConext context, T messages);
default void destroy(AnyValue config) {
}