增加 ThreadHashExecutor

This commit is contained in:
Redkale
2020-08-28 15:15:26 +08:00
parent 72155e31dd
commit ba4100e22d
4 changed files with 78 additions and 11 deletions

View File

@@ -10,6 +10,7 @@ import java.util.logging.*;
import org.redkale.boot.NodeHttpServer;
import org.redkale.net.http.*;
import org.redkale.service.Service;
import org.redkale.util.ThreadHashExecutor;
/**
*
@@ -30,7 +31,7 @@ public class HttpMessageProcessor implements MessageProcessor {
protected final NodeHttpServer server;
protected final ThreadPoolExecutor workExecutor;
protected final ThreadHashExecutor workExecutor;
protected final Service service;
@@ -48,7 +49,7 @@ public class HttpMessageProcessor implements MessageProcessor {
if (cdl != null) cdl.countDown();
};
public HttpMessageProcessor(Logger logger, MessageProducers producer, NodeHttpServer server, Service service, HttpServlet servlet) {
public HttpMessageProcessor(Logger logger, ThreadHashExecutor workExecutor, MessageProducers producer, NodeHttpServer server, Service service, HttpServlet servlet) {
this.logger = logger;
this.finest = logger.isLoggable(Level.FINEST);
this.producer = producer;
@@ -59,7 +60,7 @@ public class HttpMessageProcessor implements MessageProcessor {
this.multiconsumer = mmc != null;
this.restmodule = "/" + Rest.getRestModule(service) + "/";
this.multimodule = mmc != null ? ("/" + mmc.module() + "/") : null;
this.workExecutor = server.getServer().getWorkExecutor();
this.workExecutor = workExecutor;
}
@Override
@@ -72,7 +73,7 @@ public class HttpMessageProcessor implements MessageProcessor {
if (this.workExecutor == null) {
execute(message, innerCallback);
} else {
this.workExecutor.execute(() -> execute(message, innerCallback));
this.workExecutor.execute(message.hash(), () -> execute(message, innerCallback));
}
}

View File

@@ -53,6 +53,8 @@ public abstract class MessageAgent {
protected ScheduledThreadPoolExecutor timeoutExecutor;
protected ThreadHashExecutor workExecutor;
//本地Service消息接收处理器 key:consumer
protected HashMap<String, MessageConsumerNode> messageNodes = new LinkedHashMap<>();
@@ -60,6 +62,7 @@ public abstract class MessageAgent {
this.name = checkName(config.getValue("name", ""));
this.httpMessageClient = new HttpMessageClient(this);
this.sncpMessageClient = new SncpMessageClient(this);
this.workExecutor = new ThreadHashExecutor(Math.max(4, Runtime.getRuntime().availableProcessors()));
// application (it doesn't execute completion handlers).
this.timeoutExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, (Runnable r) -> {
Thread t = new Thread(r);
@@ -93,6 +96,7 @@ public abstract class MessageAgent {
public void destroy(AnyValue config) {
this.httpMessageClient.close().join();
this.sncpMessageClient.close().join();
this.workExecutor.shutdown();
if (this.timeoutExecutor != null) this.timeoutExecutor.shutdown();
if (this.sncpProducer != null) this.sncpProducer.shutdown().join();
if (this.httpProducer != null) this.httpProducer.shutdown().join();
@@ -211,7 +215,7 @@ public abstract class MessageAgent {
String[] topics = generateHttpReqTopics(service);
String consumerid = generateHttpConsumerid(topics, service);
if (messageNodes.containsKey(consumerid)) throw new RuntimeException("consumerid(" + consumerid + ") is repeat");
HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, getHttpProducer(), ns, service, servlet);
HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, this.workExecutor, getHttpProducer(), ns, service, servlet);
this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topics, consumerid, processor)));
}
@@ -219,7 +223,7 @@ public abstract class MessageAgent {
String topic = generateSncpReqTopic(service);
String consumerid = generateSncpConsumerid(topic, service);
if (messageNodes.containsKey(consumerid)) throw new RuntimeException("consumerid(" + consumerid + ") is repeat");
SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, getSncpProducer(), ns, service, servlet);
SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, this.workExecutor, getSncpProducer(), ns, service, servlet);
this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(new String[]{topic}, consumerid, processor)));
}

View File

@@ -10,6 +10,7 @@ import java.util.logging.*;
import org.redkale.boot.NodeSncpServer;
import org.redkale.net.sncp.*;
import org.redkale.service.Service;
import org.redkale.util.ThreadHashExecutor;
/**
*
@@ -28,7 +29,7 @@ public class SncpMessageProcessor implements MessageProcessor {
protected final NodeSncpServer server;
protected final ThreadPoolExecutor workExecutor;
protected final ThreadHashExecutor workExecutor;
protected final Service service;
@@ -40,13 +41,13 @@ public class SncpMessageProcessor implements MessageProcessor {
if (cdl != null) cdl.countDown();
};
public SncpMessageProcessor(Logger logger, MessageProducers producer, NodeSncpServer server, Service service, SncpServlet servlet) {
public SncpMessageProcessor(Logger logger, ThreadHashExecutor workExecutor, MessageProducers producer, NodeSncpServer server, Service service, SncpServlet servlet) {
this.logger = logger;
this.producer = producer;
this.server = server;
this.service = service;
this.servlet = servlet;
this.workExecutor = server.getServer().getWorkExecutor();
this.workExecutor = workExecutor;
}
@Override
@@ -59,7 +60,7 @@ public class SncpMessageProcessor implements MessageProcessor {
if (this.workExecutor == null) {
execute(message, innerCallback);
} else {
this.workExecutor.execute(() -> execute(message, innerCallback));
this.workExecutor.execute(message.hash(), () -> execute(message, innerCallback));
}
}
@@ -69,7 +70,7 @@ public class SncpMessageProcessor implements MessageProcessor {
SncpContext context = server.getSncpServer().getContext();
SncpMessageRequest request = new SncpMessageRequest(context, message);
response = new SncpMessageResponse(context, request, callback, null, producer.getProducer(message));
servlet.execute(request, response);
servlet.execute(request, response);
} catch (Throwable ex) {
if (response != null) response.finish(SncpResponse.RETCODE_ILLSERVICEID, null);
logger.log(Level.SEVERE, SncpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex);

View File

@@ -0,0 +1,61 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.util;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 线程池
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
public class ThreadHashExecutor {
private final AtomicInteger index = new AtomicInteger();
private final ExecutorService[] executors;
public ThreadHashExecutor(int size) {
ExecutorService[] array = new ExecutorService[size];
for (int i = 0; i < array.length; i++) {
array[i] = Executors.newSingleThreadExecutor();
}
this.executors = array;
}
public void execute(int hash, Runnable command) {
if (hash < 1) {
this.executors[index.incrementAndGet() % this.executors.length].execute(command);
} else {
this.executors[hash % this.executors.length].execute(command);
}
}
public void shutdown() {
for (ExecutorService executor : this.executors) {
executor.shutdown();
}
}
public List<Runnable> shutdownNow() {
List<Runnable> list = new ArrayList<>();
for (ExecutorService executor : this.executors) {
list.addAll(executor.shutdownNow());
}
return list;
}
public boolean isShutdown() {
return this.executors[0].isShutdown();
}
}