diff --git a/src/com/zdemo/IConsumer.java b/src/com/zdemo/IConsumer.java index 1ac291a..dfcf47c 100644 --- a/src/com/zdemo/IConsumer.java +++ b/src/com/zdemo/IConsumer.java @@ -1,5 +1,6 @@ package com.zdemo; +import org.redkale.convert.json.JsonConvert; import org.redkale.util.TypeToken; import java.util.Collection; @@ -13,4 +14,18 @@ public interface IConsumer { TypeToken getTypeToken(); void accept(T t); + + default void accept(String value) { + System.out.println(value); + if ("com.zdemo.Event".equals(getTypeToken().getType().toString())) { + String _value = value.split("\"value\":")[1]; + _value = _value.substring(0, _value.length() - 1); + Event t = JsonConvert.root().convertFrom(getTypeToken().getType(), value.replace(_value, "’‘")); + t.setValue(_value); + accept((T) t); + } else { + Event t = JsonConvert.root().convertFrom(getTypeToken().getType(), value); + accept((T) t); + } + } } diff --git a/src/com/zdemo/kafak/KafakConsumer.java b/src/com/zdemo/kafak/KafakConsumer.java index b0a2c2c..a2aec9b 100644 --- a/src/com/zdemo/kafak/KafakConsumer.java +++ b/src/com/zdemo/kafak/KafakConsumer.java @@ -5,7 +5,6 @@ import com.zdemo.IConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.redkale.convert.json.JsonConvert; import org.redkale.net.http.RestService; import org.redkale.service.Service; import org.redkale.util.AnyValue; @@ -43,14 +42,13 @@ public abstract class KafakConsumer implements IConsumer, Se consumer.subscribe(getSubscribes()); while (true) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { + String value = record.value(); try { - logger.finest(String.format("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value())); - T t = JsonConvert.root().convertFrom(getTypeToken().getType(), record.value()); - accept(t); + accept(value); } catch (Exception e) { + logger.warning("event accept error :" + value); e.printStackTrace(); } } diff --git a/src/com/zdemo/redis/RedisConsumer.java b/src/com/zdemo/redis/RedisConsumer.java index f70fcc0..0b1f592 100644 --- a/src/com/zdemo/redis/RedisConsumer.java +++ b/src/com/zdemo/redis/RedisConsumer.java @@ -2,7 +2,6 @@ package com.zdemo.redis; import com.zdemo.Event; import com.zdemo.IConsumer; -import org.redkale.convert.json.JsonConvert; import org.redkale.service.Service; import org.redkale.util.AnyValue; @@ -61,10 +60,9 @@ public abstract class RedisConsumer implements IConsumer, Se br.readLine(); //$n len(value) String value = br.readLine(); // value try { - T t = JsonConvert.root().convertFrom(getTypeToken().getType(), value); - accept(t); + accept(value); } catch (Exception e) { - logger.warning("event fmt error :" + value); + logger.warning("event accept error :" + value); e.printStackTrace(); } } diff --git a/test/com/zdemo/test/AppTest.java b/test/com/zdemo/test/AppTest.java index a120bdc..e5e1bed 100644 --- a/test/com/zdemo/test/AppTest.java +++ b/test/com/zdemo/test/AppTest.java @@ -5,6 +5,8 @@ import com.zdemo.redis.RedisProducer; import org.junit.Test; import org.redkale.boot.Application; +import java.util.Map; + /** * 消息发布订阅测试 */ @@ -31,10 +33,10 @@ public class AppTest { try { RedisProducer producer = Application.singleton(RedisProducer.class); - Event event = new Event<>(); + Event event = new Event<>(); event.setTopic("c"); event.setKey("abx"); - event.setValue(2314); + event.setValue(Map.of("A", "a")); producer.send(event); diff --git a/test/com/zdemo/test/MyConsumer.java b/test/com/zdemo/test/MyConsumer.java index 769535a..18aef05 100644 --- a/test/com/zdemo/test/MyConsumer.java +++ b/test/com/zdemo/test/MyConsumer.java @@ -8,7 +8,7 @@ import org.redkale.util.TypeToken; import java.util.Collection; import java.util.List; -public class MyConsumer extends RedisConsumer> { +public class MyConsumer extends RedisConsumer> { public String getGroupid() { return "group-test"; //quest、user、im、live @@ -20,13 +20,13 @@ public class MyConsumer extends RedisConsumer> { } @Override - public TypeToken> getTypeToken() { - return new TypeToken>() { + public TypeToken> getTypeToken() { + return new TypeToken>() { }; } @Override - public void accept(Event event) { + public void accept(Event event) { switch (event.getTopic()) { case "a" -> System.out.println("我收到了消息 主题A 事件:" + JsonConvert.root().convertTo(event)); case "b" -> System.out.println("我收到了消息 主题B 事件:" + JsonConvert.root().convertTo(event));