This commit is contained in:
Redkale
2017-05-27 12:31:58 +08:00
parent 174a8a2a0c
commit 82a2a513f5
3 changed files with 67 additions and 52 deletions

View File

@@ -21,6 +21,7 @@ import static org.redkale.boot.Application.*;
import org.redkale.boot.ClassFilter.FilterEntry;
import org.redkale.net.Filter;
import org.redkale.net.*;
import org.redkale.net.http.WebSocketServlet;
import org.redkale.net.sncp.*;
import org.redkale.service.*;
import org.redkale.source.*;
@@ -267,44 +268,51 @@ public abstract class NodeServer {
}, DataSource.class);
//------------------------------------- 注册CacheSource --------------------------------------------------------
resourceFactory.register((ResourceFactory rf, final Object src, final String resourceName, Field field, final Object attachment) -> {
try {
if (field.getAnnotation(Resource.class) == null) return;
if ((src instanceof Service) && Sncp.isRemote((Service) src)) return; //远程模式不需要注入 CacheSource
final Service srcService = (Service) src;
SncpClient client = Sncp.getSncpClient(srcService);
Transport sameGroupTransport = Sncp.getSameGroupTransport(srcService);
Transport[] dts = Sncp.getDiffGroupTransports((Service) src);
List<Transport> diffGroupTransports = dts == null ? new ArrayList<>() : Arrays.asList(dts);
final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress();
final AnyValue sourceConf = cacheResource.get(resourceName);
final Class sourceType = sourceConf == null ? CacheMemorySource.class : Class.forName(sourceConf.getValue("type"));
@SuppressWarnings("unchecked")
final CacheSource source = (CacheSource) Sncp.createLocalService(resourceName, getExecutor(), appResFactory, (Class<? extends Service>) sourceType, sncpAddr, Sncp.getSncpGroup(srcService), Sncp.getGroups(srcService), Sncp.getConf(srcService), sameGroupTransport, diffGroupTransports);
Type genericType = field.getGenericType();
ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null;
Type valType = pt == null ? null : pt.getActualTypeArguments()[1];
if (sourceType == CacheMemorySource.class) {
CacheMemorySource memorySource = (CacheMemorySource) source;
memorySource.setStoreType(pt == null ? Serializable.class : (Class) pt.getActualTypeArguments()[0], valType instanceof Class ? (Class) valType : Object.class);
if (field.getAnnotation(Transient.class) != null) memorySource.setNeedStore(false); //必须在setStoreType之后
}
application.cacheSources.add(source);
appResFactory.register(resourceName, genericType, source);
appResFactory.register(resourceName, CacheSource.class, source);
field.set(src, source);
rf.inject(source, self); //
if (source instanceof Service) ((Service) source).init(sourceConf);
resourceFactory.register(new ResourceFactory.ResourceLoader() {
public void load(ResourceFactory rf, final Object src, final String resourceName, Field field, final Object attachment) {
try {
if (field.getAnnotation(Resource.class) == null) return;
if ((src instanceof Service) && Sncp.isRemote((Service) src)) return; //远程模式不需要注入 CacheSource
final Service srcService = (Service) src;
SncpClient client = Sncp.getSncpClient(srcService);
Transport sameGroupTransport = Sncp.getSameGroupTransport(srcService);
Transport[] dts = Sncp.getDiffGroupTransports((Service) src);
List<Transport> diffGroupTransports = dts == null ? new ArrayList<>() : Arrays.asList(dts);
final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress();
final AnyValue sourceConf = cacheResource.get(resourceName);
final Class sourceType = sourceConf == null ? CacheMemorySource.class : Class.forName(sourceConf.getValue("type"));
@SuppressWarnings("unchecked")
final CacheSource source = (CacheSource) Sncp.createLocalService(resourceName, getExecutor(), appResFactory, (Class<? extends Service>) sourceType,
sncpAddr, Sncp.getSncpGroup(srcService), Sncp.getGroups(srcService), Sncp.getConf(srcService), sameGroupTransport, diffGroupTransports);
Type genericType = field.getGenericType();
ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null;
Type valType = pt == null ? null : pt.getActualTypeArguments()[1];
if (sourceType == CacheMemorySource.class) {
CacheMemorySource memorySource = (CacheMemorySource) source;
memorySource.setStoreType(pt == null ? Serializable.class : (Class) pt.getActualTypeArguments()[0], valType instanceof Class ? (Class) valType : Object.class);
if (field.getAnnotation(Transient.class) != null) memorySource.setNeedStore(false); //必须在setStoreType之后
}
application.cacheSources.add(source);
appResFactory.register(resourceName, genericType, source);
appResFactory.register(resourceName, CacheSource.class, source);
field.set(src, source);
rf.inject(source, self); //
if (source instanceof Service) ((Service) source).init(sourceConf);
if ((src instanceof WebSocketNodeService) && sncpAddr != null) { //只有WebSocketNodeService的服务才需要给SNCP服务注入CacheMemorySource
NodeSncpServer sncpServer = application.findNodeSncpServer(sncpAddr);
Set<String> gs = application.findSncpGroups(sameGroupTransport, diffGroupTransports);
sncpServer.getSncpServer().addSncpServlet((Service) source);
logger.info("[" + Thread.currentThread().getName() + "] Load Service " + source);
if ((src instanceof WebSocketNodeService) && sncpAddr != null) { //只有WebSocketNodeService的服务才需要给SNCP服务注入CacheMemorySource
NodeSncpServer sncpServer = application.findNodeSncpServer(sncpAddr);
Set<String> gs = application.findSncpGroups(sameGroupTransport, diffGroupTransports);
sncpServer.getSncpServer().addSncpServlet((Service) source);
//logger.info("[" + Thread.currentThread().getName() + "] Load Service " + source);
}
logger.info("[" + Thread.currentThread().getName() + "] Load Source " + source);
} catch (Exception e) {
logger.log(Level.SEVERE, "DataSource inject error", e);
}
logger.info("[" + Thread.currentThread().getName() + "] Load Source " + source);
} catch (Exception e) {
logger.log(Level.SEVERE, "DataSource inject error", e);
}
public boolean autoNone() {
return false;
}
}, CacheSource.class);
}
@@ -341,28 +349,29 @@ public abstract class NodeServer {
|| (this.sncpGroup == null && entry.isEmptyGroups()) //空的SNCP配置
|| serviceImplClass.getAnnotation(Local.class) != null;//本地模式
if (localed && (serviceImplClass.isInterface() || Modifier.isAbstract(serviceImplClass.getModifiers()))) continue; //本地模式不能实例化接口和抽象类的Service类
final BiConsumer<ResourceFactory, Boolean> runner = (ResourceFactory rf, Boolean needinject) -> {
final ResourceFactory.ResourceLoader resourceLoader = (ResourceFactory rf, final Object src, final String resourceName, Field field, final Object attachment) -> {
try {
Service service;
if (localed) { //本地模式
service = Sncp.createLocalService(entry.getName(), getExecutor(), application.getResourceFactory(), serviceImplClass,
boolean ws = src instanceof WebSocketServlet;
if (ws || localed) { //本地模式
service = Sncp.createLocalService(resourceName, getExecutor(), application.getResourceFactory(), serviceImplClass,
NodeServer.this.sncpAddress, NodeServer.this.sncpGroup, groups, entry.getProperty(), loadTransport(NodeServer.this.sncpGroup), loadTransports(groups));
} else {
service = Sncp.createRemoteService(entry.getName(), getExecutor(), serviceImplClass, NodeServer.this.sncpAddress, null, groups, entry.getProperty(), loadTransport(groups));
service = Sncp.createRemoteService(resourceName, getExecutor(), serviceImplClass, NodeServer.this.sncpAddress, null, groups, entry.getProperty(), loadTransport(groups));
}
if (SncpClient.parseMethod(serviceImplClass).isEmpty()) return; //class没有可用的方法 通常为BaseService
//final ServiceWrapper wrapper = new ServiceWrapper(serviceImplClass, service, entry.getName(), localed ? NodeServer.this.sncpGroup : null, groups, entry.getProperty());
//final ServiceWrapper wrapper = new ServiceWrapper(serviceImplClass, service, resourceName, localed ? NodeServer.this.sncpGroup : null, groups, entry.getProperty());
for (final Class restype : Sncp.getResourceTypes(service)) {
if (resourceFactory.find(entry.getName(), restype) == null) {
regFactory.register(entry.getName(), restype, service);
if (needinject) rf.inject(service); //动态加载的Service也存在按需加载的注入资源
if (rf.find(resourceName, restype) == null) {
regFactory.register(resourceName, restype, service);
} else if (isSNCP() && !entry.isAutoload()) {
throw new RuntimeException(restype.getSimpleName() + "(class:" + serviceImplClass.getName() + ", name:" + entry.getName() + ", group:" + groups + ") is repeat.");
throw new RuntimeException(restype.getSimpleName() + "(class:" + serviceImplClass.getName() + ", name:" + resourceName + ", group:" + groups + ") is repeat.");
}
}
if (Sncp.isRemote(service)) {
remoteServices.add(service);
} else {
if (field != null) rf.inject(service); //动态加载的Service也存在按需加载的注入资源
localServices.add(service);
interceptorServices.add(service);
if (consumer != null) consumer.accept(service);
@@ -374,16 +383,13 @@ public abstract class NodeServer {
}
};
if (entry.isExpect()) {
ResourceFactory.ResourceLoader resourceLoader = (ResourceFactory rf, final Object src, final String resourceName, Field field, final Object attachment) -> {
runner.accept(rf, true);
};
ResourceType rty = entry.getType().getAnnotation(ResourceType.class);
Class[] resTypes = rty == null ? new Class[]{} : rty.value();
for (final Class restype : resTypes) {
resourceFactory.register(resourceLoader, restype);
}
} else {
runner.accept(resourceFactory, false);
resourceLoader.load(resourceFactory, null, entry.getName(), null, false);
}
}