This commit is contained in:
Redkale
2017-05-14 16:05:41 +08:00
parent ef28e32e04
commit 69cc09e76d
5 changed files with 22 additions and 30 deletions

View File

@@ -130,8 +130,7 @@ public final class Application {
private final boolean singletonrun; private final boolean singletonrun;
//根WatchFactory //根WatchFactory
private final WatchFactory watchFactory = WatchFactory.root(); //private final WatchFactory watchFactory = WatchFactory.root();
//进程根目录 //进程根目录
private final File home; private final File home;
@@ -245,8 +244,8 @@ public final class Application {
if (groupsize > 0 && transportConf == null) transportConf = new DefaultAnyValue(); if (groupsize > 0 && transportConf == null) transportConf = new DefaultAnyValue();
if (transportConf != null) { if (transportConf != null) {
//--------------transportBufferPool----------- //--------------transportBufferPool-----------
AtomicLong createBufferCounter = watchFactory == null ? new AtomicLong() : watchFactory.createWatchNumber(Transport.class.getSimpleName() + ".Buffer.creatCounter"); AtomicLong createBufferCounter = new AtomicLong();
AtomicLong cycleBufferCounter = watchFactory == null ? new AtomicLong() : watchFactory.createWatchNumber(Transport.class.getSimpleName() + ".Buffer.cycleCounter"); AtomicLong cycleBufferCounter = new AtomicLong();
final int bufferCapacity = transportConf.getIntValue("bufferCapacity", 8 * 1024); final int bufferCapacity = transportConf.getIntValue("bufferCapacity", 8 * 1024);
final int bufferPoolSize = transportConf.getIntValue("bufferPoolSize", groupsize * Runtime.getRuntime().availableProcessors() * 8); final int bufferPoolSize = transportConf.getIntValue("bufferPoolSize", groupsize * Runtime.getRuntime().availableProcessors() * 8);
final int threads = transportConf.getIntValue("threads", groupsize * Runtime.getRuntime().availableProcessors() * 8); final int threads = transportConf.getIntValue("threads", groupsize * Runtime.getRuntime().availableProcessors() * 8);
@@ -281,10 +280,10 @@ public final class Application {
return resourceFactory; return resourceFactory;
} }
public WatchFactory getWatchFactory() { // public WatchFactory getWatchFactory() {
return watchFactory; // return watchFactory;
} // }
public List<NodeServer> getNodeServers() { public List<NodeServer> getNodeServers() {
return new ArrayList<>(servers); return new ArrayList<>(servers);
} }
@@ -370,8 +369,8 @@ public final class Application {
Class type = field.getType(); Class type = field.getType();
if (type == Application.class) { if (type == Application.class) {
field.set(src, application); field.set(src, application);
} else if (type == WatchFactory.class) { // } else if (type == WatchFactory.class) {
field.set(src, application.watchFactory); // field.set(src, application.watchFactory);
} }
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, "Resource inject error", e); logger.log(Level.SEVERE, "Resource inject error", e);

View File

@@ -445,7 +445,7 @@ public abstract class NodeServer {
transports.forEach(t -> addrs.addAll(Arrays.asList(t.getRemoteAddresses()))); transports.forEach(t -> addrs.addAll(Arrays.asList(t.getRemoteAddresses())));
Transport first = transports.get(0); Transport first = transports.get(0);
GroupInfo ginfo = application.findGroupInfo(first.getName()); GroupInfo ginfo = application.findGroupInfo(first.getName());
Transport newTransport = new Transport(groupid, ginfo.getProtocol(), application.getWatchFactory(), Transport newTransport = new Transport(groupid, ginfo.getProtocol(),
ginfo.getSubprotocol(), application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs); ginfo.getSubprotocol(), application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs);
synchronized (application.resourceFactory) { synchronized (application.resourceFactory) {
transport = application.resourceFactory.find(groupid, Transport.class); transport = application.resourceFactory.find(groupid, Transport.class);
@@ -471,8 +471,7 @@ public abstract class NodeServer {
GroupInfo ginfo = application.findGroupInfo(group); GroupInfo ginfo = application.findGroupInfo(group);
Set<InetSocketAddress> addrs = ginfo.copyAddrs(); Set<InetSocketAddress> addrs = ginfo.copyAddrs();
if (addrs == null) throw new RuntimeException("Not found <group> = " + group + " on <resources> "); if (addrs == null) throw new RuntimeException("Not found <group> = " + group + " on <resources> ");
transport = new Transport(group, ginfo.getProtocol(), application.getWatchFactory(), transport = new Transport(group, ginfo.getProtocol(), ginfo.getSubprotocol(), application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs);
ginfo.getSubprotocol(), application.transportBufferPool, application.transportChannelGroup, this.sncpAddress, addrs);
application.resourceFactory.register(group, transport); application.resourceFactory.register(group, transport);
} }
return transport; return transport;

View File

@@ -13,7 +13,6 @@ import java.util.concurrent.*;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.redkale.util.ObjectPool; import org.redkale.util.ObjectPool;
import org.redkale.watch.WatchFactory;
/** /**
* 传输客户端 * 传输客户端
@@ -50,8 +49,6 @@ public final class Transport {
protected final String protocol; protected final String protocol;
protected final WatchFactory watch;
protected final AsynchronousChannelGroup group; protected final AsynchronousChannelGroup group;
protected final InetSocketAddress clientAddress; protected final InetSocketAddress clientAddress;
@@ -62,15 +59,14 @@ public final class Transport {
protected final ConcurrentHashMap<SocketAddress, BlockingQueue<AsyncConnection>> connPool = new ConcurrentHashMap<>(); protected final ConcurrentHashMap<SocketAddress, BlockingQueue<AsyncConnection>> connPool = new ConcurrentHashMap<>();
public Transport(String name, WatchFactory watch, String subprotocol, final ObjectPool<ByteBuffer> transportBufferPool, public Transport(String name, String subprotocol, final ObjectPool<ByteBuffer> transportBufferPool,
final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, final Collection<InetSocketAddress> addresses) { final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, final Collection<InetSocketAddress> addresses) {
this(name, DEFAULT_PROTOCOL, watch, subprotocol, transportBufferPool, transportChannelGroup, clientAddress, addresses); this(name, DEFAULT_PROTOCOL, subprotocol, transportBufferPool, transportChannelGroup, clientAddress, addresses);
} }
public Transport(String name, String protocol, WatchFactory watch, String subprotocol, final ObjectPool<ByteBuffer> transportBufferPool, public Transport(String name, String protocol, String subprotocol, final ObjectPool<ByteBuffer> transportBufferPool,
final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, final Collection<InetSocketAddress> addresses) { final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, final Collection<InetSocketAddress> addresses) {
this.name = name; this.name = name;
this.watch = watch;
this.subprotocol = subprotocol == null ? "" : subprotocol.trim(); this.subprotocol = subprotocol == null ? "" : subprotocol.trim();
this.protocol = protocol; this.protocol = protocol;
this.tcp = "TCP".equalsIgnoreCase(protocol); this.tcp = "TCP".equalsIgnoreCase(protocol);
@@ -92,7 +88,7 @@ public final class Transport {
if (first == null) throw new NullPointerException("Collection<Transport> is null or empty"); if (first == null) throw new NullPointerException("Collection<Transport> is null or empty");
//必须按字母排列顺序确保相同内容的transport列表组合的name相同而不会因为list的顺序不同产生不同的name //必须按字母排列顺序确保相同内容的transport列表组合的name相同而不会因为list的顺序不同产生不同的name
this.name = tmpgroup.stream().sorted().collect(Collectors.joining(";")); this.name = tmpgroup.stream().sorted().collect(Collectors.joining(";"));
this.watch = first.watch; //this.watch = first.watch;
this.subprotocol = first.subprotocol; this.subprotocol = first.subprotocol;
this.protocol = first.protocol; this.protocol = first.protocol;
this.tcp = "TCP".equalsIgnoreCase(first.protocol); this.tcp = "TCP".equalsIgnoreCase(first.protocol);

View File

@@ -22,7 +22,6 @@ import org.redkale.net.sncp.*;
import org.redkale.service.Service; import org.redkale.service.Service;
import org.redkale.util.AnyValue.DefaultAnyValue; import org.redkale.util.AnyValue.DefaultAnyValue;
import org.redkale.util.*; import org.redkale.util.*;
import org.redkale.watch.WatchFactory;
/** /**
* *
@@ -49,7 +48,7 @@ public class ABMainService implements Service {
cserver.start(); cserver.start();
//------------------------ 初始化 BCService ------------------------------------ //------------------------ 初始化 BCService ------------------------------------
final Transport bctransport = new Transport("", WatchFactory.root(), "", newBufferPool(), newChannelGroup(), null, Utility.ofSet(new InetSocketAddress("127.0.0.1", 5577))); final Transport bctransport = new Transport("", "", newBufferPool(), newChannelGroup(), null, Utility.ofSet(new InetSocketAddress("127.0.0.1", 5577)));
BCService bcservice = Sncp.createLocalService("", null, ResourceFactory.root(), BCService.class, new InetSocketAddress("127.0.0.1", 5588), "", new HashSet<>(), (AnyValue) null, bctransport, null); BCService bcservice = Sncp.createLocalService("", null, ResourceFactory.root(), BCService.class, new InetSocketAddress("127.0.0.1", 5588), "", new HashSet<>(), (AnyValue) null, bctransport, null);
CService remoteCService = Sncp.createRemoteService("", null, CService.class, new InetSocketAddress("127.0.0.1", 5588), "", new HashSet<>(), (AnyValue) null, bctransport); CService remoteCService = Sncp.createRemoteService("", null, CService.class, new InetSocketAddress("127.0.0.1", 5588), "", new HashSet<>(), (AnyValue) null, bctransport);
factory.inject(remoteCService); factory.inject(remoteCService);
@@ -61,7 +60,7 @@ public class ABMainService implements Service {
bcserver.start(); bcserver.start();
//------------------------ 初始化 ABMainService ------------------------------------ //------------------------ 初始化 ABMainService ------------------------------------
final Transport abtransport = new Transport("", WatchFactory.root(), "", newBufferPool(), newChannelGroup(), null, Utility.ofSet(new InetSocketAddress("127.0.0.1", 5588))); final Transport abtransport = new Transport("", "", newBufferPool(), newChannelGroup(), null, Utility.ofSet(new InetSocketAddress("127.0.0.1", 5588)));
ABMainService service = Sncp.createLocalService("", null, ResourceFactory.root(), ABMainService.class, new InetSocketAddress("127.0.0.1", 5599), "", new HashSet<>(), (AnyValue) null, bctransport, null); ABMainService service = Sncp.createLocalService("", null, ResourceFactory.root(), ABMainService.class, new InetSocketAddress("127.0.0.1", 5599), "", new HashSet<>(), (AnyValue) null, bctransport, null);
BCService remoteBCService = Sncp.createRemoteService("", null, BCService.class, new InetSocketAddress("127.0.0.1", 5599), "", new HashSet<>(), (AnyValue) null, abtransport); BCService remoteBCService = Sncp.createRemoteService("", null, BCService.class, new InetSocketAddress("127.0.0.1", 5599), "", new HashSet<>(), (AnyValue) null, abtransport);
factory.inject(remoteBCService); factory.inject(remoteBCService);
@@ -91,7 +90,7 @@ public class ABMainService implements Service {
//异步方法 //异步方法
url = "http://127.0.0.1:" + abport + "/pipes/abmain/asyncabtime2/张先生"; url = "http://127.0.0.1:" + abport + "/pipes/abmain/asyncabtime2/张先生";
System.out.println(Utility.postHttpContent(url)); System.out.println(Utility.postHttpContent(url));
server.shutdown(); server.shutdown();
} }
@@ -193,7 +192,7 @@ public class ABMainService implements Service {
@Override @Override
public int id2() { public int id2() {
return 2; return 2;
} }
}, name); }, name);
} }

View File

@@ -18,7 +18,6 @@ import org.redkale.net.Transport;
import org.redkale.net.sncp.*; import org.redkale.net.sncp.*;
import org.redkale.service.Service; import org.redkale.service.Service;
import org.redkale.util.*; import org.redkale.util.*;
import org.redkale.watch.WatchFactory;
/** /**
* *
@@ -81,7 +80,7 @@ public class SncpTest {
set.add(addr); set.add(addr);
if (port2 > 0) set.add(new InetSocketAddress(myhost, port2)); if (port2 > 0) set.add(new InetSocketAddress(myhost, port2));
//String name, WatchFactory, ObjectPool<ByteBuffer>, AsynchronousChannelGroup, InetSocketAddress clientAddress, Collection<InetSocketAddress> //String name, WatchFactory, ObjectPool<ByteBuffer>, AsynchronousChannelGroup, InetSocketAddress clientAddress, Collection<InetSocketAddress>
final Transport transport = new Transport("", WatchFactory.root(), "", newBufferPool(), newChannelGroup(), null, set); final Transport transport = new Transport("", "", newBufferPool(), newChannelGroup(), null, set);
final SncpTestIService service = Sncp.createSimpleRemoteService(serviceName, SncpTestIService.class, addr, transport); final SncpTestIService service = Sncp.createSimpleRemoteService(serviceName, SncpTestIService.class, addr, transport);
ResourceFactory.root().inject(service); ResourceFactory.root().inject(service);
@@ -158,7 +157,7 @@ public class SncpTest {
Set<InetSocketAddress> set = new LinkedHashSet<>(); Set<InetSocketAddress> set = new LinkedHashSet<>();
if (port2 > 0) set.add(new InetSocketAddress(myhost, port2)); if (port2 > 0) set.add(new InetSocketAddress(myhost, port2));
//String name, WatchFactory, ObjectPool<ByteBuffer>, AsynchronousChannelGroup, InetSocketAddress clientAddress, Collection<InetSocketAddress> //String name, WatchFactory, ObjectPool<ByteBuffer>, AsynchronousChannelGroup, InetSocketAddress clientAddress, Collection<InetSocketAddress>
final Transport transport = new Transport("", WatchFactory.root(), "", newBufferPool(), newChannelGroup(), null, set); final Transport transport = new Transport("", "", newBufferPool(), newChannelGroup(), null, set);
SncpTestIService service = Sncp.createSimpleLocalService("", SncpTestServiceImpl.class, addr, transport); SncpTestIService service = Sncp.createSimpleLocalService("", SncpTestServiceImpl.class, addr, transport);
ResourceFactory.root().inject(service); ResourceFactory.root().inject(service);
server.addSncpServlet(service); server.addSncpServlet(service);
@@ -192,7 +191,7 @@ public class SncpTest {
Set<InetSocketAddress> set = new LinkedHashSet<>(); Set<InetSocketAddress> set = new LinkedHashSet<>();
set.add(new InetSocketAddress(myhost, port)); set.add(new InetSocketAddress(myhost, port));
//String name, WatchFactory, ObjectPool<ByteBuffer>, AsynchronousChannelGroup, InetSocketAddress clientAddress, Collection<InetSocketAddress> //String name, WatchFactory, ObjectPool<ByteBuffer>, AsynchronousChannelGroup, InetSocketAddress clientAddress, Collection<InetSocketAddress>
final Transport transport = new Transport("", WatchFactory.root(), "", newBufferPool(), newChannelGroup(), null, set); final Transport transport = new Transport("", "", newBufferPool(), newChannelGroup(), null, set);
Service service = Sncp.createSimpleLocalService("", SncpTestServiceImpl.class, addr, transport); Service service = Sncp.createSimpleLocalService("", SncpTestServiceImpl.class, addr, transport);
server.addSncpServlet(service); server.addSncpServlet(service);
AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue(); AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue();