shutdownNow
This commit is contained in:
@@ -65,7 +65,7 @@ public abstract class MessageAgent implements Resourcable {
|
||||
|
||||
protected MessageProducer messageBaseProducer;
|
||||
|
||||
protected Map<ConvertType, ConvertMessageProducer> messageProducerMap = new ConcurrentHashMap<>();
|
||||
protected Map<ConvertType, MessageProducerWrapper> messageProducerMap = new ConcurrentHashMap<>();
|
||||
|
||||
protected final CopyOnWriteArrayList<MessageConsumer> messageConsumerList = new CopyOnWriteArrayList<>();
|
||||
|
||||
@@ -89,7 +89,7 @@ public abstract class MessageAgent implements Resourcable {
|
||||
protected MessageCoder<MessageRecord> clientMessageCoder = MessageRecordSerializer.getInstance();
|
||||
|
||||
//本地Service消息接收处理器, key:consumerid
|
||||
protected HashMap<String, MessageClientConsumerNode> clientConsumerNodes = new LinkedHashMap<>();
|
||||
protected HashMap<String, MessageClientConsumerWrapper> clientConsumerNodes = new LinkedHashMap<>();
|
||||
|
||||
protected final AtomicLong msgSeqno = new AtomicLong(System.nanoTime());
|
||||
|
||||
@@ -185,7 +185,7 @@ public abstract class MessageAgent implements Resourcable {
|
||||
this.timeoutExecutor.shutdownNow();
|
||||
}
|
||||
if (this.workExecutor != null && this.workExecutor != application.getWorkExecutor()) {
|
||||
this.workExecutor.shutdownNow();
|
||||
this.workExecutor.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -208,7 +208,7 @@ public abstract class MessageAgent implements Resourcable {
|
||||
}
|
||||
MessageProducer producer = baseProducer;
|
||||
Objects.requireNonNull(producer);
|
||||
return messageProducerMap.computeIfAbsent(ann.convertType(), t -> new ConvertMessageProducer(producer, ConvertFactory.findConvert(t)));
|
||||
return messageProducerMap.computeIfAbsent(ann.convertType(), t -> new MessageProducerWrapper(producer, ConvertFactory.findConvert(t)));
|
||||
}
|
||||
|
||||
protected StringBuilder initMessageConsumer(List<MessageConsumer> consumers) {
|
||||
@@ -455,7 +455,7 @@ public abstract class MessageAgent implements Resourcable {
|
||||
throw new RedkaleException("consumerid(" + consumerid + ") is repeat");
|
||||
}
|
||||
HttpMessageClientProcessor processor = new HttpMessageClientProcessor(this.logger, httpMessageClient, getHttpMessageClientProducer(), ns, service, servlet);
|
||||
this.clientConsumerNodes.put(consumerid, new MessageClientConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(topic, consumerid, processor)));
|
||||
this.clientConsumerNodes.put(consumerid, new MessageClientConsumerWrapper(ns, service, servlet, processor, createMessageClientConsumer(topic, consumerid, processor)));
|
||||
} finally {
|
||||
serviceLock.unlock();
|
||||
}
|
||||
@@ -478,7 +478,7 @@ public abstract class MessageAgent implements Resourcable {
|
||||
throw new RedkaleException("consumerid(" + consumerid + ") is repeat");
|
||||
}
|
||||
SncpMessageClientProcessor processor = new SncpMessageClientProcessor(this.logger, sncpMessageClient, getSncpMessageClientProducer(), ns, service, servlet);
|
||||
this.clientConsumerNodes.put(consumerid, new MessageClientConsumerNode(ns, service, servlet, processor, createMessageClientConsumer(topic, consumerid, processor)));
|
||||
this.clientConsumerNodes.put(consumerid, new MessageClientConsumerWrapper(ns, service, servlet, processor, createMessageClientConsumer(topic, consumerid, processor)));
|
||||
} finally {
|
||||
serviceLock.unlock();
|
||||
}
|
||||
@@ -621,13 +621,13 @@ public abstract class MessageAgent implements Resourcable {
|
||||
|
||||
}
|
||||
|
||||
protected static class ConvertMessageProducer implements MessageProducer {
|
||||
protected static class MessageProducerWrapper implements MessageProducer {
|
||||
|
||||
private final MessageProducer producer;
|
||||
|
||||
private final Convert convert;
|
||||
|
||||
public ConvertMessageProducer(MessageProducer producer, Convert convert) {
|
||||
public MessageProducerWrapper(MessageProducer producer, Convert convert) {
|
||||
this.producer = producer;
|
||||
this.convert = convert;
|
||||
}
|
||||
@@ -639,7 +639,7 @@ public abstract class MessageAgent implements Resourcable {
|
||||
|
||||
}
|
||||
|
||||
protected static class MessageClientConsumerNode {
|
||||
protected static class MessageClientConsumerWrapper {
|
||||
|
||||
public final NodeServer server;
|
||||
|
||||
@@ -651,7 +651,7 @@ public abstract class MessageAgent implements Resourcable {
|
||||
|
||||
public final MessageClientConsumer consumer;
|
||||
|
||||
public MessageClientConsumerNode(NodeServer server, Service service, Servlet servlet, MessageClientProcessor processor, MessageClientConsumer consumer) {
|
||||
public MessageClientConsumerWrapper(NodeServer server, Service service, Servlet servlet, MessageClientProcessor processor, MessageClientConsumer consumer) {
|
||||
this.server = server;
|
||||
this.service = service;
|
||||
this.servlet = servlet;
|
||||
|
||||
@@ -189,7 +189,7 @@ public final class CacheMemorySource extends AbstractCacheSource {
|
||||
scheduler.shutdownNow();
|
||||
}
|
||||
if (subExecutor != null) {
|
||||
subExecutor.shutdownNow();
|
||||
subExecutor.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user