diff --git a/src/main/java/org/redkale/mq/MessageAgent.java b/src/main/java/org/redkale/mq/MessageAgent.java index cacf4491c..be3ecb337 100644 --- a/src/main/java/org/redkale/mq/MessageAgent.java +++ b/src/main/java/org/redkale/mq/MessageAgent.java @@ -65,7 +65,7 @@ public abstract class MessageAgent implements Resourcable { protected MessageProducer messageBaseProducer; - protected Map messageProducerMap = new ConcurrentHashMap<>(); + protected Map messageProducerMap = new ConcurrentHashMap<>(); protected final CopyOnWriteArrayList messageConsumerList = new CopyOnWriteArrayList<>(); @@ -89,7 +89,7 @@ public abstract class MessageAgent implements Resourcable { protected MessageCoder clientMessageCoder = MessageRecordSerializer.getInstance(); //本地Service消息接收处理器, key:consumerid - protected HashMap clientConsumerNodes = new LinkedHashMap<>(); + protected HashMap clientConsumerNodes = new LinkedHashMap<>(); protected final AtomicLong msgSeqno = new AtomicLong(System.nanoTime()); @@ -185,7 +185,7 @@ public abstract class MessageAgent implements Resourcable { this.timeoutExecutor.shutdownNow(); } if (this.workExecutor != null && this.workExecutor != application.getWorkExecutor()) { - this.workExecutor.shutdownNow(); + this.workExecutor.shutdown(); } } @@ -208,7 +208,7 @@ public abstract class MessageAgent implements Resourcable { } MessageProducer producer = baseProducer; Objects.requireNonNull(producer); - return messageProducerMap.computeIfAbsent(ann.convertType(), t -> new ConvertMessageProducer(producer, ConvertFactory.findConvert(t))); + return messageProducerMap.computeIfAbsent(ann.convertType(), t -> new MessageProducerWrapper(producer, ConvertFactory.findConvert(t))); } protected StringBuilder initMessageConsumer(List consumers) { @@ -455,7 +455,7 @@ public abstract class MessageAgent implements Resourcable { throw new RedkaleException("consumerid(" + consumerid + ") is repeat"); } HttpMessageClientProcessor processor = new HttpMessageClientProcessor(this.logger, httpMessageClient, getHttpMessageClientProducer(), ns, service, servlet); - this.clientConsumerNodes.put(consumerid, new MessageClientConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(topic, consumerid, processor))); + this.clientConsumerNodes.put(consumerid, new MessageClientConsumerWrapper(ns, service, servlet, processor, createMessageClientConsumer(topic, consumerid, processor))); } finally { serviceLock.unlock(); } @@ -478,7 +478,7 @@ public abstract class MessageAgent implements Resourcable { throw new RedkaleException("consumerid(" + consumerid + ") is repeat"); } SncpMessageClientProcessor processor = new SncpMessageClientProcessor(this.logger, sncpMessageClient, getSncpMessageClientProducer(), ns, service, servlet); - this.clientConsumerNodes.put(consumerid, new MessageClientConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(topic, consumerid, processor))); + this.clientConsumerNodes.put(consumerid, new MessageClientConsumerWrapper(ns, service, servlet, processor, createMessageClientConsumer(topic, consumerid, processor))); } finally { serviceLock.unlock(); } @@ -621,13 +621,13 @@ public abstract class MessageAgent implements Resourcable { } - protected static class ConvertMessageProducer implements MessageProducer { + protected static class MessageProducerWrapper implements MessageProducer { private final MessageProducer producer; private final Convert convert; - public ConvertMessageProducer(MessageProducer producer, Convert convert) { + public MessageProducerWrapper(MessageProducer producer, Convert convert) { this.producer = producer; this.convert = convert; } @@ -639,7 +639,7 @@ public abstract class MessageAgent implements Resourcable { } - protected static class MessageClientConsumerNode { + protected static class MessageClientConsumerWrapper { public final NodeServer server; @@ -651,7 +651,7 @@ public abstract class MessageAgent implements Resourcable { public final MessageClientConsumer consumer; - public MessageClientConsumerNode(NodeServer server, Service service, Servlet servlet, MessageClientProcessor processor, MessageClientConsumer consumer) { + public MessageClientConsumerWrapper(NodeServer server, Service service, Servlet servlet, MessageClientProcessor processor, MessageClientConsumer consumer) { this.server = server; this.service = service; this.servlet = servlet; diff --git a/src/main/java/org/redkale/source/CacheMemorySource.java b/src/main/java/org/redkale/source/CacheMemorySource.java index 6b45b47e7..620fafe21 100644 --- a/src/main/java/org/redkale/source/CacheMemorySource.java +++ b/src/main/java/org/redkale/source/CacheMemorySource.java @@ -189,7 +189,7 @@ public final class CacheMemorySource extends AbstractCacheSource { scheduler.shutdownNow(); } if (subExecutor != null) { - subExecutor.shutdownNow(); + subExecutor.shutdown(); } }