diff --git a/src/main/java/org/redkale/mq/HttpMessageServlet.java b/src/main/java/org/redkale/mq/HttpMessageServlet.java index 2a10bf9d0..0e5748976 100644 --- a/src/main/java/org/redkale/mq/HttpMessageServlet.java +++ b/src/main/java/org/redkale/mq/HttpMessageServlet.java @@ -6,7 +6,6 @@ package org.redkale.mq; import java.util.logging.*; -import org.redkale.boot.NodeHttpServer; import org.redkale.net.Context; import org.redkale.net.Request; import org.redkale.net.Response; @@ -25,9 +24,9 @@ import org.redkale.service.Service; */ public class HttpMessageServlet extends MessageServlet { - public HttpMessageServlet(MessageClient messageClient, NodeHttpServer server, + public HttpMessageServlet(MessageClient messageClient, Context context, Service service, HttpServlet servlet, String topic) { - super(messageClient, server, service, servlet, topic); + super(messageClient, context, service, servlet, topic); } @Override diff --git a/src/main/java/org/redkale/mq/MessageAgent.java b/src/main/java/org/redkale/mq/MessageAgent.java index 0322b64b6..9c70dd0d4 100644 --- a/src/main/java/org/redkale/mq/MessageAgent.java +++ b/src/main/java/org/redkale/mq/MessageAgent.java @@ -25,7 +25,6 @@ import org.redkale.convert.Convert; import org.redkale.convert.ConvertFactory; import org.redkale.convert.ConvertType; import org.redkale.convert.json.JsonConvert; -import org.redkale.net.Servlet; import org.redkale.net.WorkThread; import org.redkale.net.http.*; import org.redkale.net.sncp.*; @@ -59,6 +58,7 @@ public abstract class MessageAgent implements Resourcable { protected AnyValue config; + @Nonnull private ExecutorService workExecutor; private int timeoutSeconds; @@ -88,13 +88,13 @@ public abstract class MessageAgent implements Resourcable { protected MessageClient sncpMessageClient; - protected MessageClientProducer clientMessageProducer; + protected MessageClientProducer messageClientProducer; protected final ReentrantLock clientConsumerLock = new ReentrantLock(); protected final ReentrantLock clientProducerLock = new ReentrantLock(); - protected MessageCoder clientMessageCoder = MessageRecordSerializer.getInstance(); + protected MessageCoder messageRecordCoder = MessageRecordSerializer.getInstance(); protected ScheduledThreadPoolExecutor timeoutExecutor; @@ -110,8 +110,8 @@ public abstract class MessageAgent implements Resourcable { this.workExecutor = threads > 0 ? WorkThread.createExecutor(threads, "Redkale-MessageConsumerThread-[" + name + "]-%s") : WorkThread.createWorkExecutor(Utility.cpus(), "Redkale-MessageConsumerThread-[" + name + "]-%s"); } - this.httpMessageClient = new MessageClient(this, this.httpAppRespTopic, Rest.getHttpReqTopicPrefix()); - this.sncpMessageClient = new MessageClient(this, this.sncpAppRespTopic, Sncp.getSncpReqTopicPrefix()); + this.httpMessageClient = new MessageClient(this, this.httpAppRespTopic); + this.sncpMessageClient = new MessageClient(this, this.sncpAppRespTopic); String coderType = config.getValue("coder", ""); if (!coderType.trim().isEmpty()) { @@ -125,7 +125,7 @@ public abstract class MessageAgent implements Resourcable { if (coder instanceof Service) { ((Service) coder).init(config); } - this.clientMessageCoder = coder; + this.messageRecordCoder = coder; } catch (RuntimeException ex) { throw ex; } catch (Exception e) { @@ -146,21 +146,23 @@ public abstract class MessageAgent implements Resourcable { return workExecutor.submit(event); } + public void execute(Runnable event) { + workExecutor.execute(event); + } + public void start(List consumers) { StringBuilder loginfo = initMessageConsumer(consumers); startMessageConsumer(); if (loginfo.length() > 0) { logger.log(Level.INFO, loginfo.toString()); } - - this.clientMessageProducer = createMessageClientProducer("redkale-message-producer"); + //----------------- MessageClient ----------------- if (this.httpRpcClient != null || !this.httpMessageClient.isEmpty()) { this.httpMessageClient.putMessageRespProcessor(); } if (!this.sncpMessageClient.isEmpty()) { this.sncpMessageClient.putMessageRespProcessor(); } - this.startMessageClientConsumers(); List topics = new ArrayList<>(); if (!this.httpMessageClient.isEmpty()) { topics.addAll(this.httpMessageClient.getTopics()); @@ -168,7 +170,9 @@ public abstract class MessageAgent implements Resourcable { if (!this.sncpMessageClient.isEmpty()) { topics.addAll(this.sncpMessageClient.getTopics()); } - if (!topics.isEmpty()) { + if (!topics.isEmpty()) { //存在需要订阅的主题 + this.messageClientProducer = startMessageClientProducer(); + this.startMessageClientConsumer(); Collections.sort(topics); loginfo = new StringBuilder(); loginfo.append(MessageClientConsumer.class.getSimpleName() + " subscribe topics:\r\n"); @@ -183,7 +187,7 @@ public abstract class MessageAgent implements Resourcable { public void stop() { this.stopMessageConsumer(); this.stopMessageProducer(); - this.stopMessageClientConsumers(); + this.stopMessageClientConsumer(); } //Application.stop 在所有server.shutdown执行后执行 @@ -196,11 +200,11 @@ public abstract class MessageAgent implements Resourcable { //-------------- MessageClient -------------- this.httpMessageClient.stop(); this.sncpMessageClient.stop(); - if (this.clientMessageProducer != null) { - this.clientMessageProducer.stop(); + if (this.messageClientProducer != null) { + this.messageClientProducer.stop(); } - if (this.clientMessageCoder instanceof Service) { - ((Service) this.clientMessageCoder).destroy(config); + if (this.messageRecordCoder instanceof Service) { + ((Service) this.messageRecordCoder).destroy(config); } if (this.timeoutExecutor != null) { this.timeoutExecutor.shutdownNow(); @@ -290,15 +294,6 @@ public abstract class MessageAgent implements Resourcable { return sb; } - static String alignString(String value, int maxlen) { - StringBuilder sb = new StringBuilder(maxlen); - sb.append(value); - for (int i = 0; i < maxlen - value.length(); i++) { - sb.append(' '); - } - return sb.toString(); - } - @Override public String resourceName() { return name; @@ -365,19 +360,15 @@ public abstract class MessageAgent implements Resourcable { return name; } - public MessageCoder getClientMessageCoder() { - return this.clientMessageCoder; + public MessageCoder getMessageRecordCoder() { + return this.messageRecordCoder; } public MessageClientProducer getMessageClientProducer() { - return this.clientMessageProducer; + return this.messageClientProducer; } // - protected abstract void startMessageClientConsumers(); - - protected abstract void stopMessageClientConsumers(); - protected abstract void startMessageConsumer(); protected abstract void stopMessageConsumer(); @@ -386,6 +377,14 @@ public abstract class MessageAgent implements Resourcable { protected abstract void stopMessageProducer(); + //----------------- MessageClient ----------------- + protected abstract void startMessageClientConsumer(); + + protected abstract void stopMessageClientConsumer(); + + protected abstract MessageClientProducer startMessageClientProducer(); + + //--------------------------------------------------- @ResourceListener public abstract void onResourceChange(ResourceEvent[] events); @@ -401,9 +400,6 @@ public abstract class MessageAgent implements Resourcable { //ServiceLoader时判断配置是否符合当前实现类 public abstract boolean acceptsConf(AnyValue config); - //创建指定topic的生产处理器 - protected abstract MessageClientProducer createMessageClientProducer(String producerName); - public final void putService(NodeHttpServer ns, Service service, HttpServlet servlet) { AutoLoad al = service.getClass().getAnnotation(AutoLoad.class); if (al != null && !al.value() && service.getClass().getAnnotation(Local.class) != null) { @@ -423,7 +419,7 @@ public abstract class MessageAgent implements Resourcable { throw new RedkaleException("Application.node not config in WebSocket Cluster"); } String topic = Rest.generateHttpReqTopic(service, this.nodeid); - MessageServlet processor = new HttpMessageServlet(this.httpMessageClient, ns, service, servlet, topic); + MessageServlet processor = new HttpMessageServlet(this.httpMessageClient, ns.getHttpServer().getContext(), service, servlet, topic); this.httpMessageClient.putMessageServlet(processor); } @@ -440,7 +436,7 @@ public abstract class MessageAgent implements Resourcable { throw new RedkaleException("Application.node not config in WebSocket Cluster"); } String topic = Sncp.generateSncpReqTopic(service, this.nodeid); - MessageServlet processor = new SncpMessageServlet(this.sncpMessageClient, ns, service, servlet, topic); + MessageServlet processor = new SncpMessageServlet(this.sncpMessageClient, ns.getSncpServer().getContext(), service, servlet, topic); this.sncpMessageClient.putMessageServlet(processor); } @@ -456,6 +452,15 @@ public abstract class MessageAgent implements Resourcable { return Rest.getHttpRespTopicPrefix() + "app." + (Utility.isEmpty(nodeName) ? "node" : nodeName) + "-" + nodeid; } + static String alignString(String value, int maxlen) { + StringBuilder sb = new StringBuilder(maxlen); + sb.append(value); + for (int i = 0; i < maxlen - value.length(); i++) { + sb.append(' '); + } + return sb.toString(); + } + public final String getHttpAppRespTopic() { return this.httpAppRespTopic; } @@ -570,26 +575,4 @@ public abstract class MessageAgent implements Resourcable { } - protected static class MessageClientConsumerWrapper { - - public final NodeServer server; - - public final Service service; - - public final Servlet servlet; - - public final MessageServlet processor; - - public final MessageClientConsumer consumer; - - public MessageClientConsumerWrapper(NodeServer server, Service service, Servlet servlet, MessageServlet processor, MessageClientConsumer consumer) { - this.server = server; - this.service = service; - this.servlet = servlet; - this.processor = processor; - this.consumer = consumer; - } - - } - } diff --git a/src/main/java/org/redkale/mq/MessageClient.java b/src/main/java/org/redkale/mq/MessageClient.java index 9227e7424..cde302dad 100644 --- a/src/main/java/org/redkale/mq/MessageClient.java +++ b/src/main/java/org/redkale/mq/MessageClient.java @@ -44,8 +44,6 @@ public class MessageClient implements ClusterRpcClient respQueue = new ConcurrentHashMap<>(); - protected MessageClient(MessageAgent messageAgent, String appRespTopic, String reqTopicPrefix) { + protected MessageClient(MessageAgent messageAgent, String appRespTopic) { this.messageAgent = messageAgent; this.appRespTopic = appRespTopic; - this.reqTopicPrefix = reqTopicPrefix; this.msgSeqno = messageAgent.msgSeqno; } @@ -220,7 +217,7 @@ public class MessageClient implements ClusterRpcClient getClientMessageCoder() { - return this.messageAgent.getClientMessageCoder(); + return this.messageAgent.getMessageRecordCoder(); } public MessageClientProducer getProducer() { @@ -231,8 +228,4 @@ public class MessageClient implements ClusterRpcClient 1000 || cha2 > 1000) && logger.isLoggable(Level.FINE)) { - logger.log(Level.FINE, getClass().getSimpleName() + ".MessageRespFuture.complete (mqs.delays = " + cha + "ms, mqs.completes = " + cha2 + "ms) mqresp.msg: " + msg); - } else if ((cha > 50 || cha2 > 50) && logger.isLoggable(Level.FINER)) { - logger.log(Level.FINER, getClass().getSimpleName() + ".MessageRespFuture.complete (mq.delays = " + cha + "ms, mq.completes = " + cha2 + "ms) mqresp.msg: " + msg); - } else if (finest) { - logger.log(Level.FINEST, getClass().getSimpleName() + ".MessageRespFuture.complete (mq.delay = " + cha + "ms, mq.complete = " + cha2 + "ms) mqresp.msg: " + msg); - } + messageClient.getMessageAgent().execute(() -> { + resp.future.complete(msg); + long cha2 = System.currentTimeMillis() - now; + if ((cha > 1000 || cha2 > 1000) && logger.isLoggable(Level.FINE)) { + logger.log(Level.FINE, getClass().getSimpleName() + ".MessageRespFuture.complete (mqs.delays = " + cha + "ms, mqs.completes = " + cha2 + "ms) mqresp.msg: " + msg); + } else if ((cha > 50 || cha2 > 50) && logger.isLoggable(Level.FINER)) { + logger.log(Level.FINER, getClass().getSimpleName() + ".MessageRespFuture.complete (mq.delays = " + cha + "ms, mq.completes = " + cha2 + "ms) mqresp.msg: " + msg); + } else if (finest) { + logger.log(Level.FINEST, getClass().getSimpleName() + ".MessageRespFuture.complete (mq.delay = " + cha + "ms, mq.complete = " + cha2 + "ms) mqresp.msg: " + msg); + } + }); } } diff --git a/src/main/java/org/redkale/mq/MessageServlet.java b/src/main/java/org/redkale/mq/MessageServlet.java index 3a1993d14..96a1c0dcd 100644 --- a/src/main/java/org/redkale/mq/MessageServlet.java +++ b/src/main/java/org/redkale/mq/MessageServlet.java @@ -8,7 +8,6 @@ package org.redkale.mq; import java.util.concurrent.CompletionException; import java.util.logging.Level; import java.util.logging.Logger; -import org.redkale.boot.NodeServer; import org.redkale.net.Context; import org.redkale.net.Request; import org.redkale.net.Response; @@ -32,7 +31,7 @@ public abstract class MessageServlet implements MessageProcessor { protected final MessageClient messageClient; - protected final NodeServer server; + protected final Context context; protected final Service service; @@ -40,9 +39,9 @@ public abstract class MessageServlet implements MessageProcessor { protected final String topic; - public MessageServlet(MessageClient messageClient, NodeServer server, Service service, Servlet servlet, String topic) { + public MessageServlet(MessageClient messageClient, Context context, Service service, Servlet servlet, String topic) { this.messageClient = messageClient; - this.server = server; + this.context = context; this.service = service; this.servlet = servlet; this.topic = topic; @@ -56,7 +55,6 @@ public abstract class MessageServlet implements MessageProcessor { long now = System.currentTimeMillis(); long cha = now - message.createTime; long e = now - time; - Context context = server.getServer().getContext(); Request request = createRequest(context, message); response = createResponse(context, request); //执行逻辑 @@ -83,8 +81,8 @@ public abstract class MessageServlet implements MessageProcessor { protected abstract void onError(Response response, MessageRecord message, Throwable t); - public NodeServer getServer() { - return server; + public Context getContext() { + return context; } public Service getService() { diff --git a/src/main/java/org/redkale/mq/SncpMessageServlet.java b/src/main/java/org/redkale/mq/SncpMessageServlet.java index 1671bceef..1d1545cce 100644 --- a/src/main/java/org/redkale/mq/SncpMessageServlet.java +++ b/src/main/java/org/redkale/mq/SncpMessageServlet.java @@ -5,7 +5,6 @@ */ package org.redkale.mq; -import org.redkale.boot.NodeSncpServer; import org.redkale.net.Context; import org.redkale.net.Request; import org.redkale.net.Response; @@ -24,9 +23,9 @@ import org.redkale.service.Service; */ public class SncpMessageServlet extends MessageServlet { - public SncpMessageServlet(MessageClient messageClient, NodeSncpServer server, + public SncpMessageServlet(MessageClient messageClient, Context context, Service service, SncpServlet servlet, String topic) { - super(messageClient, server, service, servlet, topic); + super(messageClient, context, service, servlet, topic); } @Override