This commit is contained in:
wentch
2016-01-12 15:18:45 +08:00
parent 585d5866e6
commit b7ac9e1663
2 changed files with 53 additions and 26 deletions

View File

@@ -12,17 +12,19 @@ import org.redkale.service.Service;
import org.redkale.net.sncp.SncpServer; import org.redkale.net.sncp.SncpServer;
import org.redkale.convert.bson.BsonConvert; import org.redkale.convert.bson.BsonConvert;
import org.redkale.util.Utility; import org.redkale.util.Utility;
import org.redkale.boot.ClassFilter;
import org.redkale.net.sncp.ServiceWrapper; import org.redkale.net.sncp.ServiceWrapper;
import org.redkale.util.AnyValue; import org.redkale.util.AnyValue;
import org.redkale.watch.WatchFactory; import org.redkale.watch.WatchFactory;
import org.redkale.util.ResourceFactory; import org.redkale.util.ResourceFactory;
import java.io.*; import java.io.*;
import java.net.*; import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
import java.util.logging.*; import java.util.logging.*;
import org.redkale.util.*;
/** /**
* *
@@ -36,7 +38,7 @@ public class SncpTest {
private static final int port = 4040; private static final int port = 4040;
private static final int port2 = 0; // 4240; private static final int port2 = 4240;
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
@@ -59,13 +61,34 @@ public class SncpTest {
} }
} }
public static AsynchronousChannelGroup newChannelGroup() throws IOException {
final AtomicInteger counter = new AtomicInteger();
ExecutorService transportExec = Executors.newFixedThreadPool(16, (Runnable r) -> {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("Transport-Thread-" + counter.incrementAndGet());
return t;
});
return AsynchronousChannelGroup.withCachedThreadPool(transportExec, 1);
}
public static ObjectPool<ByteBuffer> newBufferPool() {
return new ObjectPool<>(new AtomicLong(), new AtomicLong(), 16,
(Object... params) -> ByteBuffer.allocateDirect(8192), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != 8192) return false;
e.clear();
return true;
});
}
private static void runClient() throws Exception { private static void runClient() throws Exception {
InetSocketAddress addr = new InetSocketAddress(myhost, port); InetSocketAddress addr = new InetSocketAddress(myhost, port);
Set<InetSocketAddress> set = new LinkedHashSet<>(); Set<InetSocketAddress> set = new LinkedHashSet<>();
set.add(addr); set.add(addr);
if (port2 > 0) set.add(new InetSocketAddress(myhost, port2)); if (port2 > 0) set.add(new InetSocketAddress(myhost, port2));
final Transport transport = new Transport("", WatchFactory.root(), 50, set); //String name, WatchFactory, ObjectPool<ByteBuffer>, AsynchronousChannelGroup, InetSocketAddress clientAddress, Collection<InetSocketAddress>
final SncpTestService service = Sncp.createRemoteService(serviceName, null, SncpTestService.class, null, new LinkedHashSet<>(), transport); final Transport transport = new Transport("", WatchFactory.root(), newBufferPool(), newChannelGroup(), null, set);
final SncpTestService service = Sncp.createRemoteService(serviceName, null, SncpTestService.class, null, transport);
ResourceFactory.root().inject(service); ResourceFactory.root().inject(service);
// SncpTestBean bean = new SncpTestBean(); // SncpTestBean bean = new SncpTestBean();
@@ -123,18 +146,20 @@ public class SncpTest {
InetSocketAddress addr = new InetSocketAddress(myhost, port); InetSocketAddress addr = new InetSocketAddress(myhost, port);
final CountDownLatch cdl = new CountDownLatch(1); final CountDownLatch cdl = new CountDownLatch(1);
new Thread() { new Thread() {
{
setName("Thread-Server-01");
}
@Override @Override
public void run() { public void run() {
try { try {
SncpServer server = new SncpServer(); SncpServer server = new SncpServer();
Set<InetSocketAddress> set = new LinkedHashSet<>(); Set<InetSocketAddress> set = new LinkedHashSet<>();
if (port2 > 0) set.add(new InetSocketAddress(myhost, port2)); if (port2 > 0) set.add(new InetSocketAddress(myhost, port2));
Transport transport = new Transport("", WatchFactory.root(), 50, set); //String name, WatchFactory, ObjectPool<ByteBuffer>, AsynchronousChannelGroup, InetSocketAddress clientAddress, Collection<InetSocketAddress>
List<Transport> sameTransports = new ArrayList<>(); final Transport transport = new Transport("", WatchFactory.root(), newBufferPool(), newChannelGroup(), null, set);
if (port2 > 0) sameTransports.add(transport); SncpTestService service = Sncp.createLocalService("", null, SncpTestService.class, addr, transport, null);
SncpTestService service = Sncp.createLocalService("", null, SncpTestService.class, addr, new LinkedHashSet<>(), sameTransports, null);
ResourceFactory.root().inject(service); ResourceFactory.root().inject(service);
server.addService(new ServiceWrapper(SncpTestService.class, service, "", new ClassFilter.FilterEntry(SncpTestService.class, null))); server.addService(new ServiceWrapper(SncpTestService.class, service, "", "", new HashSet<>(), null));
System.out.println(service); System.out.println(service);
AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue(); AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue();
conf.addValue("host", "0.0.0.0"); conf.addValue("host", "0.0.0.0");
@@ -154,17 +179,19 @@ public class SncpTest {
InetSocketAddress addr = new InetSocketAddress(myhost, port2); InetSocketAddress addr = new InetSocketAddress(myhost, port2);
final CountDownLatch cdl = new CountDownLatch(1); final CountDownLatch cdl = new CountDownLatch(1);
new Thread() { new Thread() {
{
setName("Thread-Server-02");
}
@Override @Override
public void run() { public void run() {
try { try {
SncpServer server = new SncpServer(); SncpServer server = new SncpServer();
Set<InetSocketAddress> set = new LinkedHashSet<>(); Set<InetSocketAddress> set = new LinkedHashSet<>();
set.add(new InetSocketAddress(myhost, port)); set.add(new InetSocketAddress(myhost, port));
Transport transport = new Transport("", WatchFactory.root(), 50, set); //String name, WatchFactory, ObjectPool<ByteBuffer>, AsynchronousChannelGroup, InetSocketAddress clientAddress, Collection<InetSocketAddress>
List<Transport> sameTransports = new ArrayList<>(); final Transport transport = new Transport("", WatchFactory.root(), newBufferPool(), newChannelGroup(), null, set);
sameTransports.add(transport); Service service = Sncp.createLocalService("", null, SncpTestService.class, addr, transport, null);
Service service = Sncp.createLocalService("", null, SncpTestService.class, addr, new LinkedHashSet<>(), sameTransports, null); server.addService(new ServiceWrapper(SncpTestService.class, service, "", "", new HashSet<>(), null));
server.addService(new ServiceWrapper(SncpTestService.class, service, "", new ClassFilter.FilterEntry(SncpTestService.class, null)));
AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue(); AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue();
conf.addValue("host", "0.0.0.0"); conf.addValue("host", "0.0.0.0");
conf.addValue("port", "" + port2); conf.addValue("port", "" + port2);

View File

@@ -7,7 +7,6 @@ package org.redkale.test.sncp;
import java.lang.reflect.*; import java.lang.reflect.*;
import java.net.*; import java.net.*;
import java.util.*;
import org.redkale.net.sncp.*; import org.redkale.net.sncp.*;
import org.redkale.service.*; import org.redkale.service.*;
import org.redkale.util.Attribute; import org.redkale.util.Attribute;
@@ -69,7 +68,7 @@ public class SncpTestService implements SncpTestIService {
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
Service service = Sncp.createLocalService("", null, SncpTestService.class, new InetSocketAddress("127.0.0.1", 7070), new HashSet(), null, null); Service service = Sncp.createLocalService("", null, SncpTestService.class, new InetSocketAddress("127.0.0.1", 7070), null, null);
for (Method method : service.getClass().getDeclaredMethods()) { for (Method method : service.getClass().getDeclaredMethods()) {
System.out.println(method); System.out.println(method);
} }
@@ -78,7 +77,7 @@ public class SncpTestService implements SncpTestIService {
System.out.println(method); System.out.println(method);
} }
System.out.println("-----------------------------------"); System.out.println("-----------------------------------");
service = Sncp.createRemoteService("", null, SncpTestService.class, new InetSocketAddress("127.0.0.1", 7070), new HashSet(), null); service = Sncp.createRemoteService("", null, SncpTestService.class, new InetSocketAddress("127.0.0.1", 7070), null);
for (Method method : service.getClass().getDeclaredMethods()) { for (Method method : service.getClass().getDeclaredMethods()) {
System.out.println(method); System.out.println(method);
} }
@@ -86,7 +85,8 @@ public class SncpTestService implements SncpTestIService {
for (Method method : SncpClient.parseMethod(service.getClass())) { for (Method method : SncpClient.parseMethod(service.getClass())) {
System.out.println(method); System.out.println(method);
} }
System.out.println("-----------------------------------");service = Sncp.createRemoteService("", null, SncpTestIService.class, new InetSocketAddress("127.0.0.1", 7070), new HashSet(), null); System.out.println("-----------------------------------");
service = Sncp.createRemoteService("", null, SncpTestIService.class, new InetSocketAddress("127.0.0.1", 7070), null);
for (Method method : service.getClass().getDeclaredMethods()) { for (Method method : service.getClass().getDeclaredMethods()) {
System.out.println(method); System.out.println(method);
} }