diff --git a/src/main/java/org/redkale/mq/HttpMessageProcessor.java b/src/main/java/org/redkale/mq/HttpMessageClientProcessor.java similarity index 87% rename from src/main/java/org/redkale/mq/HttpMessageProcessor.java rename to src/main/java/org/redkale/mq/HttpMessageClientProcessor.java index 717d091a3..0467d06a4 100644 --- a/src/main/java/org/redkale/mq/HttpMessageProcessor.java +++ b/src/main/java/org/redkale/mq/HttpMessageClientProcessor.java @@ -23,7 +23,7 @@ import org.redkale.util.*; * * @since 2.1.0 */ -public class HttpMessageProcessor implements MessageProcessor { +public class HttpMessageClientProcessor implements MessageClientProcessor { protected final Logger logger; @@ -59,7 +59,7 @@ public class HttpMessageProcessor implements MessageProcessor { } }; - public HttpMessageProcessor(Logger logger, HttpMessageClient messageClient, MessageClientProducers producers, NodeHttpServer server, Service service, HttpServlet servlet) { + public HttpMessageClientProcessor(Logger logger, HttpMessageClient messageClient, MessageClientProducers producers, NodeHttpServer server, Service service, HttpServlet servlet) { this.logger = logger; this.messageClient = messageClient; this.producers = producers; @@ -118,7 +118,7 @@ public class HttpMessageProcessor implements MessageProcessor { HttpMessageResponse.finishHttpResult(logger.isLoggable(Level.FINEST), request == null ? null : request.getRespConvert(), null, message, callback, messageClient, producers.getProducer(message), message.getRespTopic(), new HttpResult().status(500)); } - logger.log(Level.SEVERE, HttpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex instanceof CompletionException ? ((CompletionException) ex).getCause() : ex); + logger.log(Level.SEVERE, HttpMessageClientProcessor.class.getSimpleName() + " process error, message=" + message, ex instanceof CompletionException ? ((CompletionException) ex).getCause() : ex); } } @@ -128,7 +128,7 @@ public class HttpMessageProcessor implements MessageProcessor { try { this.cdl.await(30, TimeUnit.SECONDS); } catch (Exception ex) { - logger.log(Level.SEVERE, HttpMessageProcessor.class.getSimpleName() + " commit error, restmodule=" + this.restModule, ex); + logger.log(Level.SEVERE, HttpMessageClientProcessor.class.getSimpleName() + " commit error, restmodule=" + this.restModule, ex); } this.cdl = null; } diff --git a/src/main/java/org/redkale/mq/MessageAgent.java b/src/main/java/org/redkale/mq/MessageAgent.java index 2b98e74ac..9f3fe6ea5 100644 --- a/src/main/java/org/redkale/mq/MessageAgent.java +++ b/src/main/java/org/redkale/mq/MessageAgent.java @@ -67,8 +67,8 @@ public abstract class MessageAgent implements Resourcable { protected MessageCoder messageCoder = MessageRecordCoder.getInstance(); - //本地Service消息接收处理器, key:consumer - protected HashMap messageNodes = new LinkedHashMap<>(); + //本地Service消息接收处理器, key:consumerid + protected HashMap clientConsumerNodes = new LinkedHashMap<>(); public void init(ResourceFactory factory, AnyValue config) { this.name = checkName(config.getValue("name", "")); @@ -96,7 +96,7 @@ public abstract class MessageAgent implements Resourcable { } // application (it doesn't execute completion handlers). this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> { - Thread t = new Thread(r, "Redkale-MessageAgent-Timeout-Thread"); + Thread t = new Thread(r, "Redkale-MessageAgent-[" + name + "]-Timeout-Thread"); t.setDaemon(true); return t; }); @@ -109,7 +109,7 @@ public abstract class MessageAgent implements Resourcable { public CompletableFuture> start() { final LinkedHashMap map = new LinkedHashMap<>(); final List futures = new ArrayList<>(); - this.messageNodes.values().forEach(node -> { + this.clientConsumerNodes.values().forEach(node -> { long s = System.currentTimeMillis(); futures.add(node.consumer.startup().whenComplete((r, t) -> map.put(node.consumer.consumerid, System.currentTimeMillis() - s))); }); @@ -119,7 +119,7 @@ public abstract class MessageAgent implements Resourcable { //Application.shutdown 在执行server.shutdown之前执行 public CompletableFuture stop() { List futures = new ArrayList<>(); - this.messageNodes.values().forEach(node -> { + this.clientConsumerNodes.values().forEach(node -> { futures.add(node.consumer.shutdown()); }); return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); @@ -153,7 +153,7 @@ public abstract class MessageAgent implements Resourcable { if (one != null) { consumers.add(one); } - consumers.addAll(messageNodes.values().stream().map(mcn -> mcn.consumer).collect(Collectors.toList())); + consumers.addAll(clientConsumerNodes.values().stream().map(mcn -> mcn.consumer).collect(Collectors.toList())); return consumers; } @@ -291,7 +291,7 @@ public abstract class MessageAgent implements Resourcable { public abstract boolean acceptsConf(AnyValue config); //创建指定topic的消费处理器 - public abstract MessageClientConsumer createMessageClientConsumer(String[] topics, String group, MessageProcessor processor); + public abstract MessageClientConsumer createMessageClientConsumer(String[] topics, String group, MessageClientProcessor processor); public final void putService(NodeHttpServer ns, Service service, HttpServlet servlet) { AutoLoad al = service.getClass().getAnnotation(AutoLoad.class); @@ -312,11 +312,11 @@ public abstract class MessageAgent implements Resourcable { String consumerid = generateHttpConsumerid(topics, service); httpNodesLock.lock(); try { - if (messageNodes.containsKey(consumerid)) { + if (clientConsumerNodes.containsKey(consumerid)) { throw new RedkaleException("consumerid(" + consumerid + ") is repeat"); } - HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, httpMessageClient, getHttpMessageClientProducer(), ns, service, servlet); - this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(topics, consumerid, processor))); + HttpMessageClientProcessor processor = new HttpMessageClientProcessor(this.logger, httpMessageClient, getHttpMessageClientProducer(), ns, service, servlet); + this.clientConsumerNodes.put(consumerid, new MessageClientConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(topics, consumerid, processor))); } finally { httpNodesLock.unlock(); } @@ -335,11 +335,11 @@ public abstract class MessageAgent implements Resourcable { String consumerid = generateSncpConsumerid(topic, service); sncpNodesLock.lock(); try { - if (messageNodes.containsKey(consumerid)) { + if (clientConsumerNodes.containsKey(consumerid)) { throw new RedkaleException("consumerid(" + consumerid + ") is repeat"); } - SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, sncpMessageClient, getSncpMessageClientProducer(), ns, service, servlet); - this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(new String[]{topic}, consumerid, processor))); + SncpMessageClientProcessor processor = new SncpMessageClientProcessor(this.logger, sncpMessageClient, getSncpMessageClientProducer(), ns, service, servlet); + this.clientConsumerNodes.put(consumerid, new MessageClientConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(new String[]{topic}, consumerid, processor))); } finally { sncpNodesLock.unlock(); } @@ -407,7 +407,7 @@ public abstract class MessageAgent implements Resourcable { return protocol + ".resp.node" + nodeid; } - protected static class MessageConsumerNode { + protected static class MessageClientConsumerNode { public final NodeServer server; @@ -415,11 +415,11 @@ public abstract class MessageAgent implements Resourcable { public final Servlet servlet; - public final MessageProcessor processor; + public final MessageClientProcessor processor; public final MessageClientConsumer consumer; - public MessageConsumerNode(NodeServer server, Service service, Servlet servlet, MessageProcessor processor, MessageClientConsumer consumer) { + public MessageClientConsumerNode(NodeServer server, Service service, Servlet servlet, MessageClientProcessor processor, MessageClientConsumer consumer) { this.server = server; this.service = service; this.servlet = servlet; diff --git a/src/main/java/org/redkale/mq/MessageClient.java b/src/main/java/org/redkale/mq/MessageClient.java index 316da4ccf..da328408d 100644 --- a/src/main/java/org/redkale/mq/MessageClient.java +++ b/src/main/java/org/redkale/mq/MessageClient.java @@ -67,7 +67,7 @@ public abstract class MessageClient { this.respConsumerid = "consumer-" + this.respTopic; } if (this.respConsumer == null) { - MessageProcessor processor = (msg, callback) -> { + MessageClientProcessor processor = (msg, callback) -> { long now = System.currentTimeMillis(); MessageRespFutureNode node = respNodes.remove(msg.getSeqid()); if (node == null) { diff --git a/src/main/java/org/redkale/mq/MessageClientConsumer.java b/src/main/java/org/redkale/mq/MessageClientConsumer.java index 540829284..6f6e0934a 100644 --- a/src/main/java/org/redkale/mq/MessageClientConsumer.java +++ b/src/main/java/org/redkale/mq/MessageClientConsumer.java @@ -27,13 +27,13 @@ public abstract class MessageClientConsumer { protected MessageAgent messageAgent; - protected final MessageProcessor processor; + protected final MessageClientProcessor processor; protected final Logger logger; protected volatile boolean closed; - protected MessageClientConsumer(MessageAgent messageAgent, String[] topics, final String consumerid, MessageProcessor processor) { + protected MessageClientConsumer(MessageAgent messageAgent, String[] topics, final String consumerid, MessageClientProcessor processor) { Objects.requireNonNull(messageAgent); Objects.requireNonNull(topics); Objects.requireNonNull(consumerid); @@ -45,7 +45,7 @@ public abstract class MessageClientConsumer { this.processor = processor; } - public MessageProcessor getProcessor() { + public MessageClientProcessor getProcessor() { return processor; } diff --git a/src/main/java/org/redkale/mq/MessageProcessor.java b/src/main/java/org/redkale/mq/MessageClientProcessor.java similarity index 88% rename from src/main/java/org/redkale/mq/MessageProcessor.java rename to src/main/java/org/redkale/mq/MessageClientProcessor.java index 288c0fb06..444733689 100644 --- a/src/main/java/org/redkale/mq/MessageProcessor.java +++ b/src/main/java/org/redkale/mq/MessageClientProcessor.java @@ -15,7 +15,7 @@ package org.redkale.mq; * * @since 2.1.0 */ -public interface MessageProcessor { +public interface MessageClientProcessor { default void begin(int size, long starttime) { } diff --git a/src/main/java/org/redkale/mq/SncpMessageProcessor.java b/src/main/java/org/redkale/mq/SncpMessageClientProcessor.java similarity index 86% rename from src/main/java/org/redkale/mq/SncpMessageProcessor.java rename to src/main/java/org/redkale/mq/SncpMessageClientProcessor.java index 3851384e1..4972ad775 100644 --- a/src/main/java/org/redkale/mq/SncpMessageProcessor.java +++ b/src/main/java/org/redkale/mq/SncpMessageClientProcessor.java @@ -22,7 +22,7 @@ import org.redkale.util.Traces; * * @since 2.1.0 */ -public class SncpMessageProcessor implements MessageProcessor { +public class SncpMessageClientProcessor implements MessageClientProcessor { protected final Logger logger; @@ -46,7 +46,7 @@ public class SncpMessageProcessor implements MessageProcessor { } }; - public SncpMessageProcessor(Logger logger, SncpMessageClient messageClient, MessageClientProducers producer, NodeSncpServer server, Service service, SncpServlet servlet) { + public SncpMessageClientProcessor(Logger logger, SncpMessageClient messageClient, MessageClientProducers producer, NodeSncpServer server, Service service, SncpServlet servlet) { this.logger = logger; this.messageClient = messageClient; this.producer = producer; @@ -90,7 +90,7 @@ public class SncpMessageProcessor implements MessageProcessor { if (response != null) { response.finish(SncpResponse.RETCODE_ILLSERVICEID, null); } - logger.log(Level.SEVERE, SncpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex instanceof CompletionException ? ((CompletionException) ex).getCause() : ex); + logger.log(Level.SEVERE, SncpMessageClientProcessor.class.getSimpleName() + " process error, message=" + message, ex instanceof CompletionException ? ((CompletionException) ex).getCause() : ex); } } diff --git a/src/main/java/org/redkale/mq/SncpMessageResponse.java b/src/main/java/org/redkale/mq/SncpMessageResponse.java index 59dd57fa1..e3e027d70 100644 --- a/src/main/java/org/redkale/mq/SncpMessageResponse.java +++ b/src/main/java/org/redkale/mq/SncpMessageResponse.java @@ -50,14 +50,13 @@ public class SncpMessageResponse extends SncpResponse { callback.run(); } if (out == null) { - final ByteArray result = new ByteArray(SncpHeader.HEADER_SIZE); + final ByteArray result = onlyHeaderData; fillHeader(result, 0, retcode); producer.apply(messageClient.createMessageRecord(message.getSeqid(), message.getRespTopic(), null, (byte[]) null)); return; } - final int respBodyLength = out.count(); //body总长度 final ByteArray result = out.toByteArray(); - fillHeader(result, respBodyLength - SncpHeader.HEADER_SIZE, retcode); + fillHeader(result, result.length() - SncpHeader.HEADER_SIZE, retcode); producer.apply(messageClient.createMessageRecord(message.getSeqid(), message.getRespTopic(), null, result.getBytes())); } } diff --git a/src/main/java/org/redkale/net/sncp/Sncp.java b/src/main/java/org/redkale/net/sncp/Sncp.java index dcbf82833..d907a3b75 100644 --- a/src/main/java/org/redkale/net/sncp/Sncp.java +++ b/src/main/java/org/redkale/net/sncp/Sncp.java @@ -36,7 +36,9 @@ import org.redkale.util.*; */ public abstract class Sncp { - private static final byte[] PING_BYTES = new SncpHeader(null, Uint128.ZERO, Uint128.ZERO).writeTo(new ByteArray(SncpHeader.HEADER_SIZE), null, 0, 0, 0).getBytes(); + private static final byte[] PING_BYTES = new SncpHeader(null, Uint128.ZERO, Uint128.ZERO) + .writeTo(new ByteArray(SncpHeader.HEADER_SIZE).putPlaceholder(SncpHeader.HEADER_SIZE), null, 0, 0, 0) + .getBytes(); private static final byte[] PONG_BYTES = Arrays.copyOf(PING_BYTES, PING_BYTES.length); diff --git a/src/main/java/org/redkale/net/sncp/SncpHeader.java b/src/main/java/org/redkale/net/sncp/SncpHeader.java index 2167bce49..20935ac45 100644 --- a/src/main/java/org/redkale/net/sncp/SncpHeader.java +++ b/src/main/java/org/redkale/net/sncp/SncpHeader.java @@ -118,6 +118,9 @@ public class SncpHeader { if (newAddrBytes.length != 4) { throw new SncpException("address bytes length must be 4, but " + newAddrBytes.length); } + if (array.length() < HEADER_SIZE) { + throw new SncpException("ByteArray length must more " + HEADER_SIZE); + } int offset = 0; array.putLong(offset, newSeqid); //8 offset += 8; diff --git a/src/main/java/org/redkale/net/sncp/SncpResponse.java b/src/main/java/org/redkale/net/sncp/SncpResponse.java index 774770796..20d1885eb 100644 --- a/src/main/java/org/redkale/net/sncp/SncpResponse.java +++ b/src/main/java/org/redkale/net/sncp/SncpResponse.java @@ -36,6 +36,8 @@ public class SncpResponse extends Response { protected final BsonWriter writer = new BsonWriter(); + protected final ByteArray onlyHeaderData = new ByteArray(HEADER_SIZE).putPlaceholder(HEADER_SIZE); + protected final CompletionHandler realHandler = new CompletionHandler() { @Override public void completed(Object result, Object attachment) { @@ -168,9 +170,9 @@ public class SncpResponse extends Response { //调用此方法时out已写入SncpHeader public void finish(final int retcode, final BsonWriter out) { if (out == null) { - final ByteArray buffer = new ByteArray(HEADER_SIZE); - fillHeader(buffer, 0, retcode); - finish(buffer); + final ByteArray array = onlyHeaderData; + fillHeader(array, 0, retcode); + finish(array); return; } final ByteArray array = out.toByteArray(); @@ -179,9 +181,9 @@ public class SncpResponse extends Response { finish(array); } - protected void fillHeader(ByteArray buffer, int bodyLength, int retcode) { + protected void fillHeader(ByteArray array, int bodyLength, int retcode) { SncpHeader header = request.getHeader(); - header.writeTo(buffer, this.addrBytes, this.addrPort, header.getSeqid(), bodyLength, retcode); + header.writeTo(array, this.addrBytes, this.addrPort, header.getSeqid(), bodyLength, retcode); } }