This commit is contained in:
Redkale
2020-06-09 21:31:08 +08:00
parent a58c13cd9f
commit bd4343c5d8

View File

@@ -44,10 +44,6 @@ public abstract class MessageAgent {
protected SncpMessageClient sncpMessageClient;
//protected MessageConsumer sncpRespConsumer;
//protected SncpRespProcessor sncpRespProcessor;
//sncpRespConsumer启动耗时 小于0表示未启动
//protected long sncpRespStartms = -1;
//本地Service消息接收处理器 key:topic
protected HashMap<String, MessageConsumerNode> messageNodes = new LinkedHashMap<>();
@@ -57,24 +53,8 @@ public abstract class MessageAgent {
this.sncpMessageClient = new SncpMessageClient(this);
}
//ServiceLoader时判断配置是否符合当前实现类
public abstract boolean match(AnyValue config);
// public final CompletableFuture<MessageRecord> createSncpRespFuture2(AtomicLong counter, MessageRecord message) {
// return this.sncpRespProcessor.createFuture2(message.getSeqid(), counter);
// }
//
// public final synchronized void startSncpRespConsumer() {
// if (this.sncpRespStartms >= 0) return;
// long s = System.currentTimeMillis();
// if (this.sncpRespConsumer != null) {
// this.sncpRespConsumer.startup().join();
// }
// this.sncpRespStartms = System.currentTimeMillis() - s;
// }
public CompletableFuture<Map<String, Long>> start() {
final LinkedHashMap<String, Long> map = new LinkedHashMap<>();
//if (this.sncpRespStartms >= 0) map.put(this.sncpRespConsumer.topic, this.sncpRespStartms);
final List<CompletableFuture> futures = new ArrayList<>();
this.messageNodes.values().forEach(node -> {
long s = System.currentTimeMillis();
@@ -94,7 +74,6 @@ public abstract class MessageAgent {
//Application.shutdown 在所有server.shutdown执行后执行
public void destroy(AnyValue config) {
//if (this.sncpRespConsumer != null) this.sncpRespConsumer.shutdown().join();
this.httpMessageClient.close().join();
this.sncpMessageClient.close().join();
if (this.producer != null) this.producer.shutdown().join();
@@ -156,26 +135,12 @@ public abstract class MessageAgent {
//查询所有topic
public abstract List<String> queryTopic();
//ServiceLoader时判断配置是否符合当前实现类
public abstract boolean match(AnyValue config);
//创建指定topic的消费处理器
public abstract MessageConsumer createConsumer(String topic, MessageProcessor processor);
// public final synchronized void putSncpResp(NodeSncpServer ns) {
// if (this.sncpRespConsumer != null) return;
// this.sncpRespProcessor = new SncpRespProcessor(this.logger, this);
// this.sncpRespConsumer = createConsumer(generateSncpRespTopic(), sncpRespProcessor);
// }
//
// public CompletableFuture<MessageRecord> sendRemoteSncp(AtomicLong counter, MessageRecord message) {
// if (this.sncpRespConsumer == null) {
// CompletableFuture future = new CompletableFuture();
// future.completeExceptionally(new RuntimeException("Not open sncp consumer"));
// return future;
// }
// message.setFormat(ConvertType.BSON);
// message.setResptopic(generateSncpRespTopic());
// getProducer().apply(message);
// return this.sncpRespProcessor.createFuture(message.getSeqid(), counter);
// }
public final synchronized void putService(NodeHttpServer ns, Service service, HttpServlet servlet) {
String topic = generateHttpReqTopic(service);
if (messageNodes.containsKey(topic)) throw new RuntimeException("topic(" + topic + ") is repeat");