新增:消费对象初始化是否 开启事件监听控制

This commit is contained in:
lxy 2020-09-19 13:49:50 +08:00
parent 4a7217f85f
commit 4b3849b66e
4 changed files with 9 additions and 4 deletions

View File

@ -14,7 +14,8 @@ public abstract class AbstractConsumer implements IConsumer {
public final Map<String, EventType> eventMap = new HashMap<>(); public final Map<String, EventType> eventMap = new HashMap<>();
public void preInit() { public boolean preInit() {
return true;
} }
public void addEventType(EventType... eventType) { public void addEventType(EventType... eventType) {

View File

@ -6,7 +6,7 @@ import java.util.Collection;
import java.util.logging.Logger; import java.util.logging.Logger;
public interface IConsumer<T extends Event> { public interface IConsumer<T extends Event> {
TypeToken TYPE_TOKEN_STRING = new TypeToken<String>() { TypeToken<String> TYPE_TOKEN_STRING = new TypeToken<String>() {
}; };
TypeToken<Integer> TYPE_TOKEN_INT = new TypeToken<Integer>() { TypeToken<Integer> TYPE_TOKEN_INT = new TypeToken<Integer>() {
}; };

View File

@ -54,7 +54,9 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume
@Override @Override
public final void init(AnyValue config) { public final void init(AnyValue config) {
preInit(); if (!preInit()) {
return;
}
try (FileInputStream fis = new FileInputStream(new File(APP_HOME, "conf/kafak.properties"));) { try (FileInputStream fis = new FileInputStream(new File(APP_HOME, "conf/kafak.properties"));) {
props = new Properties(); props = new Properties();
props.load(fis); props.load(fis);

View File

@ -12,7 +12,7 @@ public class MyConsumer extends KafakConsumer {
} }
@Override @Override
public void preInit() { public boolean preInit() {
addEventType( addEventType(
EventType.of("a1", new TypeToken<Float>() { EventType.of("a1", new TypeToken<Float>() {
}, r -> { }, r -> {
@ -23,5 +23,7 @@ public class MyConsumer extends KafakConsumer {
System.out.println("我收到了消息 主题bx 事件:" + str); System.out.println("我收到了消息 主题bx 事件:" + str);
}) })
); );
return true;
} }
} }