新增:主题发布标准方法 publish 替换原 send 方法
This commit is contained in:
parent
48db458f5e
commit
3e91d785e3
@ -20,6 +20,7 @@ public abstract class AbstractConsumer implements IConsumer {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public void addEventType(EventType... eventType) {
|
||||
for (EventType type : eventType) {
|
||||
String[] topics = type.topic.split(",");
|
||||
|
@ -2,9 +2,11 @@ package com.zdemo;
|
||||
|
||||
import java.util.logging.Logger;
|
||||
|
||||
public interface IProducer<T extends Event> {
|
||||
public interface IProducer {
|
||||
Logger logger = Logger.getLogger(IProducer.class.getSimpleName());
|
||||
|
||||
void send(T t);
|
||||
@Deprecated
|
||||
<T extends Event> void send(T t);
|
||||
|
||||
<V> void publish(String topic, V v);
|
||||
}
|
||||
|
@ -19,10 +19,10 @@ import java.util.logging.Level;
|
||||
/**
|
||||
* 生产
|
||||
*
|
||||
* @param <T>
|
||||
* @param
|
||||
*/
|
||||
@RestService
|
||||
public class KafakProducer<T extends Event> implements IProducer<T>, Service {
|
||||
public class KafakProducer implements IProducer, Service {
|
||||
private KafkaProducer<String, String> producer;
|
||||
|
||||
@Resource(name = "APP_HOME")
|
||||
@ -40,8 +40,9 @@ public class KafakProducer<T extends Event> implements IProducer<T>, Service {
|
||||
}
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
@Override
|
||||
public void send(T t) {
|
||||
public <T extends Event> void send(T t) {
|
||||
String v = JsonConvert.root().convertTo(t.value);
|
||||
if (v.startsWith("\"") && v.endsWith("\"")) {
|
||||
v = v.substring(1, v.length() - 1);
|
||||
@ -49,8 +50,20 @@ public class KafakProducer<T extends Event> implements IProducer<T>, Service {
|
||||
producer.send(new ProducerRecord(t.topic, v));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> void publish(String topic, V v) {
|
||||
producer.send(new ProducerRecord(topic, toStr(v)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy(AnyValue config) {
|
||||
producer.close();
|
||||
}
|
||||
|
||||
private <V> String toStr(V v) {
|
||||
if (v instanceof String) {
|
||||
return (String) v;
|
||||
}
|
||||
return JsonConvert.root().convertTo(v);
|
||||
}
|
||||
}
|
||||
|
@ -13,7 +13,7 @@ import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.util.logging.Level;
|
||||
|
||||
public class RedisProducer<T extends Event> implements IProducer<T>, Service {
|
||||
public class RedisProducer implements IProducer, Service {
|
||||
|
||||
@Resource(name = "property.redis.host")
|
||||
private String host = "127.0.0.1";
|
||||
@ -39,8 +39,9 @@ public class RedisProducer<T extends Event> implements IProducer<T>, Service {
|
||||
}
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
@Override
|
||||
public void send(T t) {
|
||||
public <T extends Event> void send(T t) {
|
||||
try {
|
||||
String v = JsonConvert.root().convertTo(t.value);
|
||||
if (v.startsWith("\"") && v.endsWith("\"")) {
|
||||
@ -53,4 +54,22 @@ public class RedisProducer<T extends Event> implements IProducer<T>, Service {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> void publish(String topic, V v) {
|
||||
try {
|
||||
osw.write("PUBLISH " + topic + " '" + toStr(v) + "' \r\n");
|
||||
osw.flush();
|
||||
} catch (IOException e) {
|
||||
logger.log(Level.WARNING, "", e);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private <V> String toStr(V v) {
|
||||
if (v instanceof String) {
|
||||
return (String) v;
|
||||
}
|
||||
return JsonConvert.root().convertTo(v);
|
||||
}
|
||||
}
|
||||
|
@ -231,6 +231,15 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public <V> void publish(String topic, V v) {
|
||||
send("publish", topic, toStr(v));
|
||||
}
|
||||
|
||||
public <V> void broadcast(String topic, V v) {
|
||||
send("broadcast", topic, toStr(v));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe(String topic, Consumer<String> consumer) {
|
||||
addEventType(EventType.of(topic, consumer));
|
||||
|
@ -1,7 +1,5 @@
|
||||
package com.zdemo.test;
|
||||
|
||||
import com.zdemo.Event;
|
||||
import com.zdemo.EventType;
|
||||
import com.zdemo.IProducer;
|
||||
import org.junit.Test;
|
||||
import org.redkale.boot.Application;
|
||||
@ -24,11 +22,9 @@ public class AppTest {
|
||||
//启动并开启消费监听
|
||||
MyConsumer consumer = Application.singleton(MyConsumer.class);
|
||||
|
||||
consumer.addEventType(
|
||||
EventType.of("a-1", str -> {
|
||||
System.out.println("我收到了消息 a 事件:" + str);
|
||||
})
|
||||
);
|
||||
consumer.subscribe("a-1", str -> {
|
||||
System.out.println("我收到了消息 a 事件:" + str);
|
||||
});
|
||||
|
||||
consumer.timer("a", () -> {
|
||||
System.out.println(Utility.now() + " timer a 执行了");
|
||||
@ -44,10 +40,9 @@ public class AppTest {
|
||||
@Test
|
||||
public void runProducer() {
|
||||
try {
|
||||
IProducer producer = Application.singleton(MyConsumer.class);
|
||||
MyConsumer producer = Application.singleton(MyConsumer.class);
|
||||
for (int i = 0; i < 10_0000; i++) {
|
||||
producer.send(Event.of("a-1", i + ""));
|
||||
producer.send(Event.of("a-1", i));
|
||||
producer.publish("a-1", i);
|
||||
}
|
||||
|
||||
try {
|
||||
@ -117,7 +112,7 @@ public class AppTest {
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
|
||||
producer.send(Event.of("x", "x"));
|
||||
producer.publish("x", "x");
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
Loading…
Reference in New Issue
Block a user