diff --git a/src/main/java/org/redkale/mq/HttpMessageClient.java b/src/main/java/org/redkale/mq/HttpMessageClient.java index 4aba7b4ab..55e76a2c5 100644 --- a/src/main/java/org/redkale/mq/HttpMessageClient.java +++ b/src/main/java/org/redkale/mq/HttpMessageClient.java @@ -201,7 +201,7 @@ public class HttpMessageClient extends MessageClient { } @Override - protected MessageProducers getProducer() { - return messageAgent.getHttpProducer(); + protected MessageClientProducers getProducer() { + return messageAgent.getHttpMessageClientProducer(); } } diff --git a/src/main/java/org/redkale/mq/HttpMessageProcessor.java b/src/main/java/org/redkale/mq/HttpMessageProcessor.java index c0868bc57..717d091a3 100644 --- a/src/main/java/org/redkale/mq/HttpMessageProcessor.java +++ b/src/main/java/org/redkale/mq/HttpMessageProcessor.java @@ -29,7 +29,7 @@ public class HttpMessageProcessor implements MessageProcessor { protected HttpMessageClient messageClient; - protected final MessageProducers producers; + protected final MessageClientProducers producers; protected final NodeHttpServer server; @@ -59,7 +59,7 @@ public class HttpMessageProcessor implements MessageProcessor { } }; - public HttpMessageProcessor(Logger logger, HttpMessageClient messageClient, MessageProducers producers, NodeHttpServer server, Service service, HttpServlet servlet) { + public HttpMessageProcessor(Logger logger, HttpMessageClient messageClient, MessageClientProducers producers, NodeHttpServer server, Service service, HttpServlet servlet) { this.logger = logger; this.messageClient = messageClient; this.producers = producers; @@ -134,7 +134,7 @@ public class HttpMessageProcessor implements MessageProcessor { } } - public MessageProducers getProducer() { + public MessageClientProducers getProducer() { return producers; } diff --git a/src/main/java/org/redkale/mq/HttpMessageResponse.java b/src/main/java/org/redkale/mq/HttpMessageResponse.java index 33bdf7669..f2dbad651 100644 --- a/src/main/java/org/redkale/mq/HttpMessageResponse.java +++ b/src/main/java/org/redkale/mq/HttpMessageResponse.java @@ -34,7 +34,7 @@ public class HttpMessageResponse extends HttpResponse { protected MessageRecord message; - protected MessageProducer producer; + protected MessageClientProducer producer; protected Runnable callback; @@ -45,16 +45,7 @@ public class HttpMessageResponse extends HttpResponse { this.messageClient = messageClient; } -// public HttpMessageResponse(HttpContext context, HttpMessageRequest request, Runnable callback, -// HttpResponseConfig config, HttpMessageClient messageClient, MessageProducer producer) { -// super(context, request, config); -// this.message = request.message; -// this.callback = callback; -// this.messageClient = messageClient; -// this.producer = producer; -// this.finest = producer.logger.isLoggable(Level.FINEST); -// } - public void prepare(MessageRecord message, Runnable callback, MessageProducer producer) { + public void prepare(MessageRecord message, Runnable callback, MessageClientProducer producer) { ((HttpMessageRequest) request).prepare(message); this.message = message; this.callback = callback; @@ -73,7 +64,7 @@ public class HttpMessageResponse extends HttpResponse { finishHttpResult(producer.logger.isLoggable(Level.FINEST), respConvert == null ? ((HttpMessageRequest) this.request).getRespConvert() : respConvert, type, this.message, this.callback, this.messageClient, this.producer, message.getRespTopic(), result); } - public static void finishHttpResult(boolean finest, Convert respConvert, Type type, MessageRecord msg, Runnable callback, MessageClient messageClient, MessageProducer producer, String resptopic, HttpResult result) { + public static void finishHttpResult(boolean finest, Convert respConvert, Type type, MessageRecord msg, Runnable callback, MessageClient messageClient, MessageClientProducer producer, String resptopic, HttpResult result) { if (callback != null) { callback.run(); } diff --git a/src/main/java/org/redkale/mq/MessageAgent.java b/src/main/java/org/redkale/mq/MessageAgent.java index 5296e127e..2b98e74ac 100644 --- a/src/main/java/org/redkale/mq/MessageAgent.java +++ b/src/main/java/org/redkale/mq/MessageAgent.java @@ -43,9 +43,9 @@ public abstract class MessageAgent implements Resourcable { protected AnyValue config; - protected MessageProducers httpProducer; + protected MessageClientProducers httpProducer; - protected MessageProducers sncpProducer; + protected MessageClientProducers sncpProducer; protected final ReentrantLock httpProducerLock = new ReentrantLock(); @@ -143,9 +143,9 @@ public abstract class MessageAgent implements Resourcable { } } - protected List getAllMessageConsumer() { - List consumers = new ArrayList<>(); - MessageConsumer one = this.httpMessageClient == null ? null : this.httpMessageClient.respConsumer; + protected List getMessageClientConsumers() { + List consumers = new ArrayList<>(); + MessageClientConsumer one = this.httpMessageClient == null ? null : this.httpMessageClient.respConsumer; if (one != null) { consumers.add(one); } @@ -157,15 +157,15 @@ public abstract class MessageAgent implements Resourcable { return consumers; } - protected List getAllMessageProducer() { - List producers = new ArrayList<>(); + protected List getMessageClientProducers() { + List producers = new ArrayList<>(); if (this.httpProducer != null) { producers.addAll(Utility.ofList(this.httpProducer.producers)); } if (this.sncpProducer != null) { producers.addAll(Utility.ofList(this.sncpProducer.producers)); } - MessageProducers one = this.httpMessageClient == null ? null : this.httpMessageClient.getProducer(); + MessageClientProducers one = this.httpMessageClient == null ? null : this.httpMessageClient.getProducer(); if (one != null) { producers.addAll(Utility.ofList(one.producers)); } @@ -225,15 +225,15 @@ public abstract class MessageAgent implements Resourcable { } //获取指定topic的生产处理器 - public MessageProducers getSncpProducer() { + public MessageClientProducers getSncpMessageClientProducer() { if (this.sncpProducer == null) { sncpProducerLock.lock(); try { if (this.sncpProducer == null) { long s = System.currentTimeMillis(); - MessageProducer[] producers = new MessageProducer[producerCount]; + MessageClientProducer[] producers = new MessageClientProducer[producerCount]; for (int i = 0; i < producers.length; i++) { - MessageProducer producer = createProducer("SncpProducer"); + MessageClientProducer producer = createMessageClientProducer("SncpProducer"); producer.startup().join(); producers[i] = producer; } @@ -241,7 +241,7 @@ public abstract class MessageAgent implements Resourcable { if (logger.isLoggable(Level.FINEST)) { logger.log(Level.FINEST, "MessageAgent.SncpProducer startup all in " + e + "ms"); } - this.sncpProducer = new MessageProducers(producers); + this.sncpProducer = new MessageClientProducers(producers); } } finally { sncpProducerLock.unlock(); @@ -250,15 +250,15 @@ public abstract class MessageAgent implements Resourcable { return this.sncpProducer; } - public MessageProducers getHttpProducer() { + public MessageClientProducers getHttpMessageClientProducer() { if (this.httpProducer == null) { httpProducerLock.lock(); try { if (this.httpProducer == null) { long s = System.currentTimeMillis(); - MessageProducer[] producers = new MessageProducer[producerCount]; + MessageClientProducer[] producers = new MessageClientProducer[producerCount]; for (int i = 0; i < producers.length; i++) { - MessageProducer producer = createProducer("HttpProducer"); + MessageClientProducer producer = createMessageClientProducer("HttpProducer"); producer.startup().join(); producers[i] = producer; } @@ -266,7 +266,7 @@ public abstract class MessageAgent implements Resourcable { if (logger.isLoggable(Level.FINEST)) { logger.log(Level.FINEST, "MessageAgent.HttpProducer startup all in " + e + "ms"); } - this.httpProducer = new MessageProducers(producers); + this.httpProducer = new MessageClientProducers(producers); } } finally { httpProducerLock.unlock(); @@ -276,7 +276,7 @@ public abstract class MessageAgent implements Resourcable { } //创建指定topic的生产处理器 - protected abstract MessageProducer createProducer(String name); + protected abstract MessageClientProducer createMessageClientProducer(String name); //创建topic,如果已存在则跳过 public abstract boolean createTopic(String... topics); @@ -291,7 +291,7 @@ public abstract class MessageAgent implements Resourcable { public abstract boolean acceptsConf(AnyValue config); //创建指定topic的消费处理器 - public abstract MessageConsumer createConsumer(String[] topics, String group, MessageProcessor processor); + public abstract MessageClientConsumer createMessageClientConsumer(String[] topics, String group, MessageProcessor processor); public final void putService(NodeHttpServer ns, Service service, HttpServlet servlet) { AutoLoad al = service.getClass().getAnnotation(AutoLoad.class); @@ -315,8 +315,8 @@ public abstract class MessageAgent implements Resourcable { if (messageNodes.containsKey(consumerid)) { throw new RedkaleException("consumerid(" + consumerid + ") is repeat"); } - HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, httpMessageClient, getHttpProducer(), ns, service, servlet); - this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topics, consumerid, processor))); + HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, httpMessageClient, getHttpMessageClientProducer(), ns, service, servlet); + this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(topics, consumerid, processor))); } finally { httpNodesLock.unlock(); } @@ -338,8 +338,8 @@ public abstract class MessageAgent implements Resourcable { if (messageNodes.containsKey(consumerid)) { throw new RedkaleException("consumerid(" + consumerid + ") is repeat"); } - SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, sncpMessageClient, getSncpProducer(), ns, service, servlet); - this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(new String[]{topic}, consumerid, processor))); + SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, sncpMessageClient, getSncpMessageClientProducer(), ns, service, servlet); + this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(new String[]{topic}, consumerid, processor))); } finally { sncpNodesLock.unlock(); } @@ -417,9 +417,9 @@ public abstract class MessageAgent implements Resourcable { public final MessageProcessor processor; - public final MessageConsumer consumer; + public final MessageClientConsumer consumer; - public MessageConsumerNode(NodeServer server, Service service, Servlet servlet, MessageProcessor processor, MessageConsumer consumer) { + public MessageConsumerNode(NodeServer server, Service service, Servlet servlet, MessageProcessor processor, MessageClientConsumer consumer) { this.server = server; this.service = service; this.servlet = servlet; diff --git a/src/main/java/org/redkale/mq/MessageClient.java b/src/main/java/org/redkale/mq/MessageClient.java index 3647c683f..316da4ccf 100644 --- a/src/main/java/org/redkale/mq/MessageClient.java +++ b/src/main/java/org/redkale/mq/MessageClient.java @@ -35,7 +35,7 @@ public abstract class MessageClient { protected final AtomicLong msgSeqno; - protected MessageConsumer respConsumer; + protected MessageClientConsumer respConsumer; protected String respTopic; @@ -96,7 +96,7 @@ public abstract class MessageClient { } }; long ones = System.currentTimeMillis(); - MessageConsumer one = messageAgent.createConsumer(new String[]{respTopic}, respConsumerid, processor); + MessageClientConsumer one = messageAgent.createMessageClientConsumer(new String[]{respTopic}, respConsumerid, processor); one.startup().join(); long onee = System.currentTimeMillis() - ones; if (finest) { @@ -136,7 +136,7 @@ public abstract class MessageClient { return message; } - protected abstract MessageProducers getProducer(); + protected abstract MessageClientProducers getProducer(); public MessageRecord createMessageRecord(String resptopic, String content) { return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, null, null, resptopic, Traces.currTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); diff --git a/src/main/java/org/redkale/mq/MessageConsumer.java b/src/main/java/org/redkale/mq/MessageClientConsumer.java similarity index 84% rename from src/main/java/org/redkale/mq/MessageConsumer.java rename to src/main/java/org/redkale/mq/MessageClientConsumer.java index a1ffd20e3..540829284 100644 --- a/src/main/java/org/redkale/mq/MessageConsumer.java +++ b/src/main/java/org/redkale/mq/MessageClientConsumer.java @@ -19,7 +19,7 @@ import java.util.logging.Logger; * * @since 2.1.0 */ -public abstract class MessageConsumer { +public abstract class MessageClientConsumer { protected final String[] topics; @@ -33,7 +33,7 @@ public abstract class MessageConsumer { protected volatile boolean closed; - protected MessageConsumer(MessageAgent messageAgent, String[] topics, final String consumerid, MessageProcessor processor) { + protected MessageClientConsumer(MessageAgent messageAgent, String[] topics, final String consumerid, MessageProcessor processor) { Objects.requireNonNull(messageAgent); Objects.requireNonNull(topics); Objects.requireNonNull(consumerid); diff --git a/src/main/java/org/redkale/mq/MessageProducer.java b/src/main/java/org/redkale/mq/MessageClientProducer.java similarity index 85% rename from src/main/java/org/redkale/mq/MessageProducer.java rename to src/main/java/org/redkale/mq/MessageClientProducer.java index 2a990c5a3..8d6093846 100644 --- a/src/main/java/org/redkale/mq/MessageProducer.java +++ b/src/main/java/org/redkale/mq/MessageClientProducer.java @@ -18,7 +18,7 @@ import java.util.logging.Logger; * * @since 2.1.0 */ -public abstract class MessageProducer { +public abstract class MessageClientProducer { protected final Logger logger; @@ -26,7 +26,7 @@ public abstract class MessageProducer { protected final AtomicBoolean closed = new AtomicBoolean(); - protected MessageProducer(String name, Logger logger) { + protected MessageClientProducer(String name, Logger logger) { this.name = name; this.logger = logger; } diff --git a/src/main/java/org/redkale/mq/MessageProducers.java b/src/main/java/org/redkale/mq/MessageClientProducers.java similarity index 82% rename from src/main/java/org/redkale/mq/MessageProducers.java rename to src/main/java/org/redkale/mq/MessageClientProducers.java index 764979730..a460a0513 100644 --- a/src/main/java/org/redkale/mq/MessageProducers.java +++ b/src/main/java/org/redkale/mq/MessageClientProducers.java @@ -17,17 +17,17 @@ import java.util.concurrent.atomic.AtomicInteger; * * @since 2.1.0 */ -public class MessageProducers { +public class MessageClientProducers { - protected final MessageProducer[] producers; + protected final MessageClientProducer[] producers; protected final AtomicInteger index = new AtomicInteger(); - public MessageProducers(MessageProducer[] producers) { + public MessageClientProducers(MessageClientProducer[] producers) { this.producers = producers; } - public MessageProducer getProducer(MessageRecord message) { + public MessageClientProducer getProducer(MessageRecord message) { if (this.producers.length == 1) { return this.producers[0]; } diff --git a/src/main/java/org/redkale/mq/SncpMessageClient.java b/src/main/java/org/redkale/mq/SncpMessageClient.java index 1f9e2fd9f..e3b0dce2b 100644 --- a/src/main/java/org/redkale/mq/SncpMessageClient.java +++ b/src/main/java/org/redkale/mq/SncpMessageClient.java @@ -25,8 +25,8 @@ public class SncpMessageClient extends MessageClient { } @Override - protected MessageProducers getProducer() { - return messageAgent.getSncpProducer(); + protected MessageClientProducers getProducer() { + return messageAgent.getSncpMessageClientProducer(); } public String getRespTopic() { diff --git a/src/main/java/org/redkale/mq/SncpMessageProcessor.java b/src/main/java/org/redkale/mq/SncpMessageProcessor.java index 576925155..3851384e1 100644 --- a/src/main/java/org/redkale/mq/SncpMessageProcessor.java +++ b/src/main/java/org/redkale/mq/SncpMessageProcessor.java @@ -28,7 +28,7 @@ public class SncpMessageProcessor implements MessageProcessor { protected MessageClient messageClient; - protected final MessageProducers producer; + protected final MessageClientProducers producer; protected final NodeSncpServer server; @@ -46,7 +46,7 @@ public class SncpMessageProcessor implements MessageProcessor { } }; - public SncpMessageProcessor(Logger logger, SncpMessageClient messageClient, MessageProducers producer, NodeSncpServer server, Service service, SncpServlet servlet) { + public SncpMessageProcessor(Logger logger, SncpMessageClient messageClient, MessageClientProducers producer, NodeSncpServer server, Service service, SncpServlet servlet) { this.logger = logger; this.messageClient = messageClient; this.producer = producer; @@ -105,7 +105,7 @@ public class SncpMessageProcessor implements MessageProcessor { } } - public MessageProducers getProducer() { + public MessageClientProducers getProducer() { return producer; } diff --git a/src/main/java/org/redkale/mq/SncpMessageResponse.java b/src/main/java/org/redkale/mq/SncpMessageResponse.java index 3be4ddf61..59dd57fa1 100644 --- a/src/main/java/org/redkale/mq/SncpMessageResponse.java +++ b/src/main/java/org/redkale/mq/SncpMessageResponse.java @@ -24,11 +24,11 @@ public class SncpMessageResponse extends SncpResponse { protected MessageRecord message; - protected MessageProducer producer; + protected MessageClientProducer producer; protected Runnable callback; - public SncpMessageResponse(SncpContext context, SncpMessageRequest request, Runnable callback, MessageClient messageClient, MessageProducer producer) { + public SncpMessageResponse(SncpContext context, SncpMessageRequest request, Runnable callback, MessageClient messageClient, MessageClientProducer producer) { super(context, request); this.message = request.message; this.callback = callback; @@ -36,7 +36,7 @@ public class SncpMessageResponse extends SncpResponse { this.producer = producer; } - public SncpMessageResponse(SncpContext context, MessageRecord message, Runnable callback, MessageClient messageClient, MessageProducer producer) { + public SncpMessageResponse(SncpContext context, MessageRecord message, Runnable callback, MessageClient messageClient, MessageClientProducer producer) { super(context, new SncpMessageRequest(context, message)); this.message = message; this.callback = callback;