优化NodeServer

This commit is contained in:
redkale
2023-05-01 09:30:17 +08:00
parent 4911aa3f15
commit eb5142a125
14 changed files with 112 additions and 52 deletions

View File

@@ -1119,7 +1119,7 @@ public final class Application {
loadCacheSource(sourceName, false); loadCacheSource(sourceName, false);
} }
this.resourceFactory.inject(clusterAgent); this.resourceFactory.inject(clusterAgent);
clusterAgent.init(this.resourceFactory, clusterAgent.getConfig()); clusterAgent.init(clusterAgent.getConfig());
this.resourceFactory.register(ClusterAgent.class, clusterAgent); this.resourceFactory.register(ClusterAgent.class, clusterAgent);
logger.info("ClusterAgent (type = " + this.clusterAgent.getClass().getSimpleName() + ") init in " + (System.currentTimeMillis() - s) + " ms"); logger.info("ClusterAgent (type = " + this.clusterAgent.getClass().getSimpleName() + ") init in " + (System.currentTimeMillis() - s) + " ms");
} }
@@ -1130,7 +1130,7 @@ public final class Application {
long s = System.currentTimeMillis(); long s = System.currentTimeMillis();
for (MessageAgent agent : this.messageAgents) { for (MessageAgent agent : this.messageAgents) {
this.resourceFactory.inject(agent); this.resourceFactory.inject(agent);
agent.init(this.resourceFactory, agent.getConfig()); agent.init(agent.getConfig());
this.resourceFactory.register(agent.getName(), MessageAgent.class, agent); this.resourceFactory.register(agent.getName(), MessageAgent.class, agent);
this.resourceFactory.register(agent.getName(), HttpMessageClient.class, agent.getHttpMessageClient()); this.resourceFactory.register(agent.getName(), HttpMessageClient.class, agent.getHttpMessageClient());
//this.resourceFactory.register(agent.getName(), SncpMessageClient.class, agent.getSncpMessageClient()); //不需要给开发者使用 //this.resourceFactory.register(agent.getName(), SncpMessageClient.class, agent.getSncpMessageClient()); //不需要给开发者使用

View File

@@ -607,6 +607,12 @@ public final class ClassFilter<T> {
if (classname.startsWith("com.mysql.")) { if (classname.startsWith("com.mysql.")) {
break; break;
} }
if (classname.startsWith("org.junit.")) {
break;
}
if (classname.startsWith("org.openjfx.")) {
break;
}
if (classname.startsWith("org.mariadb.")) { if (classname.startsWith("org.mariadb.")) {
break; break;
} }
@@ -701,11 +707,10 @@ public final class ClassFilter<T> {
return; return;
} }
File[] lfs = root.listFiles(); File[] lfs = root.listFiles();
if (lfs == null) { if (lfs != null) {
throw new RedkaleException("File(" + root + ") cannot listFiles()"); for (File f : lfs) {
} loadClassFiles(exclude, f, files);
for (File f : lfs) { }
loadClassFiles(exclude, f, files);
} }
} }
} }

View File

