From 69cc09e76d0868c1fd1b1b5c417a46aabffe5b58 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Sun, 14 May 2017 16:05:41 +0800 Subject: [PATCH] --- src/org/redkale/boot/Application.java | 19 +++++++++---------- src/org/redkale/boot/NodeServer.java | 5 ++--- src/org/redkale/net/Transport.java | 12 ++++-------- .../redkale/test/service/ABMainService.java | 9 ++++----- test/org/redkale/test/sncp/SncpTest.java | 7 +++---- 5 files changed, 22 insertions(+), 30 deletions(-) diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index bb9ccb520..db6f21cb5 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -130,8 +130,7 @@ public final class Application { private final boolean singletonrun; //根WatchFactory - private final WatchFactory watchFactory = WatchFactory.root(); - + //private final WatchFactory watchFactory = WatchFactory.root(); //进程根目录 private final File home; @@ -245,8 +244,8 @@ public final class Application { if (groupsize > 0 && transportConf == null) transportConf = new DefaultAnyValue(); if (transportConf != null) { //--------------transportBufferPool----------- - AtomicLong createBufferCounter = watchFactory == null ? new AtomicLong() : watchFactory.createWatchNumber(Transport.class.getSimpleName() + ".Buffer.creatCounter"); - AtomicLong cycleBufferCounter = watchFactory == null ? new AtomicLong() : watchFactory.createWatchNumber(Transport.class.getSimpleName() + ".Buffer.cycleCounter"); + AtomicLong createBufferCounter = new AtomicLong(); + AtomicLong cycleBufferCounter = new AtomicLong(); final int bufferCapacity = transportConf.getIntValue("bufferCapacity", 8 * 1024); final int bufferPoolSize = transportConf.getIntValue("bufferPoolSize", groupsize * Runtime.getRuntime().availableProcessors() * 8); final int threads = transportConf.getIntValue("threads", groupsize * Runtime.getRuntime().availableProcessors() * 8); @@ -281,10 +280,10 @@ public final class Application { return resourceFactory; } - public WatchFactory getWatchFactory() { - return watchFactory; - } - +// public WatchFactory getWatchFactory() { +// return watchFactory; +// } + public List getNodeServers() { return new ArrayList<>(servers); } @@ -370,8 +369,8 @@ public final class Application { Class type = field.getType(); if (type == Application.class) { field.set(src, application); - } else if (type == WatchFactory.class) { - field.set(src, application.watchFactory); +// } else if (type == WatchFactory.class) { +// field.set(src, application.watchFactory); } } catch (Exception e) { logger.log(Level.SEVERE, "Resource inject error", e); diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index c49089b84..44bd42c65 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -445,7 +445,7 @@ public abstract class NodeServer { transports.forEach(t -> addrs.addAll(Arrays.asList(t.getRemoteAddresses()))); Transport first = transports.get(0); GroupInfo ginfo = application.findGroupInfo(first.getName()); - Transport newTransport = new Transport(groupid, ginfo.getProtocol(), application.getWatchFactory(), + Transport newTransport = new Transport(groupid, ginfo.getProtocol(), ginfo.getSubprotocol(), application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs); synchronized (application.resourceFactory) { transport = application.resourceFactory.find(groupid, Transport.class); @@ -471,8 +471,7 @@ public abstract class NodeServer { GroupInfo ginfo = application.findGroupInfo(group); Set addrs = ginfo.copyAddrs(); if (addrs == null) throw new RuntimeException("Not found = " + group + " on "); - transport = new Transport(group, ginfo.getProtocol(), application.getWatchFactory(), - ginfo.getSubprotocol(), application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs); + transport = new Transport(group, ginfo.getProtocol(), ginfo.getSubprotocol(), application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs); application.resourceFactory.register(group, transport); } return transport; diff --git a/src/org/redkale/net/Transport.java b/src/org/redkale/net/Transport.java index d05065e4b..213d5ade9 100644 --- a/src/org/redkale/net/Transport.java +++ b/src/org/redkale/net/Transport.java @@ -13,7 +13,6 @@ import java.util.concurrent.*; import java.util.function.Supplier; import java.util.stream.Collectors; import org.redkale.util.ObjectPool; -import org.redkale.watch.WatchFactory; /** * 传输客户端 @@ -50,8 +49,6 @@ public final class Transport { protected final String protocol; - protected final WatchFactory watch; - protected final AsynchronousChannelGroup group; protected final InetSocketAddress clientAddress; @@ -62,15 +59,14 @@ public final class Transport { protected final ConcurrentHashMap> connPool = new ConcurrentHashMap<>(); - public Transport(String name, WatchFactory watch, String subprotocol, final ObjectPool transportBufferPool, + public Transport(String name, String subprotocol, final ObjectPool transportBufferPool, final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, final Collection addresses) { - this(name, DEFAULT_PROTOCOL, watch, subprotocol, transportBufferPool, transportChannelGroup, clientAddress, addresses); + this(name, DEFAULT_PROTOCOL, subprotocol, transportBufferPool, transportChannelGroup, clientAddress, addresses); } - public Transport(String name, String protocol, WatchFactory watch, String subprotocol, final ObjectPool transportBufferPool, + public Transport(String name, String protocol, String subprotocol, final ObjectPool transportBufferPool, final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, final Collection addresses) { this.name = name; - this.watch = watch; this.subprotocol = subprotocol == null ? "" : subprotocol.trim(); this.protocol = protocol; this.tcp = "TCP".equalsIgnoreCase(protocol); @@ -92,7 +88,7 @@ public final class Transport { if (first == null) throw new NullPointerException("Collection is null or empty"); //必须按字母排列顺序确保,相同内容的transport列表组合的name相同,而不会因为list的顺序不同产生不同的name this.name = tmpgroup.stream().sorted().collect(Collectors.joining(";")); - this.watch = first.watch; + //this.watch = first.watch; this.subprotocol = first.subprotocol; this.protocol = first.protocol; this.tcp = "TCP".equalsIgnoreCase(first.protocol); diff --git a/test/org/redkale/test/service/ABMainService.java b/test/org/redkale/test/service/ABMainService.java index c73e6ead1..8a6b4effb 100644 --- a/test/org/redkale/test/service/ABMainService.java +++ b/test/org/redkale/test/service/ABMainService.java @@ -22,7 +22,6 @@ import org.redkale.net.sncp.*; import org.redkale.service.Service; import org.redkale.util.AnyValue.DefaultAnyValue; import org.redkale.util.*; -import org.redkale.watch.WatchFactory; /** * @@ -49,7 +48,7 @@ public class ABMainService implements Service { cserver.start(); //------------------------ 初始化 BCService ------------------------------------ - final Transport bctransport = new Transport("", WatchFactory.root(), "", newBufferPool(), newChannelGroup(), null, Utility.ofSet(new InetSocketAddress("127.0.0.1", 5577))); + final Transport bctransport = new Transport("", "", newBufferPool(), newChannelGroup(), null, Utility.ofSet(new InetSocketAddress("127.0.0.1", 5577))); BCService bcservice = Sncp.createLocalService("", null, ResourceFactory.root(), BCService.class, new InetSocketAddress("127.0.0.1", 5588), "", new HashSet<>(), (AnyValue) null, bctransport, null); CService remoteCService = Sncp.createRemoteService("", null, CService.class, new InetSocketAddress("127.0.0.1", 5588), "", new HashSet<>(), (AnyValue) null, bctransport); factory.inject(remoteCService); @@ -61,7 +60,7 @@ public class ABMainService implements Service { bcserver.start(); //------------------------ 初始化 ABMainService ------------------------------------ - final Transport abtransport = new Transport("", WatchFactory.root(), "", newBufferPool(), newChannelGroup(), null, Utility.ofSet(new InetSocketAddress("127.0.0.1", 5588))); + final Transport abtransport = new Transport("", "", newBufferPool(), newChannelGroup(), null, Utility.ofSet(new InetSocketAddress("127.0.0.1", 5588))); ABMainService service = Sncp.createLocalService("", null, ResourceFactory.root(), ABMainService.class, new InetSocketAddress("127.0.0.1", 5599), "", new HashSet<>(), (AnyValue) null, bctransport, null); BCService remoteBCService = Sncp.createRemoteService("", null, BCService.class, new InetSocketAddress("127.0.0.1", 5599), "", new HashSet<>(), (AnyValue) null, abtransport); factory.inject(remoteBCService); @@ -91,7 +90,7 @@ public class ABMainService implements Service { //异步方法 url = "http://127.0.0.1:" + abport + "/pipes/abmain/asyncabtime2/张先生"; System.out.println(Utility.postHttpContent(url)); - + server.shutdown(); } @@ -193,7 +192,7 @@ public class ABMainService implements Service { @Override public int id2() { - return 2; + return 2; } }, name); } diff --git a/test/org/redkale/test/sncp/SncpTest.java b/test/org/redkale/test/sncp/SncpTest.java index c9483b2e2..3ec4cac49 100644 --- a/test/org/redkale/test/sncp/SncpTest.java +++ b/test/org/redkale/test/sncp/SncpTest.java @@ -18,7 +18,6 @@ import org.redkale.net.Transport; import org.redkale.net.sncp.*; import org.redkale.service.Service; import org.redkale.util.*; -import org.redkale.watch.WatchFactory; /** * @@ -81,7 +80,7 @@ public class SncpTest { set.add(addr); if (port2 > 0) set.add(new InetSocketAddress(myhost, port2)); //String name, WatchFactory, ObjectPool, AsynchronousChannelGroup, InetSocketAddress clientAddress, Collection - final Transport transport = new Transport("", WatchFactory.root(), "", newBufferPool(), newChannelGroup(), null, set); + final Transport transport = new Transport("", "", newBufferPool(), newChannelGroup(), null, set); final SncpTestIService service = Sncp.createSimpleRemoteService(serviceName, SncpTestIService.class, addr, transport); ResourceFactory.root().inject(service); @@ -158,7 +157,7 @@ public class SncpTest { Set set = new LinkedHashSet<>(); if (port2 > 0) set.add(new InetSocketAddress(myhost, port2)); //String name, WatchFactory, ObjectPool, AsynchronousChannelGroup, InetSocketAddress clientAddress, Collection - final Transport transport = new Transport("", WatchFactory.root(), "", newBufferPool(), newChannelGroup(), null, set); + final Transport transport = new Transport("", "", newBufferPool(), newChannelGroup(), null, set); SncpTestIService service = Sncp.createSimpleLocalService("", SncpTestServiceImpl.class, addr, transport); ResourceFactory.root().inject(service); server.addSncpServlet(service); @@ -192,7 +191,7 @@ public class SncpTest { Set set = new LinkedHashSet<>(); set.add(new InetSocketAddress(myhost, port)); //String name, WatchFactory, ObjectPool, AsynchronousChannelGroup, InetSocketAddress clientAddress, Collection - final Transport transport = new Transport("", WatchFactory.root(), "", newBufferPool(), newChannelGroup(), null, set); + final Transport transport = new Transport("", "", newBufferPool(), newChannelGroup(), null, set); Service service = Sncp.createSimpleLocalService("", SncpTestServiceImpl.class, addr, transport); server.addSncpServlet(service); AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue();