This commit is contained in:
wentch
2015-12-29 11:00:25 +08:00
parent 867a0a2e98
commit 3fbb5d37b3
4 changed files with 16 additions and 7 deletions

View File

@@ -77,7 +77,7 @@ public final class NodeHttpServer extends NodeServer {
getSncpAddress(), sncpDefaultGroups, sncpSameGroupTransports, sncpDiffGroupTransports); getSncpAddress(), sncpDefaultGroups, sncpSameGroupTransports, sncpDiffGroupTransports);
regFactory.register(resourceName, WebSocketNode.class, nodeService); regFactory.register(resourceName, WebSocketNode.class, nodeService);
factory.inject(nodeService, self); factory.inject(nodeService, self);
logger.fine("[" + Thread.currentThread().getName() + "] Load " + nodeService); logger.fine("[" + Thread.currentThread().getName() + "] Load Service " + nodeService);
if (getSncpAddress() != null) { if (getSncpAddress() != null) {
NodeSncpServer sncpServer = null; NodeSncpServer sncpServer = null;
for (NodeServer node : application.servers) { for (NodeServer node : application.servers) {
@@ -101,7 +101,15 @@ public final class NodeHttpServer extends NodeServer {
final StringBuilder sb = logger.isLoggable(Level.FINE) ? new StringBuilder() : null; final StringBuilder sb = logger.isLoggable(Level.FINE) ? new StringBuilder() : null;
final String prefix = conf == null ? "" : conf.getValue("prefix", ""); final String prefix = conf == null ? "" : conf.getValue("prefix", "");
final String threadName = "[" + Thread.currentThread().getName() + "] "; final String threadName = "[" + Thread.currentThread().getName() + "] ";
for (FilterEntry<? extends Servlet> en : filter.getFilterEntrys()) { List<FilterEntry<? extends Servlet>> list = new ArrayList(filter.getFilterEntrys());
list.sort((FilterEntry<? extends Servlet> o1, FilterEntry<? extends Servlet> o2) -> { //必须保证WebSocketServlet优先加载 因为要确保其他的HttpServlet可以注入本地模式的WebSocketNode
boolean ws1 = WebSocketServlet.class.isAssignableFrom(o1.getType());
boolean ws2 = WebSocketServlet.class.isAssignableFrom(o2.getType());
if (ws1 == ws2) return 0;
return ws1 ? -1 : 1;
}
);
for (FilterEntry<? extends Servlet> en : list) {
Class<HttpServlet> clazz = (Class<HttpServlet>) en.getType(); Class<HttpServlet> clazz = (Class<HttpServlet>) en.getType();
if (Modifier.isAbstract(clazz.getModifiers())) continue; if (Modifier.isAbstract(clazz.getModifiers())) continue;
WebServlet ws = clazz.getAnnotation(WebServlet.class); WebServlet ws = clazz.getAnnotation(WebServlet.class);

View File

@@ -227,7 +227,7 @@ public abstract class NodeServer {
ServiceWrapper wrapper = new ServiceWrapper(CacheSourceService.class, (Service) source, resourceName, getSncpGroup(), sncpDefaultGroups, null); ServiceWrapper wrapper = new ServiceWrapper(CacheSourceService.class, (Service) source, resourceName, getSncpGroup(), sncpDefaultGroups, null);
sncpServer.getSncpServer().addService(wrapper); sncpServer.getSncpServer().addService(wrapper);
} }
logger.fine("[" + Thread.currentThread().getName() + "] Load " + source); logger.fine("[" + Thread.currentThread().getName() + "] Load Source " + source);
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, "DataSource inject error", e); logger.log(Level.SEVERE, "DataSource inject error", e);
} }
@@ -319,7 +319,7 @@ public abstract class NodeServer {
} }
Service service = Sncp.createLocalService(entry.getName(), getExecutor(), type, this.sncpAddress, groups, sameGroupTransports, diffGroupTransports); Service service = Sncp.createLocalService(entry.getName(), getExecutor(), type, this.sncpAddress, groups, sameGroupTransports, diffGroupTransports);
wrapper = new ServiceWrapper(type, service, this.sncpGroup, entry); wrapper = new ServiceWrapper(type, service, this.sncpGroup, entry);
if (fine) logger.fine("[" + Thread.currentThread().getName() + "] " + service + " loaded"); if (fine) logger.fine("[" + Thread.currentThread().getName() + "] Load Service " + service);
} else { } else {
sameGroupAddrs.remove(this.sncpAddress); sameGroupAddrs.remove(this.sncpAddress);
StringBuilder g = new StringBuilder(); StringBuilder g = new StringBuilder();
@@ -331,7 +331,7 @@ public abstract class NodeServer {
if (sameGroupAddrs.isEmpty()) throw new RuntimeException(type.getName() + " has no remote address on group (" + groups + ")"); if (sameGroupAddrs.isEmpty()) throw new RuntimeException(type.getName() + " has no remote address on group (" + groups + ")");
Service service = Sncp.createRemoteService(entry.getName(), getExecutor(), type, this.sncpAddress, groups, loadTransport(g.toString(), server.getProtocol(), sameGroupAddrs)); Service service = Sncp.createRemoteService(entry.getName(), getExecutor(), type, this.sncpAddress, groups, loadTransport(g.toString(), server.getProtocol(), sameGroupAddrs));
wrapper = new ServiceWrapper(type, service, "", entry); wrapper = new ServiceWrapper(type, service, "", entry);
if (fine) logger.fine("[" + Thread.currentThread().getName() + "] " + service + " loaded"); if (fine) logger.fine("[" + Thread.currentThread().getName() + "] Load Service " + service);
} }
if (factory.find(wrapper.getName(), wrapper.getType()) == null) { if (factory.find(wrapper.getName(), wrapper.getType()) == null) {
regFactory.register(wrapper.getName(), wrapper.getType(), wrapper.getService()); regFactory.register(wrapper.getName(), wrapper.getType(), wrapper.getService());
@@ -367,7 +367,7 @@ public abstract class NodeServer {
remoteServiceWrappers.forEach(y -> { remoteServiceWrappers.forEach(y -> {
factory.inject(y.getService(), NodeServer.this); factory.inject(y.getService(), NodeServer.this);
if (sb != null) { if (sb != null) {
sb.append(threadName).append("RemoteService(").append(y.getType()).append(':').append(y.getName()).append(") loaded").append(LINE_SEPARATOR); sb.append(threadName).append("RemoteService(").append(y.getType()).append(':').append(y.getName()).append(") injected").append(LINE_SEPARATOR);
} }
}); });
//----------------- init ----------------- //----------------- init -----------------

View File

@@ -54,6 +54,7 @@ public abstract class WebSocketNode {
}); });
}); });
} }
protected abstract List<String> getOnlineRemoteAddresses(@DynTargetAddress InetSocketAddress targetAddress, Serializable groupid); protected abstract List<String> getOnlineRemoteAddresses(@DynTargetAddress InetSocketAddress targetAddress, Serializable groupid);
protected abstract int sendMessage(@DynTargetAddress InetSocketAddress targetAddress, Serializable groupid, boolean recent, Serializable message, boolean last); protected abstract int sendMessage(@DynTargetAddress InetSocketAddress targetAddress, Serializable groupid, boolean recent, Serializable message, boolean last);

View File

@@ -32,7 +32,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
@Override @Override
public List<String> getOnlineRemoteAddresses(@DynTargetAddress InetSocketAddress targetAddress, Serializable groupid) { public List<String> getOnlineRemoteAddresses(@DynTargetAddress InetSocketAddress targetAddress, Serializable groupid) {
if (localSncpAddress != null && !localSncpAddress.equals(targetAddress)) return null; if (localSncpAddress == null || !localSncpAddress.equals(targetAddress)) return ((WebSocketNodeService) remoteNode).getOnlineRemoteAddresses(targetAddress, groupid);
final Set<String> engineids = localNodes.get(groupid); final Set<String> engineids = localNodes.get(groupid);
if (engineids == null || engineids.isEmpty()) return null; if (engineids == null || engineids.isEmpty()) return null;
final List<String> rs = new ArrayList<>(); final List<String> rs = new ArrayList<>();