@@ -41,10 +41,13 @@ public class NodeHttpServer extends NodeServer {
protected final HttpServer httpServer; protected final HttpServer httpServer;
protected ClassFilter<? extends WebSocket> webSocketFilter;
public NodeHttpServer(Application application, AnyValue serconf) { public NodeHttpServer(Application application, AnyValue serconf) {
super(application, createServer(application, serconf)); super(application, createServer(application, serconf));
this.httpServer = (HttpServer) server; this.httpServer = (HttpServer) server;
this.rest = serconf == null ? false : serconf.getAnyValue("rest") != null; this.rest = serconf == null ? false : serconf.getAnyValue("rest") != null;
} }
private static Server createServer(Application application, AnyValue serconf) { private static Server createServer(Application application, AnyValue serconf) {
@@ -79,18 +82,33 @@ public class NodeHttpServer extends NodeServer {
} }
@Override @Override
protected ClassFilter createOtherClassFilter() { protected List<ClassFilter> createOtherClassFilters() {
return createClassFilter(null, RestWebSocket.class, WebSocket.class, null, null, "rest", "websocket"); this.webSocketFilter = createClassFilter(null, RestWebSocket.class, WebSocket.class, null, null, "rest", "websocket");
List<ClassFilter> filters = super.createOtherClassFilters();
if (filters == null) {
filters = new ArrayList<>();
}
filters.add(webSocketFilter);
return filters;
} }
@Override @Override
protected void loadService(ClassFilter<? extends Service> serviceFilter, ClassFilter otherFilter) throws Exception { protected void loadOthers(List<ClassFilter> otherFilters) throws Exception {
super.loadService(serviceFilter, otherFilter); List<ClassFilter> filters = otherFilters;
if (filters != null) {
filters.remove(this.webSocketFilter); //webSocketFilter会在loadHttpFilter中处理先剔除
}
super.loadOthers(filters);
}
@Override
protected void loadService(ClassFilter<? extends Service> serviceFilter) throws Exception {
super.loadService(serviceFilter);
initWebSocketService(); initWebSocketService();
} }
@Override @Override
protected void loadFilter(ClassFilter<? extends Filter> filterFilter, ClassFilter otherFilter) throws Exception { protected void loadFilter(ClassFilter<? extends Filter> filterFilter) throws Exception {
if (httpServer != null) { if (httpServer != null) {
loadHttpFilter(filterFilter); loadHttpFilter(filterFilter);
} }
@@ -98,9 +116,9 @@ public class NodeHttpServer extends NodeServer {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected void loadServlet(ClassFilter<? extends Servlet> servletFilter, ClassFilter otherFilter) throws Exception { protected void loadServlet(ClassFilter<? extends Servlet> servletFilter) throws Exception {
if (httpServer != null) { if (httpServer != null) {
loadHttpServlet(servletFilter, otherFilter); loadHttpServlet(servletFilter);
} }
} }
@@ -192,7 +210,7 @@ public class NodeHttpServer extends NodeServer {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected void loadHttpServlet(final ClassFilter<? extends Servlet> servletFilter, ClassFilter<? extends WebSocket> webSocketFilter) throws Exception { protected void loadHttpServlet(final ClassFilter<? extends Servlet> servletFilter) throws Exception {
RedkaleClassLoader.putReflectionPublicClasses(HttpServlet.class.getName()); RedkaleClassLoader.putReflectionPublicClasses(HttpServlet.class.getName());
RedkaleClassLoader.putReflectionPublicClasses(HttpDispatcherServlet.class.getName()); RedkaleClassLoader.putReflectionPublicClasses(HttpDispatcherServlet.class.getName());
RedkaleClassLoader.putReflectionDeclaredConstructors(HttpResourceServlet.class, HttpResourceServlet.class.getName()); RedkaleClassLoader.putReflectionDeclaredConstructors(HttpResourceServlet.class, HttpResourceServlet.class.getName());
@@ -258,6 +276,7 @@ public class NodeHttpServer extends NodeServer {
for (AnyValue restConf : serverConf.getAnyValues("rest")) { for (AnyValue restConf : serverConf.getAnyValues("rest")) {
loadRestServlet(webSocketFilter, restConf, restedObjects, restedLock, sb, rests, webss); loadRestServlet(webSocketFilter, restConf, restedObjects, restedLock, sb, rests, webss);
} }
this.webSocketFilter = null;
} }
int max = 0; int max = 0;
if (ss != null && sb != null) { if (ss != null && sb != null) {

View File

@@ -197,15 +197,29 @@ public abstract class NodeServer {
} }
ClassFilter<Filter> filterFilter = createFilterClassFilter(); ClassFilter<Filter> filterFilter = createFilterClassFilter();
ClassFilter<Servlet> servletFilter = createServletClassFilter(); ClassFilter<Servlet> servletFilter = createServletClassFilter();
ClassFilter otherFilter = createOtherClassFilter(); List<ClassFilter> otherFilters = createOtherClassFilters();
List<ClassFilter> filters = new ArrayList<>();
if (serviceFilter != null) {
filters.add(serviceFilter);
}
if (filterFilter != null) {
filters.add(filterFilter);
}
if (servletFilter != null) {
filters.add(servletFilter);
}
if (otherFilters != null) {
filters.addAll(otherFilters);
}
long s = System.currentTimeMillis(); long s = System.currentTimeMillis();
ClassFilter.Loader.load(application.getHome(), serverClassLoader, ((application.excludelibs != null ? (application.excludelibs + ";") : "") + serverConf.getValue("excludelibs", "")).split(";"), serviceFilter, filterFilter, servletFilter, otherFilter); ClassFilter.Loader.load(application.getHome(), serverClassLoader, ((application.excludelibs != null ? (application.excludelibs + ";") : "") + serverConf.getValue("excludelibs", "")).split(";"), filters.toArray(new ClassFilter[filters.size()]));
long e = System.currentTimeMillis() - s; long e = System.currentTimeMillis() - s;
logger.info(this.getClass().getSimpleName() + " load filter class in " + e + " ms"); logger.info(this.getClass().getSimpleName() + " load filter class in " + e + " ms");
loadService(serviceFilter, otherFilter); //必须在servlet之前 loadService(serviceFilter); //必须在servlet之前
loadOthers(otherFilters);
if (!application.isSingletonMode()) { //非singleton模式下才加载Filter、Servlet if (!application.isSingletonMode()) { //非singleton模式下才加载Filter、Servlet
loadFilter(filterFilter, otherFilter); loadFilter(filterFilter);
loadServlet(servletFilter, otherFilter); loadServlet(servletFilter);
postLoadServlets(); postLoadServlets();
} }
if (this.interceptor != null) { if (this.interceptor != null) {
@@ -213,15 +227,17 @@ public abstract class NodeServer {
} }
} }
protected abstract void loadFilter(ClassFilter<? extends Filter> filterFilter, ClassFilter otherFilter) throws Exception; protected void loadOthers(List<ClassFilter> otherFilters) throws Exception {
}
protected abstract void loadServlet(ClassFilter<? extends Servlet> servletFilter, ClassFilter otherFilter) throws Exception; protected abstract void loadFilter(ClassFilter<? extends Filter> filterFilter) throws Exception;
protected abstract void loadServlet(ClassFilter<? extends Servlet> servletFilter) throws Exception;
private void initResource() { private void initResource() {
final NodeServer self = this; final NodeServer self = this;
//--------------------------------------------------------------------------------------------- //---------------------------------------------------------------------------------------------
final ResourceFactory appResFactory = application.getResourceFactory(); final ResourceFactory appResFactory = application.getResourceFactory();
final String confURI = appResFactory.find(RESNAME_APP_CONF_DIR, String.class);
//------------------------------------- 注册 Resource -------------------------------------------------------- //------------------------------------- 注册 Resource --------------------------------------------------------
resourceFactory.register((ResourceFactory rf, String srcResourceName, final Object srcObj, String resourceName, Field field, final Object attachment) -> { resourceFactory.register((ResourceFactory rf, String srcResourceName, final Object srcObj, String resourceName, Field field, final Object attachment) -> {
try { try {
@@ -408,7 +424,7 @@ public abstract class NodeServer {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected void loadService(ClassFilter<? extends Service> serviceFilter, ClassFilter otherFilter) throws Exception { protected void loadService(ClassFilter<? extends Service> serviceFilter) throws Exception {
if (serviceFilter == null) { if (serviceFilter == null) {
return; return;
} }
@@ -446,11 +462,11 @@ public abstract class NodeServer {
if (!entry.isEmptyGroup() && !entry.isRemote() && rpcGroups.containsGroup(entry.getGroup())) { if (!entry.isEmptyGroup() && !entry.isRemote() && rpcGroups.containsGroup(entry.getGroup())) {
throw new RedkaleException("Not found group(" + entry.getGroup() + ")"); throw new RedkaleException("Not found group(" + entry.getGroup() + ")");
} }
Service oldother = resourceFactory.find(entry.getName(), serviceImplClass); Service oldOther = resourceFactory.find(entry.getName(), serviceImplClass);
if (oldother != null) { //Server加载Service时需要判断是否已经加载过了。 if (oldOther != null) { //Server加载Service时需要判断是否已在其他协议服务中加载
if (!Sncp.isRemote(oldother)) { if (!Sncp.isRemote(oldOther)) {
if (!Sncp.isComponent(oldother)) { if (!Sncp.isComponent(oldOther)) {
servletServices.add(oldother); servletServices.add(oldOther);
} }
} }
continue; continue;
@@ -705,7 +721,7 @@ public abstract class NodeServer {
protected abstract ClassFilter<Servlet> createServletClassFilter(); protected abstract ClassFilter<Servlet> createServletClassFilter();
protected ClassFilter createOtherClassFilter() { protected List<ClassFilter> createOtherClassFilters() {
return null; return null;
} }

View File

@@ -102,7 +102,7 @@ public class NodeSncpServer extends NodeServer {
} }
@Override @Override
protected void loadFilter(ClassFilter<? extends Filter> filterFilter, ClassFilter otherFilter) throws Exception { protected void loadFilter(ClassFilter<? extends Filter> filterFilter) throws Exception {
if (sncpServer != null) { if (sncpServer != null) {
loadSncpFilter(this.serverConf.getAnyValue("fliters"), filterFilter); loadSncpFilter(this.serverConf.getAnyValue("fliters"), filterFilter);
} }
@@ -132,7 +132,7 @@ public class NodeSncpServer extends NodeServer {
} }
@Override @Override
protected void loadServlet(ClassFilter<? extends Servlet> servletFilter, ClassFilter otherFilter) throws Exception { protected void loadServlet(ClassFilter<? extends Servlet> servletFilter) throws Exception {
RedkaleClassLoader.putReflectionPublicClasses(SncpServlet.class.getName()); RedkaleClassLoader.putReflectionPublicClasses(SncpServlet.class.getName());
if (!application.isSingletonMode()) { if (!application.isSingletonMode()) {
this.servletServices.stream() this.servletServices.stream()

View File

@@ -6,8 +6,9 @@
package org.redkale.boot; package org.redkale.boot;
import java.lang.annotation.Annotation; import java.lang.annotation.Annotation;
import java.util.List;
import org.redkale.net.*; import org.redkale.net.*;
import org.redkale.net.http.*; import org.redkale.net.http.WebServlet;
import org.redkale.service.Service; import org.redkale.service.Service;
import org.redkale.util.AnyValue; import org.redkale.util.AnyValue;
import org.redkale.watch.*; import org.redkale.watch.*;
@@ -42,8 +43,8 @@ public class NodeWatchServer extends NodeHttpServer {
} }
@Override @Override
protected ClassFilter createOtherClassFilter() { protected List<ClassFilter> createOtherClassFilters() {
return null; return null; //不调用 super.createOtherClassFilters()
} }
@Override @Override

View File

@@ -51,8 +51,8 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
protected final ConcurrentHashMap<String, Set<InetSocketAddress>> mqtpAddressMap = new ConcurrentHashMap<>(); protected final ConcurrentHashMap<String, Set<InetSocketAddress>> mqtpAddressMap = new ConcurrentHashMap<>();
@Override @Override
public void init(ResourceFactory factory, AnyValue config) { public void init(AnyValue config) {
super.init(factory, config); super.init(config);
this.sourceName = getSourceName(); this.sourceName = getSourceName();
this.ttls = config.getIntValue("ttls", 10); this.ttls = config.getIntValue("ttls", 10);
if (this.ttls < 5) { if (this.ttls < 5) {

View File

@@ -67,7 +67,7 @@ public abstract class ClusterAgent {
protected final ConcurrentHashMap<String, ClusterEntry> remoteEntrys = new ConcurrentHashMap<>(); protected final ConcurrentHashMap<String, ClusterEntry> remoteEntrys = new ConcurrentHashMap<>();
public void init(ResourceFactory factory, AnyValue config) { public void init(AnyValue config) {
this.config = config; this.config = config;
this.name = config.getValue("name", ""); this.name = config.getValue("name", "");
this.waits = config.getBoolValue("waits", false); this.waits = config.getBoolValue("waits", false);

View File

@@ -36,6 +36,9 @@ public abstract class MessageAgent implements Resourcable {
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
@Resource(required = false)
protected Application application;
@Resource(name = RESNAME_APP_NODEID) @Resource(name = RESNAME_APP_NODEID)
protected int nodeid; protected int nodeid;
@@ -72,7 +75,7 @@ public abstract class MessageAgent implements Resourcable {
//本地Service消息接收处理器 key:consumerid //本地Service消息接收处理器 key:consumerid
protected HashMap<String, MessageClientConsumerNode> clientConsumerNodes = new LinkedHashMap<>(); protected HashMap<String, MessageClientConsumerNode> clientConsumerNodes = new LinkedHashMap<>();
public void init(ResourceFactory factory, AnyValue config) { public void init(AnyValue config) {
this.name = checkName(config.getValue("name", "")); this.name = checkName(config.getValue("name", ""));
this.httpMessageClient = new HttpMessageClient(this); this.httpMessageClient = new HttpMessageClient(this);
this.sncpMessageClient = new SncpMessageClient(this); this.sncpMessageClient = new SncpMessageClient(this);
@@ -83,8 +86,8 @@ public abstract class MessageAgent implements Resourcable {
Class<MessageCoder<MessageRecord>> coderClass = (Class) Thread.currentThread().getContextClassLoader().loadClass(coderType); Class<MessageCoder<MessageRecord>> coderClass = (Class) Thread.currentThread().getContextClassLoader().loadClass(coderType);
RedkaleClassLoader.putReflectionPublicConstructors(coderClass, coderClass.getName()); RedkaleClassLoader.putReflectionPublicConstructors(coderClass, coderClass.getName());
MessageCoder<MessageRecord> coder = coderClass.getConstructor().newInstance(); MessageCoder<MessageRecord> coder = coderClass.getConstructor().newInstance();
if (factory != null) { if (application != null) {
factory.inject(coder); application.getResourceFactory().inject(coder);
} }
if (coder instanceof Service) { if (coder instanceof Service) {
((Service) coder).init(config); ((Service) coder).init(config);

View File

@@ -6,6 +6,7 @@ package org.redkale.mq;
import static java.lang.annotation.ElementType.TYPE; import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME; import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.*; import java.lang.annotation.*;
import org.redkale.convert.ConvertType;
/** /**
* MQ资源注解 * MQ资源注解
@@ -27,4 +28,6 @@ public @interface MessageConsumer {
String group() default ""; String group() default "";
String[] topics(); String[] topics();
ConvertType convertType() default ConvertType.JSON;
} }

View File

@@ -4,7 +4,8 @@
package org.redkale.mq; package org.redkale.mq;
import org.redkale.annotation.Component; import org.redkale.annotation.Component;
import org.redkale.service.*; import org.redkale.service.Local;
import org.redkale.util.AnyValue;
/** /**
* MQ资源注解 * MQ资源注解
@@ -18,7 +19,13 @@ import org.redkale.service.*;
*/ */
@Local @Local
@Component @Component
public interface MessageConsumerListener<T> extends Service { public interface MessageConsumerListener<T> {
default void init(AnyValue config) {
}
public void onMessage(String topic, T message); public void onMessage(String topic, T message);
default void destroy(AnyValue config) {
}
} }

View File

@@ -5,9 +5,9 @@
*/ */
package org.redkale.mq; package org.redkale.mq;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.*; import java.lang.annotation.*;
import static java.lang.annotation.ElementType.*; import static java.lang.annotation.ElementType.*;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
/** /**
* 多消费组,需要同 &#64;RestService 一起使用 * 多消费组,需要同 &#64;RestService 一起使用
@@ -41,13 +41,14 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
* </pre></blockquote> * </pre></blockquote>
* *
* <p> * <p>
* 注: 标记 &#64;MessageMultiConsumer 的Service的&#64;RestMapping方法都只能是void返回类型 * 注: 标记 &#64;MessageMultiConsumer 的Service的&#64;RestMapping方法都只能是void返回类型 <br>
* * 由 MessageConsumerListener 代替
* <p> * <p>
* 详情见: https://redkale.org * 详情见: https://redkale.org
* *
* *
* @author zhangjx * @author zhangjx
* @deprecated
* *
* @since 2.1.0 * @since 2.1.0
*/ */
@@ -55,6 +56,7 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
@Documented @Documented
@Target({TYPE}) @Target({TYPE})
@Retention(RUNTIME) @Retention(RUNTIME)
@Deprecated(since = "2.8.0")
public @interface MessageMultiConsumer { public @interface MessageMultiConsumer {
String module(); String module();

View File

@@ -6,6 +6,7 @@ package org.redkale.mq;
import static java.lang.annotation.ElementType.FIELD; import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.RetentionPolicy.RUNTIME; import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.*; import java.lang.annotation.*;
import org.redkale.convert.ConvertType;
/** /**
* MQ资源注解, 只能标记在MessageProducerSender类型字段上 * MQ资源注解, 只能标记在MessageProducerSender类型字段上
@@ -23,4 +24,7 @@ import java.lang.annotation.*;
public @interface MessageProducer { public @interface MessageProducer {
String mq(); String mq();
ConvertType convertType() default ConvertType.JSON;
} }

View File

@@ -19,13 +19,13 @@ import org.redkale.convert.Convert;
*/ */
public interface MessageProducerSender { public interface MessageProducerSender {
public CompletableFuture<Void> send(String topic, Object value); public CompletableFuture<Void> sendMessage(String topic, Object value);
default CompletableFuture<Void> send(String topic, Convert convert, Object value) { default CompletableFuture<Void> sendMessage(String topic, Convert convert, Object value) {
return send(topic, convert.convertToBytes(value)); return sendMessage(topic, convert.convertToBytes(value));
} }
default CompletableFuture<Void> send(String topic, Convert convert, Type type, Object value) { default CompletableFuture<Void> sendMessage(String topic, Convert convert, Type type, Object value) {
return send(topic, convert.convertToBytes(type, value)); return sendMessage(topic, convert.convertToBytes(type, value));
} }
} }