新增:主题退订

This commit is contained in:
lxy 2020-12-08 19:31:15 +08:00
parent 10472078b0
commit a37f1b9564
4 changed files with 42 additions and 31 deletions

View File

@ -18,4 +18,10 @@ public interface IConsumer<T extends Event> {
void addEventType(EventType... eventType); void addEventType(EventType... eventType);
void accept(String topic, String record); void accept(String topic, String record);
/**
* 取消订阅
* @param topic
*/
void unsubscribe(String topic);
} }

View File

@ -35,18 +35,14 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume
public abstract String getGroupid(); public abstract String getGroupid();
private final LinkedBlockingQueue<EventType> queue = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
@Override @Override
public void addEventType(EventType... eventTypes) { public void addEventType(EventType... eventTypes) {
super.addEventType(eventTypes); super.addEventType(eventTypes);
try {
for (EventType eventType : eventTypes) { // 增加变更标记
queue.put(eventType); queue.add(() -> logger.info("KafakConsumer starting..."));
}
} catch (InterruptedException e) {
logger.log(Level.WARNING, "", e);
}
} }
@Override @Override
@ -58,8 +54,6 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume
props = new Properties(); props = new Properties();
props.load(fis); props.load(fis);
if (logger.isLoggable(Level.INFO)) logger.info(getGroupid() + " consumer started!");
new Thread(() -> { new Thread(() -> {
try { try {
props.put("group.id", getGroupid()); props.put("group.id", getGroupid());
@ -78,9 +72,12 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume
} }
}); });
// 动态新增订阅 if (!queue.isEmpty()) {
while (!queue.isEmpty()) { Runnable runnable;
queue.clear(); while ((runnable = queue.poll()) != null) {
runnable.run();
}
consumer.unsubscribe(); consumer.unsubscribe();
consumer.subscribe(getTopics()); consumer.subscribe(getTopics());
} }
@ -95,5 +92,8 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume
} }
} }
@Override
public void unsubscribe(String topic) {
queue.add(() -> eventMap.remove(topic)); // 加入延时执行队列下一次订阅变更检查周期执行
}
} }

View File

@ -22,18 +22,14 @@ public abstract class PulsarConsumer extends AbstractConsumer implements IConsum
public abstract String getGroupid(); public abstract String getGroupid();
private final LinkedBlockingQueue<EventType> queue = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
@Override @Override
public void addEventType(EventType... eventTypes) { public void addEventType(EventType... eventTypes) {
super.addEventType(eventTypes); super.addEventType(eventTypes);
try {
for (EventType eventType : eventTypes) { // 增加变更标记
queue.put(eventType); queue.add(() -> logger.info("PulsarConsumer add new topic!"));
}
} catch (InterruptedException e) {
logger.log(Level.WARNING, "", e);
}
} }
@Override @Override
@ -41,22 +37,21 @@ public abstract class PulsarConsumer extends AbstractConsumer implements IConsum
if (!preInit()) { if (!preInit()) {
return; return;
} }
queue.add(() -> logger.info("PulsarConsumer starting ..."));
new Thread(() -> { new Thread(() -> {
try { try {
client = PulsarClient.builder() client = PulsarClient.builder()
.serviceUrl(serviceurl) .serviceUrl(serviceurl)
.build(); .build();
consumer = client.newConsumer()
.topics(new ArrayList<>(getTopics()))
.subscriptionName(getGroupid())
.subscriptionType(SubscriptionType.Shared)
.subscribe();
while (true) { while (true) {
// 动态新增订阅 // 动态新增订阅
while (!queue.isEmpty()) { if (!queue.isEmpty()) {
queue.clear(); Runnable runnable;
while ((runnable = queue.poll()) != null) {
runnable.run();
}
consumer.unsubscribe(); consumer.unsubscribe();
consumer = client.newConsumer() consumer = client.newConsumer()
.topics(new ArrayList<>(getTopics())) .topics(new ArrayList<>(getTopics()))
@ -90,7 +85,7 @@ public abstract class PulsarConsumer extends AbstractConsumer implements IConsum
} }
@Override @Override
public void destroy(AnyValue config) { public void unsubscribe(String topic) {
} }
} }

View File

@ -101,4 +101,14 @@ public abstract class RedisConsumer extends AbstractConsumer implements IConsume
} }
} }
} }
@Override
public void unsubscribe(String topic) {
try {
writer.write("UNSUBSCRIBE " + topic + "\r\n");
writer.flush();
} catch (IOException e) {
logger.log(Level.WARNING, "", e);
}
}
} }