修改:定时调度执行异常和 任务堆叠导致丢失任务bug

This commit is contained in:
lxy 2021-01-25 12:50:53 +08:00
parent f4ed1b05d0
commit 0cd83f1a19
3 changed files with 69 additions and 17 deletions

View File

@ -42,9 +42,6 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume
@Override @Override
public void addEventType(EventType... eventTypes) { public void addEventType(EventType... eventTypes) {
super.addEventType(eventTypes); super.addEventType(eventTypes);
// 增加变更标记
queue.add(() -> logger.info("KafakConsumer add new topic!"));
} }
@Override @Override

View File

@ -15,7 +15,9 @@ import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.net.SocketException; import java.net.SocketException;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -32,20 +34,30 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer,
private int port = 1216; private int port = 1216;
private ReentrantLock lock = new ReentrantLock(); private ReentrantLock lock = new ReentrantLock();
private Socket client; private Socket client;
private OutputStream writer; private OutputStream writer;
private BufferedReader reader; private BufferedReader reader;
private final LinkedBlockingQueue<Timer> timerQueue = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<Event<String>> topicQueue = new LinkedBlockingQueue<>();
private BiConsumer<Runnable, Integer> threadBuilder = (r, n) -> {
for (int i = 0; i < n; i++) {
new Thread(() -> r.run()).start();
}
};
@Override @Override
public void init(AnyValue config) { public void init(AnyValue config) {
if (!preInit()) { if (!preInit()) {
return; return;
} }
boolean flag = initSocket(); if (!initSocket()) {
return;
}
// 消息 事件接收
new Thread(() -> { new Thread(() -> {
while (flag) { while (true) {
try { try {
String readLine = reader.readLine(); String readLine = reader.readLine();
if (readLine == null) { // 连接中断 处理 if (readLine == null) { // 连接中断 处理
@ -71,11 +83,8 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer,
reader.readLine(); //$n len(value) reader.readLine(); //$n len(value)
String value = reader.readLine(); // value String value = reader.readLine(); // value
try {
accept(topic, value); topicQueue.put(Event.of(topic, value));
} catch (Exception e) {
logger.log(Level.WARNING, "topic[" + topic + "] event accept error :" + value, e);
}
} }
// timer 消息 // timer 消息
@ -88,10 +97,7 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer,
reader.readLine(); //$n len(key) reader.readLine(); //$n len(key)
String topic = reader.readLine(); // name String topic = reader.readLine(); // name
Timer timer = timerMap.get(topic); timerQueue.put(timerMap.get(topic));
if (timer != null) {
timer.runnable.run();
}
} }
} catch (IOException e) { } catch (IOException e) {
logger.log(Level.WARNING, "reconnection ", e.getMessage()); logger.log(Level.WARNING, "reconnection ", e.getMessage());
@ -104,9 +110,49 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer,
} }
} }
} }
} catch (InterruptedException e) {
e.printStackTrace();
} }
} }
}).start(); }).start();
// 定时调度事件
threadBuilder.accept(() -> {
while (true) {
Timer timer = null;
try {
if ((timer = timerQueue.take()) == null) {
return;
}
long start = System.currentTimeMillis();
timer.runnable.run();
long end = System.currentTimeMillis();
logger.finest(String.format("timer [%s] : elapsed time %s ms", timer.name, end - start));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
logger.log(Level.WARNING, "timer [" + timer.name + "]", e);
}
}
}, 2);
threadBuilder.accept(() -> {
while (true) {
Event<String> event = null;
try {
if ((event = topicQueue.take()) == null) {
continue;
}
accept(event.topic, event.value);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
logger.log(Level.WARNING, "topic[" + event.topic + "] event accept error :" + event.value, e);
}
}
}, 1);
} }
// --------------------- // ---------------------
@ -226,7 +272,7 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer,
public <V> void publish(String topic, V v) { public <V> void publish(String topic, V v) {
send("publish", topic, toStr(v)); send("publish", topic, toStr(v));
} }
public <V> void broadcast(String topic, V v) { public <V> void broadcast(String topic, V v) {
send("broadcast", topic, toStr(v)); send("broadcast", topic, toStr(v));
} }

View File

@ -28,6 +28,15 @@ public class AppTest {
consumer.timer("a", () -> { consumer.timer("a", () -> {
System.out.println(Utility.now() + " timer a 执行了"); System.out.println(Utility.now() + " timer a 执行了");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
consumer.timer("b", () -> {
System.out.println(Utility.now() + " ----------------- timer b 执行了");
}); });