diff --git a/src/com/zdemo/kafak/KafakConsumer.java b/src/com/zdemo/kafak/KafakConsumer.java index 3b655c2..e6a8227 100644 --- a/src/com/zdemo/kafak/KafakConsumer.java +++ b/src/com/zdemo/kafak/KafakConsumer.java @@ -13,7 +13,6 @@ import org.redkale.util.AnyValue; import javax.annotation.Resource; import java.io.File; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.time.Duration; import java.util.Properties; @@ -48,7 +47,7 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume queue.put(eventType); } } catch (InterruptedException e) { - e.printStackTrace(); + logger.log(Level.WARNING, "", e); } } @@ -69,7 +68,7 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(asList("_")); while (true) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(10_000)); + ConsumerRecords records = consumer.poll(Duration.ofMillis(1_000)); records.forEach(record -> { String topic = record.topic(); long offset = record.offset(); @@ -77,8 +76,7 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume try { accept(topic, value); } catch (Exception e) { - logger.warning(String.format("topic[%s] event accept error, offset=%s,value:%s", topic, offset, value)); - e.printStackTrace(); + logger.log(Level.WARNING, String.format("topic[%s] event accept error, offset=%s,value:%s", topic, offset, value), e); } }); @@ -94,10 +92,8 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume } }, "thread-consumer-[" + getGroupid() + "]").start(); - } catch (FileNotFoundException e) { - e.printStackTrace(); } catch (IOException e) { - e.printStackTrace(); + logger.log(Level.WARNING, "", e); } } diff --git a/src/com/zdemo/kafak/KafakProducer.java b/src/com/zdemo/kafak/KafakProducer.java index 420a668..ff04553 100644 --- a/src/com/zdemo/kafak/KafakProducer.java +++ b/src/com/zdemo/kafak/KafakProducer.java @@ -12,9 +12,9 @@ import org.redkale.util.AnyValue; import javax.annotation.Resource; import java.io.File; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.util.Properties; +import java.util.logging.Level; /** * 生产 @@ -31,26 +31,23 @@ public class KafakProducer implements IProducer, Service { @Override public void init(AnyValue config) { File file = new File(APP_HOME, "conf/kafak.properties"); - if (!file.exists()) { - logger.warning(String.format("------\n%s (系统找不到指定的文件。)\n未初始化kafak 生产者,kafak发布消息不可用\n------", file.getPath())); - return; - } - try (FileInputStream fis = new FileInputStream(file)) { Properties props = new Properties(); props.load(fis); producer = new KafkaProducer(props); - } catch (FileNotFoundException e) { - e.printStackTrace(); } catch (IOException e) { - e.printStackTrace(); + logger.log(Level.WARNING, "未初始化kafak 生产者,kafak发布消息不可用", e); } } @Override public void send(T... t) { for (T x : t) { - producer.send(new ProducerRecord(x.topic, JsonConvert.root().convertTo(x.value))); + String v = JsonConvert.root().convertTo(x.value); + if (v.startsWith("\"") && v.endsWith("\"")) { + v = v.substring(1, v.length() - 1); + } + producer.send(new ProducerRecord(x.topic, v)); } } diff --git a/src/com/zdemo/redis/RedisConsumer.java b/src/com/zdemo/redis/RedisConsumer.java index 08e52a6..35f5679 100644 --- a/src/com/zdemo/redis/RedisConsumer.java +++ b/src/com/zdemo/redis/RedisConsumer.java @@ -11,6 +11,7 @@ import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.InetSocketAddress; import java.net.Socket; +import java.util.logging.Level; public abstract class RedisConsumer extends AbstractConsumer implements IConsumer, Service { @@ -59,13 +60,12 @@ public abstract class RedisConsumer extends AbstractConsumer implements IConsume try { accept(topic, value); } catch (Exception e) { - logger.warning("topic[" + topic + "] event accept error :" + value); - e.printStackTrace(); + logger.log(Level.WARNING, "topic[" + topic + "] event accept error :" + value, e); } } } } catch (Exception e) { - e.printStackTrace(); + logger.log(Level.WARNING, "Redis Consumer 初始化失败!", e); } }).start(); } diff --git a/src/com/zdemo/redis/RedisProducer.java b/src/com/zdemo/redis/RedisProducer.java index a565df1..f8db3ec 100644 --- a/src/com/zdemo/redis/RedisProducer.java +++ b/src/com/zdemo/redis/RedisProducer.java @@ -11,6 +11,7 @@ import java.io.IOException; import java.io.OutputStreamWriter; import java.net.InetSocketAddress; import java.net.Socket; +import java.util.logging.Level; public class RedisProducer implements IProducer, Service { @@ -34,7 +35,7 @@ public class RedisProducer implements IProducer, Service { oswPub.write("AUTH " + password + "\r\n"); oswPub.flush(); } catch (IOException e) { - e.printStackTrace(); + logger.log(Level.WARNING, "", e); } } @@ -45,7 +46,7 @@ public class RedisProducer implements IProducer, Service { oswPub.write("PUBLISH " + x.topic + " '" + JsonConvert.root().convertTo(x.value) + "' \r\n"); oswPub.flush(); } catch (IOException e) { - e.printStackTrace(); + logger.log(Level.WARNING, "", e); } } } diff --git a/test/com/zdemo/test/AppTest.java b/test/com/zdemo/test/AppTest.java index 3513d52..324fdd1 100644 --- a/test/com/zdemo/test/AppTest.java +++ b/test/com/zdemo/test/AppTest.java @@ -22,32 +22,8 @@ public class AppTest { public void runConsumer() { try { //启动并开启消费监听 - MyConsumer consumer = Application.singleton(MyConsumer.class); - - //新增订阅主题 a1 - consumer.addEventType(EventType.of("a1", new TypeToken() { - }, r -> { - System.out.println("我收到了消息 主题A 事件:" + JsonConvert.root().convertTo(r)); - })); - - Thread.sleep(5_000); - - //新增订阅主题 b1、c1 - consumer.addEventType( - // 订阅主题 b1 - EventType.of("b1", new TypeToken>() { - }, r -> { - System.out.println("我收到了消息 主题B 事件:" + JsonConvert.root().convertTo(r)); - }), - - // 订阅主题 c1 - EventType.of("c1", new TypeToken>() { - }, r -> { - System.out.println("我收到了消息 主题C 事件:" + JsonConvert.root().convertTo(r)); - }) - ); - - Thread.sleep(60_000); + Application.singleton(MyConsumer.class); + Thread.sleep(60_000 * 60); } catch (Exception e) { e.printStackTrace(); }