This commit is contained in:
Redkale
2020-06-28 07:23:07 +08:00
parent 76a7e92787
commit 2d1a6e0edc
7 changed files with 79 additions and 54 deletions

View File

@@ -154,9 +154,9 @@ public abstract class NodeServer {
//必须要进行初始化, 构建Service时需要使用Context中的ExecutorService
server.init(this.serverConf);
//init之后才有Executor
resourceFactory.register(Server.RESNAME_SERVER_EXECUTOR, Executor.class, server.getExecutor());
resourceFactory.register(Server.RESNAME_SERVER_EXECUTOR, ExecutorService.class, server.getExecutor());
resourceFactory.register(Server.RESNAME_SERVER_EXECUTOR, ThreadPoolExecutor.class, server.getExecutor());
resourceFactory.register(Server.RESNAME_SERVER_EXECUTOR, Executor.class, server.getWorkExecutor());
resourceFactory.register(Server.RESNAME_SERVER_EXECUTOR, ExecutorService.class, server.getWorkExecutor());
resourceFactory.register(Server.RESNAME_SERVER_EXECUTOR, ThreadPoolExecutor.class, server.getWorkExecutor());
initResource(); //给 DataSource、CacheSource 注册依赖注入时的监听回调事件。
String interceptorClass = this.serverConf.getValue("interceptor", "");

View File

@@ -5,6 +5,7 @@
*/
package org.redkale.mq;
import java.util.concurrent.*;
import java.util.logging.*;
import org.redkale.boot.NodeHttpServer;
import org.redkale.net.http.*;
@@ -29,6 +30,8 @@ public class HttpMessageProcessor implements MessageProcessor {
protected final NodeHttpServer server;
protected final ThreadPoolExecutor workExecutor;
protected final Service service;
protected final HttpServlet servlet;
@@ -39,6 +42,8 @@ public class HttpMessageProcessor implements MessageProcessor {
protected final String multimodule; // 前后有/, 例如: /userstat/
protected CountDownLatch cdl;
public HttpMessageProcessor(Logger logger, MessageProducer producer, NodeHttpServer server, Service service, HttpServlet servlet) {
this.logger = logger;
this.finest = logger.isLoggable(Level.FINEST);
@@ -50,10 +55,24 @@ public class HttpMessageProcessor implements MessageProcessor {
this.multiconsumer = mmc != null;
this.restmodule = "/" + Rest.getRestModule(service) + "/";
this.multimodule = mmc != null ? ("/" + mmc.module() + "/") : null;
this.workExecutor = server.getServer().getWorkExecutor();
}
@Override
public void process(MessageRecord message, Runnable callback) {
public void begin(final int size) {
if (this.workExecutor != null) this.cdl = new CountDownLatch(size);
}
@Override
public void process(final MessageRecord message, final Runnable callback) {
if (this.workExecutor == null) {
execute(message, callback);
} else {
this.workExecutor.execute(() -> execute(message, callback));
}
}
private void execute(final MessageRecord message, final Runnable callback) {
try {
if (finest) logger.log(Level.FINEST, "HttpMessageProcessor.process message: " + message);
if (multiconsumer) message.setResptopic(null); //不容许有响应
@@ -69,6 +88,18 @@ public class HttpMessageProcessor implements MessageProcessor {
HttpMessageResponse.finishHttpResult(message, callback, producer, message.getResptopic(), new HttpResult().status(500));
}
logger.log(Level.SEVERE, HttpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex);
} finally {
if (cdl != null) cdl.countDown();
}
}
@Override
public void commit() {
if (this.cdl != null) {
try {
this.cdl.await();
} catch (Exception ex) {
}
}
}

View File

@@ -1,38 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
import java.util.concurrent.CountDownLatch;
/**
*
* @author zhangjx
*/
public class MessageMultiThreadProcessor implements MessageProcessor {
protected final MessageAgent messageAgent;
protected final MessageProcessor processor;
protected CountDownLatch cdl;
public MessageMultiThreadProcessor(MessageAgent messageAgent, MessageProcessor processor) {
this.messageAgent = messageAgent;
this.processor = processor;
}
@Override
public void begin(int size) {
}
@Override
public void process(MessageRecord message, Runnable callback) {
}
@Override
public void commit() {
}
}

View File

@@ -5,6 +5,7 @@
*/
package org.redkale.mq;
import java.util.concurrent.*;
import java.util.logging.*;
import org.redkale.boot.NodeSncpServer;
import org.redkale.net.sncp.*;
@@ -27,28 +28,59 @@ public class SncpMessageProcessor implements MessageProcessor {
protected final NodeSncpServer server;
protected final ThreadPoolExecutor workExecutor;
protected final Service service;
protected final SncpServlet servlet;
protected CountDownLatch cdl;
public SncpMessageProcessor(Logger logger, MessageProducer producer, NodeSncpServer server, Service service, SncpServlet servlet) {
this.logger = logger;
this.producer = producer;
this.server = server;
this.service = service;
this.servlet = servlet;
this.workExecutor = server.getServer().getWorkExecutor();
}
@Override
public void process(MessageRecord message, Runnable callback) {
SncpContext context = server.getSncpServer().getContext();
SncpMessageRequest request = new SncpMessageRequest(context, message);
SncpMessageResponse response = new SncpMessageResponse(context, request, callback, null, producer);
public void begin(final int size) {
if (this.workExecutor != null) this.cdl = new CountDownLatch(size);
}
@Override
public void process(final MessageRecord message, final Runnable callback) {
if (this.workExecutor == null) {
execute(message, callback);
} else {
this.workExecutor.execute(() -> execute(message, callback));
}
}
private void execute(final MessageRecord message, final Runnable callback) {
SncpMessageResponse response = null;
try {
SncpContext context = server.getSncpServer().getContext();
SncpMessageRequest request = new SncpMessageRequest(context, message);
response = new SncpMessageResponse(context, request, callback, null, producer);
servlet.execute(request, response);
} catch (Exception ex) {
response.finish(SncpResponse.RETCODE_ILLSERVICEID, null);
if (response != null) response.finish(SncpResponse.RETCODE_ILLSERVICEID, null);
logger.log(Level.SEVERE, SncpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex);
} finally {
if (cdl != null) cdl.countDown();
}
}
@Override
public void commit() {
if (this.cdl != null) {
try {
this.cdl.await();
} catch (Exception ex) {
}
}
}

View File

@@ -86,7 +86,7 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
protected int threads;
//线程池
protected ThreadPoolExecutor executor;
protected ThreadPoolExecutor workExecutor;
//ByteBuffer池大小
protected int bufferPoolSize;
@@ -151,8 +151,8 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
final AtomicInteger counter = new AtomicInteger();
final Format f = createFormat();
final String n = name;
this.executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(threads, (Runnable r) -> {
Thread t = new WorkThread(executor, r);
this.workExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(threads, (Runnable r) -> {
Thread t = new WorkThread(workExecutor, r);
t.setName("Redkale-" + n + "-ServletThread-" + f.format(counter.incrementAndGet()));
return t;
});
@@ -187,8 +187,8 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
return resourceFactory;
}
public ThreadPoolExecutor getExecutor() {
return executor;
public ThreadPoolExecutor getWorkExecutor() {
return workExecutor;
}
public InetSocketAddress getSocketAddress() {

View File

@@ -436,7 +436,7 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
final HttpContextConfig contextConfig = new HttpContextConfig();
contextConfig.serverStartTime = this.serverStartTime;
contextConfig.logger = this.logger;
contextConfig.executor = this.executor;
contextConfig.executor = this.workExecutor;
contextConfig.sslContext = this.sslContext;
contextConfig.bufferCapacity = this.bufferCapacity;
contextConfig.maxconns = this.maxconns;

View File

@@ -112,7 +112,7 @@ public class SncpServer extends Server<DLong, SncpContext, SncpRequest, SncpResp
final SncpContextConfig contextConfig = new SncpContextConfig();
contextConfig.serverStartTime = this.serverStartTime;
contextConfig.logger = this.logger;
contextConfig.executor = this.executor;
contextConfig.executor = this.workExecutor;
contextConfig.sslContext = this.sslContext;
contextConfig.bufferCapacity = this.bufferCapacity;
contextConfig.maxconns = this.maxconns;