This commit is contained in:
Redkale
2020-11-11 18:00:37 +08:00
parent 51e4d44fc1
commit 2f6eb0908e
2 changed files with 7 additions and 1 deletions

View File

@@ -80,7 +80,7 @@ public class HttpMessageProcessor implements MessageProcessor {
private void execute(final MessageRecord message, final Runnable callback) {
HttpMessageRequest request = null;
try {
if (finest) logger.log(Level.FINEST, "HttpMessageProcessor.process message: " + message);
if (finest) logger.log(Level.FINEST, "HttpMessageProcessor.process(mq.delay=" + (System.currentTimeMillis() - message.createtime) + "ms) message: " + message);
if (multiconsumer) message.setResptopic(null); //不容许有响应
HttpContext context = server.getHttpServer().getContext();
request = new HttpMessageRequest(context, message);

View File

@@ -30,8 +30,11 @@ public abstract class MessageClient {
protected String respConsumerid;
protected boolean finest;
protected MessageClient(MessageAgent messageAgent) {
this.messageAgent = messageAgent;
this.finest = messageAgent == null ? false : messageAgent.logger.isLoggable(Level.FINEST);
}
protected CompletableFuture<Void> close() {
@@ -47,6 +50,7 @@ public abstract class MessageClient {
if (this.respConsumerid == null) this.respConsumerid = "consumer-" + this.respTopic;
if (this.respConsumer == null) {
MessageProcessor processor = (msg, callback) -> {
long now = System.currentTimeMillis();
MessageRespFutureNode node = respNodes.remove(msg.getSeqid());
if (node == null) {
messageAgent.logger.log(Level.WARNING, MessageClient.this.getClass().getSimpleName() + " process " + msg + " error not found msgnode");
@@ -55,6 +59,8 @@ public abstract class MessageClient {
AtomicLong ncer = node.getCounter();
if (ncer != null) ncer.decrementAndGet();
node.future.complete(msg);
if (finest) messageAgent.logger.log(Level.FINEST, "MessageRespFutureNode.process(mq.delay=" + (now - msg.createtime) + "ms) message: " + message);
};
MessageConsumer one = messageAgent.createConsumer(new String[]{respTopic}, respConsumerid, processor);
one.startup().join();