This commit is contained in:
地平线
2015-08-31 12:40:21 +08:00
parent 32834fe086
commit ea69085b57
4 changed files with 108 additions and 72 deletions

View File

@@ -8,7 +8,6 @@ package com.wentch.redkale.boot;
import com.wentch.redkale.convert.bson.*;
import com.wentch.redkale.convert.json.*;
import com.wentch.redkale.net.*;
import com.wentch.redkale.net.http.*;
import com.wentch.redkale.net.sncp.*;
import com.wentch.redkale.service.*;
import com.wentch.redkale.source.*;
@@ -65,10 +64,6 @@ public final class Application {
//当前Service所属的组 类型: Set<String>、String[]
public static final String RESNAME_SNCP_GROUPS = Sncp.RESNAME_SNCP_GROUPS; // SNCP_GROUPS
protected final ResourceFactory factory = ResourceFactory.root();
protected final WatchFactory watch = WatchFactory.root();
protected final Map<InetSocketAddress, String> globalNodes = new HashMap<>();
private final Map<String, Set<InetSocketAddress>> globalGroups = new HashMap<>();
@@ -81,7 +76,13 @@ public final class Application {
protected final List<NodeServer> servers = new CopyOnWriteArrayList<>();
//--------------------------------------------------------------------------------------------
protected CountDownLatch servicecdl; //会出现两次赋值
//--------------------------------------------------------------------------------------------
private final ResourceFactory factory = ResourceFactory.root();
private final WatchFactory watch = WatchFactory.root();
private File home;
private final Logger logger;
@@ -176,10 +177,22 @@ public final class Application {
this.serversLatch = new CountDownLatch(config.getAnyValues("server").length + 1);
}
public ResourceFactory getResourceFactory() {
return factory;
}
public WatchFactory getWatchFactory() {
return watch;
}
public File getHome() {
return home;
}
public long getStartTime() {
return startTime;
}
private void initLogging() {
}
@@ -376,7 +389,7 @@ public final class Application {
@SuppressWarnings("unchecked")
private void runServers(CountDownLatch timecd, final List<AnyValue> serconfs) throws Exception {
CountDownLatch servicecdl = new CountDownLatch(serconfs.size());
this.servicecdl = new CountDownLatch(serconfs.size());
CountDownLatch sercdl = new CountDownLatch(serconfs.size());
for (final AnyValue serconf : serconfs) {
Thread thread = new Thread() {
@@ -391,18 +404,12 @@ public final class Application {
try {
//Thread ctd = Thread.currentThread();
//ctd.setContextClassLoader(new URLClassLoader(new URL[0], ctd.getContextClassLoader()));
String protocol = serconf.getValue("protocol", "");
String subprotocol = Sncp.DEFAULT_PROTOCOL;
int pos = protocol.indexOf('.');
if (pos > 0) {
subprotocol = protocol.substring(pos + 1);
protocol = protocol.substring(0, pos);
}
final String protocol = serconf.getValue("protocol", "").replaceFirst("\\..+", "");
NodeServer server = null;
if ("SNCP".equalsIgnoreCase(protocol)) {
server = new NodeSncpServer(Application.this, servicecdl, new SncpServer(startTime, subprotocol, watch));
server = new NodeSncpServer(Application.this, serconf);
} else if ("HTTP".equalsIgnoreCase(protocol)) {
server = new NodeHttpServer(Application.this, servicecdl, new HttpServer(startTime, watch));
server = new NodeHttpServer(Application.this, serconf);
}
if (server == null) {
logger.log(Level.SEVERE, "Not found Server Class for protocol({0})", serconf.getValue("protocol"));
@@ -434,7 +441,8 @@ public final class Application {
: Sncp.createLocalService("", serviceClass, null, new LinkedHashSet<>(), null, null);
application.init();
application.factory.register(service);
final NodeServer server = new NodeHttpServer(application, new CountDownLatch(1), null);
application.servicecdl = new CountDownLatch(1);
final NodeServer server = new NodeHttpServer(application, null);
server.init(application.config);
server.factory.inject(service);
return service;
@@ -444,7 +452,6 @@ public final class Application {
final String home = new File(System.getProperty(RESNAME_APP_HOME, "")).getCanonicalPath();
System.setProperty(RESNAME_APP_HOME, home);
File appfile = new File(home, "conf/application.xml");
//System.setProperty(DataConnection.PERSIST_FILEPATH, appfile.getCanonicalPath());
return new Application(load(new FileInputStream(appfile)));
}

View File

@@ -10,6 +10,7 @@ import com.wentch.redkale.net.http.HttpServer;
import com.wentch.redkale.net.http.HttpServlet;
import com.wentch.redkale.util.AnyValue;
import com.wentch.redkale.boot.ClassFilter.FilterEntry;
import com.wentch.redkale.net.*;
import com.wentch.redkale.net.http.*;
import com.wentch.redkale.net.sncp.*;
import com.wentch.redkale.service.*;
@@ -17,7 +18,6 @@ import com.wentch.redkale.util.*;
import java.lang.reflect.*;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.logging.*;
import javax.annotation.*;
@@ -28,35 +28,40 @@ import javax.annotation.*;
*/
public final class NodeHttpServer extends NodeServer {
private final HttpServer server;
private final HttpServer httpServer;
public NodeHttpServer(Application application, CountDownLatch servicecdl, HttpServer server) {
super(application, application.factory.createChild(), servicecdl, server);
this.server = server;
public NodeHttpServer(Application application, AnyValue serconf) {
super(application, application.getResourceFactory().createChild(), createServer(application, serconf));
this.httpServer = (HttpServer) server;
}
private static Server createServer(Application application, AnyValue serconf) {
return new HttpServer(application.getStartTime(), application.getWatchFactory());
}
@Override
public InetSocketAddress getSocketAddress() {
return server == null ? null : server.getSocketAddress();
return httpServer == null ? null : httpServer.getSocketAddress();
}
@Override
public void prepare() throws Exception {
ClassFilter<HttpServlet> httpFilter = createClassFilter(null, WebServlet.class, HttpServlet.class, null, "servlets", "servlet");
ClassFilter<Service> serviceFilter = createServiceClassFilter();
long s = System.currentTimeMillis();
ClassFilter.Loader.load(application.getHome(), serviceFilter, httpFilter);
long e = System.currentTimeMillis() - s;
logger.info(this.getClass().getSimpleName() + " load filter class in " + e + " ms");
loadService(serviceFilter); //必须在servlet之前
protected ClassFilter<Servlet> createServletClassFilter() {
return createClassFilter(null, WebServlet.class, HttpServlet.class, null, "servlets", "servlet");
}
@Override
protected void loadServlet(ClassFilter<? extends Servlet> servletFilter) throws Exception {
if (httpServer != null) loadHttpServlet(this.nodeConf.getAnyValue("servlets"), servletFilter);
}
@Override
protected void loadService(ClassFilter serviceFilter) throws Exception {
super.loadService(serviceFilter);
initWebSocketService();
if (server != null) loadHttpServlet(this.nodeConf.getAnyValue("servlets"), httpFilter);
}
private void initWebSocketService() {
final boolean fine = logger.isLoggable(Level.FINE);
final ResourceFactory regFactory = application.factory;
final ResourceFactory regFactory = application.getResourceFactory();
factory.add(WebSocketNode.class, (ResourceFactory rf, final Object src, Field field) -> {
try {
Resource rs = field.getAnnotation(Resource.class);
@@ -91,12 +96,12 @@ public final class NodeHttpServer extends NodeServer {
});
}
protected void loadHttpServlet(final AnyValue conf, final ClassFilter<HttpServlet> filter) throws Exception {
protected void loadHttpServlet(final AnyValue conf, final ClassFilter<? extends Servlet> filter) throws Exception {
final StringBuilder sb = logger.isLoggable(Level.FINE) ? new StringBuilder() : null;
final String prefix = conf == null ? "" : conf.getValue("prefix", "");
final String threadName = "[" + Thread.currentThread().getName() + "] ";
for (FilterEntry<HttpServlet> en : filter.getFilterEntrys()) {
Class<HttpServlet> clazz = en.getType();
for (FilterEntry<? extends Servlet> en : filter.getFilterEntrys()) {
Class<HttpServlet> clazz = (Class<HttpServlet>) en.getType();
if (Modifier.isAbstract(clazz.getModifiers())) continue;
WebServlet ws = clazz.getAnnotation(WebServlet.class);
if (ws == null || ws.value().length == 0) continue;
@@ -108,7 +113,7 @@ public final class NodeHttpServer extends NodeServer {
mappings[i] = prefix + mappings[i];
}
}
this.server.addHttpServlet(servlet, en.getProperty(), mappings);
this.httpServer.addHttpServlet(servlet, en.getProperty(), mappings);
if (sb != null) sb.append(threadName).append(" Loaded ").append(clazz.getName()).append(" --> ").append(Arrays.toString(mappings)).append(LINE_SEPARATOR);
}
if (sb != null && sb.length() > 0) logger.log(Level.FINE, sb.toString());

View File

@@ -23,7 +23,6 @@ import java.lang.annotation.Annotation;
import java.lang.reflect.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.logging.*;
import javax.annotation.*;
@@ -44,9 +43,7 @@ public abstract class NodeServer {
protected final ResourceFactory factory;
private final CountDownLatch servicecdl;
private final Server server;
protected final Server server;
private String sncpGroup = null; //当前Server的SNCP协议的组
@@ -68,17 +65,14 @@ public abstract class NodeServer {
protected final Set<ServiceWrapper> remoteServiceWrappers = new LinkedHashSet<>();
public NodeServer(Application application, ResourceFactory factory, CountDownLatch servicecdl, Server server) {
public NodeServer(Application application, ResourceFactory factory, Server server) {
this.application = application;
this.factory = factory;
this.servicecdl = servicecdl;
this.server = server;
this.logger = Logger.getLogger(this.getClass().getSimpleName());
this.fine = logger.isLoggable(Level.FINE);
}
protected abstract void prepare() throws Exception;
public void init(AnyValue config) throws Exception {
this.nodeConf = config == null ? AnyValue.create() : config;
if (isSNCP()) { // SNCP协议
@@ -103,12 +97,26 @@ public abstract class NodeServer {
if (server != null) server.init(config);
}
initResource();
prepare();
//prepare();
ClassFilter<Servlet> servletFilter = createServletClassFilter();
ClassFilter<Service> serviceFilter = createServiceClassFilter();
long s = System.currentTimeMillis();
if (servletFilter == null) {
ClassFilter.Loader.load(application.getHome(), serviceFilter);
} else {
ClassFilter.Loader.load(application.getHome(), serviceFilter, servletFilter);
}
long e = System.currentTimeMillis() - s;
logger.info(this.getClass().getSimpleName() + " load filter class in " + e + " ms");
loadService(serviceFilter); //必须在servlet之前
loadServlet(servletFilter);
}
protected abstract void loadServlet(ClassFilter<? extends Servlet> servletFilter) throws Exception;
private void initResource() {
//---------------------------------------------------------------------------------------------
final ResourceFactory regFactory = application.factory;
final ResourceFactory regFactory = application.getResourceFactory();
factory.add(DataSource.class, (ResourceFactory rf, final Object src, Field field) -> {
try {
Resource rs = field.getAnnotation(Resource.class);
@@ -183,7 +191,7 @@ public abstract class NodeServer {
if (serviceFilter == null) return;
final String threadName = "[" + Thread.currentThread().getName() + "] ";
final Set<FilterEntry<Service>> entrys = serviceFilter.getFilterEntrys();
ResourceFactory regFactory = isSNCP() ? application.factory : factory;
ResourceFactory regFactory = isSNCP() ? application.getResourceFactory() : factory;
for (FilterEntry<Service> entry : entrys) { //service实现类
final Class<? extends Service> type = entry.getType();
if (type.isInterface()) continue;
@@ -254,8 +262,8 @@ public abstract class NodeServer {
throw new RuntimeException(ServiceWrapper.class.getSimpleName() + "(class:" + type.getName() + ", name:" + entry.getName() + ", group:" + groups + ") is repeat.");
}
}
servicecdl.countDown();
servicecdl.await();
application.servicecdl.countDown();
application.servicecdl.await();
final StringBuilder sb = logger.isLoggable(Level.INFO) ? new StringBuilder() : null;
//---------------- inject ----------------
@@ -295,7 +303,7 @@ public abstract class NodeServer {
}
}
if (transport == null) {
transport = new Transport(group + "_" + application.transports.size(), protocol, application.watch, 32, addrs);
transport = new Transport(group + "_" + application.transports.size(), protocol, application.getWatchFactory(), 32, addrs);
application.transports.add(transport);
}
}
@@ -303,6 +311,8 @@ public abstract class NodeServer {
return transport;
}
protected abstract ClassFilter<Servlet> createServletClassFilter();
protected ClassFilter<Service> createServiceClassFilter() {
return createClassFilter(this.sncpGroup, null, Service.class, Annotation.class, "services", "service");
}

View File

@@ -5,10 +5,10 @@
*/
package com.wentch.redkale.boot;
import com.wentch.redkale.net.*;
import com.wentch.redkale.net.sncp.*;
import com.wentch.redkale.service.Service;
import com.wentch.redkale.util.*;
import java.net.*;
import java.util.concurrent.CountDownLatch;
import java.util.logging.*;
/**
@@ -17,32 +17,37 @@ import java.util.logging.*;
*/
public final class NodeSncpServer extends NodeServer {
private final SncpServer server;
private final SncpServer sncpServer;
public NodeSncpServer(Application application, CountDownLatch regcdl, SncpServer server) {
super(application, application.factory.createChild(), regcdl, server);
this.server = server;
this.consumer = server == null ? null : x -> server.addService(x);
public NodeSncpServer(Application application, AnyValue serconf) {
super(application, application.getResourceFactory().createChild(), createServer(application, serconf));
this.sncpServer = (SncpServer) this.server;
this.consumer = sncpServer == null ? null : x -> sncpServer.addService(x);
}
private static Server createServer(Application application, AnyValue serconf) {
String proto = serconf.getValue("protocol", "");
String subprotocol = Sncp.DEFAULT_PROTOCOL;
int pos = proto.indexOf('.');
if (pos > 0) {
subprotocol = proto.substring(pos + 1);
}
return new SncpServer(application.getStartTime(), subprotocol, application.getWatchFactory());
}
@Override
public InetSocketAddress getSocketAddress() {
return server == null ? null : server.getSocketAddress();
return sncpServer == null ? null : sncpServer.getSocketAddress();
}
@Override
public void prepare() throws Exception {
ClassFilter<Service> serviceFilter = createServiceClassFilter();
long s = System.currentTimeMillis();
ClassFilter.Loader.load(application.getHome(), serviceFilter);
long e = System.currentTimeMillis() - s;
logger.info(this.getClass().getSimpleName() + " load filter class in " + e + " ms");
loadService(serviceFilter); //必须在servlet之前
public void init(AnyValue config) throws Exception {
super.init(config);
//-------------------------------------------------------------------
if (server == null) return; //调试时server才可能为null
if (sncpServer == null) return; //调试时server才可能为null
final StringBuilder sb = logger.isLoggable(Level.FINE) ? new StringBuilder() : null;
final String threadName = "[" + Thread.currentThread().getName() + "] ";
for (SncpServlet en : server.getSncpServlets()) {
for (SncpServlet en : sncpServer.getSncpServlets()) {
if (sb != null) sb.append(threadName).append(" Loaded ").append(en).append(LINE_SEPARATOR);
}
if (sb != null && sb.length() > 0) logger.log(Level.FINE, sb.toString());
@@ -54,6 +59,15 @@ public final class NodeSncpServer extends NodeServer {
}
public SncpServer getSncpServer() {
return server;
return sncpServer;
}
@Override
protected void loadServlet(ClassFilter<? extends Servlet> servletFilter) throws Exception {
}
@Override
protected ClassFilter<Servlet> createServletClassFilter() {
return null;
}
}