From 756e4634d9b7a5ac52396dca7a9af07c758ffb66 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Mon, 29 May 2017 15:03:27 +0800 Subject: [PATCH] --- src/org/redkale/boot/Application.java | 13 +++--- src/org/redkale/boot/NodeServer.java | 4 +- src/org/redkale/net/TransportFactory.java | 10 +++++ src/org/redkale/net/TransportGroupInfo.java | 24 +++++++---- src/org/redkale/net/sncp/Sncp.java | 20 ++++++--- src/org/redkale/net/sncp/SncpClient.java | 15 +------ .../redkale/test/service/ABMainService.java | 41 ++++++++++--------- test/org/redkale/test/sncp/SncpTest.java | 21 +++++----- .../test/sncp/SncpTestServiceImpl.java | 14 ++++--- 9 files changed, 95 insertions(+), 67 deletions(-) diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index a5071d659..7c1287221 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -103,7 +103,7 @@ public final class Application { final List servers = new CopyOnWriteArrayList<>(); //传输端的TransportFactory - final TransportFactory transportFactory; + final SncpTransportFactory transportFactory; //全局根ResourceFactory final ResourceFactory resourceFactory = ResourceFactory.root(); @@ -260,21 +260,20 @@ public final class Application { logger.log(Level.INFO, Transport.class.getSimpleName() + " configure bufferCapacity = " + bufferCapacity + "; bufferPoolSize = " + bufferPoolSize + "; threads = " + threads + ";"); } } - this.transportFactory = new TransportFactory(transportExec, transportPool, transportGroup); + this.transportFactory = new SncpTransportFactory(transportExec, transportPool, transportGroup); } public ResourceFactory getResourceFactory() { return resourceFactory; } - public TransportFactory getTransportFactory() { + public SncpTransportFactory getTransportFactory() { return transportFactory; } - + // public WatchFactory getWatchFactory() { // return watchFactory; // } - public List getNodeServers() { return new ArrayList<>(servers); } @@ -359,6 +358,8 @@ public final class Application { Class type = field.getType(); if (type == Application.class) { field.set(src, application); + } else if (type == SncpTransportFactory.class) { + field.set(src, application.transportFactory); } else if (type == NodeSncpServer.class) { NodeServer server = null; for (NodeServer ns : application.getNodeServers()) { @@ -403,7 +404,7 @@ public final class Application { return false; } - }, Application.class, NodeSncpServer.class, NodeHttpServer.class, NodeWatchServer.class); + }, Application.class, SncpTransportFactory.class, NodeSncpServer.class, NodeHttpServer.class, NodeWatchServer.class); //-------------------------------------------------------------------------- initResources(); } diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index 9c6dfa9e4..01c0127a5 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -188,7 +188,7 @@ public abstract class NodeServer { final NodeServer self = this; //--------------------------------------------------------------------------------------------- final ResourceFactory appResFactory = application.getResourceFactory(); - final TransportFactory appTranFactory = application.getTransportFactory(); + final SncpTransportFactory appTranFactory = application.getTransportFactory(); final AnyValue resources = application.config.getAnyValue("resources"); final Map cacheResource = new HashMap<>(); //final Map dataResources = new HashMap<>(); @@ -321,7 +321,7 @@ public abstract class NodeServer { final Set> entrys = (Set) serviceFilter.getAllFilterEntrys(); ResourceFactory regFactory = isSNCP() ? application.getResourceFactory() : resourceFactory; final ResourceFactory appResourceFactory = application.getResourceFactory(); - final TransportFactory appTransportFactory = application.getTransportFactory(); + final SncpTransportFactory appTransportFactory = application.getTransportFactory(); for (FilterEntry entry : entrys) { //service实现类 final Class serviceImplClass = entry.getType(); if (Modifier.isFinal(serviceImplClass.getModifiers())) continue; //修饰final的类跳过 diff --git a/src/org/redkale/net/TransportFactory.java b/src/org/redkale/net/TransportFactory.java index 886d2242e..3e1e3bacd 100644 --- a/src/org/redkale/net/TransportFactory.java +++ b/src/org/redkale/net/TransportFactory.java @@ -52,6 +52,16 @@ public class TransportFactory { return groupInfos.get(group); } + public TransportFactory addGroupInfo(String name, InetSocketAddress... addrs) { + addGroupInfo(new TransportGroupInfo(name, addrs)); + return this; + } + + public TransportFactory addGroupInfo(String name, Set addrs) { + addGroupInfo(new TransportGroupInfo(name, addrs)); + return this; + } + public boolean addGroupInfo(TransportGroupInfo info) { if (info == null) throw new RuntimeException("TransportGroupInfo can not null"); if (info.addresses == null) throw new RuntimeException("TransportGroupInfo.addresses can not null"); diff --git a/src/org/redkale/net/TransportGroupInfo.java b/src/org/redkale/net/TransportGroupInfo.java index ed0073f2d..747af158d 100644 --- a/src/org/redkale/net/TransportGroupInfo.java +++ b/src/org/redkale/net/TransportGroupInfo.java @@ -8,6 +8,7 @@ package org.redkale.net; import java.net.InetSocketAddress; import java.util.*; import org.redkale.convert.json.JsonConvert; +import org.redkale.util.Utility; /** * 协议地址组合对象, 对应application.xml 中 resources->group 节点信息 @@ -30,13 +31,23 @@ public class TransportGroupInfo { public TransportGroupInfo() { } + public TransportGroupInfo(String name, InetSocketAddress... addrs) { + this(name, "TCP", "", Utility.ofSet(addrs)); + } + + public TransportGroupInfo(String name, Set addrs) { + this(name, "TCP", "", addrs); + } + + public TransportGroupInfo(String name, String protocol, String subprotocol, InetSocketAddress... addrs) { + this(name, protocol, subprotocol, Utility.ofSet(addrs)); + } + public TransportGroupInfo(String name, String protocol, String subprotocol, Set addrs) { Objects.requireNonNull(name, "Transport.group.name can not null"); - Objects.requireNonNull(protocol, "Transport.group.protocol can not null"); - Objects.requireNonNull(subprotocol, "Transport.group.subprotocol can not null"); this.name = name; - this.protocol = protocol; - this.subprotocol = subprotocol; + this.protocol = protocol == null ? "TCP" : protocol; + this.subprotocol = subprotocol == null ? "" : subprotocol; this.addresses = addrs; } @@ -54,8 +65,7 @@ public class TransportGroupInfo { } public void setProtocol(String protocol) { - Objects.requireNonNull(protocol, "Transport.group.protocol can not null"); - this.protocol = protocol; + this.protocol = protocol == null ? "TCP" : protocol; } public String getSubprotocol() { @@ -63,7 +73,7 @@ public class TransportGroupInfo { } public void setSubprotocol(String subprotocol) { - Objects.requireNonNull(subprotocol, "Transport.group.subprotocol can not null"); + this.subprotocol = subprotocol == null ? "" : subprotocol; this.subprotocol = subprotocol; } diff --git a/src/org/redkale/net/sncp/Sncp.java b/src/org/redkale/net/sncp/Sncp.java index 785b130b2..5974b00cb 100644 --- a/src/org/redkale/net/sncp/Sncp.java +++ b/src/org/redkale/net/sncp/Sncp.java @@ -15,7 +15,6 @@ import static jdk.internal.org.objectweb.asm.ClassWriter.COMPUTE_FRAMES; import jdk.internal.org.objectweb.asm.*; import static jdk.internal.org.objectweb.asm.Opcodes.*; import jdk.internal.org.objectweb.asm.Type; -import org.redkale.net.*; import org.redkale.net.sncp.SncpClient.SncpAction; import org.redkale.service.*; import org.redkale.util.*; @@ -750,6 +749,11 @@ public abstract class Sncp { } } + public static T createSimpleLocalService(final Class serviceImplClass, + final SncpTransportFactory transportFactory, final InetSocketAddress clientSncpAddress, final String... groups) { + return createLocalService("", serviceImplClass, ResourceFactory.root(), transportFactory, clientSncpAddress, Utility.ofSet(groups), null); + } + /** * * 创建本地模式Service实例 @@ -758,7 +762,7 @@ public abstract class Sncp { * @param name 资源名 * @param serviceImplClass Service类 * @param resourceFactory ResourceFactory - * @param transportFactory TransportFactory + * @param transportFactory SncpTransportFactory * @param clientSncpAddress 本地IP地址 * @param groups 所有的组节点,包含自身 * @param conf 启动配置项 @@ -770,7 +774,7 @@ public abstract class Sncp { final String name, final Class serviceImplClass, final ResourceFactory resourceFactory, - final TransportFactory transportFactory, + final SncpTransportFactory transportFactory, final InetSocketAddress clientSncpAddress, final Set groups, final AnyValue conf) { @@ -825,6 +829,11 @@ public abstract class Sncp { } } + public static T createSimpleRemoteService(final Class serviceImplClass, + final SncpTransportFactory transportFactory, final InetSocketAddress clientSncpAddress, final String... groups) { + return createRemoteService("", serviceImplClass, transportFactory, clientSncpAddress, Utility.ofSet(groups), null); + } + /** *
      * @Resource(name = "")
@@ -889,7 +898,7 @@ public abstract class Sncp {
      * @param                     Service泛型
      * @param name                   资源名
      * @param serviceTypeOrImplClass Service类
-     * @param transportFactory       TransportFactory
+     * @param transportFactory       SncpTransportFactory
      * @param clientAddress          本地IP地址
      * @param groups                 所有的组节点,包含自身
      * @param conf                   启动配置项
@@ -897,10 +906,11 @@ public abstract class Sncp {
      * @return Service的远程模式实例
      */
     @SuppressWarnings("unchecked")
+
     public static  T createRemoteService(
         final String name,
         final Class serviceTypeOrImplClass,
-        final TransportFactory transportFactory,
+        final SncpTransportFactory transportFactory,
         final InetSocketAddress clientAddress,
         final Set groups,
         final AnyValue conf) {
diff --git a/src/org/redkale/net/sncp/SncpClient.java b/src/org/redkale/net/sncp/SncpClient.java
index e36c39411..54dd4ed95 100644
--- a/src/org/redkale/net/sncp/SncpClient.java
+++ b/src/org/redkale/net/sncp/SncpClient.java
@@ -81,7 +81,7 @@ public final class SncpClient {
     //本地模式
     protected Transport[] diffGroupTransports;
 
-    public  SncpClient(final String serviceName, final Class serviceTypeOrImplClass, final T service, final TransportFactory factory,
+    public  SncpClient(final String serviceName, final Class serviceTypeOrImplClass, final T service, final SncpTransportFactory factory,
         final boolean remote, final Class serviceClass, final InetSocketAddress clientAddress) {
         this.remote = remote;
         this.executor = factory.getExecutor();
@@ -101,6 +101,7 @@ public final class SncpClient {
         this.actions = methodens.toArray(new SncpAction[methodens.size()]);
         this.addrBytes = clientAddress == null ? new byte[4] : clientAddress.getAddress().getAddress();
         this.addrPort = clientAddress == null ? 0 : clientAddress.getPort();
+        factory.addSncpClient(this);
     }
 
     static List getSncpActions(final Class serviceClass) {
@@ -337,18 +338,6 @@ public final class SncpClient {
     }
 
     private CompletableFuture remote0(final AsyncHandler handler, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) {
-        if ("rest".equalsIgnoreCase(transport.getSubprotocol())) {
-            return remoteRest0(handler, transport, addr0, action, params);
-        }
-        return remoteSncp0(handler, transport, addr0, action, params);
-    }
-
-    //尚未实现
-    private CompletableFuture remoteRest0(final AsyncHandler handler, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) {
-        return null;
-    }
-
-    private CompletableFuture remoteSncp0(final AsyncHandler handler, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) {
         final Type[] myparamtypes = action.paramTypes;
         final Class[] myparamclass = action.paramClass;
         if (action.addressSourceParamIndex >= 0) params[action.addressSourceParamIndex] = this.clientAddress;
diff --git a/test/org/redkale/test/service/ABMainService.java b/test/org/redkale/test/service/ABMainService.java
index 8a6b4effb..7a832fc84 100644
--- a/test/org/redkale/test/service/ABMainService.java
+++ b/test/org/redkale/test/service/ABMainService.java
@@ -9,14 +9,12 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.AsynchronousChannelGroup;
-import java.util.HashSet;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 import java.util.logging.*;
 import javax.annotation.Resource;
 import org.redkale.convert.bson.BsonConvert;
 import org.redkale.convert.json.JsonConvert;
-import org.redkale.net.*;
 import org.redkale.net.http.*;
 import org.redkale.net.sncp.*;
 import org.redkale.service.Service;
@@ -36,11 +34,18 @@ public class ABMainService implements Service {
     public static void remotemain(String[] args) throws Throwable {
         System.out.println("------------------- 远程模式调用 -----------------------------------");
         final int abport = 8888;
-        ResourceFactory factory = ResourceFactory.root();
-        factory.register(JsonConvert.root());
-        factory.register(BsonConvert.root());
+        ResourceFactory resFactory = ResourceFactory.root();
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        final SncpTransportFactory transFactory = new SncpTransportFactory(executor, newBufferPool(), newChannelGroup());
+        transFactory.addGroupInfo("g77", new InetSocketAddress("127.0.0.1", 5577));
+        transFactory.addGroupInfo("g88", new InetSocketAddress("127.0.0.1", 5588));
+        transFactory.addGroupInfo("g99", new InetSocketAddress("127.0.0.1", 5599));
+
+        resFactory.register(JsonConvert.root());
+        resFactory.register(BsonConvert.root());
+
         //------------------------ 初始化 CService ------------------------------------
-        CService cservice = Sncp.createLocalService("", null, ResourceFactory.root(), CService.class, new InetSocketAddress("127.0.0.1", 5577), "", new HashSet<>(), (AnyValue) null, null, null);
+        CService cservice = Sncp.createSimpleLocalService(CService.class, transFactory, new InetSocketAddress("127.0.0.1", 5577), "g77");
         SncpServer cserver = new SncpServer();
         cserver.getLogger().setLevel(Level.WARNING);
         cserver.addSncpServlet(cservice);
@@ -48,11 +53,10 @@ public class ABMainService implements Service {
         cserver.start();
 
         //------------------------ 初始化 BCService ------------------------------------
-        final Transport bctransport = new Transport("", "", newBufferPool(), newChannelGroup(), null, Utility.ofSet(new InetSocketAddress("127.0.0.1", 5577)));
-        BCService bcservice = Sncp.createLocalService("", null, ResourceFactory.root(), BCService.class, new InetSocketAddress("127.0.0.1", 5588), "", new HashSet<>(), (AnyValue) null, bctransport, null);
-        CService remoteCService = Sncp.createRemoteService("", null, CService.class, new InetSocketAddress("127.0.0.1", 5588), "", new HashSet<>(), (AnyValue) null, bctransport);
-        factory.inject(remoteCService);
-        factory.register("", remoteCService);
+        BCService bcservice = Sncp.createSimpleLocalService(BCService.class, transFactory, new InetSocketAddress("127.0.0.1", 5588), "g88");
+        CService remoteCService = Sncp.createSimpleRemoteService(CService.class, transFactory, new InetSocketAddress("127.0.0.1", 5588), "g77");
+        resFactory.inject(remoteCService);
+        resFactory.register("", remoteCService);
         SncpServer bcserver = new SncpServer();
         bcserver.getLogger().setLevel(Level.WARNING);
         bcserver.addSncpServlet(bcservice);
@@ -60,20 +64,19 @@ public class ABMainService implements Service {
         bcserver.start();
 
         //------------------------ 初始化 ABMainService ------------------------------------
-        final Transport abtransport = new Transport("", "", newBufferPool(), newChannelGroup(), null, Utility.ofSet(new InetSocketAddress("127.0.0.1", 5588)));
-        ABMainService service = Sncp.createLocalService("", null, ResourceFactory.root(), ABMainService.class, new InetSocketAddress("127.0.0.1", 5599), "", new HashSet<>(), (AnyValue) null, bctransport, null);
-        BCService remoteBCService = Sncp.createRemoteService("", null, BCService.class, new InetSocketAddress("127.0.0.1", 5599), "", new HashSet<>(), (AnyValue) null, abtransport);
-        factory.inject(remoteBCService);
-        factory.register("", remoteBCService);
+        ABMainService service = Sncp.createSimpleLocalService(ABMainService.class, transFactory, new InetSocketAddress("127.0.0.1", 5599), "g99");
+        BCService remoteBCService = Sncp.createSimpleRemoteService(BCService.class, transFactory, new InetSocketAddress("127.0.0.1", 5599), "g88");
+        resFactory.inject(remoteBCService);
+        resFactory.register("", remoteBCService);
 
         HttpServer server = new HttpServer();
         server.getLogger().setLevel(Level.WARNING);
 
         server.addRestServlet("", ABMainService.class, service, HttpServlet.class, "/pipes");
 
-        factory.inject(cservice);
-        factory.inject(bcservice);
-        factory.inject(service);
+        resFactory.inject(cservice);
+        resFactory.inject(bcservice);
+        resFactory.inject(service);
 
         server.init(DefaultAnyValue.create("port", abport));
         server.start();
diff --git a/test/org/redkale/test/sncp/SncpTest.java b/test/org/redkale/test/sncp/SncpTest.java
index 3ec4cac49..fc8441d12 100644
--- a/test/org/redkale/test/sncp/SncpTest.java
+++ b/test/org/redkale/test/sncp/SncpTest.java
@@ -14,7 +14,6 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 import java.util.logging.LogManager;
 import org.redkale.convert.bson.*;
-import org.redkale.net.Transport;
 import org.redkale.net.sncp.*;
 import org.redkale.service.Service;
 import org.redkale.util.*;
@@ -79,9 +78,9 @@ public class SncpTest {
         Set set = new LinkedHashSet<>();
         set.add(addr);
         if (port2 > 0) set.add(new InetSocketAddress(myhost, port2));
-        //String name, WatchFactory, ObjectPool, AsynchronousChannelGroup, InetSocketAddress clientAddress, Collection
-        final Transport transport = new Transport("", "", newBufferPool(), newChannelGroup(), null, set);
-        final SncpTestIService service = Sncp.createSimpleRemoteService(serviceName, SncpTestIService.class, addr, transport);
+        final SncpTransportFactory transFactory = new SncpTransportFactory(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup());
+        transFactory.addGroupInfo("client", set);
+        final SncpTestIService service = Sncp.createSimpleRemoteService(SncpTestIService.class, transFactory, addr, "client");
         ResourceFactory.root().inject(service);
 
 //        SncpTestBean bean = new SncpTestBean();
@@ -156,9 +155,9 @@ public class SncpTest {
                     SncpServer server = new SncpServer();
                     Set set = new LinkedHashSet<>();
                     if (port2 > 0) set.add(new InetSocketAddress(myhost, port2));
-                    //String name, WatchFactory, ObjectPool, AsynchronousChannelGroup, InetSocketAddress clientAddress, Collection
-                    final Transport transport = new Transport("", "", newBufferPool(), newChannelGroup(), null, set);
-                    SncpTestIService service = Sncp.createSimpleLocalService("", SncpTestServiceImpl.class, addr, transport);
+                    final SncpTransportFactory transFactory = new SncpTransportFactory(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup());
+                    transFactory.addGroupInfo("server", set);
+                    SncpTestIService service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, transFactory, addr, "server");
                     ResourceFactory.root().inject(service);
                     server.addSncpServlet(service);
                     System.out.println(service);
@@ -190,9 +189,10 @@ public class SncpTest {
                     SncpServer server = new SncpServer();
                     Set set = new LinkedHashSet<>();
                     set.add(new InetSocketAddress(myhost, port));
-                    //String name, WatchFactory, ObjectPool, AsynchronousChannelGroup, InetSocketAddress clientAddress, Collection
-                    final Transport transport = new Transport("", "", newBufferPool(), newChannelGroup(), null, set);
-                    Service service = Sncp.createSimpleLocalService("", SncpTestServiceImpl.class, addr, transport);
+
+                    final SncpTransportFactory transFactory = new SncpTransportFactory(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup());
+                    transFactory.addGroupInfo("server", set);
+                    Service service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, transFactory, addr, "server");
                     server.addSncpServlet(service);
                     AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue();
                     conf.addValue("host", "0.0.0.0");
@@ -207,4 +207,5 @@ public class SncpTest {
         }.start();
         cdl.await();
     }
+
 }
diff --git a/test/org/redkale/test/sncp/SncpTestServiceImpl.java b/test/org/redkale/test/sncp/SncpTestServiceImpl.java
index 50b4c18dc..64f14a06d 100644
--- a/test/org/redkale/test/sncp/SncpTestServiceImpl.java
+++ b/test/org/redkale/test/sncp/SncpTestServiceImpl.java
@@ -7,11 +7,11 @@ package org.redkale.test.sncp;
 
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.*;
 import org.redkale.net.sncp.*;
 import org.redkale.service.*;
 import org.redkale.source.DataCallArrayAttribute;
+import static org.redkale.test.sncp.SncpTest.*;
 import org.redkale.util.*;
 
 /**
@@ -98,7 +98,11 @@ public class SncpTestServiceImpl implements SncpTestIService {
     }
 
     public static void main(String[] args) throws Exception {
-        Service service = Sncp.createLocalService("", null, ResourceFactory.root(), SncpTestServiceImpl.class, new InetSocketAddress("127.0.0.1", 7070), "", new HashSet<>(), (AnyValue) null, null, null);
+
+        final SncpTransportFactory transFactory = new SncpTransportFactory(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup());
+
+        transFactory.addGroupInfo("g70", new InetSocketAddress("127.0.0.1", 7070));
+        Service service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, transFactory, new InetSocketAddress("127.0.0.1", 7070), "g70");
         for (Method method : service.getClass().getDeclaredMethods()) {
             System.out.println(method);
         }
@@ -107,7 +111,7 @@ public class SncpTestServiceImpl implements SncpTestIService {
             System.out.println(method);
         }
         System.out.println("-----------------------------------");
-        service = Sncp.createSimpleRemoteService("", SncpTestServiceImpl.class, new InetSocketAddress("127.0.0.1", 7070), null);
+        service = Sncp.createSimpleRemoteService(SncpTestServiceImpl.class, transFactory, new InetSocketAddress("127.0.0.1", 7070), "g70");
         for (Method method : service.getClass().getDeclaredMethods()) {
             System.out.println(method);
         }
@@ -116,7 +120,7 @@ public class SncpTestServiceImpl implements SncpTestIService {
             System.out.println(method);
         }
         System.out.println("-----------------------------------");
-        service = Sncp.createSimpleRemoteService("", SncpTestIService.class, new InetSocketAddress("127.0.0.1", 7070), null);
+        service = Sncp.createSimpleRemoteService(SncpTestIService.class, transFactory, new InetSocketAddress("127.0.0.1", 7070), "g70");
         for (Method method : service.getClass().getDeclaredMethods()) {
             System.out.println(method);
         }