This commit is contained in:
lxy 2020-09-19 10:45:49 +08:00
parent e104268952
commit 4a7217f85f
4 changed files with 14 additions and 2 deletions

View File

@ -14,6 +14,9 @@ public abstract class AbstractConsumer implements IConsumer {
public final Map<String, EventType> eventMap = new HashMap<>(); public final Map<String, EventType> eventMap = new HashMap<>();
public void preInit() {
}
public void addEventType(EventType... eventType) { public void addEventType(EventType... eventType) {
for (EventType type : eventType) { for (EventType type : eventType) {
String[] topics = type.topic.split(","); String[] topics = type.topic.split(",");

View File

@ -1,9 +1,16 @@
package com.zdemo; package com.zdemo;
import org.redkale.util.TypeToken;
import java.util.Collection; import java.util.Collection;
import java.util.logging.Logger; import java.util.logging.Logger;
public interface IConsumer<T extends Event> { public interface IConsumer<T extends Event> {
TypeToken TYPE_TOKEN_STRING = new TypeToken<String>() {
};
TypeToken<Integer> TYPE_TOKEN_INT = new TypeToken<Integer>() {
};
Logger logger = Logger.getLogger(IConsumer.class.getSimpleName()); Logger logger = Logger.getLogger(IConsumer.class.getSimpleName());
Collection<String> getSubscribes(); Collection<String> getSubscribes();

View File

@ -53,7 +53,8 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume
} }
@Override @Override
public void init(AnyValue config) { public final void init(AnyValue config) {
preInit();
try (FileInputStream fis = new FileInputStream(new File(APP_HOME, "conf/kafak.properties"));) { try (FileInputStream fis = new FileInputStream(new File(APP_HOME, "conf/kafak.properties"));) {
props = new Properties(); props = new Properties();
props.load(fis); props.load(fis);

View File

@ -11,7 +11,8 @@ public class MyConsumer extends KafakConsumer {
return "group-test"; //消费组名称 return "group-test"; //消费组名称
} }
{ @Override
public void preInit() {
addEventType( addEventType(
EventType.of("a1", new TypeToken<Float>() { EventType.of("a1", new TypeToken<Float>() {
}, r -> { }, r -> {