修改:日志输出
This commit is contained in:
@@ -13,7 +13,6 @@ import org.redkale.util.AnyValue;
|
||||
import javax.annotation.Resource;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.Properties;
|
||||
@@ -48,7 +47,7 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume
|
||||
queue.put(eventType);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
logger.log(Level.WARNING, "", e);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -69,7 +68,7 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume
|
||||
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
|
||||
consumer.subscribe(asList("_"));
|
||||
while (true) {
|
||||
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10_000));
|
||||
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1_000));
|
||||
records.forEach(record -> {
|
||||
String topic = record.topic();
|
||||
long offset = record.offset();
|
||||
@@ -77,8 +76,7 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume
|
||||
try {
|
||||
accept(topic, value);
|
||||
} catch (Exception e) {
|
||||
logger.warning(String.format("topic[%s] event accept error, offset=%s,value:%s", topic, offset, value));
|
||||
e.printStackTrace();
|
||||
logger.log(Level.WARNING, String.format("topic[%s] event accept error, offset=%s,value:%s", topic, offset, value), e);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -94,10 +92,8 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume
|
||||
}
|
||||
|
||||
}, "thread-consumer-[" + getGroupid() + "]").start();
|
||||
} catch (FileNotFoundException e) {
|
||||
e.printStackTrace();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
logger.log(Level.WARNING, "", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -12,9 +12,9 @@ import org.redkale.util.AnyValue;
|
||||
import javax.annotation.Resource;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
import java.util.logging.Level;
|
||||
|
||||
/**
|
||||
* 生产
|
||||
@@ -31,26 +31,23 @@ public class KafakProducer<T extends Event> implements IProducer<T>, Service {
|
||||
@Override
|
||||
public void init(AnyValue config) {
|
||||
File file = new File(APP_HOME, "conf/kafak.properties");
|
||||
if (!file.exists()) {
|
||||
logger.warning(String.format("------\n%s (系统找不到指定的文件。)\n未初始化kafak 生产者,kafak发布消息不可用\n------", file.getPath()));
|
||||
return;
|
||||
}
|
||||
|
||||
try (FileInputStream fis = new FileInputStream(file)) {
|
||||
Properties props = new Properties();
|
||||
props.load(fis);
|
||||
producer = new KafkaProducer(props);
|
||||
} catch (FileNotFoundException e) {
|
||||
e.printStackTrace();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
logger.log(Level.WARNING, "未初始化kafak 生产者,kafak发布消息不可用", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(T... t) {
|
||||
for (T x : t) {
|
||||
producer.send(new ProducerRecord(x.topic, JsonConvert.root().convertTo(x.value)));
|
||||
String v = JsonConvert.root().convertTo(x.value);
|
||||
if (v.startsWith("\"") && v.endsWith("\"")) {
|
||||
v = v.substring(1, v.length() - 1);
|
||||
}
|
||||
producer.send(new ProducerRecord(x.topic, v));
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user