This commit is contained in:
lxy 2020-08-18 14:21:54 +08:00
parent 1b54d52cf0
commit c8746b712e
2 changed files with 21 additions and 1 deletions

View File

@ -1,10 +1,24 @@
package com.zdemo; package com.zdemo;
/**
* 发布订阅 事件
*
* @param <V>
*/
public class Event<V> { public class Event<V> {
private String topic; private String topic;
private String key; private String key;
private V value; private V value;
public Event() {
}
public Event(String topic, String key, V value) {
this.topic = topic;
this.key = key;
this.value = value;
}
public String getTopic() { public String getTopic() {
return topic; return topic;
} }

View File

@ -30,7 +30,13 @@ public class KafakProducer<T extends Event> implements IProducer<T>, Service {
@Override @Override
public void init(AnyValue config) { public void init(AnyValue config) {
try (FileInputStream fis = new FileInputStream(new File(APP_HOME, "conf/kafak.properties"));) { File file = new File(APP_HOME, "conf/kafak.properties");
if (!file.exists()) {
logger.warning(String.format("------\n%s (系统找不到指定的文件。)\n未初始化kafak 生产者kafak发布消息不可用\n------", file.getPath()));
return;
}
try (FileInputStream fis = new FileInputStream(file)) {
Properties props = new Properties(); Properties props = new Properties();
props.load(fis); props.load(fis);
producer = new KafkaProducer(props); producer = new KafkaProducer(props);