This commit is contained in:
wentch
2016-01-13 11:13:23 +08:00
parent 6aefd6be8a
commit d36baa430a
7 changed files with 66 additions and 58 deletions

View File

@@ -358,14 +358,7 @@ public final class ClassFilter<T> {
for (final URL url : urljares) {
Set<String> classes = cache.get(url);
if (classes == null) {
synchronized (cache) {
if (cache.get(url) == null) {
classes = new CopyOnWriteArraySet<>();
cache.put(url, classes);
} else {
classes = cache.get(url);
}
}
classes = new LinkedHashSet<>();
try (JarFile jar = new JarFile(URLDecoder.decode(url.getFile(), "UTF-8"))) {
Enumeration<JarEntry> it = jar.entries();
while (it.hasMoreElements()) {
@@ -381,6 +374,7 @@ public final class ClassFilter<T> {
}
}
}
cache.put(url, classes);
} else {
for (String classname : classes) {
for (final ClassFilter filter : filters) {
@@ -392,14 +386,7 @@ public final class ClassFilter<T> {
for (final URL url : urlfiles) {
Set<String> classes = cache.get(url);
if (classes == null) {
synchronized (cache) {
if (cache.get(url) == null) {
classes = new CopyOnWriteArraySet<>();
cache.put(url, classes);
} else {
classes = cache.get(url);
}
}
classes = new LinkedHashSet<>();
files.clear();
File root = new File(url.getFile());
String rootpath = root.getPath();
@@ -413,6 +400,7 @@ public final class ClassFilter<T> {
if (filter != null) filter.filter(null, classname);
}
}
cache.put(url, classes);
} else {
for (String classname : classes) {
for (final ClassFilter filter : filters) {

View File

@@ -96,10 +96,10 @@ public final class NodeHttpServer extends NodeServer {
list.sort((FilterEntry<? extends Servlet> o1, FilterEntry<? extends Servlet> o2) -> { //必须保证WebSocketServlet优先加载 因为要确保其他的HttpServlet可以注入本地模式的WebSocketNode
boolean ws1 = WebSocketServlet.class.isAssignableFrom(o1.getType());
boolean ws2 = WebSocketServlet.class.isAssignableFrom(o2.getType());
if (ws1 == ws2) return 0;
if (ws1 == ws2) return o1.getType().getName().compareTo(o2.getType().getName());
return ws1 ? -1 : 1;
}
);
});
final List<AbstractMap.SimpleEntry<String, String[]>> ss = sb == null ? null : new ArrayList<>();
for (FilterEntry<? extends Servlet> en : list) {
Class<HttpServlet> clazz = (Class<HttpServlet>) en.getType();
if (Modifier.isAbstract(clazz.getModifiers())) continue;
@@ -122,7 +122,21 @@ public final class NodeHttpServer extends NodeServer {
}
}
this.httpServer.addHttpServlet(servlet, servletConf, mappings);
if (sb != null) sb.append(threadName).append(" Loaded ").append(clazz.getName()).append(" --> ").append(Arrays.toString(mappings)).append(LINE_SEPARATOR);
if (ss != null) ss.add(new AbstractMap.SimpleEntry<>(clazz.getName(), mappings));
}
if (ss != null) {
Collections.sort(ss, (AbstractMap.SimpleEntry<String, String[]> o1, AbstractMap.SimpleEntry<String, String[]> o2) -> o1.getKey().compareTo(o2.getKey()));
int max = 0;
for (AbstractMap.SimpleEntry<String, String[]> as : ss) {
if (as.getKey().length() > max) max = as.getKey().length();
}
for (AbstractMap.SimpleEntry<String, String[]> as : ss) {
sb.append(threadName).append(" Loaded ").append(as.getKey());
for (int i = 0; i < max - as.getKey().length(); i++) {
sb.append(' ');
}
sb.append(" mapping to ").append(Arrays.toString(as.getValue())).append(LINE_SEPARATOR);
}
}
if (sb != null && sb.length() > 0) logger.log(Level.FINE, sb.toString());
}

View File

@@ -18,6 +18,7 @@ import java.lang.annotation.Annotation;
import java.lang.reflect.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.logging.*;
import javax.annotation.*;
@@ -57,7 +58,7 @@ public abstract class NodeServer {
private String sncpGroup = null; //当前Server的SNCP协议的组
private InetSocketAddress sncpAddress; //HttpServer中的sncpAddress 为所属group对应的SncpServer, 为null表示只是单节点没有分布式结构
private InetSocketAddress sncpAddress; //SNCP服务的地址 非SNCP为null
protected Consumer<ServiceWrapper> consumer;
@@ -175,10 +176,10 @@ public abstract class NodeServer {
ts.setAccessible(true);
client = (SncpClient) ts.get(src);
} catch (Exception e) {
//src 不含 MultiRun 方法
throw new RuntimeException(src.getClass().getName() + " not found _sameGroupTransport or _diffGroupTransports at " + field, e);
}
final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress();
if (sncpAddr != null && factory.find(resourceName, DataCacheListener.class) == null) {
if ((src instanceof DataSource) && sncpAddr != null && factory.find(resourceName, DataCacheListener.class) == null) { //只有DataSourceService 才能赋值 DataCacheListener
Service cacheListenerService = Sncp.createLocalService(resourceName, getExecutor(), DataCacheListenerService.class, sncpAddr, sameGroupTransport, diffGroupTransports);
regFactory.register(resourceName, DataCacheListener.class, cacheListenerService);
final NodeSncpServer sncpServer = application.findNodeSncpServer(sncpAddr);
@@ -216,10 +217,10 @@ public abstract class NodeServer {
ts.setAccessible(true);
client = (SncpClient) ts.get(src);
} catch (Exception e) {
//src 不含 MultiRun 方法
throw new RuntimeException(src.getClass().getName() + " not found _sameGroupTransport or _diffGroupTransports at " + field, e);
}
final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress();
CacheSourceService source = Sncp.createLocalService(resourceName, getExecutor(), CacheSourceService.class, sncpAddr, sameGroupTransport, diffGroupTransports);
final CacheSourceService source = Sncp.createLocalService(resourceName, getExecutor(), CacheSourceService.class, sncpAddr, sameGroupTransport, diffGroupTransports);
Type genericType = field.getGenericType();
ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null;
Type valType = pt == null ? null : pt.getActualTypeArguments()[1];
@@ -231,7 +232,7 @@ public abstract class NodeServer {
rf.inject(source, self); //
((Service) source).init(null);
if (sncpAddr != null) {
if ((src instanceof WebSocketNodeService) && sncpAddr != null) { //只有WebSocketNodeService的服务才需要给SNCP服务注入CacheSourceService
NodeSncpServer sncpServer = application.findNodeSncpServer(sncpAddr);
Set<String> gs = application.findSncpGroups(sameGroupTransport, diffGroupTransports);
ServiceWrapper wrapper = new ServiceWrapper(CacheSourceService.class, (Service) source, resourceName, sncpServer.getSncpGroup(), gs, null);
@@ -304,16 +305,19 @@ public abstract class NodeServer {
Collections.sort(swlist);
localServiceWrappers.clear();
localServiceWrappers.addAll(swlist);
final List<String> slist = sb == null ? null : new CopyOnWriteArrayList<>();
localServiceWrappers.parallelStream().forEach(y -> {
long s = System.currentTimeMillis();
y.getService().init(y.getConf());
long e = System.currentTimeMillis() - s;
if (sb != null) {
synchronized (sb) { //parallelStream 必须要锁
sb.append(threadName).append(y.toSimpleString()).append(" loaded and init ").append(e).append(" ms").append(LINE_SEPARATOR);
}
}
if (slist != null) slist.add(new StringBuilder().append(threadName).append(y.toSimpleString()).append(" loaded and init ").append(e).append(" ms").append(LINE_SEPARATOR).toString());
});
Collections.sort(slist);
if (slist != null && sb != null) {
for (String s : slist) {
sb.append(s);
}
}
if (sb != null && sb.length() > 0) logger.log(Level.INFO, sb.toString());
}

View File

@@ -128,7 +128,7 @@ public abstract class Server {
serverChannel.bind(address, backlog);
serverChannel.accept();
final String threadName = "[" + Thread.currentThread().getName() + "] ";
logger.info(threadName + this.getClass().getSimpleName() + "." + protocol + " listen: " + address
logger.info(threadName + this.getClass().getSimpleName() + ("TCP".equalsIgnoreCase(protocol) ? "" : ("." + protocol)) + " listen: " + address
+ ", threads: " + threads + ", bufferCapacity: " + bufferCapacity + ", bufferPoolSize: " + bufferPoolSize + ", responsePoolSize: " + responsePoolSize
+ ", started in " + (System.currentTimeMillis() - context.getServerStartTime()) + " ms");
}
@@ -156,8 +156,8 @@ public abstract class Server {
return new DecimalFormat(sf);
}
public static void loadLib(final Logger logger, final String lib) throws Exception {
if (lib == null || lib.isEmpty()) return;
public static URL[] loadLib(final Logger logger, final String lib) throws Exception {
if (lib == null || lib.isEmpty()) return new URL[0];
final Set<URL> set = new HashSet<>();
for (String s : lib.split(";")) {
if (s.isEmpty()) continue;
@@ -173,7 +173,7 @@ public abstract class Server {
if (f.canRead()) set.add(f.toURI().toURL());
}
}
if (set.isEmpty()) return;
if (set.isEmpty()) return new URL[0];
ClassLoader cl = Thread.currentThread().getContextClassLoader();
if (cl instanceof URLClassLoader) {
URLClassLoader loader = (URLClassLoader) cl;
@@ -186,6 +186,9 @@ public abstract class Server {
} else {
Thread.currentThread().setContextClassLoader(new URLClassLoader(set.toArray(new URL[set.size()]), cl));
}
List<URL> list = new ArrayList<>(set);
Collections.sort(list, (URL o1, URL o2) -> o1.getFile().compareTo(o2.getFile()));
return list.toArray(new URL[list.size()]);
}
}

View File

@@ -37,7 +37,7 @@ public abstract class WebSocketNode {
protected WebSocketNode remoteNode;
//存放所有用户分布在节点上的队列信息,Set<InetSocketAddress> 为 sncpnode 的集合
@Resource(name = "$_webscoket_source")
@Resource(name = "$_websocket_nodes")
protected CacheSource<Serializable, InetSocketAddress> source;
//存放本地节点上所有在线用户的队列信息,Set<String> 为 engineid 的集合

View File

@@ -52,8 +52,8 @@ public final class ServiceWrapper<T extends Service> implements Comparable<Servi
ResourceType rty = service.getClass().getAnnotation(ResourceType.class);
this.resTypes = rty == null ? new Class[]{this.type} : rty.value();
maxNameLength = Math.max(maxNameLength, name.length() + 1);
maxClassNameLength = Math.max(maxClassNameLength, type.getName().length());
maxNameLength = Math.max(maxNameLength, name.length());
maxClassNameLength = Math.max(maxClassNameLength, type.getName().length() + 1);
}
public String toSimpleString() {

View File

@@ -184,7 +184,6 @@ public abstract class Sncp {
if (!java.lang.reflect.Modifier.isPublic(mod)) return serviceClass;
if (java.lang.reflect.Modifier.isAbstract(mod)) return serviceClass;
final List<Method> methods = SncpClient.parseMethod(serviceClass);
final boolean hasMultiRun = methods.stream().filter(x -> x.getAnnotation(MultiRun.class) != null).findAny().isPresent();
final String supDynName = serviceClass.getName().replace('.', '/');
final String clientName = SncpClient.class.getName().replace('.', '/');
final String clientDesc = Type.getDescriptor(SncpClient.class);
@@ -238,26 +237,26 @@ public abstract class Sncp {
}
av0.visitEnd();
}
if (hasMultiRun) {
{
fv = cw.visitField(ACC_PRIVATE, "_convert", convertDesc, null, null);
av0 = fv.visitAnnotation("Ljavax/annotation/Resource;", true);
av0.visitEnd();
fv.visitEnd();
}
{
fv = cw.visitField(ACC_PRIVATE, "_sameGroupTransport", transportDesc, null, null);
fv.visitEnd();
}
{
fv = cw.visitField(ACC_PRIVATE, "_diffGroupTransports", transportsDesc, null, null);
fv.visitEnd();
}
{
fv = cw.visitField(ACC_PRIVATE, "_client", clientDesc, null, null);
fv.visitEnd();
}
{
fv = cw.visitField(ACC_PRIVATE, "_convert", convertDesc, null, null);
av0 = fv.visitAnnotation("Ljavax/annotation/Resource;", true);
av0.visitEnd();
fv.visitEnd();
}
{
fv = cw.visitField(ACC_PRIVATE, "_sameGroupTransport", transportDesc, null, null);
fv.visitEnd();
}
{
fv = cw.visitField(ACC_PRIVATE, "_diffGroupTransports", transportsDesc, null, null);
fv.visitEnd();
}
{
fv = cw.visitField(ACC_PRIVATE, "_client", clientDesc, null, null);
fv.visitEnd();
}
{
fv = cw.visitField(ACC_PRIVATE, "_selfstring", "Ljava/lang/String;", null, null);
fv.visitEnd();