From 2847f761bb0cb59ceb78ba67acb2f89eca843447 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Sat, 26 Dec 2020 00:07:49 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9MessageRecord=E7=9A=84seqid?= =?UTF-8?q?=E7=94=9F=E6=88=90=E8=A7=84=E5=88=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/mq/HttpMessageClient.java | 9 +++-- src/org/redkale/mq/HttpMessageProcessor.java | 9 +++-- src/org/redkale/mq/HttpMessageResponse.java | 16 +++++--- src/org/redkale/mq/MessageAgent.java | 4 +- src/org/redkale/mq/MessageClient.java | 40 ++++++++++++++++++++ src/org/redkale/mq/MessageRecord.java | 38 ++----------------- src/org/redkale/mq/SncpMessageProcessor.java | 7 +++- src/org/redkale/mq/SncpMessageResponse.java | 12 ++++-- src/org/redkale/net/sncp/SncpClient.java | 2 +- 9 files changed, 80 insertions(+), 57 deletions(-) diff --git a/src/org/redkale/mq/HttpMessageClient.java b/src/org/redkale/mq/HttpMessageClient.java index e21407ce4..8e633d017 100644 --- a/src/org/redkale/mq/HttpMessageClient.java +++ b/src/org/redkale/mq/HttpMessageClient.java @@ -9,7 +9,7 @@ import java.lang.reflect.Type; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Logger; +import java.util.logging.*; import org.redkale.convert.json.JsonConvert; import org.redkale.net.http.*; @@ -165,19 +165,20 @@ public class HttpMessageClient extends MessageClient { } public CompletableFuture> sendMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - MessageRecord message = new MessageRecord(topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); + MessageRecord message = createMessageRecord(topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); message.userid(userid).groupid(groupid); + //if (finest) logger.log(Level.FINEST, "HttpMessageClient.sendMessage: " + message); return sendMessage(message, true, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance())); } public void broadcastMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - MessageRecord message = new MessageRecord(topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); + MessageRecord message = createMessageRecord(topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); message.userid(userid).groupid(groupid); sendMessage(message, false, counter); } public void produceMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) { - MessageRecord message = new MessageRecord(topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); + MessageRecord message = createMessageRecord(topic, null, HttpSimpleRequestCoder.getInstance().encode(request)); message.userid(userid).groupid(groupid); sendMessage(message, false, counter); } diff --git a/src/org/redkale/mq/HttpMessageProcessor.java b/src/org/redkale/mq/HttpMessageProcessor.java index c134e88dc..4b634d86f 100644 --- a/src/org/redkale/mq/HttpMessageProcessor.java +++ b/src/org/redkale/mq/HttpMessageProcessor.java @@ -31,6 +31,8 @@ public class HttpMessageProcessor implements MessageProcessor { protected final Logger logger; + protected MessageClient messageClient; + protected final MessageProducers producer; protected final NodeHttpServer server; @@ -55,11 +57,12 @@ public class HttpMessageProcessor implements MessageProcessor { if (cdl != null) cdl.countDown(); }; - public HttpMessageProcessor(Logger logger, ThreadHashExecutor workExecutor, MessageProducers producer, NodeHttpServer server, Service service, HttpServlet servlet) { + public HttpMessageProcessor(Logger logger, ThreadHashExecutor workExecutor, MessageClient messageClient, MessageProducers producer, NodeHttpServer server, Service service, HttpServlet servlet) { this.logger = logger; this.finest = logger.isLoggable(Level.FINEST); this.finer = logger.isLoggable(Level.FINER); this.fine = logger.isLoggable(Level.FINE); + this.messageClient = messageClient; this.producer = producer; this.server = server; this.service = service; @@ -98,7 +101,7 @@ public class HttpMessageProcessor implements MessageProcessor { if (multiconsumer) { request.setRequestURI(request.getRequestURI().replaceFirst(this.multimodule, this.restmodule)); } - HttpMessageResponse response = new HttpMessageResponse(context, request, callback, null, null, producer.getProducer(message)); + HttpMessageResponse response = new HttpMessageResponse(context, request, callback, null, null, messageClient, producer.getProducer(message)); servlet.execute(request, response); long o = System.currentTimeMillis() - now; if ((cha > 1000 || e > 100 || o > 1000) && fine) { @@ -111,7 +114,7 @@ public class HttpMessageProcessor implements MessageProcessor { } catch (Throwable ex) { if (message.getResptopic() != null && !message.getResptopic().isEmpty()) { HttpMessageResponse.finishHttpResult(finest, request == null ? null : request.getRespConvert(), - message, callback, producer.getProducer(message), message.getResptopic(), new HttpResult().status(500)); + message, callback, messageClient, producer.getProducer(message), message.getResptopic(), new HttpResult().status(500)); } logger.log(Level.SEVERE, HttpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex instanceof CompletionException ? ((CompletionException) ex).getCause() : ex); } diff --git a/src/org/redkale/mq/HttpMessageResponse.java b/src/org/redkale/mq/HttpMessageResponse.java index e5f6c7abd..de08d95c8 100644 --- a/src/org/redkale/mq/HttpMessageResponse.java +++ b/src/org/redkale/mq/HttpMessageResponse.java @@ -26,6 +26,8 @@ import org.redkale.util.ObjectPool; */ public class HttpMessageResponse extends HttpResponse { + protected MessageClient messageClient; + protected MessageRecord message; protected MessageProducer producer; @@ -35,26 +37,28 @@ public class HttpMessageResponse extends HttpResponse { protected Runnable callback; public HttpMessageResponse(HttpContext context, HttpMessageRequest request, Runnable callback, - ObjectPool responsePool, HttpResponseConfig config, MessageProducer producer) { + ObjectPool responsePool, HttpResponseConfig config, MessageClient messageClient, MessageProducer producer) { super(context, request, responsePool, config); this.message = request.message; this.callback = callback; + this.messageClient = messageClient; this.producer = producer; this.finest = producer.logger.isLoggable(Level.FINEST); } - public HttpMessageResponse(HttpContext context, MessageRecord message, Runnable callback, HttpResponseConfig config, MessageProducer producer) { + public HttpMessageResponse(HttpContext context, MessageRecord message, Runnable callback, HttpResponseConfig config, MessageClient messageClient, MessageProducer producer) { super(context, new HttpMessageRequest(context, message), null, config); this.message = message; this.callback = callback; + this.messageClient = messageClient; this.producer = producer; } public void finishHttpResult(HttpResult result) { - finishHttpResult(this.finest, ((HttpMessageRequest) this.request).getRespConvert(), this.message, this.callback, this.producer, message.getResptopic(), result); + finishHttpResult(this.finest, ((HttpMessageRequest) this.request).getRespConvert(), this.message, this.callback, this.messageClient, this.producer, message.getResptopic(), result); } - public static void finishHttpResult(boolean finest, Convert respConvert, MessageRecord msg, Runnable callback, MessageProducer producer, String resptopic, HttpResult result) { + public static void finishHttpResult(boolean finest, Convert respConvert, MessageRecord msg, Runnable callback, MessageClient messageClient, MessageProducer producer, String resptopic, HttpResult result) { if (callback != null) callback.run(); if (resptopic == null || resptopic.isEmpty()) return; if (result.getResult() instanceof RetResult) { @@ -66,10 +70,10 @@ public class HttpMessageResponse extends HttpResponse { if (finest) { Object innerrs = result.getResult(); if (innerrs instanceof byte[]) innerrs = new String((byte[]) innerrs, StandardCharsets.UTF_8); - producer.logger.log(Level.FINEST, "HttpMessageProcessor.process seqid=" + msg.getSeqid() + ", content: " + innerrs + ", status: " + result.getStatus() + ", headers: " + result.getHeaders()); + producer.logger.log(Level.FINEST, "HttpMessageResponse.finishHttpResult seqid=" + msg.getSeqid() + ", content: " + innerrs + ", status: " + result.getStatus() + ", headers: " + result.getHeaders()); } byte[] content = HttpResultCoder.getInstance().encode(result); - producer.apply(new MessageRecord(msg.getSeqid(), resptopic, null, content)); + producer.apply(messageClient.createMessageRecord(msg.getSeqid(), resptopic, null, content)); } @Override diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 14b5c2459..2b3567c98 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -224,7 +224,7 @@ public abstract class MessageAgent { String[] topics = generateHttpReqTopics(service); String consumerid = generateHttpConsumerid(topics, service); if (messageNodes.containsKey(consumerid)) throw new RuntimeException("consumerid(" + consumerid + ") is repeat"); - HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, this.workExecutor, getHttpProducer(), ns, service, servlet); + HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, this.workExecutor, httpMessageClient, getHttpProducer(), ns, service, servlet); this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topics, consumerid, processor))); } @@ -232,7 +232,7 @@ public abstract class MessageAgent { String topic = generateSncpReqTopic(service); String consumerid = generateSncpConsumerid(topic, service); if (messageNodes.containsKey(consumerid)) throw new RuntimeException("consumerid(" + consumerid + ") is repeat"); - SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, this.workExecutor, getSncpProducer(), ns, service, servlet); + SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, this.workExecutor, sncpMessageClient, getSncpProducer(), ns, service, servlet); this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(new String[]{topic}, consumerid, processor))); } diff --git a/src/org/redkale/mq/MessageClient.java b/src/org/redkale/mq/MessageClient.java index 38e04e995..fd55b25a0 100644 --- a/src/org/redkale/mq/MessageClient.java +++ b/src/org/redkale/mq/MessageClient.java @@ -5,9 +5,11 @@ */ package org.redkale.mq; +import java.nio.charset.StandardCharsets; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; +import org.redkale.convert.Convert; /** * @@ -22,6 +24,8 @@ public abstract class MessageClient { protected final ConcurrentHashMap respNodes = new ConcurrentHashMap<>(); + protected final AtomicLong msgSeqno = new AtomicLong(System.nanoTime()); + protected final MessageAgent messageAgent; protected MessageConsumer respConsumer; @@ -102,4 +106,40 @@ public abstract class MessageClient { } protected abstract MessageProducers getProducer(); + + public MessageRecord createMessageRecord(String resptopic, String content) { + return new MessageRecord(msgSeqno.incrementAndGet(), 1, 0, System.currentTimeMillis(), 0, null, null, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + } + + public MessageRecord createMessageRecord(String topic, String resptopic, String content) { + return new MessageRecord(msgSeqno.incrementAndGet(), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + } + + public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String content) { + return new MessageRecord(msgSeqno.incrementAndGet(), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); + } + + public MessageRecord createMessageRecord(String topic, String resptopic, Convert convert, Object bean) { + return new MessageRecord(msgSeqno.incrementAndGet(), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, convert.convertToBytes(bean)); + } + + public MessageRecord createMessageRecord(int userid, String topic, String resptopic, Convert convert, Object bean) { + return new MessageRecord(msgSeqno.incrementAndGet(), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, convert.convertToBytes(bean)); + } + + public MessageRecord createMessageRecord(int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { + return new MessageRecord(msgSeqno.incrementAndGet(), 1, 0, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean)); + } + + public MessageRecord createMessageRecord(int flag, int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { + return new MessageRecord(msgSeqno.incrementAndGet(), 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean)); + } + + public MessageRecord createMessageRecord(String topic, String resptopic, byte[] content) { + return new MessageRecord(msgSeqno.incrementAndGet(), topic, resptopic, content); + } + + public MessageRecord createMessageRecord(long seqid, String topic, String resptopic, byte[] content) { + return new MessageRecord(seqid, topic, resptopic, content); + } } diff --git a/src/org/redkale/mq/MessageRecord.java b/src/org/redkale/mq/MessageRecord.java index ff09226a3..69c4d97db 100644 --- a/src/org/redkale/mq/MessageRecord.java +++ b/src/org/redkale/mq/MessageRecord.java @@ -65,47 +65,15 @@ public class MessageRecord implements Serializable { public MessageRecord() { } - public MessageRecord(String resptopic, String content) { - this(System.nanoTime(), 1, 0, System.currentTimeMillis(), 0, null, null, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); - } - - public MessageRecord(String topic, String resptopic, String content) { - this(System.nanoTime(), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); - } - - public MessageRecord(int userid, String topic, String resptopic, String content) { - this(System.nanoTime(), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); - } - - public MessageRecord(String topic, String resptopic, Convert convert, Object bean) { - this(System.nanoTime(), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, convert.convertToBytes(bean)); - } - - public MessageRecord(int userid, String topic, String resptopic, Convert convert, Object bean) { - this(System.nanoTime(), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, convert.convertToBytes(bean)); - } - - public MessageRecord(int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { - this(System.nanoTime(), 1, 0, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean)); - } - - public MessageRecord(int flag, int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { - this(System.nanoTime(), 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean)); - } - - public MessageRecord(String topic, String resptopic, byte[] content) { - this(System.nanoTime(), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, content); - } - - public MessageRecord(long seqid, String topic, String resptopic, byte[] content) { + protected MessageRecord(long seqid, String topic, String resptopic, byte[] content) { this(seqid, 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, content); } - public MessageRecord(long seqid, int flag, int userid, String groupid, String topic, String resptopic, byte[] content) { + protected MessageRecord(long seqid, int flag, int userid, String groupid, String topic, String resptopic, byte[] content) { this(seqid, 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, content); } - public MessageRecord(long seqid, int version, int flag, long createtime, int userid, String groupid, String topic, String resptopic, byte[] content) { + protected MessageRecord(long seqid, int version, int flag, long createtime, int userid, String groupid, String topic, String resptopic, byte[] content) { this.seqid = seqid; this.version = version; this.flag = flag; diff --git a/src/org/redkale/mq/SncpMessageProcessor.java b/src/org/redkale/mq/SncpMessageProcessor.java index b24ed0ff3..ef193d846 100644 --- a/src/org/redkale/mq/SncpMessageProcessor.java +++ b/src/org/redkale/mq/SncpMessageProcessor.java @@ -31,6 +31,8 @@ public class SncpMessageProcessor implements MessageProcessor { protected final Logger logger; + protected MessageClient messageClient; + protected final MessageProducers producer; protected final NodeSncpServer server; @@ -49,11 +51,12 @@ public class SncpMessageProcessor implements MessageProcessor { if (cdl != null) cdl.countDown(); }; - public SncpMessageProcessor(Logger logger, ThreadHashExecutor workExecutor, MessageProducers producer, NodeSncpServer server, Service service, SncpServlet servlet) { + public SncpMessageProcessor(Logger logger, ThreadHashExecutor workExecutor, MessageClient messageClient, MessageProducers producer, NodeSncpServer server, Service service, SncpServlet servlet) { this.logger = logger; this.finest = logger.isLoggable(Level.FINEST); this.finer = logger.isLoggable(Level.FINER); this.fine = logger.isLoggable(Level.FINE); + this.messageClient = messageClient; this.producer = producer; this.server = server; this.service = service; @@ -84,7 +87,7 @@ public class SncpMessageProcessor implements MessageProcessor { long e = now - starttime; SncpContext context = server.getSncpServer().getContext(); SncpMessageRequest request = new SncpMessageRequest(context, message); - response = new SncpMessageResponse(context, request, callback, null, producer.getProducer(message)); + response = new SncpMessageResponse(context, request, callback, null, messageClient, producer.getProducer(message)); servlet.execute(request, response); long o = System.currentTimeMillis() - now; if ((cha > 1000 || e > 100 || o > 1000) && fine) { diff --git a/src/org/redkale/mq/SncpMessageResponse.java b/src/org/redkale/mq/SncpMessageResponse.java index 067535faf..626d11969 100644 --- a/src/org/redkale/mq/SncpMessageResponse.java +++ b/src/org/redkale/mq/SncpMessageResponse.java @@ -23,23 +23,27 @@ import org.redkale.util.ObjectPool; */ public class SncpMessageResponse extends SncpResponse { + protected MessageClient messageClient; + protected MessageRecord message; protected MessageProducer producer; protected Runnable callback; - public SncpMessageResponse(SncpContext context, SncpMessageRequest request, Runnable callback, ObjectPool responsePool, MessageProducer producer) { + public SncpMessageResponse(SncpContext context, SncpMessageRequest request, Runnable callback, ObjectPool responsePool, MessageClient messageClient, MessageProducer producer) { super(context, request, responsePool); this.message = request.message; this.callback = callback; + this.messageClient = messageClient; this.producer = producer; } - public SncpMessageResponse(SncpContext context, MessageRecord message, Runnable callback, ObjectPool responsePool, MessageProducer producer) { + public SncpMessageResponse(SncpContext context, MessageRecord message, Runnable callback, ObjectPool responsePool, MessageClient messageClient, MessageProducer producer) { super(context, new SncpMessageRequest(context, message), responsePool); this.message = message; this.callback = callback; + this.messageClient = messageClient; this.producer = producer; } @@ -49,12 +53,12 @@ public class SncpMessageResponse extends SncpResponse { if (out == null) { final byte[] result = new byte[SncpRequest.HEADER_SIZE]; fillHeader(ByteBuffer.wrap(result), 0, retcode); - producer.apply(new MessageRecord(message.getSeqid(), message.getResptopic(), null, (byte[]) null)); + producer.apply(messageClient.createMessageRecord(message.getSeqid(), message.getResptopic(), null, (byte[]) null)); return; } final int respBodyLength = out.count(); //body总长度 final byte[] result = out.toArray(); fillHeader(ByteBuffer.wrap(result), respBodyLength - HEADER_SIZE, retcode); - producer.apply(new MessageRecord(message.getSeqid(), message.getResptopic(), null, result)); + producer.apply(messageClient.createMessageRecord(message.getSeqid(), message.getResptopic(), null, result)); } } diff --git a/src/org/redkale/net/sncp/SncpClient.java b/src/org/redkale/net/sncp/SncpClient.java index 46157b13d..6c98b428b 100644 --- a/src/org/redkale/net/sncp/SncpClient.java +++ b/src/org/redkale/net/sncp/SncpClient.java @@ -287,7 +287,7 @@ public final class SncpClient { fillHeader(ByteBuffer.wrap(reqbytes), seqid, actionid, reqBodyLength); String targetTopic = action.topicTargetParamIndex >= 0 ? (String) params[action.topicTargetParamIndex] : this.topic; if (targetTopic == null) targetTopic = this.topic; - MessageRecord message = new MessageRecord(targetTopic, null, reqbytes); + MessageRecord message = messageClient.createMessageRecord(targetTopic, null, reqbytes); return messageClient.sendMessage(message).thenApply(msg -> { ByteBuffer buffer = ByteBuffer.wrap(msg.getContent()); checkResult(seqid, action, buffer);