This commit is contained in:
redkale
2023-10-14 08:13:25 +08:00
parent bad571aecb
commit 48c1263ea1
6 changed files with 63 additions and 89 deletions

View File

@@ -6,7 +6,6 @@
package org.redkale.mq;
import java.util.logging.*;
import org.redkale.boot.NodeHttpServer;
import org.redkale.net.Context;
import org.redkale.net.Request;
import org.redkale.net.Response;
@@ -25,9 +24,9 @@ import org.redkale.service.Service;
*/
public class HttpMessageServlet extends MessageServlet {
public HttpMessageServlet(MessageClient messageClient, NodeHttpServer server,
public HttpMessageServlet(MessageClient messageClient, Context context,
Service service, HttpServlet servlet, String topic) {
super(messageClient, server, service, servlet, topic);
super(messageClient, context, service, servlet, topic);
}
@Override

View File

@@ -25,7 +25,6 @@ import org.redkale.convert.Convert;
import org.redkale.convert.ConvertFactory;
import org.redkale.convert.ConvertType;
import org.redkale.convert.json.JsonConvert;
import org.redkale.net.Servlet;
import org.redkale.net.WorkThread;
import org.redkale.net.http.*;
import org.redkale.net.sncp.*;
@@ -59,6 +58,7 @@ public abstract class MessageAgent implements Resourcable {
protected AnyValue config;
@Nonnull
private ExecutorService workExecutor;
private int timeoutSeconds;
@@ -88,13 +88,13 @@ public abstract class MessageAgent implements Resourcable {
protected MessageClient sncpMessageClient;
protected MessageClientProducer clientMessageProducer;
protected MessageClientProducer messageClientProducer;
protected final ReentrantLock clientConsumerLock = new ReentrantLock();
protected final ReentrantLock clientProducerLock = new ReentrantLock();
protected MessageCoder<MessageRecord> clientMessageCoder = MessageRecordSerializer.getInstance();
protected MessageCoder<MessageRecord> messageRecordCoder = MessageRecordSerializer.getInstance();
protected ScheduledThreadPoolExecutor timeoutExecutor;
@@ -110,8 +110,8 @@ public abstract class MessageAgent implements Resourcable {
this.workExecutor = threads > 0 ? WorkThread.createExecutor(threads, "Redkale-MessageConsumerThread-[" + name + "]-%s")
: WorkThread.createWorkExecutor(Utility.cpus(), "Redkale-MessageConsumerThread-[" + name + "]-%s");
}
this.httpMessageClient = new MessageClient(this, this.httpAppRespTopic, Rest.getHttpReqTopicPrefix());
this.sncpMessageClient = new MessageClient(this, this.sncpAppRespTopic, Sncp.getSncpReqTopicPrefix());
this.httpMessageClient = new MessageClient(this, this.httpAppRespTopic);
this.sncpMessageClient = new MessageClient(this, this.sncpAppRespTopic);
String coderType = config.getValue("coder", "");
if (!coderType.trim().isEmpty()) {
@@ -125,7 +125,7 @@ public abstract class MessageAgent implements Resourcable {
if (coder instanceof Service) {
((Service) coder).init(config);
}
this.clientMessageCoder = coder;
this.messageRecordCoder = coder;
} catch (RuntimeException ex) {
throw ex;
} catch (Exception e) {
@@ -146,21 +146,23 @@ public abstract class MessageAgent implements Resourcable {
return workExecutor.submit(event);
}
public void execute(Runnable event) {
workExecutor.execute(event);
}
public void start(List<MessageConsumer> consumers) {
StringBuilder loginfo = initMessageConsumer(consumers);
startMessageConsumer();
if (loginfo.length() > 0) {
logger.log(Level.INFO, loginfo.toString());
}
this.clientMessageProducer = createMessageClientProducer("redkale-message-producer");
//----------------- MessageClient -----------------
if (this.httpRpcClient != null || !this.httpMessageClient.isEmpty()) {
this.httpMessageClient.putMessageRespProcessor();
}
if (!this.sncpMessageClient.isEmpty()) {
this.sncpMessageClient.putMessageRespProcessor();
}
this.startMessageClientConsumers();
List<String> topics = new ArrayList<>();
if (!this.httpMessageClient.isEmpty()) {
topics.addAll(this.httpMessageClient.getTopics());
@@ -168,7 +170,9 @@ public abstract class MessageAgent implements Resourcable {
if (!this.sncpMessageClient.isEmpty()) {
topics.addAll(this.sncpMessageClient.getTopics());
}
if (!topics.isEmpty()) {
if (!topics.isEmpty()) { //存在需要订阅的主题
this.messageClientProducer = startMessageClientProducer();
this.startMessageClientConsumer();
Collections.sort(topics);
loginfo = new StringBuilder();
loginfo.append(MessageClientConsumer.class.getSimpleName() + " subscribe topics:\r\n");
@@ -183,7 +187,7 @@ public abstract class MessageAgent implements Resourcable {
public void stop() {
this.stopMessageConsumer();
this.stopMessageProducer();
this.stopMessageClientConsumers();
this.stopMessageClientConsumer();
}
//Application.stop 在所有server.shutdown执行后执行
@@ -196,11 +200,11 @@ public abstract class MessageAgent implements Resourcable {
//-------------- MessageClient --------------
this.httpMessageClient.stop();
this.sncpMessageClient.stop();
if (this.clientMessageProducer != null) {
this.clientMessageProducer.stop();
if (this.messageClientProducer != null) {
this.messageClientProducer.stop();
}
if (this.clientMessageCoder instanceof Service) {
((Service) this.clientMessageCoder).destroy(config);
if (this.messageRecordCoder instanceof Service) {
((Service) this.messageRecordCoder).destroy(config);
}
if (this.timeoutExecutor != null) {
this.timeoutExecutor.shutdownNow();
@@ -290,15 +294,6 @@ public abstract class MessageAgent implements Resourcable {
return sb;
}
static String alignString(String value, int maxlen) {
StringBuilder sb = new StringBuilder(maxlen);
sb.append(value);
for (int i = 0; i < maxlen - value.length(); i++) {
sb.append(' ');
}
return sb.toString();
}
@Override
public String resourceName() {
return name;
@@ -365,19 +360,15 @@ public abstract class MessageAgent implements Resourcable {
return name;
}
public MessageCoder<MessageRecord> getClientMessageCoder() {
return this.clientMessageCoder;
public MessageCoder<MessageRecord> getMessageRecordCoder() {
return this.messageRecordCoder;
}
public MessageClientProducer getMessageClientProducer() {
return this.clientMessageProducer;
return this.messageClientProducer;
}
//
protected abstract void startMessageClientConsumers();
protected abstract void stopMessageClientConsumers();
protected abstract void startMessageConsumer();
protected abstract void stopMessageConsumer();
@@ -386,6 +377,14 @@ public abstract class MessageAgent implements Resourcable {
protected abstract void stopMessageProducer();
//----------------- MessageClient -----------------
protected abstract void startMessageClientConsumer();
protected abstract void stopMessageClientConsumer();
protected abstract MessageClientProducer startMessageClientProducer();
//---------------------------------------------------
@ResourceListener
public abstract void onResourceChange(ResourceEvent[] events);
@@ -401,9 +400,6 @@ public abstract class MessageAgent implements Resourcable {
//ServiceLoader时判断配置是否符合当前实现类
public abstract boolean acceptsConf(AnyValue config);
//创建指定topic的生产处理器
protected abstract MessageClientProducer createMessageClientProducer(String producerName);
public final void putService(NodeHttpServer ns, Service service, HttpServlet servlet) {
AutoLoad al = service.getClass().getAnnotation(AutoLoad.class);
if (al != null && !al.value() && service.getClass().getAnnotation(Local.class) != null) {
@@ -423,7 +419,7 @@ public abstract class MessageAgent implements Resourcable {
throw new RedkaleException("Application.node not config in WebSocket Cluster");
}
String topic = Rest.generateHttpReqTopic(service, this.nodeid);
MessageServlet processor = new HttpMessageServlet(this.httpMessageClient, ns, service, servlet, topic);
MessageServlet processor = new HttpMessageServlet(this.httpMessageClient, ns.getHttpServer().getContext(), service, servlet, topic);
this.httpMessageClient.putMessageServlet(processor);
}
@@ -440,7 +436,7 @@ public abstract class MessageAgent implements Resourcable {
throw new RedkaleException("Application.node not config in WebSocket Cluster");
}
String topic = Sncp.generateSncpReqTopic(service, this.nodeid);
MessageServlet processor = new SncpMessageServlet(this.sncpMessageClient, ns, service, servlet, topic);
MessageServlet processor = new SncpMessageServlet(this.sncpMessageClient, ns.getSncpServer().getContext(), service, servlet, topic);
this.sncpMessageClient.putMessageServlet(processor);
}
@@ -456,6 +452,15 @@ public abstract class MessageAgent implements Resourcable {
return Rest.getHttpRespTopicPrefix() + "app." + (Utility.isEmpty(nodeName) ? "node" : nodeName) + "-" + nodeid;
}
static String alignString(String value, int maxlen) {
StringBuilder sb = new StringBuilder(maxlen);
sb.append(value);
for (int i = 0; i < maxlen - value.length(); i++) {
sb.append(' ');
}
return sb.toString();
}
public final String getHttpAppRespTopic() {
return this.httpAppRespTopic;
}
@@ -570,26 +575,4 @@ public abstract class MessageAgent implements Resourcable {
}
protected static class MessageClientConsumerWrapper {
public final NodeServer server;
public final Service service;
public final Servlet servlet;
public final MessageServlet processor;
public final MessageClientConsumer consumer;
public MessageClientConsumerWrapper(NodeServer server, Service service, Servlet servlet, MessageServlet processor, MessageClientConsumer consumer) {
this.server = server;
this.service = service;
this.servlet = servlet;
this.processor = processor;
this.consumer = consumer;
}
}
}

View File

@@ -44,8 +44,6 @@ public class MessageClient implements ClusterRpcClient<MessageRecord, MessageRec
private final String appRespTopic;
private final String reqTopicPrefix;
protected final ReentrantLock processorLock = new ReentrantLock();
protected final AtomicLong msgSeqno;
@@ -55,10 +53,9 @@ public class MessageClient implements ClusterRpcClient<MessageRecord, MessageRec
final ConcurrentHashMap<Long, MessageRespFuture> respQueue = new ConcurrentHashMap<>();
protected MessageClient(MessageAgent messageAgent, String appRespTopic, String reqTopicPrefix) {
protected MessageClient(MessageAgent messageAgent, String appRespTopic) {
this.messageAgent = messageAgent;
this.appRespTopic = appRespTopic;
this.reqTopicPrefix = reqTopicPrefix;
this.msgSeqno = messageAgent.msgSeqno;
}
@@ -220,7 +217,7 @@ public class MessageClient implements ClusterRpcClient<MessageRecord, MessageRec
}
public MessageCoder<MessageRecord> getClientMessageCoder() {
return this.messageAgent.getClientMessageCoder();
return this.messageAgent.getMessageRecordCoder();
}
public MessageClientProducer getProducer() {
@@ -231,8 +228,4 @@ public class MessageClient implements ClusterRpcClient<MessageRecord, MessageRec
return appRespTopic;
}
public String getReqTopicPrefix() {
return reqTopicPrefix;
}
}

View File

@@ -41,14 +41,16 @@ public class MessageRespProcessor implements MessageProcessor {
if (finest) {
logger.log(Level.FINEST, getClass().getSimpleName() + ".MessageRespFuture.receive (mq.delay = " + cha + "ms, mq.seqid = " + msg.getSeqid() + ")");
}
resp.future.complete(msg);
long cha2 = System.currentTimeMillis() - now;
if ((cha > 1000 || cha2 > 1000) && logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, getClass().getSimpleName() + ".MessageRespFuture.complete (mqs.delays = " + cha + "ms, mqs.completes = " + cha2 + "ms) mqresp.msg: " + msg);
} else if ((cha > 50 || cha2 > 50) && logger.isLoggable(Level.FINER)) {
logger.log(Level.FINER, getClass().getSimpleName() + ".MessageRespFuture.complete (mq.delays = " + cha + "ms, mq.completes = " + cha2 + "ms) mqresp.msg: " + msg);
} else if (finest) {
logger.log(Level.FINEST, getClass().getSimpleName() + ".MessageRespFuture.complete (mq.delay = " + cha + "ms, mq.complete = " + cha2 + "ms) mqresp.msg: " + msg);
}
messageClient.getMessageAgent().execute(() -> {
resp.future.complete(msg);
long cha2 = System.currentTimeMillis() - now;
if ((cha > 1000 || cha2 > 1000) && logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, getClass().getSimpleName() + ".MessageRespFuture.complete (mqs.delays = " + cha + "ms, mqs.completes = " + cha2 + "ms) mqresp.msg: " + msg);
} else if ((cha > 50 || cha2 > 50) && logger.isLoggable(Level.FINER)) {
logger.log(Level.FINER, getClass().getSimpleName() + ".MessageRespFuture.complete (mq.delays = " + cha + "ms, mq.completes = " + cha2 + "ms) mqresp.msg: " + msg);
} else if (finest) {
logger.log(Level.FINEST, getClass().getSimpleName() + ".MessageRespFuture.complete (mq.delay = " + cha + "ms, mq.complete = " + cha2 + "ms) mqresp.msg: " + msg);
}
});
}
}

View File

@@ -8,7 +8,6 @@ package org.redkale.mq;
import java.util.concurrent.CompletionException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.redkale.boot.NodeServer;
import org.redkale.net.Context;
import org.redkale.net.Request;
import org.redkale.net.Response;
@@ -32,7 +31,7 @@ public abstract class MessageServlet implements MessageProcessor {
protected final MessageClient messageClient;
protected final NodeServer server;
protected final Context context;
protected final Service service;
@@ -40,9 +39,9 @@ public abstract class MessageServlet implements MessageProcessor {
protected final String topic;
public MessageServlet(MessageClient messageClient, NodeServer server, Service service, Servlet servlet, String topic) {
public MessageServlet(MessageClient messageClient, Context context, Service service, Servlet servlet, String topic) {
this.messageClient = messageClient;
this.server = server;
this.context = context;
this.service = service;
this.servlet = servlet;
this.topic = topic;
@@ -56,7 +55,6 @@ public abstract class MessageServlet implements MessageProcessor {
long now = System.currentTimeMillis();
long cha = now - message.createTime;
long e = now - time;
Context context = server.getServer().getContext();
Request request = createRequest(context, message);
response = createResponse(context, request);
//执行逻辑
@@ -83,8 +81,8 @@ public abstract class MessageServlet implements MessageProcessor {
protected abstract void onError(Response response, MessageRecord message, Throwable t);
public NodeServer getServer() {
return server;
public Context getContext() {
return context;
}
public Service getService() {

View File

@@ -5,7 +5,6 @@
*/
package org.redkale.mq;
import org.redkale.boot.NodeSncpServer;
import org.redkale.net.Context;
import org.redkale.net.Request;
import org.redkale.net.Response;
@@ -24,9 +23,9 @@ import org.redkale.service.Service;
*/
public class SncpMessageServlet extends MessageServlet {
public SncpMessageServlet(MessageClient messageClient, NodeSncpServer server,
public SncpMessageServlet(MessageClient messageClient, Context context,
Service service, SncpServlet servlet, String topic) {
super(messageClient, server, service, servlet, topic);
super(messageClient, context, service, servlet, topic);
}
@Override