修改:接收消息 String类型 反序列化失败
This commit is contained in:
parent
c8746b712e
commit
17b28a9124
@ -1,5 +1,6 @@
|
|||||||
package com.zdemo;
|
package com.zdemo;
|
||||||
|
|
||||||
|
import org.redkale.convert.json.JsonConvert;
|
||||||
import org.redkale.util.TypeToken;
|
import org.redkale.util.TypeToken;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
@ -13,4 +14,18 @@ public interface IConsumer<T extends Event> {
|
|||||||
TypeToken<T> getTypeToken();
|
TypeToken<T> getTypeToken();
|
||||||
|
|
||||||
void accept(T t);
|
void accept(T t);
|
||||||
|
|
||||||
|
default void accept(String value) {
|
||||||
|
System.out.println(value);
|
||||||
|
if ("com.zdemo.Event<java.lang.String>".equals(getTypeToken().getType().toString())) {
|
||||||
|
String _value = value.split("\"value\":")[1];
|
||||||
|
_value = _value.substring(0, _value.length() - 1);
|
||||||
|
Event t = JsonConvert.root().convertFrom(getTypeToken().getType(), value.replace(_value, "’‘"));
|
||||||
|
t.setValue(_value);
|
||||||
|
accept((T) t);
|
||||||
|
} else {
|
||||||
|
Event t = JsonConvert.root().convertFrom(getTypeToken().getType(), value);
|
||||||
|
accept((T) t);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -5,7 +5,6 @@ import com.zdemo.IConsumer;
|
|||||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||||
import org.redkale.convert.json.JsonConvert;
|
|
||||||
import org.redkale.net.http.RestService;
|
import org.redkale.net.http.RestService;
|
||||||
import org.redkale.service.Service;
|
import org.redkale.service.Service;
|
||||||
import org.redkale.util.AnyValue;
|
import org.redkale.util.AnyValue;
|
||||||
@ -43,14 +42,13 @@ public abstract class KafakConsumer<T extends Event> implements IConsumer<T>, Se
|
|||||||
consumer.subscribe(getSubscribes());
|
consumer.subscribe(getSubscribes());
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
|
||||||
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
|
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
|
||||||
for (ConsumerRecord<String, String> record : records) {
|
for (ConsumerRecord<String, String> record : records) {
|
||||||
|
String value = record.value();
|
||||||
try {
|
try {
|
||||||
logger.finest(String.format("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()));
|
accept(value);
|
||||||
T t = JsonConvert.root().convertFrom(getTypeToken().getType(), record.value());
|
|
||||||
accept(t);
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
logger.warning("event accept error :" + value);
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,7 +2,6 @@ package com.zdemo.redis;
|
|||||||
|
|
||||||
import com.zdemo.Event;
|
import com.zdemo.Event;
|
||||||
import com.zdemo.IConsumer;
|
import com.zdemo.IConsumer;
|
||||||
import org.redkale.convert.json.JsonConvert;
|
|
||||||
import org.redkale.service.Service;
|
import org.redkale.service.Service;
|
||||||
import org.redkale.util.AnyValue;
|
import org.redkale.util.AnyValue;
|
||||||
|
|
||||||
@ -61,10 +60,9 @@ public abstract class RedisConsumer<T extends Event> implements IConsumer<T>, Se
|
|||||||
br.readLine(); //$n len(value)
|
br.readLine(); //$n len(value)
|
||||||
String value = br.readLine(); // value
|
String value = br.readLine(); // value
|
||||||
try {
|
try {
|
||||||
T t = JsonConvert.root().convertFrom(getTypeToken().getType(), value);
|
accept(value);
|
||||||
accept(t);
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warning("event fmt error :" + value);
|
logger.warning("event accept error :" + value);
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5,6 +5,8 @@ import com.zdemo.redis.RedisProducer;
|
|||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.redkale.boot.Application;
|
import org.redkale.boot.Application;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 消息发布订阅测试
|
* 消息发布订阅测试
|
||||||
*/
|
*/
|
||||||
@ -31,10 +33,10 @@ public class AppTest {
|
|||||||
try {
|
try {
|
||||||
RedisProducer producer = Application.singleton(RedisProducer.class);
|
RedisProducer producer = Application.singleton(RedisProducer.class);
|
||||||
|
|
||||||
Event<Integer> event = new Event<>();
|
Event<Map> event = new Event<>();
|
||||||
event.setTopic("c");
|
event.setTopic("c");
|
||||||
event.setKey("abx");
|
event.setKey("abx");
|
||||||
event.setValue(2314);
|
event.setValue(Map.of("A", "a"));
|
||||||
|
|
||||||
producer.send(event);
|
producer.send(event);
|
||||||
|
|
||||||
|
@ -8,7 +8,7 @@ import org.redkale.util.TypeToken;
|
|||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
public class MyConsumer extends RedisConsumer<Event<Integer>> {
|
public class MyConsumer extends RedisConsumer<Event<String>> {
|
||||||
|
|
||||||
public String getGroupid() {
|
public String getGroupid() {
|
||||||
return "group-test"; //quest、user、im、live
|
return "group-test"; //quest、user、im、live
|
||||||
@ -20,13 +20,13 @@ public class MyConsumer extends RedisConsumer<Event<Integer>> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TypeToken<Event<Integer>> getTypeToken() {
|
public TypeToken<Event<String>> getTypeToken() {
|
||||||
return new TypeToken<Event<Integer>>() {
|
return new TypeToken<Event<String>>() {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void accept(Event<Integer> event) {
|
public void accept(Event<String> event) {
|
||||||
switch (event.getTopic()) {
|
switch (event.getTopic()) {
|
||||||
case "a" -> System.out.println("我收到了消息 主题A 事件:" + JsonConvert.root().convertTo(event));
|
case "a" -> System.out.println("我收到了消息 主题A 事件:" + JsonConvert.root().convertTo(event));
|
||||||
case "b" -> System.out.println("我收到了消息 主题B 事件:" + JsonConvert.root().convertTo(event));
|
case "b" -> System.out.println("我收到了消息 主题B 事件:" + JsonConvert.root().convertTo(event));
|
||||||
|
Loading…
Reference in New Issue
Block a user