重构:代码组合逻辑

This commit is contained in:
lxy 2021-02-03 11:01:50 +08:00
parent bf410c13f3
commit 71a2043da8
7 changed files with 106 additions and 153 deletions

View File

@ -1,10 +1,12 @@
package com.zdemo; package com.zdemo;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import org.redkale.util.TypeToken;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.function.Consumer;
/** /**
* @author Liang * @author Liang
@ -12,29 +14,15 @@ import java.util.Set;
*/ */
public abstract class AbstractConsumer implements IConsumer { public abstract class AbstractConsumer implements IConsumer {
public final Map<String, EventType> eventMap = new HashMap<>(); private Map<String, EventType> eventMap = new HashMap<>();
public abstract String getGroupid(); protected abstract String getGroupid();
public boolean preInit() { protected boolean preInit() {
return true; return true;
} }
@Deprecated protected final Set<String> getTopics() {
public void addEventType(EventType... eventType) {
for (EventType type : eventType) {
String[] topics = type.topic.split(",");
for (String topic : topics) {
if (topic.isEmpty()) {
continue;
}
eventMap.put(topic, type);
}
}
}
@Override
public final Set<String> getTopics() {
if (!eventMap.isEmpty()) { if (!eventMap.isEmpty()) {
return eventMap.keySet(); return eventMap.keySet();
} }
@ -42,8 +30,7 @@ public abstract class AbstractConsumer implements IConsumer {
return Set.of("-"); return Set.of("-");
} }
@Override protected void accept(String topic, String value) {
public final void accept(String topic, String value) {
EventType eventType = eventMap.get(topic); EventType eventType = eventMap.get(topic);
Object data = null; Object data = null;
@ -56,5 +43,31 @@ public abstract class AbstractConsumer implements IConsumer {
eventType.accept(data); eventType.accept(data);
} }
protected final void removeEventType(String topic) {
eventMap.remove(topic);
}
/**
* 不同组件的订阅事件 发送
*
* @param topic
*/
protected abstract void subscribe(String topic);
public void subscribe(String topic, Consumer<String> consumer) {
subscribe(topic, TYPE_TOKEN_STRING, consumer);
}
@Override
public <T> void subscribe(String topic, TypeToken<T> typeToken, Consumer<T> consumer) {
if (topic.contains(",")) {
for (String x : topic.split(",")) {
subscribe(x, typeToken, consumer);
}
}
eventMap.put(topic, EventType.of(topic, typeToken, consumer));
subscribe(topic);
}
} }

View File

@ -2,7 +2,6 @@ package com.zdemo;
import org.redkale.util.TypeToken; import org.redkale.util.TypeToken;
import java.util.Collection;
import java.util.function.Consumer; import java.util.function.Consumer;
public interface IConsumer { public interface IConsumer {
@ -11,12 +10,6 @@ public interface IConsumer {
TypeToken<Integer> TYPE_TOKEN_INT = new TypeToken<Integer>() { TypeToken<Integer> TYPE_TOKEN_INT = new TypeToken<Integer>() {
}; };
Collection<String> getTopics();
void addEventType(EventType... eventType);
void accept(String topic, String record);
/** /**
* 取消订阅 * 取消订阅
* *
@ -24,7 +17,21 @@ public interface IConsumer {
*/ */
void unsubscribe(String topic); void unsubscribe(String topic);
/**
* 订阅 接收数据类型 String
*
* @param topic
* @param consumer
*/
void subscribe(String topic, Consumer<String> consumer); void subscribe(String topic, Consumer<String> consumer);
/**
* 订阅接收类型为 <T>
*
* @param topic
* @param typeToken
* @param consumer
* @param <T>
*/
<T> void subscribe(String topic, TypeToken<T> typeToken, Consumer<T> consumer); <T> void subscribe(String topic, TypeToken<T> typeToken, Consumer<T> consumer);
} }

View File

@ -1,7 +1,6 @@
package com.zdemo.kafak; package com.zdemo.kafak;
import com.zdemo.AbstractConsumer; import com.zdemo.AbstractConsumer;
import com.zdemo.EventType;
import com.zdemo.IConsumer; import com.zdemo.IConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
@ -9,7 +8,6 @@ import org.apache.kafka.common.errors.WakeupException;
import org.redkale.net.http.RestService; import org.redkale.net.http.RestService;
import org.redkale.service.Service; import org.redkale.service.Service;
import org.redkale.util.AnyValue; import org.redkale.util.AnyValue;
import org.redkale.util.TypeToken;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.File; import java.io.File;
@ -18,7 +16,6 @@ import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -33,24 +30,17 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume
@Resource(name = "APP_HOME") @Resource(name = "APP_HOME")
protected File APP_HOME; protected File APP_HOME;
protected Properties props;
public abstract String getGroupid(); public abstract String getGroupid();
private final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
@Override
public void addEventType(EventType... eventTypes) {
super.addEventType(eventTypes);
}
@Override @Override
public final void init(AnyValue config) { public final void init(AnyValue config) {
if (!preInit()) { if (!preInit()) {
return; return;
} }
try (FileInputStream fis = new FileInputStream(new File(APP_HOME, "conf/kafak.properties"));) { try (FileInputStream fis = new FileInputStream(new File(APP_HOME, "conf/kafak.properties"));) {
props = new Properties(); Properties props = new Properties();
props.load(fis); props.load(fis);
new Thread(() -> { new Thread(() -> {
@ -93,16 +83,13 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume
@Override @Override
public void unsubscribe(String topic) { public void unsubscribe(String topic) {
queue.add(() -> eventMap.remove(topic)); // 加入延时执行队列下一次订阅变更检查周期执行 queue.add(() -> super.removeEventType(topic)); // 加入延时执行队列下一次订阅变更检查周期执行
} }
@Override @Override
public void subscribe(String topic, Consumer<String> consumer) { protected void subscribe(String topic) {
addEventType(EventType.of(topic, consumer)); queue.add(() -> {
} // just set flag, nothing to do
});
@Override
public <T> void subscribe(String topic, TypeToken<T> typeToken, Consumer<T> consumer) {
addEventType(EventType.of(topic, typeToken, consumer));
} }
} }

View File

@ -1,6 +1,5 @@
package com.zdemo.kafak; package com.zdemo.kafak;
import com.zdemo.Event;
import com.zdemo.IProducer; import com.zdemo.IProducer;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
@ -40,16 +39,6 @@ public class KafakProducer implements IProducer, Service {
} }
} }
/*@Deprecated
@Override
public <T extends Event> void send(T t) {
String v = JsonConvert.root().convertTo(t.value);
if (v.startsWith("\"") && v.endsWith("\"")) {
v = v.substring(1, v.length() - 1);
}
producer.send(new ProducerRecord(t.topic, v));
}*/
@Override @Override
public <V> void publish(String topic, V v) { public <V> void publish(String topic, V v) {
producer.send(new ProducerRecord(topic, toStr(v))); producer.send(new ProducerRecord(topic, toStr(v)));

View File

@ -1,11 +1,9 @@
package com.zdemo.redis; package com.zdemo.redis;
import com.zdemo.AbstractConsumer; import com.zdemo.AbstractConsumer;
import com.zdemo.EventType;
import com.zdemo.IConsumer; import com.zdemo.IConsumer;
import org.redkale.service.Service; import org.redkale.service.Service;
import org.redkale.util.AnyValue; import org.redkale.util.AnyValue;
import org.redkale.util.TypeToken;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.BufferedReader; import java.io.BufferedReader;
@ -14,7 +12,6 @@ import java.io.InputStreamReader;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.util.function.Consumer;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -87,31 +84,10 @@ public class RedisConsumer extends AbstractConsumer implements IConsumer, Servic
} }
@Override @Override
public String getGroupid() { protected String getGroupid() {
return null; return null;
} }
@Override
public void addEventType(EventType... eventType) {
for (EventType type : eventType) {
String[] topics = type.topic.split(",");
for (String topic : topics) {
if (topic.isEmpty()) {
continue;
}
eventMap.put(topic, type);
//新增订阅
try {
writer.write("SUBSCRIBE " + topic + "\r\n");
writer.flush();
} catch (IOException e) {
logger.log(Level.WARNING, "", e);
}
}
}
}
@Override @Override
public void unsubscribe(String topic) { public void unsubscribe(String topic) {
try { try {
@ -120,15 +96,17 @@ public class RedisConsumer extends AbstractConsumer implements IConsumer, Servic
} catch (IOException e) { } catch (IOException e) {
logger.log(Level.WARNING, "", e); logger.log(Level.WARNING, "", e);
} }
super.removeEventType(topic);
} }
@Override @Override
public void subscribe(String topic, Consumer<String> consumer) { protected void subscribe(String topic) {
addEventType(EventType.of(topic, consumer)); //新增订阅
} try {
writer.write("SUBSCRIBE " + topic + "\r\n");
@Override writer.flush();
public <T> void subscribe(String topic, TypeToken<T> typeToken, Consumer<T> consumer) { } catch (IOException e) {
addEventType(EventType.of(topic, typeToken, consumer)); logger.log(Level.WARNING, "", e);
}
} }
} }

View File

@ -1,6 +1,5 @@
package com.zdemo.redis; package com.zdemo.redis;
import com.zdemo.Event;
import com.zdemo.IProducer; import com.zdemo.IProducer;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import org.redkale.service.Service; import org.redkale.service.Service;

View File

@ -1,10 +1,12 @@
package com.zdemo.zhub; package com.zdemo.zhub;
import com.zdemo.*; import com.zdemo.AbstractConsumer;
import com.zdemo.Event;
import com.zdemo.IConsumer;
import com.zdemo.IProducer;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import org.redkale.service.Service; import org.redkale.service.Service;
import org.redkale.util.AnyValue; import org.redkale.util.AnyValue;
import org.redkale.util.TypeToken;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.BufferedReader; import java.io.BufferedReader;
@ -18,13 +20,12 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
public abstract class ZHubClient extends AbstractConsumer implements IConsumer, IProducer, Service { public abstract class ZHubClient extends AbstractConsumer implements IConsumer, IProducer, Service {
Logger logger = Logger.getLogger(IProducer.class.getSimpleName()); Logger logger = Logger.getLogger(ZHubClient.class.getSimpleName());
@Resource(name = "property.zhub.host") @Resource(name = "property.zhub.host")
private String host = "127.0.0.1"; private String host = "127.0.0.1";
@ -134,7 +135,7 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer,
logger.log(Level.WARNING, "timer [" + timer.name + "]", e); logger.log(Level.WARNING, "timer [" + timer.name + "]", e);
} }
} }
}, 2); }, 1);
threadBuilder.accept(() -> { threadBuilder.accept(() -> {
while (true) { while (true) {
@ -184,7 +185,7 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer,
return JsonConvert.root().convertTo(v); return JsonConvert.root().convertTo(v);
} }
public boolean initSocket() { protected boolean initSocket() {
try { try {
client = new Socket(); client = new Socket();
client.connect(new InetSocketAddress(host, port)); client.connect(new InetSocketAddress(host, port));
@ -213,62 +214,12 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer,
return true; return true;
} }
@Deprecated
@Override
public void addEventType(EventType... eventType) {
for (EventType type : eventType) {
String[] topics = type.topic.split(",");
for (String topic : topics) {
if (topic.isEmpty()) {
continue;
}
eventMap.put(topic, type);
//新增订阅
send("subscribe " + topic);
}
}
}
@Override @Override
public void unsubscribe(String topic) { public void unsubscribe(String topic) {
send("unsubscribe " + topic); send("unsubscribe " + topic);
super.removeEventType(topic);
} }
// timer
private ConcurrentHashMap<String, Timer> timerMap = new ConcurrentHashMap();
public void timer(String name, Runnable run) {
timerMap.put(name, new Timer(name, run));
send("timer", name);
}
public void reloadTimer() {
send("cmd", "reload-timer-config");
}
class Timer {
String name;
//String expr;
Runnable runnable;
//boolean single;
public String getName() {
return name;
}
public Runnable getRunnable() {
return runnable;
}
public Timer(String name, Runnable runnable) {
this.name = name;
this.runnable = runnable;
}
}
public <V> void publish(String topic, V v) { public <V> void publish(String topic, V v) {
send("publish", topic, toStr(v)); send("publish", topic, toStr(v));
} }
@ -316,12 +267,41 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer,
} }
@Override @Override
public void subscribe(String topic, Consumer<String> consumer) { protected void subscribe(String topic) {
addEventType(EventType.of(topic, consumer)); send("subscribe " + topic); //新增订阅
} }
@Override
public <T> void subscribe(String topic, TypeToken<T> typeToken, Consumer<T> consumer) { // ================================================== timer ==================================================
addEventType(EventType.of(topic, typeToken, consumer)); private ConcurrentHashMap<String, Timer> timerMap = new ConcurrentHashMap();
class Timer {
String name;
//String expr;
Runnable runnable;
//boolean single;
public String getName() {
return name;
}
public Runnable getRunnable() {
return runnable;
}
public Timer(String name, Runnable runnable) {
this.name = name;
this.runnable = runnable;
}
}
public void timer(String name, Runnable run) {
timerMap.put(name, new Timer(name, run));
send("timer", name);
}
public void reloadTimer() {
send("cmd", "reload-timer-config");
} }
} }