diff --git a/src/com/zdemo/AbstractConsumer.java b/src/com/zdemo/AbstractConsumer.java index 2a5768c..2793e44 100644 --- a/src/com/zdemo/AbstractConsumer.java +++ b/src/com/zdemo/AbstractConsumer.java @@ -14,6 +14,9 @@ public abstract class AbstractConsumer implements IConsumer { public final Map eventMap = new HashMap<>(); + public void preInit() { + } + public void addEventType(EventType... eventType) { for (EventType type : eventType) { String[] topics = type.topic.split(","); diff --git a/src/com/zdemo/IConsumer.java b/src/com/zdemo/IConsumer.java index c34aaa1..32cdd25 100644 --- a/src/com/zdemo/IConsumer.java +++ b/src/com/zdemo/IConsumer.java @@ -1,9 +1,16 @@ package com.zdemo; +import org.redkale.util.TypeToken; + import java.util.Collection; import java.util.logging.Logger; public interface IConsumer { + TypeToken TYPE_TOKEN_STRING = new TypeToken() { + }; + TypeToken TYPE_TOKEN_INT = new TypeToken() { + }; + Logger logger = Logger.getLogger(IConsumer.class.getSimpleName()); Collection getSubscribes(); diff --git a/src/com/zdemo/kafak/KafakConsumer.java b/src/com/zdemo/kafak/KafakConsumer.java index 8e197c7..ea08e2d 100644 --- a/src/com/zdemo/kafak/KafakConsumer.java +++ b/src/com/zdemo/kafak/KafakConsumer.java @@ -53,7 +53,8 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume } @Override - public void init(AnyValue config) { + public final void init(AnyValue config) { + preInit(); try (FileInputStream fis = new FileInputStream(new File(APP_HOME, "conf/kafak.properties"));) { props = new Properties(); props.load(fis); diff --git a/test/com/zdemo/test/MyConsumer.java b/test/com/zdemo/test/MyConsumer.java index e7112b8..223b400 100644 --- a/test/com/zdemo/test/MyConsumer.java +++ b/test/com/zdemo/test/MyConsumer.java @@ -11,7 +11,8 @@ public class MyConsumer extends KafakConsumer { return "group-test"; //消费组名称 } - { + @Override + public void preInit() { addEventType( EventType.of("a1", new TypeToken() { }, r -> {