mq优化命名

This commit is contained in:
redkale
2023-04-03 08:48:43 +08:00
parent 8727b5165a
commit 188a74423b
11 changed files with 51 additions and 60 deletions

View File

@@ -201,7 +201,7 @@ public class HttpMessageClient extends MessageClient {
} }
@Override @Override
protected MessageProducers getProducer() { protected MessageClientProducers getProducer() {
return messageAgent.getHttpProducer(); return messageAgent.getHttpMessageClientProducer();
} }
} }

View File

@@ -29,7 +29,7 @@ public class HttpMessageProcessor implements MessageProcessor {
protected HttpMessageClient messageClient; protected HttpMessageClient messageClient;
protected final MessageProducers producers; protected final MessageClientProducers producers;
protected final NodeHttpServer server; protected final NodeHttpServer server;
@@ -59,7 +59,7 @@ public class HttpMessageProcessor implements MessageProcessor {
} }
}; };
public HttpMessageProcessor(Logger logger, HttpMessageClient messageClient, MessageProducers producers, NodeHttpServer server, Service service, HttpServlet servlet) { public HttpMessageProcessor(Logger logger, HttpMessageClient messageClient, MessageClientProducers producers, NodeHttpServer server, Service service, HttpServlet servlet) {
this.logger = logger; this.logger = logger;
this.messageClient = messageClient; this.messageClient = messageClient;
this.producers = producers; this.producers = producers;
@@ -134,7 +134,7 @@ public class HttpMessageProcessor implements MessageProcessor {
} }
} }
public MessageProducers getProducer() { public MessageClientProducers getProducer() {
return producers; return producers;
} }

View File

