diff --git a/src/META-INF/application-template.xml b/src/META-INF/application-template.xml index 2bb59a717..56be5a6dd 100644 --- a/src/META-INF/application-template.xml +++ b/src/META-INF/application-template.xml @@ -26,16 +26,25 @@ - - + + @@ -113,7 +122,7 @@ 当Server为HTTP协议时, request节点才有效。 remoteaddr 节点: 替换请求方节点的IP地址, 通常请求方是由nginx等web静态服务器转发过的则需要配置该节点。 且value值只能是以request.headers.开头,表示从request.headers中获取对应的header值。 - 例如下面例子获取request.getRemoteAddr()值,如果header存在X-RemoteAddress值则返回X-RemoteAddress值,不存在返回request.getRemoteAddress()。 + 例如下面例子获取request.getRemoteAddr()值,如果header存在X-RemoteAddress值则返回X-RemoteAddress值,不存在返回getRemoteAddress()。 --> diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index 2a26d9d29..90029b947 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -63,19 +63,18 @@ public final class Application { public static final String RESNAME_APP_NODES = "APP_NODES"; //当前Service的IP地址+端口 类型: SocketAddress、InetSocketAddress、String - public static final String RESNAME_SERVER_ADDR = "SERVER_ADDR"; // SERVER_ADDR + public static final String RESNAME_SERVER_ADDR = "SERVER_ADDR"; //当前SNCP Server所属的组 类型: String public static final String RESNAME_SERVER_GROUP = "SERVER_GROUP"; - //当前Service所属的组 类型: Set、String[] - public static final String RESNAME_SNCP_GROUPS = Sncp.RESNAME_SNCP_GROUPS; // SNCP_GROUPS - final Map globalNodes = new HashMap<>(); final Map> globalGroups = new HashMap<>(); - final List transports = new ArrayList<>(); + final Map globalGroupProtocols = new HashMap<>(); + + final Map transports = new HashMap<>(); final InetAddress localAddress; @@ -87,12 +86,18 @@ public final class Application { CountDownLatch servicecdl; //会出现两次赋值 + final ObjectPool transportBufferPool; + + final ExecutorService transportExecutor; + + final AsynchronousChannelGroup transportChannelGroup; + //-------------------------------------------------------------------------------------------- private final ResourceFactory factory = ResourceFactory.root(); private final WatchFactory watch = WatchFactory.root(); - private File home; + private final File home; private final Logger logger; @@ -185,6 +190,47 @@ public final class Application { } this.logger = Logger.getLogger(this.getClass().getSimpleName()); this.serversLatch = new CountDownLatch(config.getAnyValues("server").length + 1); + //------------------配置 节点 ------------------ + ObjectPool transportPool = null; + ExecutorService transportExec = null; + AsynchronousChannelGroup transportGroup = null; + final AnyValue resources = config.getAnyValue("resources"); + if (resources != null) { + AnyValue transportConf = resources.getAnyValue("transport"); + int groupsize = resources.getAnyValues("group").length; + if (groupsize > 0 && transportConf == null) transportConf = new DefaultAnyValue(); + if (transportConf != null) { + //--------------transportBufferPool----------- + AtomicLong createBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber(Transport.class.getSimpleName() + ".Buffer.creatCounter"); + AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber(Transport.class.getSimpleName() + ".Buffer.cycleCounter"); + final int bufferCapacity = transportConf.getIntValue("bufferCapacity", 8 * 1024); + final int bufferPoolSize = transportConf.getIntValue("bufferPoolSize", groupsize * Runtime.getRuntime().availableProcessors() * 8); + final int threads = transportConf.getIntValue("threads", groupsize * Runtime.getRuntime().availableProcessors() * 8); + transportPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize, + (Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> { + if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) return false; + e.clear(); + return true; + }); + //-----------transportChannelGroup-------------- + try { + final AtomicInteger counter = new AtomicInteger(); + transportExec = Executors.newFixedThreadPool(threads, (Runnable r) -> { + Thread t = new Thread(r); + t.setDaemon(true); + t.setName("Transport-Thread-" + counter.incrementAndGet()); + return t; + }); + transportGroup = AsynchronousChannelGroup.withCachedThreadPool(transportExec, 1); + } catch (Exception e) { + throw new RuntimeException(e); + } + logger.log(Level.INFO, Transport.class.getSimpleName() + " configure bufferCapacity = " + bufferCapacity + "; bufferPoolSize = " + bufferPoolSize + "; threads = " + threads + ";"); + } + } + this.transportBufferPool = transportPool; + this.transportExecutor = transportExec; + this.transportChannelGroup = transportGroup; } public ResourceFactory getResourceFactory() { @@ -279,13 +325,14 @@ public final class Application { for (AnyValue conf : resources.getAnyValues("group")) { final String group = conf.getValue("name", ""); - String protocol = conf.getValue("protocol", Transport.DEFAULT_PROTOCOL).toUpperCase(); + final String protocol = conf.getValue("protocol", Transport.DEFAULT_PROTOCOL).toUpperCase(); if (!"TCP".equalsIgnoreCase(protocol) && !"UDP".equalsIgnoreCase(protocol)) { throw new RuntimeException("Not supported Transport Protocol " + conf.getValue("protocol")); } Set addrs = globalGroups.get(group); if (addrs == null) { addrs = new LinkedHashSet<>(); + globalGroupProtocols.put(group, protocol); globalGroups.put(group, addrs); } for (AnyValue node : conf.getAnyValues("node")) { @@ -479,8 +526,8 @@ public final class Application { public static T singleton(Class serviceClass, boolean remote) throws Exception { final Application application = Application.create(); Consumer executor = (x) -> Executors.newFixedThreadPool(8).submit(x); - T service = remote ? Sncp.createRemoteService("", executor, serviceClass, null, new LinkedHashSet<>(), null) - : Sncp.createLocalService("", executor, serviceClass, null, new LinkedHashSet<>(), null, null); + T service = remote ? Sncp.createRemoteService("", executor, serviceClass, null, null) + : Sncp.createLocalService("", executor, serviceClass, null, null, null); application.init(); application.factory.register(service); application.servicecdl = new CountDownLatch(1); @@ -515,6 +562,11 @@ public final class Application { System.exit(0); } + String findGroupProtocol(String group) { + if (group == null) return null; + return globalGroupProtocols.get(group); + } + Set findGlobalGroup(String group) { if (group == null) return null; Set set = globalGroups.get(group); @@ -546,6 +598,13 @@ public final class Application { logger.log(Level.FINER, "close CacheSource erroneous", e); } } + if (this.transportChannelGroup != null) { + try { + this.transportChannelGroup.shutdownNow(); + } catch (Exception e) { + logger.log(Level.FINER, "close transportChannelGroup erroneous", e); + } + } } private static AnyValue load(final InputStream in0) { diff --git a/src/org/redkale/boot/ClassFilter.java b/src/org/redkale/boot/ClassFilter.java index 334f63cbd..3a56176ca 100644 --- a/src/org/redkale/boot/ClassFilter.java +++ b/src/org/redkale/boot/ClassFilter.java @@ -269,6 +269,10 @@ public final class ClassFilter { public FilterEntry(Class type, final boolean autoload, AnyValue property) { this.type = type; String str = property == null ? null : property.getValue("groups"); + if (str != null) { + str = str.trim(); + if (str.endsWith(";")) str = str.substring(0, str.length() - 1); + } if (str != null) groups.addAll(Arrays.asList(str.split(";"))); this.property = property; this.autoload = autoload; diff --git a/src/org/redkale/boot/NodeHttpServer.java b/src/org/redkale/boot/NodeHttpServer.java index c314cd51f..c85fc96a2 100644 --- a/src/org/redkale/boot/NodeHttpServer.java +++ b/src/org/redkale/boot/NodeHttpServer.java @@ -25,7 +25,9 @@ import org.redkale.util.*; /** * HTTP Server节点的配置Server * - *

详情见: http://www.redkale.org + *

+ * 详情见: http://www.redkale.org + * * @author zhangjx */ @NodeProtocol({"HTTP"}) @@ -34,7 +36,7 @@ public final class NodeHttpServer extends NodeServer { private final HttpServer httpServer; public NodeHttpServer(Application application, AnyValue serconf) { - super(application, application.getResourceFactory().createChild(), createServer(application, serconf)); + super(application, createServer(application, serconf)); this.httpServer = (HttpServer) server; } @@ -54,7 +56,7 @@ public final class NodeHttpServer extends NodeServer { @Override protected void loadServlet(ClassFilter servletFilter) throws Exception { - if (httpServer != null) loadHttpServlet(this.nodeConf.getAnyValue("servlets"), servletFilter); + if (httpServer != null) loadHttpServlet(this.serverConf.getAnyValue("servlets"), servletFilter); } @Override @@ -66,28 +68,17 @@ public final class NodeHttpServer extends NodeServer { private void initWebSocketService() { final NodeServer self = this; final ResourceFactory regFactory = application.getResourceFactory(); - factory.add(WebSocketNode.class, (ResourceFactory rf, final Object src, final String resourceName, Field field, Object attachment) -> { + factory.add(WebSocketNode.class, (ResourceFactory rf, final Object src, final String resourceName, Field field, Object attachment) -> { //主要用于单点的服务 try { if (field.getAnnotation(Resource.class) == null) return; if (!(src instanceof WebSocketServlet)) return; synchronized (regFactory) { Service nodeService = (Service) rf.find(resourceName, WebSocketNode.class); if (nodeService == null) { - nodeService = Sncp.createLocalService(resourceName, getExecutor(), (Class) WebSocketNodeService.class, - getSncpAddress(), sncpDefaultGroups, sncpSameGroupTransports, sncpDiffGroupTransports); + nodeService = Sncp.createLocalService(resourceName, getExecutor(), WebSocketNodeService.class, (InetSocketAddress) null, (Transport) null, (Collection) null); regFactory.register(resourceName, WebSocketNode.class, nodeService); factory.inject(nodeService, self); logger.fine("[" + Thread.currentThread().getName() + "] Load Service " + nodeService); - if (getSncpAddress() != null) { - NodeSncpServer sncpServer = null; - for (NodeServer node : application.servers) { - if (node.isSNCP() && getSncpAddress().equals(node.getSncpAddress())) { - sncpServer = (NodeSncpServer) node; - } - } - ServiceWrapper wrapper = new ServiceWrapper(WebSocketNodeService.class, nodeService, resourceName, getSncpGroup(), sncpDefaultGroups, null); - sncpServer.getSncpServer().addService(wrapper); - } } field.set(src, nodeService); } diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index 21a9a615e..e485f4027 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -23,7 +23,6 @@ import java.util.logging.*; import javax.annotation.*; import javax.persistence.*; import org.redkale.net.*; -import org.redkale.net.http.*; import org.redkale.service.*; import org.redkale.source.*; import org.redkale.util.*; @@ -57,27 +56,19 @@ public abstract class NodeServer { private String sncpGroup = null; //当前Server的SNCP协议的组 - private String nodeProtocol = Transport.DEFAULT_PROTOCOL; - private InetSocketAddress sncpAddress; //HttpServer中的sncpAddress 为所属group对应的SncpServer, 为null表示只是单节点,没有分布式结构 protected Consumer consumer; - protected AnyValue nodeConf; - - protected final HashSet sncpDefaultGroups = new LinkedHashSet<>(); - - protected final List sncpSameGroupTransports = new ArrayList<>(); - - protected final List sncpDiffGroupTransports = new ArrayList<>(); + protected AnyValue serverConf; protected final Set localServiceWrappers = new LinkedHashSet<>(); protected final Set remoteServiceWrappers = new LinkedHashSet<>(); - public NodeServer(Application application, ResourceFactory factory, Server server) { + public NodeServer(Application application, Server server) { this.application = application; - this.factory = factory; + this.factory = application.getResourceFactory().createChild(); this.server = server; this.logger = Logger.getLogger(this.getClass().getSimpleName()); this.fine = logger.isLoggable(Level.FINE); @@ -115,16 +106,15 @@ public abstract class NodeServer { } public void init(AnyValue config) throws Exception { - this.nodeConf = config == null ? AnyValue.create() : config; + this.serverConf = config == null ? AnyValue.create() : config; if (isSNCP()) { // SNCP协议 - String host = this.nodeConf.getValue("host", "0.0.0.0").replace("0.0.0.0", ""); - this.sncpAddress = new InetSocketAddress(host.isEmpty() ? application.localAddress.getHostAddress() : host, this.nodeConf.getIntValue("port")); + String host = this.serverConf.getValue("host", "0.0.0.0").replace("0.0.0.0", ""); + this.sncpAddress = new InetSocketAddress(host.isEmpty() ? application.localAddress.getHostAddress() : host, this.serverConf.getIntValue("port")); this.sncpGroup = application.globalNodes.get(this.sncpAddress); if (this.sncpGroup == null) throw new RuntimeException("Server (" + String.valueOf(config).replaceAll("\\s+", " ") + ") not found info"); - if (server != null) this.nodeProtocol = server.getProtocol(); } - initGroup(); - if (this.sncpAddress != null) this.factory.register(RESNAME_SERVER_ADDR, this.sncpAddress); + + if (this.sncpAddress != null) this.factory.register(RESNAME_SERVER_ADDR, this.sncpAddress); //单点服务不会有 sncpAddress、sncpGroup if (this.sncpGroup != null) this.factory.register(RESNAME_SERVER_GROUP, this.sncpGroup); { //设置root文件夹 @@ -137,8 +127,9 @@ public abstract class NodeServer { Server.loadLib(logger, config.getValue("lib", "") + ";" + homepath + "/lib/*;" + homepath + "/classes"); if (server != null) server.init(config); } - initResource(); - //prepare(); + + initResource(); //给 DataSource、CacheSource 注册依赖注入时的监听回调事件。 + ClassFilter servletFilter = createServletClassFilter(); ClassFilter serviceFilter = createServiceClassFilter(); long s = System.currentTimeMillis(); @@ -166,25 +157,23 @@ public abstract class NodeServer { DataSource source = new DataDefaultSource(resourceName); application.dataSources.add(source); regFactory.register(resourceName, DataSource.class, source); - List sameGroupTransports = sncpSameGroupTransports; - List diffGroupTransports = sncpDiffGroupTransports; + Transport sameGroupTransport = null; + List diffGroupTransports = null; try { - Field ts = src.getClass().getDeclaredField("_sameGroupTransports"); + Field ts = src.getClass().getDeclaredField("_sameGroupTransport"); ts.setAccessible(true); - Transport[] lts = (Transport[]) ts.get(src); - sameGroupTransports = Arrays.asList(lts); + sameGroupTransport = (Transport) ts.get(src); ts = src.getClass().getDeclaredField("_diffGroupTransports"); ts.setAccessible(true); - lts = (Transport[]) ts.get(src); - diffGroupTransports = Arrays.asList(lts); + diffGroupTransports = Arrays.asList((Transport[]) ts.get(src)); } catch (Exception e) { //src 不含 MultiRun 方法 } if (factory.find(resourceName, DataCacheListener.class) == null) { - Service cacheListenerService = Sncp.createLocalService(resourceName, getExecutor(), DataCacheListenerService.class, this.sncpAddress, sncpDefaultGroups, sameGroupTransports, diffGroupTransports); + Service cacheListenerService = Sncp.createLocalService(resourceName, getExecutor(), DataCacheListenerService.class, this.sncpAddress, sameGroupTransport, diffGroupTransports); regFactory.register(resourceName, DataCacheListener.class, cacheListenerService); - ServiceWrapper wrapper = new ServiceWrapper(DataCacheListenerService.class, cacheListenerService, resourceName, sncpGroup, sncpDefaultGroups, null); + ServiceWrapper wrapper = new ServiceWrapper(DataCacheListenerService.class, cacheListenerService, resourceName, sncpGroup, null, null); localServiceWrappers.add(wrapper); if (consumer != null) consumer.accept(wrapper); rf.inject(cacheListenerService, self); @@ -199,22 +188,20 @@ public abstract class NodeServer { try { if (field.getAnnotation(Resource.class) == null) return; if ((src instanceof Service) && Sncp.isRemote((Service) src)) return; //远程模式不得注入 CacheSource - List sameGroupTransports = sncpSameGroupTransports; - List diffGroupTransports = sncpDiffGroupTransports; + Transport sameGroupTransport = null; + List diffGroupTransports = null; try { - Field ts = src.getClass().getDeclaredField("_sameGroupTransports"); + Field ts = src.getClass().getDeclaredField("_sameGroupTransport"); ts.setAccessible(true); - Transport[] lts = (Transport[]) ts.get(src); - sameGroupTransports = Arrays.asList(lts); + sameGroupTransport = (Transport) ts.get(src); ts = src.getClass().getDeclaredField("_diffGroupTransports"); ts.setAccessible(true); - lts = (Transport[]) ts.get(src); - diffGroupTransports = Arrays.asList(lts); + diffGroupTransports = Arrays.asList((Transport[]) ts.get(src)); } catch (Exception e) { //src 不含 MultiRun 方法 } - CacheSourceService source = Sncp.createLocalService(resourceName, getExecutor(), CacheSourceService.class, this.sncpAddress, sncpDefaultGroups, sameGroupTransports, diffGroupTransports); + CacheSourceService source = Sncp.createLocalService(resourceName, getExecutor(), CacheSourceService.class, this.sncpAddress, sameGroupTransport, diffGroupTransports); Type genericType = field.getGenericType(); ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null; Type valType = pt == null ? null : pt.getActualTypeArguments()[1]; @@ -232,7 +219,7 @@ public abstract class NodeServer { sncpServer = (NodeSncpServer) node; } } - ServiceWrapper wrapper = new ServiceWrapper(CacheSourceService.class, (Service) source, resourceName, getSncpGroup(), sncpDefaultGroups, null); + ServiceWrapper wrapper = new ServiceWrapper(CacheSourceService.class, (Service) source, resourceName, getSncpGroup(), null, null); sncpServer.getSncpServer().addService(wrapper); } logger.fine("[" + Thread.currentThread().getName() + "] Load Source " + source); @@ -242,120 +229,37 @@ public abstract class NodeServer { }); } - private void initGroup() { - final AnyValue[] services = this.nodeConf.getAnyValues("services"); - final String[] groups = services.length < 1 ? new String[]{""} : services[0].getValue("groups", "").split(";"); - this.sncpDefaultGroups.addAll(Arrays.asList(groups)); - if (!isSNCP()) { - NodeSncpServer sncpServer = null; - for (NodeServer node : application.servers) { - if (!node.isSNCP()) continue; - if (!this.sncpDefaultGroups.contains(node.sncpGroup)) continue; - sncpServer = (NodeSncpServer) node; - break; - } - if (sncpServer == null && (groups.length == 1 && groups[0].isEmpty())) { - for (NodeServer node : application.servers) { - if (!node.isSNCP()) continue; - sncpServer = (NodeSncpServer) node; - break; - } - } - if (sncpServer != null) { - this.sncpAddress = sncpServer.getSncpAddress(); - this.sncpGroup = sncpServer.getSncpGroup(); - this.sncpDefaultGroups.clear(); - this.sncpDefaultGroups.addAll(sncpServer.sncpDefaultGroups); - this.sncpSameGroupTransports.addAll(sncpServer.sncpSameGroupTransports); - this.sncpDiffGroupTransports.addAll(sncpServer.sncpDiffGroupTransports); - return; - } - } - final Set sameGroupAddrs = application.findGlobalGroup(this.sncpGroup); - final Map> diffGroupAddrs = new HashMap<>(); - for (String groupitem : groups) { - final Set addrs = application.findGlobalGroup(groupitem); - if (addrs == null || groupitem.equals(this.sncpGroup)) continue; - diffGroupAddrs.put(groupitem, addrs); - } - if (sameGroupAddrs != null) { - sameGroupAddrs.remove(this.sncpAddress); - for (InetSocketAddress iaddr : sameGroupAddrs) { - sncpSameGroupTransports.add(loadTransport(this.sncpGroup, getNodeProtocol(), iaddr)); - } - } - diffGroupAddrs.forEach((k, v) -> sncpDiffGroupTransports.add(loadTransport(k, getNodeProtocol(), v))); - } - @SuppressWarnings("unchecked") protected void loadService(ClassFilter serviceFilter) throws Exception { if (serviceFilter == null) return; final String threadName = "[" + Thread.currentThread().getName() + "] "; final Set> entrys = serviceFilter.getFilterEntrys(); ResourceFactory regFactory = isSNCP() ? application.getResourceFactory() : factory; - final Set sg = application.findGlobalGroup(this.sncpGroup); + for (FilterEntry entry : entrys) { //service实现类 final Class type = entry.getType(); - if (Modifier.isFinal(type.getModifiers())) continue; + if (Modifier.isFinal(type.getModifiers())) continue; //修饰final的类跳过 if (!Modifier.isPublic(type.getModifiers())) continue; - if (!isSNCP() && factory.find(entry.getName(), type) != null) continue; - final Set sameGroupAddrs = new LinkedHashSet<>(); - final Map> diffGroupAddrs = new HashMap<>(); - final HashSet groups = entry.getGroups(); - for (String g : groups) { - if (g.isEmpty()) continue; - if (g.equals(this.sncpGroup) && sg != null) sameGroupAddrs.addAll(sg); - Set set = application.findGlobalGroup(g); - if (set == null) throw new RuntimeException(type.getName() + " has illegal group (" + groups + ")"); - if (!g.equals(this.sncpGroup)) { - diffGroupAddrs.put(g, set); - } - } - List diffGroupTransports = new ArrayList<>(); - diffGroupAddrs.forEach((k, v) -> diffGroupTransports.add(loadTransport(k, server.getProtocol(), v))); - final boolean localed = (sameGroupAddrs.isEmpty() && diffGroupAddrs.isEmpty()) || sameGroupAddrs.contains(this.sncpAddress) || type.getAnnotation(LocalService.class) != null;//本地模式 + if (entry.getName().contains("$")) throw new RuntimeException(" value cannot contains '$' in " + entry.getProperty()); + if (!isSNCP() && factory.find(entry.getName(), type) != null) continue; //非SNCP的Server加载Service时需要判断是否在SNCP的Server已经加载过了。 + final HashSet groups = entry.getGroups(); //groups.isEmpty()表示没有配置groups属性。 + if (groups.isEmpty() && isSNCP()) groups.add(this.sncpGroup); + + final boolean localed = this.sncpAddress == null //非SNCP的Server,通常是单点服务 + || groups.contains(this.sncpGroup) //本地IP含在内的 + || type.getAnnotation(LocalService.class) != null;//本地模式 if (localed && (type.isInterface() || Modifier.isAbstract(type.getModifiers()))) continue; //本地模式不能实例化接口和抽象类的Service类 - final ServiceResource st = type.getAnnotation(ServiceResource.class); - final Class resType = st == null ? type : st.value(); - if (st != null && (!isSNCP() && factory.find(entry.getName(), resType) != null)) continue; - ServiceWrapper wrapper; + + Service service; if (localed) { //本地模式 - sameGroupAddrs.remove(this.sncpAddress); - List sameGroupTransports = new ArrayList<>(); - for (InetSocketAddress iaddr : sameGroupAddrs) { - Set tset = new HashSet<>(); - tset.add(iaddr); - sameGroupTransports.add(loadTransport(this.sncpGroup, server.getProtocol(), tset)); - } - Service service = Sncp.createLocalService(entry.getName(), getExecutor(), type, this.sncpAddress, groups, sameGroupTransports, diffGroupTransports); - wrapper = new ServiceWrapper(resType, service, this.sncpGroup, entry); - if (fine) logger.fine("[" + Thread.currentThread().getName() + "] Load Service " + service); + service = Sncp.createLocalService(entry.getName(), getExecutor(), type, this.sncpAddress, loadTransport(this.sncpGroup), loadTransports(groups)); } else { - sameGroupAddrs.remove(this.sncpAddress); - StringBuilder g = new StringBuilder(); - diffGroupAddrs.forEach((k, v) -> { - if (g.length() > 0) g.append(';'); - g.append(k); - sameGroupAddrs.addAll(v); - }); - if (sameGroupAddrs.isEmpty()) throw new RuntimeException(type.getName() + " has no remote address on group (" + groups + ")"); - Service service = Sncp.createRemoteService(entry.getName(), getExecutor(), type, this.sncpAddress, groups, loadTransport(g.toString(), server.getProtocol(), sameGroupAddrs)); - wrapper = new ServiceWrapper(resType, service, "", entry); - if (fine) logger.fine("[" + Thread.currentThread().getName() + "] Load Service " + service); + service = Sncp.createRemoteService(entry.getName(), getExecutor(), type, this.sncpAddress, loadTransport(groups)); } + final ServiceWrapper wrapper = new ServiceWrapper(type, service, entry.getName(), localed ? this.sncpGroup : null, groups, entry.getProperty()); + if (fine) logger.fine("[" + Thread.currentThread().getName() + "] Load Service " + wrapper.getService()); if (factory.find(wrapper.getName(), wrapper.getType()) == null) { - regFactory.register(wrapper.getName(), wrapper.getType(), wrapper.getService()); - if (wrapper.getService() instanceof DataSource) { - regFactory.register(wrapper.getName(), DataSource.class, wrapper.getService()); - } else if (wrapper.getService() instanceof CacheSource) { - regFactory.register(wrapper.getName(), CacheSource.class, wrapper.getService()); - } else if (wrapper.getService() instanceof DataCacheListener) { - regFactory.register(wrapper.getName(), DataCacheListener.class, wrapper.getService()); - } else if (wrapper.getService() instanceof DataSQLListener) { - regFactory.register(wrapper.getName(), DataSQLListener.class, wrapper.getService()); - } else if (wrapper.getService() instanceof WebSocketNode) { - regFactory.register(wrapper.getName(), WebSocketNode.class, wrapper.getService()); - } + regFactory.register(wrapper.getName(), wrapper.getService()); if (wrapper.isRemote()) { remoteServiceWrappers.add(wrapper); } else { @@ -392,30 +296,69 @@ public abstract class NodeServer { if (sb != null && sb.length() > 0) logger.log(Level.INFO, sb.toString()); } - protected Transport loadTransport(String group, String protocol, InetSocketAddress addr) { - if (addr == null) return null; - Set set = new HashSet<>(); - set.add(addr); - return loadTransport(group, protocol, set); + protected List loadTransports(final HashSet groups) { + if (groups == null) return null; + final List transports = new ArrayList<>(); + for (String group : groups) { + transports.add(loadTransport(group)); + } + return transports; } - protected Transport loadTransport(String group, String protocol, Set addrs) { - Transport transport = null; - if (!addrs.isEmpty()) { - synchronized (application.transports) { - for (Transport tran : application.transports) { - if (tran.match(addrs)) { - transport = tran; - break; - } - } - if (transport == null) { - transport = new Transport(group + "_" + application.transports.size(), protocol, application.getWatchFactory(), 32, addrs); - logger.info(transport + " created"); - application.transports.add(transport); - } + protected Transport loadTransport(final HashSet groups) { + if (groups == null || groups.isEmpty()) return null; + List tmpgroup = new ArrayList<>(groups); + Collections.sort(tmpgroup); //按字母排列顺序 + boolean flag = false; + StringBuilder sb = new StringBuilder(); + for (String g : tmpgroup) { + if (flag) sb.append(';'); + sb.append(g); + flag = true; + } + final String groupid = sb.toString(); + Transport transport = application.transports.get(groupid); + if (transport != null) return transport; + final List transports = new ArrayList<>(); + for (String group : groups) { + transports.add(loadTransport(group)); + } + Set addrs = new HashSet(); + for (Transport t : transports) { + for (InetSocketAddress addr : t.getRemoteAddresses()) { + addrs.add(addr); } } + Transport first = transports.get(0); + Transport newTransport = new Transport(groupid, application.findGroupProtocol(first.getName()), application.getWatchFactory(), + application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs); + synchronized (application.transports) { + transport = application.transports.get(groupid); + if (transport == null) { + transport = newTransport; + application.transports.put(groupid, transport); + } + } + return transport; + } + + protected Transport loadTransport(final String group) { + if (group == null) return null; + Transport transport; + synchronized (application.transports) { + transport = application.transports.get(group); + if (transport != null) { + if (this.sncpAddress != null && !this.sncpAddress.equals(transport.getClientAddress())) { + throw new RuntimeException(transport + "repeat create on newClientAddress = " + this.sncpAddress + ", oldClientAddress = " + transport.getClientAddress()); + } + return transport; + } + Set addrs = application.findGlobalGroup(group); + if (addrs == null) throw new RuntimeException("Not found = " + group + " on "); + transport = new Transport(group, application.findGroupProtocol(group), application.getWatchFactory(), + application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs); + application.transports.put(group, transport); + } return transport; } @@ -429,13 +372,17 @@ public abstract class NodeServer { Class inter, Class ref2, String properties, String property) { ClassFilter cf = new ClassFilter(ref, inter, null); if (properties == null && properties == null) return cf; - if (this.nodeConf == null) return cf; - AnyValue[] proplist = this.nodeConf.getAnyValues(properties); + if (this.serverConf == null) return cf; + AnyValue[] proplist = this.serverConf.getAnyValues(properties); if (proplist == null || proplist.length < 1) return cf; cf = null; for (AnyValue list : proplist) { DefaultAnyValue prop = null; String sc = list.getValue("groups"); + if (sc != null) { + sc = sc.trim(); + if (sc.endsWith(";")) sc = sc.substring(0, sc.length() - 1); + } if (sc == null) sc = localGroup; if (sc != null) { prop = new AnyValue.DefaultAnyValue(); @@ -496,10 +443,6 @@ public abstract class NodeServer { return sncpGroup; } - public String getNodeProtocol() { - return nodeProtocol; - } - public void start() throws IOException { server.start(); } diff --git a/src/org/redkale/boot/NodeSncpServer.java b/src/org/redkale/boot/NodeSncpServer.java index 1cdfb9df5..efae0982e 100644 --- a/src/org/redkale/boot/NodeSncpServer.java +++ b/src/org/redkale/boot/NodeSncpServer.java @@ -13,7 +13,9 @@ import org.redkale.util.*; /** * - *

详情见: http://www.redkale.org + *

+ * 详情见: http://www.redkale.org + * * @author zhangjx */ @NodeProtocol({"SNCP"}) @@ -22,7 +24,7 @@ public final class NodeSncpServer extends NodeServer { private final SncpServer sncpServer; public NodeSncpServer(Application application, AnyValue serconf) { - super(application, application.getResourceFactory().createChild(), createServer(application, serconf)); + super(application, createServer(application, serconf)); this.sncpServer = (SncpServer) this.server; this.consumer = sncpServer == null ? null : x -> sncpServer.addService(x); } diff --git a/src/org/redkale/net/Server.java b/src/org/redkale/net/Server.java index 9489d0e0c..bbd09f741 100644 --- a/src/org/redkale/net/Server.java +++ b/src/org/redkale/net/Server.java @@ -19,7 +19,9 @@ import org.redkale.watch.*; /** * - *

详情见: http://www.redkale.org + *

+ * 详情见: http://www.redkale.org + * * @author zhangjx */ public abstract class Server { @@ -49,7 +51,7 @@ public abstract class Server { protected ProtocolServer serverChannel; - protected int capacity; + protected int bufferCapacity; protected int threads; @@ -82,8 +84,8 @@ public abstract class Server { this.backlog = config.getIntValue("backlog", 8 * 1024); this.readTimeoutSecond = config.getIntValue("readTimeoutSecond", 0); this.writeTimeoutSecond = config.getIntValue("writeTimeoutSecond", 0); - this.capacity = config.getIntValue("capacity", 8 * 1024); this.maxbody = config.getIntValue("maxbody", 64 * 1024); + this.bufferCapacity = config.getIntValue("bufferCapacity", 8 * 1024); this.threads = config.getIntValue("threads", Runtime.getRuntime().availableProcessors() * 16); this.bufferPoolSize = config.getIntValue("bufferPoolSize", Runtime.getRuntime().availableProcessors() * 512); this.responsePoolSize = config.getIntValue("responsePoolSize", Runtime.getRuntime().availableProcessors() * 256); @@ -127,7 +129,7 @@ public abstract class Server { serverChannel.accept(); final String threadName = "[" + Thread.currentThread().getName() + "] "; logger.info(threadName + this.getClass().getSimpleName() + "." + protocol + " listen: " + address - + ", threads: " + threads + ", bufferCapacity: " + capacity + ", bufferPoolSize: " + bufferPoolSize + ", responsePoolSize: " + responsePoolSize + + ", threads: " + threads + ", bufferCapacity: " + bufferCapacity + ", bufferPoolSize: " + bufferPoolSize + ", responsePoolSize: " + responsePoolSize + ", started in " + (System.currentTimeMillis() - context.getServerStartTime()) + " ms"); } diff --git a/src/org/redkale/net/Transport.java b/src/org/redkale/net/Transport.java index 9a00ba157..28da98f92 100644 --- a/src/org/redkale/net/Transport.java +++ b/src/org/redkale/net/Transport.java @@ -10,7 +10,6 @@ import java.nio.*; import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.*; import java.util.function.*; import org.redkale.util.*; import org.redkale.watch.*; @@ -42,97 +41,104 @@ public final class Transport { supportTcpNoDelay = tcpNoDelay; } - protected final String name; - - protected final int bufferPoolSize; - - protected final int bufferCapacity; + protected final String name; //即的name属性 protected final boolean tcp; protected final String protocol; + protected final WatchFactory watch; + protected final AsynchronousChannelGroup group; - protected final InetSocketAddress[] remoteAddres; + protected final InetSocketAddress clientAddress; + + protected InetSocketAddress[] remoteAddres = new InetSocketAddress[0]; protected final ObjectPool bufferPool; protected final ConcurrentHashMap> connPool = new ConcurrentHashMap<>(); - public Transport(Transport transport, InetSocketAddress localAddress, Collection transports) { - this(transport.name, transport.protocol, null, transport.bufferPoolSize, parse(localAddress, transports)); + public Transport(String name, WatchFactory watch, final ObjectPool transportBufferPool, + final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, final Collection addresses) { + this(name, DEFAULT_PROTOCOL, watch, transportBufferPool, transportChannelGroup, clientAddress, addresses); } - public Transport(String name, WatchFactory watch, int bufferPoolSize, Collection addresses) { - this(name, DEFAULT_PROTOCOL, watch, bufferPoolSize, addresses); - } - - public Transport(String name, String protocol, WatchFactory watch, int bufferPoolSize, Collection addresses) { + public Transport(String name, String protocol, WatchFactory watch, final ObjectPool transportBufferPool, + final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, final Collection addresses) { this.name = name; + this.watch = watch; this.protocol = protocol; this.tcp = "TCP".equalsIgnoreCase(protocol); - this.bufferPoolSize = bufferPoolSize; - this.bufferCapacity = 8192; - AsynchronousChannelGroup g = null; - try { - final AtomicInteger counter = new AtomicInteger(); - ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 8, (Runnable r) -> { - Thread t = new Thread(r); - t.setDaemon(true); - t.setName("Transport-" + name + "-Thread-" + counter.incrementAndGet()); - return t; - }); - g = AsynchronousChannelGroup.withCachedThreadPool(executor, 1); - } catch (Exception e) { - throw new RuntimeException(e); - } - this.group = g; - AtomicLong createBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber(Transport.class.getSimpleName() + "-" + name + "-" + protocol + ".Buffer.creatCounter"); - AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber(Transport.class.getSimpleName() + "-" + name + "-" + protocol + ".Buffer.cycleCounter"); - final int rcapacity = bufferCapacity; - this.bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize, - (Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> { - if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false; - e.clear(); - return true; - }); - this.remoteAddres = addresses.toArray(new InetSocketAddress[addresses.size()]); + this.group = transportChannelGroup; + this.bufferPool = transportBufferPool; + this.clientAddress = clientAddress; + updateRemoteAddresses(addresses); } - private static Collection parse(InetSocketAddress addr, Collection transports) { - final Set set = new LinkedHashSet<>(); + public Transport(final Collection transports) { + Transport first = null; + List tmpgroup = new ArrayList<>(); for (Transport t : transports) { - set.addAll(Arrays.asList(t.remoteAddres)); + if (first == null) first = t; + tmpgroup.add(t.name); } - set.remove(addr); - return set; + Collections.sort(tmpgroup); //按字母排列顺序 + boolean flag = false; + StringBuilder sb = new StringBuilder(); + for (String g : tmpgroup) { + if (flag) sb.append(';'); + sb.append(g); + flag = true; + } + this.name = sb.toString(); + this.watch = first.watch; + this.protocol = first.protocol; + this.tcp = "TCP".equalsIgnoreCase(first.protocol); + this.group = first.group; + this.bufferPool = first.bufferPool; + this.clientAddress = first.clientAddress; + Set addrs = new HashSet(); + for (Transport t : transports) { + for (InetSocketAddress addr : t.getRemoteAddresses()) { + addrs.add(addr); + } + } + updateRemoteAddresses(addrs); + } + + public final InetSocketAddress[] updateRemoteAddresses(final Collection addresses) { + InetSocketAddress[] oldAddresses = this.remoteAddres; + List list = new ArrayList<>(); + if (addresses != null) { + for (InetSocketAddress addr : addresses) { + if (clientAddress != null && clientAddress.equals(addr)) continue; + list.add(addr); + } + } + this.remoteAddres = list.toArray(new InetSocketAddress[list.size()]); + return oldAddresses; + } + + public String getName() { + return name; } public void close() { connPool.forEach((k, v) -> v.forEach(c -> c.dispose())); } - public boolean match(Collection addrs) { - if (addrs == null) return false; - if (addrs.size() != this.remoteAddres.length) return false; - for (InetSocketAddress addr : this.remoteAddres) { - if (!addrs.contains(addr)) return false; - } - return true; + public InetSocketAddress getClientAddress() { + return clientAddress; } - public InetSocketAddress[] getRemoteAddress() { + public InetSocketAddress[] getRemoteAddresses() { return remoteAddres; } @Override public String toString() { - return Transport.class.getSimpleName() + "{name=" + name + ",protocol=" + protocol + ",remoteAddres=" + Arrays.toString(remoteAddres) + "}"; - } - - public int getBufferCapacity() { - return bufferCapacity; + return Transport.class.getSimpleName() + "{name = " + name + ", protocol = " + protocol + ", clientAddress = " + clientAddress + ", remoteAddres = " + Arrays.toString(remoteAddres) + "}"; } public ByteBuffer pollBuffer() { @@ -207,7 +213,7 @@ public final class Transport { } public void offerConnection(final boolean forceClose, AsyncConnection conn) { - if (!forceClose && conn.isTCP()) { //暂时每次都关闭 + if (!forceClose && conn.isTCP()) { if (conn.isOpen()) { BlockingQueue queue = connPool.get(conn.getRemoteAddress()); if (queue == null) { diff --git a/src/org/redkale/net/http/HttpServer.java b/src/org/redkale/net/http/HttpServer.java index 4cde19729..4c8358013 100644 --- a/src/org/redkale/net/http/HttpServer.java +++ b/src/org/redkale/net/http/HttpServer.java @@ -47,7 +47,7 @@ public final class HttpServer extends Server { final int port = this.address.getPort(); AtomicLong createBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Buffer.creatCounter"); AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Buffer.cycleCounter"); - final int rcapacity = Math.max(this.capacity, 16 * 1024 + 8); //兼容 HTTP 2.0 + final int rcapacity = Math.max(this.bufferCapacity, 16 * 1024 + 8); //兼容 HTTP 2.0 ObjectPool bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, this.bufferPoolSize, (Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> { if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false; diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index d9b67dfb3..6ac8a59b6 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -12,6 +12,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.logging.*; import javax.annotation.*; +import org.redkale.boot.*; import org.redkale.service.*; import org.redkale.source.*; import org.redkale.util.*; @@ -29,7 +30,7 @@ public abstract class WebSocketNode { protected final boolean finest = logger.isLoggable(Level.FINEST); - @Resource(name = "SERVER_ADDR") + @Resource(name = Application.RESNAME_SERVER_GROUP) protected InetSocketAddress localSncpAddress; //为SncpServer的服务address @DynRemote diff --git a/src/org/redkale/net/sncp/ServiceWrapper.java b/src/org/redkale/net/sncp/ServiceWrapper.java index 3ac9e677a..8d50dcc21 100644 --- a/src/org/redkale/net/sncp/ServiceWrapper.java +++ b/src/org/redkale/net/sncp/ServiceWrapper.java @@ -8,12 +8,14 @@ package org.redkale.net.sncp; import org.redkale.service.Service; import org.redkale.util.AnyValue; import java.util.*; -import org.redkale.boot.*; +import org.redkale.util.*; /** - * Service对象的封装类 + * Service对象的封装类 + * + *

+ * 详情见: http://www.redkale.org * - *

详情见: http://www.redkale.org * @author zhangjx * @param Service的子类 */ @@ -25,26 +27,26 @@ public final class ServiceWrapper { private final AnyValue conf; - private final String group; + private final String sncpGroup; //自身的组节点名 可能为null - private final Set groups; + private final Set groups; //所有的组节点,包含自身 private final String name; private final boolean remote; - public ServiceWrapper(Class type, T service, String group, ClassFilter.FilterEntry entry) { - this(type, service, entry.getName(), group, entry.getGroups(), entry.getProperty()); - } + private final Class[] resTypes; - public ServiceWrapper(Class type, T service, String name, String group, Set groups, AnyValue conf) { + public ServiceWrapper(Class type, T service, String name, String sncpGroup, Set groups, AnyValue conf) { this.type = type == null ? (Class) service.getClass() : type; this.service = service; this.conf = conf; - this.group = group; + this.sncpGroup = sncpGroup; this.groups = groups; this.name = name; this.remote = Sncp.isRemote(service); + ResourceType rty = service.getClass().getAnnotation(ResourceType.class); + this.resTypes = rty == null ? new Class[]{this.type} : rty.value(); } @Override @@ -53,14 +55,14 @@ public final class ServiceWrapper { if (obj == null) return false; if (!(obj instanceof ServiceWrapper)) return false; ServiceWrapper other = (ServiceWrapper) obj; - return (this.type.equals(other.type) && this.remote == other.remote && this.name.equals(other.name) && this.group.equals(other.group)); + return (this.type.equals(other.type) && this.remote == other.remote && this.name.equals(other.name) && Objects.equals(this.sncpGroup, other.sncpGroup)); } @Override public int hashCode() { int hash = 3; hash = 67 * hash + Objects.hashCode(this.type); - hash = 67 * hash + Objects.hashCode(this.group); + hash = 67 * hash + Objects.hashCode(this.sncpGroup); hash = 67 * hash + Objects.hashCode(this.name); hash = 67 * hash + (this.remote ? 1 : 0); return hash; @@ -70,6 +72,10 @@ public final class ServiceWrapper { return type; } + public Class[] getResTypes() { + return resTypes; + } + public Service getService() { return service; } diff --git a/src/org/redkale/net/sncp/Sncp.java b/src/org/redkale/net/sncp/Sncp.java index b8b92b45e..0edbca630 100644 --- a/src/org/redkale/net/sncp/Sncp.java +++ b/src/org/redkale/net/sncp/Sncp.java @@ -12,7 +12,6 @@ import java.net.*; import java.security.*; import java.util.*; import java.util.function.*; -import javax.annotation.*; import jdk.internal.org.objectweb.asm.*; import static jdk.internal.org.objectweb.asm.ClassWriter.COMPUTE_FRAMES; import static jdk.internal.org.objectweb.asm.Opcodes.*; @@ -34,12 +33,6 @@ import org.redkale.service.DynRemote; */ public abstract class Sncp { - //当前SNCP Server的IP地址+端口 类型: SocketAddress、InetSocketAddress、String - public static final String RESNAME_SNCP_ADDR = "SNCP_ADDR"; - - //当前Service所属的组 类型: Set、String[] - public static final String RESNAME_SNCP_GROUPS = "SNCP_GROUPS"; - private static final java.lang.reflect.Type GROUPS_TYPE1 = new TypeToken>() { }.getType(); @@ -128,12 +121,13 @@ public abstract class Sncp { *

      * @Resource(name = "")
      * @SncpDyn(remote = false)
+     * @ResourceType({TestService.class})
      * public final class _DynLocalTestService extends TestService{
      *
      *      @Resource
      *      private BsonConvert _convert;
      *
-     *      private Transport[] _sameGroupTransports;
+     *      private Transport _sameGroupTransport;
      *
      *      private Transport[] _diffGroupTransports;
      *
@@ -155,8 +149,8 @@ public abstract class Sncp {
      *      public void _createSomeThing(boolean selfrunnable, boolean samerunnable, boolean diffrunnable, TestBean bean){
      *          if(selfrunnable) super.createSomeThing(bean);
      *          if (_client== null) return;
-     *          if (samerunnable) _client.remote(_convert, _sameGroupTransports, 0, true, false, false, bean);
-     *          if (diffrunnable) _client.remote(_convert, _diffGroupTransports, 0, true, true, false, bean);
+     *          if (samerunnable) _client.remoteSameGroup(_convert, _sameGroupTransport, 0, true, false, false, bean);
+     *          if (diffrunnable) _client.remoteDiffGroup(_convert, _diffGroupTransports, 0, true, true, false, bean);
      *      }
      *
      *      @Override
@@ -168,8 +162,8 @@ public abstract class Sncp {
      *      public String _updateSomeThing(boolean selfrunnable, boolean samerunnable, boolean diffrunnable, String id){
      *          String rs = super.updateSomeThing(id);
      *          if (_client== null) return;
-     *          if (samerunnable) _client.remote(_convert, _sameGroupTransports, 1, true, false, false, id);
-     *          if (diffrunnable) _client.remote(_convert, _diffGroupTransports, 1, true, true, false, id);
+     *          if (samerunnable) _client.remoteSameGroup(_convert, _sameGroupTransport, 1, true, false, false, id);
+     *          if (diffrunnable) _client.remoteDiffGroup(_convert, _diffGroupTransports, 1, true, true, false, id);
      *          return rs;
      *      }
      * }
@@ -196,6 +190,7 @@ public abstract class Sncp {
         final String clientDesc = Type.getDescriptor(SncpClient.class);
         final String convertDesc = Type.getDescriptor(BsonConvert.class);
         final String sncpDynDesc = Type.getDescriptor(SncpDyn.class);
+        final String transportDesc = Type.getDescriptor(Transport.class);
         final String transportsDesc = Type.getDescriptor(Transport[].class);
         ClassLoader loader = Sncp.class.getClassLoader();
         String newDynName = supDynName.substring(0, supDynName.lastIndexOf('/') + 1) + LOCALPREFIX + serviceClass.getSimpleName();
@@ -227,6 +222,22 @@ public abstract class Sncp {
             av0.visit("remote", Boolean.FALSE);
             av0.visitEnd();
         }
+        {
+            av0 = cw.visitAnnotation(Type.getDescriptor(ResourceType.class), true);
+            {
+                AnnotationVisitor av1 = av0.visitArray("value");
+                ResourceType rty = serviceClass.getAnnotation(ResourceType.class);
+                if (rty == null) {
+                    av1.visit(null, Type.getType(Type.getDescriptor(serviceClass)));
+                } else {
+                    for (Class cl : rty.value()) {
+                        av1.visit(null, Type.getType(Type.getDescriptor(cl)));
+                    }
+                }
+                av1.visitEnd();
+            }
+            av0.visitEnd();
+        }
         if (hasMultiRun) {
             {
                 fv = cw.visitField(ACC_PRIVATE, "_convert", convertDesc, null, null);
@@ -235,7 +246,7 @@ public abstract class Sncp {
                 fv.visitEnd();
             }
             {
-                fv = cw.visitField(ACC_PRIVATE, "_sameGroupTransports", transportsDesc, null, null);
+                fv = cw.visitField(ACC_PRIVATE, "_sameGroupTransport", transportDesc, null, null);
                 fv.visitEnd();
             }
             {
@@ -437,8 +448,8 @@ public abstract class Sncp {
                 mv.visitFieldInsn(GETFIELD, newDynName, "_client", clientDesc);
                 mv.visitVarInsn(ALOAD, 0);  //传递 _convert
                 mv.visitFieldInsn(GETFIELD, newDynName, "_convert", convertDesc);
-                mv.visitVarInsn(ALOAD, 0);  //传递 _sameGroupTransports
-                mv.visitFieldInsn(GETFIELD, newDynName, "_sameGroupTransports", transportsDesc);
+                mv.visitVarInsn(ALOAD, 0);  //传递 _sameGroupTransport
+                mv.visitFieldInsn(GETFIELD, newDynName, "_sameGroupTransport", transportDesc);
 
                 if (index <= 5) {  //第几个 SncpAction 
                     mv.visitInsn(ICONST_0 + index);
@@ -498,7 +509,7 @@ public abstract class Sncp {
                     }
                     mv.visitInsn(AASTORE);
                 }
-                mv.visitMethodInsn(INVOKEVIRTUAL, clientName, mrun.async() ? "asyncRemote" : "remote", "(" + convertDesc + transportsDesc + "I[Ljava/lang/Object;)V", false);
+                mv.visitMethodInsn(INVOKEVIRTUAL, clientName, mrun.async() ? "asyncRemoteSameGroup" : "remoteSameGroup", "(" + convertDesc + transportDesc + "I[Ljava/lang/Object;)V", false);
                 mv.visitLabel(sameLabel);
                 //---------------------------- 调用diffrun ---------------------------------
                 mv.visitVarInsn(ILOAD, 3); //读取 diffrunnable
@@ -570,7 +581,7 @@ public abstract class Sncp {
                     }
                     mv.visitInsn(AASTORE);
                 }
-                mv.visitMethodInsn(INVOKEVIRTUAL, clientName, mrun.async() ? "asyncRemote" : "remote", "(" + convertDesc + transportsDesc + "I[Ljava/lang/Object;)V", false);
+                mv.visitMethodInsn(INVOKEVIRTUAL, clientName, mrun.async() ? "asyncRemoteDiffGroup" : "remoteDiffGroup", "(" + convertDesc + transportsDesc + "I[Ljava/lang/Object;)V", false);
                 mv.visitLabel(diffLabel);
 
                 if (returnType == void.class) {
@@ -669,53 +680,33 @@ public abstract class Sncp {
      * @return Service的本地模式实例
      */
     @SuppressWarnings("unchecked")
-    public static  T createLocalService(final String name, final Consumer executor, final Class serviceClass,
-            final InetSocketAddress clientAddress, HashSet groups, Collection sameGroupTransports, Collection diffGroupTransports) {
+    public static  T createLocalService(final String name, final Consumer executor,
+            final Class serviceClass, final InetSocketAddress clientAddress, final Transport sameGroupTransport, final Collection diffGroupTransports) {
         try {
             final Class newClazz = createLocalServiceClass(name, serviceClass);
             T rs = (T) newClazz.newInstance();
             //--------------------------------------            
-            if (sameGroupTransports == null) sameGroupTransports = new ArrayList<>();
-            if (diffGroupTransports == null) diffGroupTransports = new ArrayList<>();
+            Service remoteService = null;
             Transport remoteTransport = null;
             {
                 Class loop = newClazz;
-                String[] groupArray = null;
                 do {
                     for (Field field : loop.getDeclaredFields()) {
                         int mod = field.getModifiers();
                         if (Modifier.isFinal(mod) || Modifier.isStatic(mod)) continue;
-                        if (field.getAnnotation(DynRemote.class) != null) {
-                            field.setAccessible(true);
-                            if (remoteTransport == null) {
-                                List list = new ArrayList<>();
-                                list.addAll(sameGroupTransports);
-                                list.addAll(diffGroupTransports);
-                                if (!list.isEmpty()) remoteTransport = new Transport(list.get(0), clientAddress, list);
-                            }
-                            if (field.getType().isAssignableFrom(newClazz) && remoteTransport != null) {
-                                field.set(rs, createRemoteService(name, executor, serviceClass, clientAddress, groups, remoteTransport));
-                            }
-                            continue;
-                        }
-                        Resource res = field.getAnnotation(Resource.class);
-                        if (res == null) continue;
+                        if (field.getAnnotation(DynRemote.class) == null) continue;
+                        if (!field.getType().isAssignableFrom(newClazz)) continue;
                         field.setAccessible(true);
-                        if (res.name().equals(RESNAME_SNCP_GROUPS)) {
-                            if (groups == null) groups = new LinkedHashSet<>();
-                            if (groupArray == null) groupArray = groups.toArray(new String[groups.size()]);
-                            if (field.getGenericType().equals(GROUPS_TYPE1)) {
-                                field.set(rs, groups);
-                            } else if (field.getGenericType().equals(GROUPS_TYPE2)) {
-                                field.set(rs, groupArray);
-                            }
-                        } else if (res.name().endsWith(RESNAME_SNCP_ADDR)) {
-                            if (field.getType() == String.class) {
-                                field.set(rs, clientAddress == null ? null : (clientAddress.getHostString() + ":" + clientAddress.getPort()));
-                            } else {
-                                field.set(rs, clientAddress);
-                            }
+                        if (remoteTransport == null) {
+                            List list = new ArrayList<>();
+                            if (sameGroupTransport != null) list.add(sameGroupTransport);
+                            if (diffGroupTransports != null) list.addAll(diffGroupTransports);
+                            if (!list.isEmpty()) remoteTransport = new Transport(list);
                         }
+                        if (remoteService == null && remoteTransport != null) {
+                            remoteService = createRemoteService(name, executor, serviceClass, clientAddress, remoteTransport);
+                        }
+                        if (remoteService != null) field.set(rs, remoteService);
                     }
                 } while ((loop = loop.getSuperclass()) != Object.class);
             }
@@ -724,7 +715,7 @@ public abstract class Sncp {
                 try {
                     Field e = newClazz.getDeclaredField("_client");
                     e.setAccessible(true);
-                    client = new SncpClient(name, executor, hash(serviceClass), false, newClazz, clientAddress, groups);
+                    client = new SncpClient(name, executor, hash(serviceClass), false, newClazz, clientAddress);
                     e.set(rs, client);
                 } catch (NoSuchFieldException ne) {
                 }
@@ -735,19 +726,23 @@ public abstract class Sncp {
                 if (client != null) {
                     sb.append(", nameid = ").append(client.getNameid()).append(", serviceid = ").append(client.getServiceid());
                     sb.append(", action.size = ").append(client.getActionCount());
-
+                    List groups = new ArrayList<>();
+                    if (sameGroupTransport != null) groups.add(sameGroupTransport.getName());
+                    if (diffGroupTransports != null) {
+                        for (Transport t : diffGroupTransports) {
+                            groups.add(t.getName());
+                        }
+                    }
                     sb.append(", address = ").append(clientAddress).append(", groups = ").append(groups);
-                    List addrs = new ArrayList<>();
-                    for (Transport t : sameGroupTransports) {
-                        addrs.addAll(Arrays.asList(t.getRemoteAddress()));
-                    }
-                    sb.append(", samegroups = ").append(addrs);
+                    sb.append(", sameaddrs = ").append(sameGroupTransport == null ? null : Arrays.asList(sameGroupTransport.getRemoteAddresses()));
 
-                    addrs.clear();
-                    for (Transport t : diffGroupTransports) {
-                        addrs.addAll(Arrays.asList(t.getRemoteAddress()));
+                    List addrs = new ArrayList<>();
+                    if (diffGroupTransports != null) {
+                        for (Transport t : diffGroupTransports) {
+                            addrs.addAll(Arrays.asList(t.getRemoteAddresses()));
+                        }
                     }
-                    sb.append(", diffgroups = ").append(addrs);
+                    sb.append(", diffaddrs = ").append(addrs);
                 } else {
                     sb.append(", ").append(MultiRun.class.getSimpleName().toLowerCase()).append(" = false");
                 }
@@ -758,11 +753,11 @@ public abstract class Sncp {
             }
             if (client == null) return rs;
             {
-                Field c = newClazz.getDeclaredField("_sameGroupTransports");
+                Field c = newClazz.getDeclaredField("_sameGroupTransport");
                 c.setAccessible(true);
-                c.set(rs, sameGroupTransports.toArray(new Transport[sameGroupTransports.size()]));
+                c.set(rs, sameGroupTransport);
             }
-            {
+            if (diffGroupTransports != null) {
                 Field t = newClazz.getDeclaredField("_diffGroupTransports");
                 t.setAccessible(true);
                 t.set(rs, diffGroupTransports.toArray(new Transport[diffGroupTransports.size()]));
@@ -780,6 +775,7 @@ public abstract class Sncp {
      * 
      * @Resource(name = "")
      * @SncpDyn(remote = true)
+     * @ResourceType({TestService.class})
      * public final class _DynRemoteTestService extends TestService{
      *
      *      @Resource
@@ -837,7 +833,7 @@ public abstract class Sncp {
      */
     @SuppressWarnings("unchecked")
     public static  T createRemoteService(final String name, final Consumer executor, final Class serviceClass,
-            final InetSocketAddress clientAddress, HashSet groups, final Transport transport) {
+            final InetSocketAddress clientAddress, final Transport transport) {
         if (serviceClass == null) return null;
         if (!Service.class.isAssignableFrom(serviceClass)) return null;
         int mod = serviceClass.getModifiers();
@@ -852,7 +848,7 @@ public abstract class Sncp {
         final String anyValueDesc = Type.getDescriptor(AnyValue.class);
         ClassLoader loader = Sncp.class.getClassLoader();
         String newDynName = supDynName.substring(0, supDynName.lastIndexOf('/') + 1) + REMOTEPREFIX + serviceClass.getSimpleName();
-        final SncpClient client = new SncpClient(name, executor, hash(serviceClass), true, realed ? createLocalServiceClass(name, serviceClass) : serviceClass, clientAddress, groups);
+        final SncpClient client = new SncpClient(name, executor, hash(serviceClass), true, realed ? createLocalServiceClass(name, serviceClass) : serviceClass, clientAddress);
         try {
             Class newClazz = Class.forName(newDynName.replace('/', '.'));
             T rs = (T) newClazz.newInstance();
@@ -867,8 +863,8 @@ public abstract class Sncp {
                 sb.append(newClazz.getName()).append("{name = ").append(name);
                 sb.append(", nameid = ").append(client.getNameid()).append(", serviceid = ").append(client.getServiceid());
                 sb.append(", action.size = ").append(client.getActionCount());
-                sb.append(", address = ").append(clientAddress).append(", groups = ").append(groups);
-                sb.append(", remotes = ").append(transport == null ? null : Arrays.asList(transport.getRemoteAddress()));
+                sb.append(", address = ").append(clientAddress).append(", groups = ").append(transport == null ? null : transport.getName());
+                sb.append(", remoteaddrs = ").append(transport == null ? null : Arrays.asList(transport.getRemoteAddresses()));
                 sb.append("}");
                 Field s = newClazz.getDeclaredField("_selfstring");
                 s.setAccessible(true);
@@ -889,6 +885,22 @@ public abstract class Sncp {
             av0.visit("name", name);
             av0.visitEnd();
         }
+        {
+            av0 = cw.visitAnnotation(Type.getDescriptor(ResourceType.class), true);
+            {
+                AnnotationVisitor av1 = av0.visitArray("value");
+                ResourceType rty = serviceClass.getAnnotation(ResourceType.class);
+                if (rty == null) {
+                    av1.visit(null, Type.getType(Type.getDescriptor(serviceClass)));
+                } else {
+                    for (Class cl : rty.value()) {
+                        av1.visit(null, Type.getType(Type.getDescriptor(cl)));
+                    }
+                }
+                av1.visitEnd();
+            }
+            av0.visitEnd();
+        }
         {
             av0 = cw.visitAnnotation(sncpDynDesc, true);
             av0.visit("remote", Boolean.TRUE);
@@ -1070,8 +1082,8 @@ public abstract class Sncp {
                 sb.append(newClazz.getName()).append("{name = ").append(name);
                 sb.append(", nameid = ").append(client.getNameid()).append(", serviceid = ").append(client.getServiceid());
                 sb.append(", action.size = ").append(client.getActionCount());
-                sb.append(", address = ").append(clientAddress).append(", groups = ").append(groups);
-                sb.append(", remotes = ").append(transport == null ? null : Arrays.asList(transport.getRemoteAddress()));
+                sb.append(", address = ").append(clientAddress).append(", groups = ").append(transport == null ? null : transport.getName());
+                sb.append(", remotes = ").append(transport == null ? null : Arrays.asList(transport.getRemoteAddresses()));
                 sb.append("}");
                 Field s = newClazz.getDeclaredField("_selfstring");
                 s.setAccessible(true);
diff --git a/src/org/redkale/net/sncp/SncpClient.java b/src/org/redkale/net/sncp/SncpClient.java
index 1c0190ab0..15c27f7bb 100644
--- a/src/org/redkale/net/sncp/SncpClient.java
+++ b/src/org/redkale/net/sncp/SncpClient.java
@@ -129,8 +129,6 @@ public final class SncpClient {
 
     protected final InetSocketAddress address;
 
-    protected final HashSet groups;
-
     private final byte[] addrBytes;
 
     private final int addrPort;
@@ -144,12 +142,11 @@ public final class SncpClient {
     protected final Consumer executor;
 
     public SncpClient(final String serviceName, final Consumer executor, final DLong serviceid, boolean remote, final Class serviceClass,
-            final InetSocketAddress clientAddress, final HashSet groups) {
+            final InetSocketAddress clientAddress) {
         this.remote = remote;
         this.executor = executor;
         this.serviceClass = serviceClass;
         this.address = clientAddress;
-        this.groups = groups;
         //if (subLocalClass != null && !serviceClass.isAssignableFrom(subLocalClass)) throw new RuntimeException(subLocalClass + " is not " + serviceClass + " sub class ");
         this.name = serviceName;
         this.nameid = Sncp.hash(serviceName);
@@ -187,7 +184,7 @@ public final class SncpClient {
         if (remote) service = service.replace(Sncp.LOCALPREFIX, Sncp.REMOTEPREFIX);
         return this.getClass().getSimpleName() + "(service = " + service + ", serviceid = " + serviceid + ", nameid = " + nameid
                 + ", name = '" + name + "', address = " + (address == null ? "" : (address.getHostString() + ":" + address.getPort()))
-                + ", groups = " + groups + ", actions.size = " + actions.length + ")";
+                + ", actions.size = " + actions.length + ")";
     }
 
     public static List parseMethod(final Class serviceClass) {
@@ -230,11 +227,50 @@ public final class SncpClient {
         return multis;
     }
 
+    public void remoteSameGroup(final BsonConvert convert, Transport transport, final int index, final Object... params) {
+        final SncpAction action = actions[index];
+        if (action.handlerFuncParamIndex >= 0) params[action.handlerFuncParamIndex] = null; //不能让远程调用handler,因为之前本地方法已经调用过了
+        for (InetSocketAddress addr : transport.getRemoteAddresses()) {
+            remote0(null, convert, transport, addr, action, params);
+        }
+    }
+
+    public void asyncRemoteSameGroup(final BsonConvert convert, Transport transport, final int index, final Object... params) {
+        if (executor != null) {
+            executor.accept(() -> {
+                remoteSameGroup(convert, transport, index, params);
+            });
+        } else {
+            remoteSameGroup(convert, transport, index, params);
+        }
+    }
+
+    public void remoteDiffGroup(final BsonConvert convert, Transport[] transports, final int index, final Object... params) {
+        if (transports == null || transports.length < 1) return;
+        final SncpAction action = actions[index];
+        if (action.handlerFuncParamIndex >= 0) params[action.handlerFuncParamIndex] = null; //不能让远程调用handler,因为之前本地方法已经调用过了
+        for (Transport transport : transports) {
+            remote0(null, convert, transport, null, action, params);
+        }
+    }
+
+    public void asyncRemoteDiffGroup(final BsonConvert convert, Transport[] transports, final int index, final Object... params) {
+        if (transports == null || transports.length < 1) return;
+        if (executor != null) {
+            executor.accept(() -> {
+                remoteDiffGroup(convert, transports, index, params);
+            });
+        } else {
+            remoteDiffGroup(convert, transports, index, params);
+        }
+    }
+
+    //只给远程模式调用的
     public  T remote(final BsonConvert convert, Transport transport, final int index, final Object... params) {
         final SncpAction action = actions[index];
         final CompletionHandler handlerFunc = action.handlerFuncParamIndex >= 0 ? (CompletionHandler) params[action.handlerFuncParamIndex] : null;
         if (action.handlerFuncParamIndex >= 0) params[action.handlerFuncParamIndex] = null;
-        Future future = remote0(handlerFunc, convert, transport, action, params);
+        Future future = remote0(handlerFunc, convert, transport, null, action, params);
         if (handlerFunc != null) return null;
         final BsonReader reader = convert.pollBsonReader();
         try {
@@ -257,22 +293,11 @@ public final class SncpClient {
         if (transports == null || transports.length < 1) return;
         remote(convert, transports[0], index, params);
         for (int i = 1; i < transports.length; i++) {
-            remote0(null, convert, transports[i], actions[index], params);
+            remote0(null, convert, transports[i], null, actions[index], params);
         }
     }
 
-    public  void asyncRemote(final BsonConvert convert, Transport[] transports, final int index, final Object... params) {
-        if (transports == null || transports.length < 1) return;
-        if (executor != null) {
-            executor.accept(() -> {
-                remote(convert, transports, index, params);
-            });
-        } else {
-            remote(convert, transports, index, params);
-        }
-    }
-
-    private Future remote0(final CompletionHandler handler, final BsonConvert convert, final Transport transport, final SncpAction action, final Object... params) {
+    private Future remote0(final CompletionHandler handler, final BsonConvert convert, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) {
         Type[] myparamtypes = action.paramTypes;
         if (action.addressSourceParamIndex >= 0) params[action.addressSourceParamIndex] = this.address;
         final BsonWriter writer = convert.pollBsonWriter(transport.getBufferSupplier()); // 将head写入
@@ -283,7 +308,7 @@ public final class SncpClient {
         final int reqBodyLength = writer.count() - HEADER_SIZE; //body总长度
         final long seqid = System.nanoTime();
         final DLong actionid = action.actionid;
-        final SocketAddress addr = action.addressTargetParamIndex >= 0 ? (SocketAddress) params[action.addressTargetParamIndex] : null;
+        final SocketAddress addr = addr0 == null ? (action.addressTargetParamIndex >= 0 ? (SocketAddress) params[action.addressTargetParamIndex] : null) : addr0;
         final AsyncConnection conn = transport.pollConnection(addr);
         if (conn == null || !conn.isOpen()) {
             logger.log(Level.SEVERE, action.method + " sncp (params: " + jsonConvert.convertTo(params) + ") cannot connect " + (conn == null ? addr : conn.getRemoteAddress()));
diff --git a/src/org/redkale/net/sncp/SncpDynServlet.java b/src/org/redkale/net/sncp/SncpDynServlet.java
index 706b88ba9..046320432 100644
--- a/src/org/redkale/net/sncp/SncpDynServlet.java
+++ b/src/org/redkale/net/sncp/SncpDynServlet.java
@@ -154,9 +154,9 @@ public final class SncpDynServlet extends SncpServlet {
          *
          *      @Override
          *      public void action(final BsonReader in, final BsonWriter out) throws Throwable {
-         *          TestBean arg1 = convert.convertFrom(in, paramTypes[1]);
-         *          String arg2 = convert.convertFrom(in, paramTypes[2]);
-         *          int arg3 = convert.convertFrom(in, paramTypes[3]);
+         *          TestBean arg1 = convert.convertFrom(paramTypes[1], in);
+         *          String arg2 = convert.convertFrom(paramTypes[2], in);
+         *          int arg3 = convert.convertFrom(paramTypes[3], in);
          *          Object rs = service.change(arg1, arg2, arg3);
          *          callParameter(out, arg1, arg2, arg3);
          *          convert.convertTo(out, paramTypes[0], rs);
@@ -210,9 +210,9 @@ public final class SncpDynServlet extends SncpServlet {
                 mv.visitMaxs(1, 1);
                 mv.visitEnd();
             }
-            String convertFromDesc = "(" + convertReaderDesc + "Ljava/lang/reflect/Type;)Ljava/lang/Object;";
+            String convertFromDesc = "(Ljava/lang/reflect/Type;" + convertReaderDesc + ")Ljava/lang/Object;";
             try {
-                convertFromDesc = Type.getMethodDescriptor(BsonConvert.class.getMethod("convertFrom", BsonReader.class, java.lang.reflect.Type.class));
+                convertFromDesc = Type.getMethodDescriptor(BsonConvert.class.getMethod("convertFrom", java.lang.reflect.Type.class, BsonReader.class));
             } catch (Exception ex) {
                 throw new RuntimeException(ex); //不可能会发生
             }
@@ -227,7 +227,6 @@ public final class SncpDynServlet extends SncpServlet {
                 for (int i = 0; i < paramClasses.length; i++) { //参数
                     mv.visitVarInsn(ALOAD, 0);
                     mv.visitFieldInsn(GETFIELD, newDynName, "convert", Type.getDescriptor(BsonConvert.class));
-                    mv.visitVarInsn(ALOAD, 1);
                     mv.visitVarInsn(ALOAD, 0);
                     mv.visitFieldInsn(GETFIELD, newDynName, "paramTypes", "[Ljava/lang/reflect/Type;");
                     if (iconst > ICONST_5) {
@@ -236,6 +235,7 @@ public final class SncpDynServlet extends SncpServlet {
                         mv.visitInsn(iconst);  //
                     }
                     mv.visitInsn(AALOAD);
+                    mv.visitVarInsn(ALOAD, 1);
 
                     mv.visitMethodInsn(INVOKEVIRTUAL, convertName, "convertFrom", convertFromDesc, false);
                     int load = ALOAD;
diff --git a/src/org/redkale/net/sncp/SncpServer.java b/src/org/redkale/net/sncp/SncpServer.java
index b10de5e16..603863f37 100644
--- a/src/org/redkale/net/sncp/SncpServer.java
+++ b/src/org/redkale/net/sncp/SncpServer.java
@@ -48,7 +48,7 @@ public final class SncpServer extends Server {
         final int port = this.address.getPort();
         AtomicLong createBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Buffer.creatCounter");
         AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Buffer.cycleCounter");
-        final int rcapacity = Math.max(this.capacity, 4 * 1024);
+        final int rcapacity = Math.max(this.bufferCapacity, 4 * 1024);
         ObjectPool bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, this.bufferPoolSize,
                 (Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
                     if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false;
diff --git a/src/org/redkale/service/CacheSourceService.java b/src/org/redkale/service/CacheSourceService.java
index 2e92db6e3..7e40dd239 100644
--- a/src/org/redkale/service/CacheSourceService.java
+++ b/src/org/redkale/service/CacheSourceService.java
@@ -29,6 +29,7 @@ import org.redkale.util.*;
  * @author zhangjx
  */
 @AutoLoad(false)
+@ResourceType({CacheSourceService.class, CacheSource.class})
 public class CacheSourceService implements CacheSource, Service, AutoCloseable {
 
     @Resource(name = "APP_HOME")
diff --git a/src/org/redkale/service/DataCacheListenerService.java b/src/org/redkale/service/DataCacheListenerService.java
index 565502927..1e5bf37be 100644
--- a/src/org/redkale/service/DataCacheListenerService.java
+++ b/src/org/redkale/service/DataCacheListenerService.java
@@ -12,10 +12,13 @@ import org.redkale.util.*;
 
 /**
  *
- * 

详情见: http://www.redkale.org + *

+ * 详情见: http://www.redkale.org + * * @author zhangjx */ @AutoLoad(false) +@ResourceType({DataCacheListenerService.class, DataCacheListener.class}) public class DataCacheListenerService implements DataCacheListener, Service { @Resource(name = "$") diff --git a/src/org/redkale/service/DataSQLListenerService.java b/src/org/redkale/service/DataSQLListenerService.java index fabd2907c..d74504620 100644 --- a/src/org/redkale/service/DataSQLListenerService.java +++ b/src/org/redkale/service/DataSQLListenerService.java @@ -15,6 +15,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.logging.*; import javax.annotation.Resource; +import org.redkale.util.*; /** * 暂时不实现 @@ -25,6 +26,7 @@ import javax.annotation.Resource; * @author zhangjx */ @AutoLoad(false) +@ResourceType({DataSQLListenerService.class, DataSQLListener.class}) public class DataSQLListenerService implements DataSQLListener, Service { private static final String format = "%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%tL"; diff --git a/src/org/redkale/service/DataSourceService.java b/src/org/redkale/service/DataSourceService.java index 2d7bd144c..0260e7686 100644 --- a/src/org/redkale/service/DataSourceService.java +++ b/src/org/redkale/service/DataSourceService.java @@ -15,10 +15,13 @@ import org.redkale.util.*; /** * DataSource对应的Service类, 该类主要特点是将所有含FilterBean参数的方法重载成FilterNode对应的方法。 * - *

详情见: http://www.redkale.org + *

+ * 详情见: http://www.redkale.org + * * @author zhangjx */ @AutoLoad(false) +@ResourceType({DataSourceService.class, DataSource.class}) public class DataSourceService implements DataSource, Service, AutoCloseable { @Resource(name = "$") diff --git a/src/org/redkale/service/Service.java b/src/org/redkale/service/Service.java index f8d7d2dd2..0e9dae46c 100644 --- a/src/org/redkale/service/Service.java +++ b/src/org/redkale/service/Service.java @@ -8,9 +8,16 @@ package org.redkale.service; import org.redkale.util.*; /** - * 所有Service的实现类不得声明为final, 允许远程模式的public方法和public String name()方法都不能声明为final。 + * 所有Service的实现类不得声明为final, 允许远程模式的public方法都不能声明为final。 * 注意: "$"是一个很特殊的Service.name值 。 被标记为@Resource(name = "$") 的Service的资源名与所属父Service的资源名一致。 - * + *

+ *

+ * Service的资源类型
+ * 业务逻辑的Service通常有两种编写方式:
+ *    1、只写一个Service实现类。
+ *    2、先定义业务的Service接口或抽象类,再编写具体实现类。
+ * 第二种方式需要在具体实现类上使用@ResourceType指明资源注入的类型。
+ * 
*

* @Resource(name = ".*") * private HashMap<String, XXXService> nodemap; diff --git a/src/org/redkale/service/ServiceResource.java b/src/org/redkale/service/ServiceResource.java deleted file mode 100644 index f86e90146..000000000 --- a/src/org/redkale/service/ServiceResource.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.service; - -import java.lang.annotation.*; -import static java.lang.annotation.ElementType.TYPE; -import static java.lang.annotation.RetentionPolicy.RUNTIME; - -/** - *

- * Service的资源类型
- * 业务逻辑的Service通常有两种编写方式:
- *    1、只写一个Service实现类。
- *    2、先定义业务的Service接口或抽象类,再编写具体实现类。
- * @ServiceResource用于第二种方式, 在具体实现类上需要使用@ServiceResource指明资源注入的类型。
- * 
- *

- * 详情见: http://www.redkale.org - * - * @author zhangjx - */ -@Inherited -@Documented -@Target({TYPE}) -@Retention(RUNTIME) -public @interface ServiceResource { - - Class value(); - -} diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index b8c206f01..2809debb1 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -14,10 +14,13 @@ import org.redkale.util.*; /** * - *

详情见: http://www.redkale.org + *

+ * 详情见: http://www.redkale.org + * * @author zhangjx */ @AutoLoad(false) +@ResourceType({WebSocketNodeService.class, WebSocketNode.class}) public class WebSocketNodeService extends WebSocketNode implements Service { @Override diff --git a/src/org/redkale/util/ResourceFactory.java b/src/org/redkale/util/ResourceFactory.java index eaebba383..098eedc27 100644 --- a/src/org/redkale/util/ResourceFactory.java +++ b/src/org/redkale/util/ResourceFactory.java @@ -67,7 +67,15 @@ public final class ResourceFactory { } public void register(final String name, final Object rs) { - register(name, rs.getClass(), rs); + final Class claz = rs.getClass(); + ResourceType rtype = (ResourceType) claz.getAnnotation(ResourceType.class); + if (rtype == null) { + register(name, claz, rs); + } else { + for (Class cl : rtype.value()) { + register(name, cl, rs); + } + } } public void register(final String name, final Class clazz, final A rs) { diff --git a/src/org/redkale/util/ResourceType.java b/src/org/redkale/util/ResourceType.java new file mode 100644 index 000000000..3cb19031c --- /dev/null +++ b/src/org/redkale/util/ResourceType.java @@ -0,0 +1,26 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.util; + +import java.lang.annotation.*; +import static java.lang.annotation.ElementType.TYPE; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +/** + * 显式的指明资源类型 + *

+ * 详情见: http://www.redkale.org + * + * @author zhangjx + */ +@Inherited +@Documented +@Target({TYPE}) +@Retention(RUNTIME) +public @interface ResourceType { + + Class[] value(); +}