This commit is contained in:
wentch
2016-01-27 14:08:32 +08:00
parent f1da45322e
commit f34623a5c9
5 changed files with 31 additions and 22 deletions

View File

@@ -73,8 +73,6 @@ public final class Application {
final Map<String, String> globalGroupProtocols = new HashMap<>();
final Map<String, Transport> transports = new HashMap<>();
final InetAddress localAddress;
final List<CacheSource> cacheSources = new CopyOnWriteArrayList<>();

View File

@@ -75,7 +75,7 @@ public final class NodeHttpServer extends NodeServer {
synchronized (regFactory) {
Service nodeService = (Service) rf.find(resourceName, WebSocketNode.class);
if (nodeService == null) {
nodeService = Sncp.createLocalService(resourceName, getExecutor(), WebSocketNodeService.class, (InetSocketAddress) null, (Transport) null, (Collection<Transport>) null);
nodeService = Sncp.createLocalService(resourceName, getExecutor(), application.getResourceFactory(), WebSocketNodeService.class, (InetSocketAddress) null, (Transport) null, (Collection<Transport>) null);
regFactory.register(resourceName, WebSocketNode.class, nodeService);
factory.inject(nodeService, self);
logger.fine("[" + Thread.currentThread().getName() + "] Load Service " + nodeService);

View File

@@ -161,14 +161,14 @@ public abstract class NodeServer {
private void initResource() {
final NodeServer self = this;
//---------------------------------------------------------------------------------------------
final ResourceFactory regFactory = application.getResourceFactory();
final ResourceFactory appResFactory = application.getResourceFactory();
factory.add(DataSource.class, (ResourceFactory rf, final Object src, String resourceName, Field field, final Object attachment) -> {
try {
if (field.getAnnotation(Resource.class) == null) return;
if ((src instanceof Service) && Sncp.isRemote((Service) src)) return; //远程模式不得注入 DataSource
DataSource source = new DataDefaultSource(resourceName);
application.dataSources.add(source);
regFactory.register(resourceName, DataSource.class, source);
appResFactory.register(resourceName, DataSource.class, source);
SncpClient client = null;
Transport sameGroupTransport = null;
@@ -190,8 +190,8 @@ public abstract class NodeServer {
}
final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress();
if ((src instanceof DataSource) && sncpAddr != null && factory.find(resourceName, DataCacheListener.class) == null) { //只有DataSourceService 才能赋值 DataCacheListener
Service cacheListenerService = Sncp.createLocalService(resourceName, getExecutor(), DataCacheListenerService.class, sncpAddr, sameGroupTransport, diffGroupTransports);
regFactory.register(resourceName, DataCacheListener.class, cacheListenerService);
Service cacheListenerService = Sncp.createLocalService(resourceName, getExecutor(), appResFactory, DataCacheListenerService.class, sncpAddr, sameGroupTransport, diffGroupTransports);
appResFactory.register(resourceName, DataCacheListener.class, cacheListenerService);
final NodeSncpServer sncpServer = application.findNodeSncpServer(sncpAddr);
Set<String> gs = application.findSncpGroups(sameGroupTransport, diffGroupTransports);
ServiceWrapper wrapper = new ServiceWrapper(DataCacheListenerService.class, cacheListenerService, resourceName, sncpServer.getSncpGroup(), gs, null);
@@ -231,14 +231,14 @@ public abstract class NodeServer {
throw new RuntimeException(src.getClass().getName() + " not found _sameGroupTransport or _diffGroupTransports at " + field, e);
}
final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress();
final CacheSourceService source = Sncp.createLocalService(resourceName, getExecutor(), CacheSourceService.class, sncpAddr, sameGroupTransport, diffGroupTransports);
final CacheSourceService source = Sncp.createLocalService(resourceName, getExecutor(), appResFactory, CacheSourceService.class, sncpAddr, sameGroupTransport, diffGroupTransports);
Type genericType = field.getGenericType();
ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null;
Type valType = pt == null ? null : pt.getActualTypeArguments()[1];
source.setStoreType(pt == null ? Serializable.class : (Class) pt.getActualTypeArguments()[0], valType instanceof Class ? (Class) valType : Object.class);
if (field.getAnnotation(Transient.class) != null) source.setNeedStore(false); //必须在setStoreType之后
application.cacheSources.add(source);
regFactory.register(resourceName, CacheSource.class, source);
appResFactory.register(resourceName, CacheSource.class, source);
field.set(src, source);
rf.inject(source, self); //
((Service) source).init(null);
@@ -280,7 +280,7 @@ public abstract class NodeServer {
Service service;
if (localed) { //本地模式
service = Sncp.createLocalService(entry.getName(), getExecutor(), type, this.sncpAddress, loadTransport(this.sncpGroup), loadTransports(groups));
service = Sncp.createLocalService(entry.getName(), getExecutor(), application.getResourceFactory(), type, this.sncpAddress, loadTransport(this.sncpGroup), loadTransports(groups));
} else {
service = Sncp.createRemoteService(entry.getName(), getExecutor(), type, this.sncpAddress, loadTransport(groups));
}
@@ -355,7 +355,7 @@ public abstract class NodeServer {
flag = true;
}
final String groupid = sb.toString();
Transport transport = application.transports.get(groupid);
Transport transport = application.resourceFactory.find(groupid, Transport.class);
if (transport != null) return transport;
final List<Transport> transports = new ArrayList<>();
for (String group : groups) {
@@ -370,11 +370,11 @@ public abstract class NodeServer {
Transport first = transports.get(0);
Transport newTransport = new Transport(groupid, application.findGroupProtocol(first.getName()), application.getWatchFactory(),
application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs);
synchronized (application.transports) {
transport = application.transports.get(groupid);
synchronized (application.resourceFactory) {
transport = application.resourceFactory.find(groupid, Transport.class);
if (transport == null) {
transport = newTransport;
application.transports.put(groupid, transport);
application.resourceFactory.register(groupid, transport);
}
}
return transport;
@@ -383,8 +383,8 @@ public abstract class NodeServer {
protected Transport loadTransport(final String group) {
if (group == null) return null;
Transport transport;
synchronized (application.transports) {
transport = application.transports.get(group);
synchronized (application.resourceFactory) {
transport = application.resourceFactory.find(group, Transport.class);
if (transport != null) {
if (this.sncpAddress != null && !this.sncpAddress.equals(transport.getClientAddress())) {
throw new RuntimeException(transport + "repeat create on newClientAddress = " + this.sncpAddress + ", oldClientAddress = " + transport.getClientAddress());
@@ -395,7 +395,7 @@ public abstract class NodeServer {
if (addrs == null) throw new RuntimeException("Not found <group> = " + group + " on <resources> ");
transport = new Transport(group, application.findGroupProtocol(group), application.getWatchFactory(),
application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs);
application.transports.put(group, transport);
application.resourceFactory.register(group, transport);
}
return transport;
}

View File

@@ -83,7 +83,7 @@ public final class Transport {
if (first == null) first = t;
tmpgroup.add(t.name);
}
Collections.sort(tmpgroup); //按字母排列顺序
Collections.sort(tmpgroup); //必须按字母排列顺序确保相同内容的transport列表组合的name相同而不会因为list的顺序不同产生不同的name
boolean flag = false;
StringBuilder sb = new StringBuilder();
for (String g : tmpgroup) {

View File

@@ -673,12 +673,12 @@ public abstract class Sncp {
* @param executor 线程池
* @param serviceClass Service类
* @param clientAddress 本地IP地址
* @param sameGroupTransport 同组的通信组件
* @param sameGroupTransport 同组的通信组件
* @param diffGroupTransports 异组的通信组件列表
* @return Service的本地模式实例
*/
@SuppressWarnings("unchecked")
public static <T extends Service> T createLocalService(final String name, final Consumer<Runnable> executor,
public static <T extends Service> T createLocalService(final String name, final Consumer<Runnable> executor, final ResourceFactory resourceFactory,
final Class<T> serviceClass, final InetSocketAddress clientAddress, final Transport sameGroupTransport, final Collection<Transport> diffGroupTransports) {
try {
final Class newClazz = createLocalServiceClass(name, serviceClass);
@@ -699,7 +699,18 @@ public abstract class Sncp {
List<Transport> list = new ArrayList<>();
if (sameGroupTransport != null) list.add(sameGroupTransport);
if (diffGroupTransports != null) list.addAll(diffGroupTransports);
if (!list.isEmpty()) remoteTransport = new Transport(list);
if (!list.isEmpty()) {
Transport tmp = new Transport(list);
synchronized (resourceFactory) {
Transport old = resourceFactory.find(tmp.getName(), Transport.class);
if (old != null) {
remoteTransport = old;
} else {
remoteTransport = tmp;
resourceFactory.register(tmp.getName(), tmp);
}
}
}
}
if (remoteService == null && remoteTransport != null) {
remoteService = createRemoteService(name, executor, serviceClass, clientAddress, remoteTransport);
@@ -825,7 +836,7 @@ public abstract class Sncp {
* @param serviceClass Service类
* @param clientAddress 本地IP地址
* @param transport 通信组件
*
*
* @return Service的远程模式实例
*/
@SuppressWarnings("unchecked")