@@ -34,7 +34,7 @@ public class HttpMessageResponse extends HttpResponse {
protected MessageRecord message; protected MessageRecord message;
protected MessageProducer producer; protected MessageClientProducer producer;
protected Runnable callback; protected Runnable callback;
@@ -45,16 +45,7 @@ public class HttpMessageResponse extends HttpResponse {
this.messageClient = messageClient; this.messageClient = messageClient;
} }
// public HttpMessageResponse(HttpContext context, HttpMessageRequest request, Runnable callback, public void prepare(MessageRecord message, Runnable callback, MessageClientProducer producer) {
// HttpResponseConfig config, HttpMessageClient messageClient, MessageProducer producer) {
// super(context, request, config);
// this.message = request.message;
// this.callback = callback;
// this.messageClient = messageClient;
// this.producer = producer;
// this.finest = producer.logger.isLoggable(Level.FINEST);
// }
public void prepare(MessageRecord message, Runnable callback, MessageProducer producer) {
((HttpMessageRequest) request).prepare(message); ((HttpMessageRequest) request).prepare(message);
this.message = message; this.message = message;
this.callback = callback; this.callback = callback;
@@ -73,7 +64,7 @@ public class HttpMessageResponse extends HttpResponse {
finishHttpResult(producer.logger.isLoggable(Level.FINEST), respConvert == null ? ((HttpMessageRequest) this.request).getRespConvert() : respConvert, type, this.message, this.callback, this.messageClient, this.producer, message.getRespTopic(), result); finishHttpResult(producer.logger.isLoggable(Level.FINEST), respConvert == null ? ((HttpMessageRequest) this.request).getRespConvert() : respConvert, type, this.message, this.callback, this.messageClient, this.producer, message.getRespTopic(), result);
} }
public static void finishHttpResult(boolean finest, Convert respConvert, Type type, MessageRecord msg, Runnable callback, MessageClient messageClient, MessageProducer producer, String resptopic, HttpResult result) { public static void finishHttpResult(boolean finest, Convert respConvert, Type type, MessageRecord msg, Runnable callback, MessageClient messageClient, MessageClientProducer producer, String resptopic, HttpResult result) {
if (callback != null) { if (callback != null) {
callback.run(); callback.run();
} }

View File

@@ -43,9 +43,9 @@ public abstract class MessageAgent implements Resourcable {
protected AnyValue config; protected AnyValue config;
protected MessageProducers httpProducer; protected MessageClientProducers httpProducer;
protected MessageProducers sncpProducer; protected MessageClientProducers sncpProducer;
protected final ReentrantLock httpProducerLock = new ReentrantLock(); protected final ReentrantLock httpProducerLock = new ReentrantLock();
@@ -143,9 +143,9 @@ public abstract class MessageAgent implements Resourcable {
} }
} }
protected List<MessageConsumer> getAllMessageConsumer() { protected List<MessageClientConsumer> getMessageClientConsumers() {
List<MessageConsumer> consumers = new ArrayList<>(); List<MessageClientConsumer> consumers = new ArrayList<>();
MessageConsumer one = this.httpMessageClient == null ? null : this.httpMessageClient.respConsumer; MessageClientConsumer one = this.httpMessageClient == null ? null : this.httpMessageClient.respConsumer;
if (one != null) { if (one != null) {
consumers.add(one); consumers.add(one);
} }
@@ -157,15 +157,15 @@ public abstract class MessageAgent implements Resourcable {
return consumers; return consumers;
} }
protected List<MessageProducer> getAllMessageProducer() { protected List<MessageClientProducer> getMessageClientProducers() {
List<MessageProducer> producers = new ArrayList<>(); List<MessageClientProducer> producers = new ArrayList<>();
if (this.httpProducer != null) { if (this.httpProducer != null) {
producers.addAll(Utility.ofList(this.httpProducer.producers)); producers.addAll(Utility.ofList(this.httpProducer.producers));
} }
if (this.sncpProducer != null) { if (this.sncpProducer != null) {
producers.addAll(Utility.ofList(this.sncpProducer.producers)); producers.addAll(Utility.ofList(this.sncpProducer.producers));
} }
MessageProducers one = this.httpMessageClient == null ? null : this.httpMessageClient.getProducer(); MessageClientProducers one = this.httpMessageClient == null ? null : this.httpMessageClient.getProducer();
if (one != null) { if (one != null) {
producers.addAll(Utility.ofList(one.producers)); producers.addAll(Utility.ofList(one.producers));
} }
@@ -225,15 +225,15 @@ public abstract class MessageAgent implements Resourcable {
} }
//获取指定topic的生产处理器 //获取指定topic的生产处理器
public MessageProducers getSncpProducer() { public MessageClientProducers getSncpMessageClientProducer() {
if (this.sncpProducer == null) { if (this.sncpProducer == null) {
sncpProducerLock.lock(); sncpProducerLock.lock();
try { try {
if (this.sncpProducer == null) { if (this.sncpProducer == null) {
long s = System.currentTimeMillis(); long s = System.currentTimeMillis();
MessageProducer[] producers = new MessageProducer[producerCount]; MessageClientProducer[] producers = new MessageClientProducer[producerCount];
for (int i = 0; i < producers.length; i++) { for (int i = 0; i < producers.length; i++) {
MessageProducer producer = createProducer("SncpProducer"); MessageClientProducer producer = createMessageClientProducer("SncpProducer");
producer.startup().join(); producer.startup().join();
producers[i] = producer; producers[i] = producer;
} }
@@ -241,7 +241,7 @@ public abstract class MessageAgent implements Resourcable {
if (logger.isLoggable(Level.FINEST)) { if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "MessageAgent.SncpProducer startup all in " + e + "ms"); logger.log(Level.FINEST, "MessageAgent.SncpProducer startup all in " + e + "ms");
} }
this.sncpProducer = new MessageProducers(producers); this.sncpProducer = new MessageClientProducers(producers);
} }
} finally { } finally {
sncpProducerLock.unlock(); sncpProducerLock.unlock();
@@ -250,15 +250,15 @@ public abstract class MessageAgent implements Resourcable {
return this.sncpProducer; return this.sncpProducer;
} }
public MessageProducers getHttpProducer() { public MessageClientProducers getHttpMessageClientProducer() {
if (this.httpProducer == null) { if (this.httpProducer == null) {
httpProducerLock.lock(); httpProducerLock.lock();
try { try {
if (this.httpProducer == null) { if (this.httpProducer == null) {
long s = System.currentTimeMillis(); long s = System.currentTimeMillis();
MessageProducer[] producers = new MessageProducer[producerCount]; MessageClientProducer[] producers = new MessageClientProducer[producerCount];
for (int i = 0; i < producers.length; i++) { for (int i = 0; i < producers.length; i++) {
MessageProducer producer = createProducer("HttpProducer"); MessageClientProducer producer = createMessageClientProducer("HttpProducer");
producer.startup().join(); producer.startup().join();
producers[i] = producer; producers[i] = producer;
} }
@@ -266,7 +266,7 @@ public abstract class MessageAgent implements Resourcable {
if (logger.isLoggable(Level.FINEST)) { if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "MessageAgent.HttpProducer startup all in " + e + "ms"); logger.log(Level.FINEST, "MessageAgent.HttpProducer startup all in " + e + "ms");
} }
this.httpProducer = new MessageProducers(producers); this.httpProducer = new MessageClientProducers(producers);
} }
} finally { } finally {
httpProducerLock.unlock(); httpProducerLock.unlock();
@@ -276,7 +276,7 @@ public abstract class MessageAgent implements Resourcable {
} }
//创建指定topic的生产处理器 //创建指定topic的生产处理器
protected abstract MessageProducer createProducer(String name); protected abstract MessageClientProducer createMessageClientProducer(String name);
//创建topic如果已存在则跳过 //创建topic如果已存在则跳过
public abstract boolean createTopic(String... topics); public abstract boolean createTopic(String... topics);
@@ -291,7 +291,7 @@ public abstract class MessageAgent implements Resourcable {
public abstract boolean acceptsConf(AnyValue config); public abstract boolean acceptsConf(AnyValue config);
//创建指定topic的消费处理器 //创建指定topic的消费处理器
public abstract MessageConsumer createConsumer(String[] topics, String group, MessageProcessor processor); public abstract MessageClientConsumer createMessageClientConsumer(String[] topics, String group, MessageProcessor processor);
public final void putService(NodeHttpServer ns, Service service, HttpServlet servlet) { public final void putService(NodeHttpServer ns, Service service, HttpServlet servlet) {
AutoLoad al = service.getClass().getAnnotation(AutoLoad.class); AutoLoad al = service.getClass().getAnnotation(AutoLoad.class);
@@ -315,8 +315,8 @@ public abstract class MessageAgent implements Resourcable {
if (messageNodes.containsKey(consumerid)) { if (messageNodes.containsKey(consumerid)) {
throw new RedkaleException("consumerid(" + consumerid + ") is repeat"); throw new RedkaleException("consumerid(" + consumerid + ") is repeat");
} }
HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, httpMessageClient, getHttpProducer(), ns, service, servlet); HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, httpMessageClient, getHttpMessageClientProducer(), ns, service, servlet);
this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topics, consumerid, processor))); this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(topics, consumerid, processor)));
} finally { } finally {
httpNodesLock.unlock(); httpNodesLock.unlock();
} }
@@ -338,8 +338,8 @@ public abstract class MessageAgent implements Resourcable {
if (messageNodes.containsKey(consumerid)) { if (messageNodes.containsKey(consumerid)) {
throw new RedkaleException("consumerid(" + consumerid + ") is repeat"); throw new RedkaleException("consumerid(" + consumerid + ") is repeat");
} }
SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, sncpMessageClient, getSncpProducer(), ns, service, servlet); SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, sncpMessageClient, getSncpMessageClientProducer(), ns, service, servlet);
this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(new String[]{topic}, consumerid, processor))); this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(new String[]{topic}, consumerid, processor)));
} finally { } finally {
sncpNodesLock.unlock(); sncpNodesLock.unlock();
} }
@@ -417,9 +417,9 @@ public abstract class MessageAgent implements Resourcable {
public final MessageProcessor processor; public final MessageProcessor processor;
public final MessageConsumer consumer; public final MessageClientConsumer consumer;
public MessageConsumerNode(NodeServer server, Service service, Servlet servlet, MessageProcessor processor, MessageConsumer consumer) { public MessageConsumerNode(NodeServer server, Service service, Servlet servlet, MessageProcessor processor, MessageClientConsumer consumer) {
this.server = server; this.server = server;
this.service = service; this.service = service;
this.servlet = servlet; this.servlet = servlet;

View File

@@ -35,7 +35,7 @@ public abstract class MessageClient {
protected final AtomicLong msgSeqno; protected final AtomicLong msgSeqno;
protected MessageConsumer respConsumer; protected MessageClientConsumer respConsumer;
protected String respTopic; protected String respTopic;
@@ -96,7 +96,7 @@ public abstract class MessageClient {
} }
}; };
long ones = System.currentTimeMillis(); long ones = System.currentTimeMillis();
MessageConsumer one = messageAgent.createConsumer(new String[]{respTopic}, respConsumerid, processor); MessageClientConsumer one = messageAgent.createMessageClientConsumer(new String[]{respTopic}, respConsumerid, processor);
one.startup().join(); one.startup().join();
long onee = System.currentTimeMillis() - ones; long onee = System.currentTimeMillis() - ones;
if (finest) { if (finest) {
@@ -136,7 +136,7 @@ public abstract class MessageClient {
return message; return message;
} }
protected abstract MessageProducers getProducer(); protected abstract MessageClientProducers getProducer();
public MessageRecord createMessageRecord(String resptopic, String content) { public MessageRecord createMessageRecord(String resptopic, String content) {
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, null, resptopic, Traces.currTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, null, resptopic, Traces.currTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8));

View File

@@ -19,7 +19,7 @@ import java.util.logging.Logger;
* *
* @since 2.1.0 * @since 2.1.0
*/ */
public abstract class MessageConsumer { public abstract class MessageClientConsumer {
protected final String[] topics; protected final String[] topics;
@@ -33,7 +33,7 @@ public abstract class MessageConsumer {
protected volatile boolean closed; protected volatile boolean closed;
protected MessageConsumer(MessageAgent messageAgent, String[] topics, final String consumerid, MessageProcessor processor) { protected MessageClientConsumer(MessageAgent messageAgent, String[] topics, final String consumerid, MessageProcessor processor) {
Objects.requireNonNull(messageAgent); Objects.requireNonNull(messageAgent);
Objects.requireNonNull(topics); Objects.requireNonNull(topics);
Objects.requireNonNull(consumerid); Objects.requireNonNull(consumerid);

View File

@@ -18,7 +18,7 @@ import java.util.logging.Logger;
* *
* @since 2.1.0 * @since 2.1.0
*/ */
public abstract class MessageProducer { public abstract class MessageClientProducer {
protected final Logger logger; protected final Logger logger;
@@ -26,7 +26,7 @@ public abstract class MessageProducer {
protected final AtomicBoolean closed = new AtomicBoolean(); protected final AtomicBoolean closed = new AtomicBoolean();
protected MessageProducer(String name, Logger logger) { protected MessageClientProducer(String name, Logger logger) {
this.name = name; this.name = name;
this.logger = logger; this.logger = logger;
} }

View File

@@ -17,17 +17,17 @@ import java.util.concurrent.atomic.AtomicInteger;
* *
* @since 2.1.0 * @since 2.1.0
*/ */
public class MessageProducers { public class MessageClientProducers {
protected final MessageProducer[] producers; protected final MessageClientProducer[] producers;
protected final AtomicInteger index = new AtomicInteger(); protected final AtomicInteger index = new AtomicInteger();
public MessageProducers(MessageProducer[] producers) { public MessageClientProducers(MessageClientProducer[] producers) {
this.producers = producers; this.producers = producers;
} }
public MessageProducer getProducer(MessageRecord message) { public MessageClientProducer getProducer(MessageRecord message) {
if (this.producers.length == 1) { if (this.producers.length == 1) {
return this.producers[0]; return this.producers[0];
} }

View File

@@ -25,8 +25,8 @@ public class SncpMessageClient extends MessageClient {
} }
@Override @Override
protected MessageProducers getProducer() { protected MessageClientProducers getProducer() {
return messageAgent.getSncpProducer(); return messageAgent.getSncpMessageClientProducer();
} }
public String getRespTopic() { public String getRespTopic() {

View File

@@ -28,7 +28,7 @@ public class SncpMessageProcessor implements MessageProcessor {
protected MessageClient messageClient; protected MessageClient messageClient;
protected final MessageProducers producer; protected final MessageClientProducers producer;
protected final NodeSncpServer server; protected final NodeSncpServer server;
@@ -46,7 +46,7 @@ public class SncpMessageProcessor implements MessageProcessor {
} }
}; };
public SncpMessageProcessor(Logger logger, SncpMessageClient messageClient, MessageProducers producer, NodeSncpServer server, Service service, SncpServlet servlet) { public SncpMessageProcessor(Logger logger, SncpMessageClient messageClient, MessageClientProducers producer, NodeSncpServer server, Service service, SncpServlet servlet) {
this.logger = logger; this.logger = logger;
this.messageClient = messageClient; this.messageClient = messageClient;
this.producer = producer; this.producer = producer;
@@ -105,7 +105,7 @@ public class SncpMessageProcessor implements MessageProcessor {
} }
} }
public MessageProducers getProducer() { public MessageClientProducers getProducer() {
return producer; return producer;
} }

View File

@@ -24,11 +24,11 @@ public class SncpMessageResponse extends SncpResponse {
protected MessageRecord message; protected MessageRecord message;
protected MessageProducer producer; protected MessageClientProducer producer;
protected Runnable callback; protected Runnable callback;
public SncpMessageResponse(SncpContext context, SncpMessageRequest request, Runnable callback, MessageClient messageClient, MessageProducer producer) { public SncpMessageResponse(SncpContext context, SncpMessageRequest request, Runnable callback, MessageClient messageClient, MessageClientProducer producer) {
super(context, request); super(context, request);
this.message = request.message; this.message = request.message;
this.callback = callback; this.callback = callback;
@@ -36,7 +36,7 @@ public class SncpMessageResponse extends SncpResponse {
this.producer = producer; this.producer = producer;
} }
public SncpMessageResponse(SncpContext context, MessageRecord message, Runnable callback, MessageClient messageClient, MessageProducer producer) { public SncpMessageResponse(SncpContext context, MessageRecord message, Runnable callback, MessageClient messageClient, MessageClientProducer producer) {
super(context, new SncpMessageRequest(context, message)); super(context, new SncpMessageRequest(context, message));
this.message = message; this.message = message;
this.callback = callback; this.callback = callback;