NodeServer优化

This commit is contained in:
redkale
2023-04-20 17:58:09 +08:00
parent 698269f8b0
commit b82a7d86e5
6 changed files with 104 additions and 58 deletions

View File

@@ -391,8 +391,8 @@ public class NodeHttpServer extends NodeServer {
}
final ClassFilter restFilter = ClassFilter.create(serverClassLoader, null, application.isCompileMode() ? "" : restConf.getValue("includes", ""), application.isCompileMode() ? "" : restConf.getValue("excludes", ""), includeValues, excludeValues);
final CountDownLatch scdl = new CountDownLatch(super.interceptorServices.size());
Stream<Service> stream = super.interceptorServices.stream();
final CountDownLatch scdl = new CountDownLatch(super.servletServices.size());
Stream<Service> stream = super.servletServices.stream();
if (!application.isCompileMode()) {
stream = stream.parallel(); //不能并行否则在maven plugin运行环境下ClassLoader不对
}

View File

@@ -13,7 +13,6 @@ import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.logging.*;
import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.Command;
@@ -62,9 +61,6 @@ public abstract class NodeServer {
protected final Thread serverThread;
//加载Service时的处理函数
protected BiConsumer<MessageAgent, Service> consumer;
//server节点的配置
protected AnyValue serverConf;
@@ -73,15 +69,15 @@ public abstract class NodeServer {
//加载server节点后的拦截器
protected NodeInterceptor interceptor;
//供interceptor使用的Service对象集合
protected final Set<Service> interceptorServices = new LinkedHashSet<>();
//本地模式的Service对象集合
protected final Set<Service> localServices = new LinkedHashSet<>();
//远程模式的Service对象集合
protected final Set<Service> remoteServices = new LinkedHashSet<>();
//需要转换成Servlet的Service对象集合, Component的Service不在其内
protected final Set<Service> servletServices = new LinkedHashSet<>();
//存在SncpServlet、RestServlet
protected final Map<Service, Servlet> dynServletMap = new LinkedHashMap<>();
@@ -297,7 +293,7 @@ public abstract class NodeServer {
}
//ResourceFactory resfactory = (isSNCP() ? appResFactory : resourceFactory);
Service service = Modifier.isFinal(resServiceType.getModifiers()) || resServiceType.getAnnotation(Component.class) != null
Service service = Modifier.isFinal(resServiceType.getModifiers()) || Sncp.isComponent(resServiceType)
? (Service) resServiceType.getConstructor().newInstance()
: Sncp.createLocalService(serverClassLoader, resourceName, resServiceType, appResFactory, application.getSncpRpcGroups(), sncpClient, null, null, null);
appResFactory.register(resourceName, resServiceType, service);
@@ -393,9 +389,8 @@ public abstract class NodeServer {
} else {
rf.inject(resourceName, nodeService); //动态加载的Service也存在按需加载的注入资源
localServices.add(nodeService);
interceptorServices.add(nodeService);
if (consumer != null) {
consumer.accept(null, nodeService);
if (!Sncp.isComponent(nodeService)) {
servletServices.add(nodeService);
}
}
return nodeService;
@@ -454,15 +449,16 @@ public abstract class NodeServer {
Service oldother = resourceFactory.find(entry.getName(), serviceImplClass);
if (oldother != null) { //Server加载Service时需要判断是否已经加载过了。
if (!Sncp.isRemote(oldother)) {
interceptorServices.add(oldother);
if (!Sncp.isComponent(oldother)) {
servletServices.add(oldother);
}
}
continue;
}
boolean isLocalGroup0 = rpcGroups.isLocalGroup(this.sncpGroup, this.sncpAddress, entry);
final String group = isLocalGroup0 ? null : entry.getGroup();
final boolean localMode = serviceImplClass.getAnnotation(Local.class) != null || isLocalGroup0;//本地模式
if ((localMode || serviceImplClass.getAnnotation(Component.class) != null)
&& (serviceImplClass.isInterface() || Modifier.isAbstract(serviceImplClass.getModifiers()))) {
if ((localMode || Sncp.isComponent(serviceImplClass)) && Utility.isAbstractOrInterface(serviceImplClass)) {
continue; //本地模式或Component不能实例化接口和抽象类的Service类
}
@@ -479,8 +475,7 @@ public abstract class NodeServer {
MessageAgent agent = getMessageAgent(entry.getProperty());
Service service;
final boolean ws = srcObj instanceof WebSocketServlet;
final boolean component = serviceImplClass.getAnnotation(Component.class) != null;
if (component) { //Component
if (Sncp.isComponent(serviceImplClass)) { //Component
RedkaleClassLoader.putReflectionPublicConstructors(serviceImplClass, serviceImplClass.getName());
if (!acceptsComponent(serviceImplClass)) {
return null;
@@ -507,11 +502,8 @@ public abstract class NodeServer {
rf.inject(resourceName, service); //动态加载的Service也存在按需加载的注入资源
}
localServices.add(service);
if (!component) {
interceptorServices.add(service);
}
if (consumer != null) {
consumer.accept(agent, service);
if (!Sncp.isComponent(service)) {
servletServices.add(service);
}
}
serviceCount.incrementAndGet();
@@ -593,18 +585,18 @@ public abstract class NodeServer {
long s = System.currentTimeMillis();
y.init(Sncp.getResourceConf(y));
long e = System.currentTimeMillis() - s;
String serstr = Sncp.toSimpleString(y, maxNameLength, maxTypeLength);
if (slist != null) {
String serstr = Sncp.toSimpleString(y, maxNameLength, maxTypeLength);
slist.add(new StringBuilder().append(serstr).append(" load and init in ").append(e < 10 ? " " : (e < 100 ? " " : "")).append(e).append(" ms").append(LINE_SEPARATOR).toString());
}
});
localServices.stream().forEach(y -> {
if (y.getClass().getAnnotation(Component.class) != null) {
if (Sncp.isComponent(y)) {
long s = System.currentTimeMillis();
interceptComponent(y);
boolean rs = interceptComponent(y);
long e = System.currentTimeMillis() - s;
String serstr = Sncp.toSimpleString(y, maxNameLength, maxTypeLength);
if (slist != null) {
if (rs && slist != null) {
String serstr = Sncp.toSimpleString(y, maxNameLength, maxTypeLength);
slist.add(new StringBuilder().append(serstr).append(" component-start in ").append(e < 10 ? " " : (e < 100 ? " " : "")).append(e).append(" ms").append(LINE_SEPARATOR).toString());
}
}
@@ -615,7 +607,7 @@ public abstract class NodeServer {
for (String s : wlist) {
sb.append(s);
}
sb.append("All " + localServices.size() + " Services load in ").append(System.currentTimeMillis() - starts).append(" ms");
sb.append("All ").append(localServices.size()).append(" Services load in ").append(System.currentTimeMillis() - starts).append(" ms");
}
if (sb != null && preinite > 10) {
sb.append(ClusterAgent.class.getSimpleName()).append(" register in ").append(preinite).append(" ms" + LINE_SEPARATOR);
@@ -626,7 +618,8 @@ public abstract class NodeServer {
}
private void calcMaxLength(Service y) { //计算toString中的长度
maxNameLength = Math.max(maxNameLength, Sncp.getResourceName(y).length());
String n = Sncp.getResourceName(y);
maxNameLength = Math.max(maxNameLength, n == null ? 1 : n.length()); //#
maxTypeLength = Math.max(maxTypeLength, Sncp.getResourceType(y).getName().length() + 1);
}
@@ -644,12 +637,14 @@ public abstract class NodeServer {
return true;
}
protected void interceptComponent(Service service) {
protected boolean interceptComponent(Service service) {
if (service instanceof MessageConsumerListener) {
MessageConsumer mqConsumer = service.getClass().getAnnotation(MessageConsumer.class);
MessageAgent mqAgent = application.getMessageAgent(mqConsumer.mq());
mqAgent.addConsumerListener((MessageConsumerListener) service);
return true;
}
return false;
}
protected MessageAgent getMessageAgent(AnyValue serviceConf) {
@@ -954,8 +949,8 @@ public abstract class NodeServer {
return (T) server;
}
public Set<Service> getInterceptorServices() {
return new LinkedHashSet<>(interceptorServices);
public Set<Service> getServletServices() {
return new LinkedHashSet<>(servletServices);
}
public Set<Service> getLocalServices() {

View File

@@ -5,7 +5,6 @@
*/
package org.redkale.boot;
import java.lang.reflect.Modifier;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.logging.Level;
@@ -13,7 +12,7 @@ import org.redkale.boot.ClassFilter.FilterEntry;
import org.redkale.mq.MessageAgent;
import org.redkale.net.*;
import org.redkale.net.sncp.*;
import org.redkale.service.*;
import org.redkale.service.Local;
import org.redkale.util.AnyValue.DefaultAnyValue;
import org.redkale.util.*;
@@ -33,16 +32,6 @@ public class NodeSncpServer extends NodeServer {
private NodeSncpServer(Application application, AnyValue serconf) {
super(application, createServer(application, serconf));
this.sncpServer = (SncpServer) this.server;
this.consumer = sncpServer == null || application.isSingletonMode() ? null : (agent, x) -> {//singleton模式下不生成SncpServlet
if (x.getClass().getAnnotation(Local.class) != null) {
return; //本地模式的Service不生成SncpServlet
}
SncpServlet servlet = sncpServer.addSncpServlet(x);
dynServletMap.put(x, servlet);
if (agent != null) {
agent.putService(this, x, servlet);
}
};
}
public static NodeServer createNodeServer(Application application, AnyValue serconf) {
@@ -58,12 +47,6 @@ public class NodeSncpServer extends NodeServer {
return sncpServer == null ? null : sncpServer.getSocketAddress();
}
public void consumerAccept(MessageAgent messageAgent, Service service) {
if (this.consumer != null) {
this.consumer.accept(messageAgent, service);
}
}
@Override
public void init(AnyValue config) throws Exception {
super.init(config);
@@ -131,7 +114,7 @@ public class NodeSncpServer extends NodeServer {
List<FilterEntry<? extends Filter>> list = new ArrayList(classFilter.getFilterEntrys());
for (FilterEntry<? extends Filter> en : list) {
Class<SncpFilter> clazz = (Class<SncpFilter>) en.getType();
if (Modifier.isAbstract(clazz.getModifiers())) {
if (Utility.isAbstractOrInterface(clazz)) {
continue;
}
RedkaleClassLoader.putReflectionDeclaredConstructors(clazz, clazz.getName());
@@ -151,6 +134,19 @@ public class NodeSncpServer extends NodeServer {
@Override
protected void loadServlet(ClassFilter<? extends Servlet> servletFilter, ClassFilter otherFilter) throws Exception {
RedkaleClassLoader.putReflectionPublicClasses(SncpServlet.class.getName());
if (!application.isSingletonMode()) {
this.servletServices.stream()
.filter(x -> x.getClass().getAnnotation(Local.class) == null) //Local模式的Service不生成SncpServlet
.forEach(x -> {
SncpServlet servlet = sncpServer.addSncpServlet(x);
dynServletMap.put(x, servlet);
String mq = Sncp.getResourceMQ(x);
if (mq != null) {
MessageAgent agent = application.getMessageAgent(mq);
agent.putService(this, x, servlet);
}
});
}
}
@Override

View File

@@ -5,7 +5,6 @@
*/
package org.redkale.boot;
import java.lang.reflect.Modifier;
import org.redkale.annotation.Bean;
import org.redkale.boot.ClassFilter.FilterEntry;
import org.redkale.convert.Decodeable;
@@ -13,6 +12,7 @@ import org.redkale.convert.bson.BsonFactory;
import org.redkale.convert.json.*;
import org.redkale.persistence.Entity;
import org.redkale.source.*;
import org.redkale.util.Utility;
/**
* 执行一次Application.run提前获取所有动态类
@@ -51,7 +51,7 @@ public class PrepareCompiler {
for (FilterEntry en : entityFilter.getFilterEntrys()) {
Class clz = en.getType();
if (clz.isInterface() || Modifier.isAbstract(clz.getModifiers())) {
if (Utility.isAbstractOrInterface(clz)) {
continue;
}
try {
@@ -70,7 +70,7 @@ public class PrepareCompiler {
}
for (FilterEntry en : entityFilter2.getFilterEntrys()) {
Class clz = en.getType();
if (clz.isInterface() || Modifier.isAbstract(clz.getModifiers())) {
if (Utility.isAbstractOrInterface(clz)) {
continue;
}
try {
@@ -89,7 +89,7 @@ public class PrepareCompiler {
}
for (FilterEntry en : beanFilter.getFilterEntrys()) {
Class clz = en.getType();
if (clz.isInterface() || Modifier.isAbstract(clz.getModifiers())) {
if (Utility.isAbstractOrInterface(clz)) {
continue;
}
try {
@@ -107,7 +107,7 @@ public class PrepareCompiler {
}
for (FilterEntry en : beanFilter2.getFilterEntrys()) {
Class clz = en.getType();
if (clz.isInterface() || Modifier.isAbstract(clz.getModifiers())) {
if (Utility.isAbstractOrInterface(clz)) {
continue;
}
try {
@@ -125,7 +125,7 @@ public class PrepareCompiler {
}
for (FilterEntry en : filterFilter.getFilterEntrys()) {
Class clz = en.getType();
if (clz.isInterface() || Modifier.isAbstract(clz.getModifiers())) {
if (Utility.isAbstractOrInterface(clz)) {
continue;
}
try {

View File

@@ -220,6 +220,14 @@ public abstract class Sncp {
return serviceType.getAnnotation(SncpDyn.class) != null;
}
public static boolean isComponent(Service service) {
return service.getClass().getAnnotation(Component.class) != null;
}
public static boolean isComponent(Class serviceType) {
return serviceType.getAnnotation(Component.class) != null;
}
public static int getVersion(Service service) {
return -1; //预留功能,暂不实现
}
@@ -262,6 +270,19 @@ public abstract class Sncp {
}
}
public static String getResourceMQ(Service service) {
if (service == null || !isSncpDyn(service)) {
return null;
}
try {
Field ts = service.getClass().getDeclaredField(FIELDPREFIX + "_mq");
ts.setAccessible(true);
return (String) ts.get(service);
} catch (Exception e) {
throw new SncpException(service + " not found " + FIELDPREFIX + "_mq");
}
}
static void checkAsyncModifier(Class param, Method method) {
if (param == CompletionHandler.class) {
return;
@@ -307,6 +328,9 @@ public abstract class Sncp {
int len;
Class type = getResourceType(service);
String name = getResourceName(service);
if(name==null) {
name = "#";
}
sb.append("(type= ").append(type.getName());
len = maxTypeLength - type.getName().length();
for (int i = 0; i < len; i++) {
@@ -485,6 +509,10 @@ public abstract class Sncp {
fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_conf", anyValueDesc, null, null);
fv.visitEnd();
}
{
fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_mq", Type.getDescriptor(String.class), null, null);
fv.visitEnd();
}
{ //构造函数
mv = new MethodDebugVisitor(cw.visitMethod(ACC_PUBLIC, "<init>", "()V", null, null));
//mv.setDebug(true);
@@ -507,6 +535,9 @@ public abstract class Sncp {
try {
Field c = newClazz.getDeclaredField(FIELDPREFIX + "_conf");
RedkaleClassLoader.putReflectionField(newDynName.replace('/', '.'), c);
c = newClazz.getDeclaredField(FIELDPREFIX + "_mq");
RedkaleClassLoader.putReflectionField(newDynName.replace('/', '.'), c);
} catch (Exception e) {
}
return (Class<T>) newClazz;
@@ -580,6 +611,11 @@ public abstract class Sncp {
c.setAccessible(true);
c.set(service, conf);
}
{
Field c = newClazz.getDeclaredField(FIELDPREFIX + "_mq");
c.setAccessible(true);
c.set(service, agent == null ? null : agent.getName());
}
return service;
} catch (RuntimeException rex) {
throw rex;
@@ -700,6 +736,11 @@ public abstract class Sncp {
c.setAccessible(true);
c.set(service, conf);
}
{
Field c = newClazz.getDeclaredField(FIELDPREFIX + "_mq");
c.setAccessible(true);
c.set(service, agent == null ? null : agent.getName());
}
{
Field c = newClazz.getDeclaredField(FIELDPREFIX + "_sncp");
c.setAccessible(true);
@@ -745,6 +786,10 @@ public abstract class Sncp {
fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_conf", anyValueDesc, null, null);
fv.visitEnd();
}
{
fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_mq", Type.getDescriptor(String.class), null, null);
fv.visitEnd();
}
{
fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_sncp", sncpInfoDesc, null, null);
fv.visitEnd();
@@ -890,6 +935,12 @@ public abstract class Sncp {
c.set(service, conf);
RedkaleClassLoader.putReflectionField(newDynName.replace('/', '.'), c);
}
{
Field c = newClazz.getDeclaredField(FIELDPREFIX + "_mq");
c.setAccessible(true);
c.set(service, agent == null ? null : agent.getName());
RedkaleClassLoader.putReflectionField(newDynName.replace('/', '.'), c);
}
{
Field c = newClazz.getDeclaredField(FIELDPREFIX + "_sncp");
c.setAccessible(true);

View File

@@ -257,6 +257,10 @@ public final class Utility {
}
}
public static boolean isAbstractOrInterface(Class clazz) {
return clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers());
}
/**
* @param value from which next positive power of two will be found.
*