This commit is contained in:
Redkale
2020-09-03 14:59:35 +08:00
parent 156af0e2a4
commit 1c684a4e32
2 changed files with 13 additions and 12 deletions

View File

@@ -55,7 +55,7 @@ public abstract class MessageAgent {
protected ThreadHashExecutor workExecutor;
//本地Service消息接收处理器 key:consumer
//本地Service消息接收处理器 key:respConsumer
protected HashMap<String, MessageConsumerNode> messageNodes = new LinkedHashMap<>();
public void init(AnyValue config) {
@@ -104,9 +104,9 @@ public abstract class MessageAgent {
protected List<MessageConsumer> getAllMessageConsumer() {
List<MessageConsumer> consumers = new ArrayList<>();
MessageConsumer one = this.httpMessageClient == null ? null : this.httpMessageClient.consumer;
MessageConsumer one = this.httpMessageClient == null ? null : this.httpMessageClient.respConsumer;
if (one != null) consumers.add(one);
one = this.sncpMessageClient == null ? null : this.sncpMessageClient.consumer;
one = this.sncpMessageClient == null ? null : this.sncpMessageClient.respConsumer;
if (one != null) consumers.add(one);
consumers.addAll(messageNodes.values().stream().map(mcn -> mcn.consumer).collect(Collectors.toList()));
return consumers;
@@ -237,7 +237,7 @@ public abstract class MessageAgent {
return "sncp.req." + Sncp.getResourceType(service).getSimpleName().replaceAll("Service.*$", "").toLowerCase() + (resname.isEmpty() ? "" : ("-" + resname));
}
//格式: consumer-sncp.req.user 不提供外部使用
//格式: respConsumer-sncp.req.user 不提供外部使用
protected final String generateSncpConsumerid(String topic, Service service) {
return "consumer-" + topic;
}
@@ -271,7 +271,7 @@ public abstract class MessageAgent {
return new String[]{"http.req." + module + (resname.isEmpty() ? "" : ("-" + resname))};
}
//格式: consumer-http.req.user
//格式: respConsumer-http.req.user
protected String generateHttpConsumerid(String[] topics, Service service) {
String resname = Sncp.getResourceName(service);
String key = Rest.getRestModule(service).toLowerCase();

View File

@@ -25,7 +25,7 @@ public abstract class MessageClient {
protected final MessageAgent messageAgent;
protected MessageConsumer consumer;
protected MessageConsumer respConsumer;
protected String respTopic;
@@ -38,29 +38,30 @@ public abstract class MessageClient {
}
protected CompletableFuture<Void> close() {
if (this.consumer == null) return CompletableFuture.completedFuture(null);
return this.consumer.shutdown();
if (this.respConsumer == null) return CompletableFuture.completedFuture(null);
return this.respConsumer.shutdown();
}
protected CompletableFuture<MessageRecord> sendMessage(MessageRecord message, boolean needresp, AtomicLong counter) {
CompletableFuture<MessageRecord> future = new CompletableFuture<>();
try {
if (this.consumer == null) {
if (this.respConsumer == null) {
synchronized (this) {
if (this.respConsumerid == null) this.respConsumerid = "consumer-" + this.respTopic;
if (this.consumer == null) {
if (this.respConsumer == null) {
MessageProcessor processor = (msg, callback) -> {
MessageRespFutureNode node = respNodes.remove(msg.getSeqid());
if (node == null) {
messageAgent.logger.log(Level.WARNING, MessageClient.this.getClass().getSimpleName() + " process " + msg + " error not found msgnode");
return;
}
if (node.getCounter() != null) node.getCounter().decrementAndGet();
AtomicLong ncer = node.getCounter();
if (ncer != null) ncer.decrementAndGet();
node.future.complete(msg);
};
MessageConsumer one = messageAgent.createConsumer(new String[]{respTopic}, respConsumerid, processor);
one.startup().join();
this.consumer = one;
this.respConsumer = one;
}
}
}