This commit is contained in:
Redkale
2017-05-29 15:03:27 +08:00
parent 06773ccdc0
commit 756e4634d9
9 changed files with 95 additions and 67 deletions

View File

@@ -103,7 +103,7 @@ public final class Application {
final List<NodeServer> servers = new CopyOnWriteArrayList<>();
//传输端的TransportFactory
final TransportFactory transportFactory;
final SncpTransportFactory transportFactory;
//全局根ResourceFactory
final ResourceFactory resourceFactory = ResourceFactory.root();
@@ -260,21 +260,20 @@ public final class Application {
logger.log(Level.INFO, Transport.class.getSimpleName() + " configure bufferCapacity = " + bufferCapacity + "; bufferPoolSize = " + bufferPoolSize + "; threads = " + threads + ";");
}
}
this.transportFactory = new TransportFactory(transportExec, transportPool, transportGroup);
this.transportFactory = new SncpTransportFactory(transportExec, transportPool, transportGroup);
}
public ResourceFactory getResourceFactory() {
return resourceFactory;
}
public TransportFactory getTransportFactory() {
public SncpTransportFactory getTransportFactory() {
return transportFactory;
}
// public WatchFactory getWatchFactory() {
// return watchFactory;
// }
public List<NodeServer> getNodeServers() {
return new ArrayList<>(servers);
}
@@ -359,6 +358,8 @@ public final class Application {
Class type = field.getType();
if (type == Application.class) {
field.set(src, application);
} else if (type == SncpTransportFactory.class) {
field.set(src, application.transportFactory);
} else if (type == NodeSncpServer.class) {
NodeServer server = null;
for (NodeServer ns : application.getNodeServers()) {
@@ -403,7 +404,7 @@ public final class Application {
return false;
}
}, Application.class, NodeSncpServer.class, NodeHttpServer.class, NodeWatchServer.class);
}, Application.class, SncpTransportFactory.class, NodeSncpServer.class, NodeHttpServer.class, NodeWatchServer.class);
//--------------------------------------------------------------------------
initResources();
}

View File

@@ -188,7 +188,7 @@ public abstract class NodeServer {
final NodeServer self = this;
//---------------------------------------------------------------------------------------------
final ResourceFactory appResFactory = application.getResourceFactory();
final TransportFactory appTranFactory = application.getTransportFactory();
final SncpTransportFactory appTranFactory = application.getTransportFactory();
final AnyValue resources = application.config.getAnyValue("resources");
final Map<String, AnyValue> cacheResource = new HashMap<>();
//final Map<String, AnyValue> dataResources = new HashMap<>();
@@ -321,7 +321,7 @@ public abstract class NodeServer {
final Set<FilterEntry<? extends Service>> entrys = (Set) serviceFilter.getAllFilterEntrys();
ResourceFactory regFactory = isSNCP() ? application.getResourceFactory() : resourceFactory;
final ResourceFactory appResourceFactory = application.getResourceFactory();
final TransportFactory appTransportFactory = application.getTransportFactory();
final SncpTransportFactory appTransportFactory = application.getTransportFactory();
for (FilterEntry<? extends Service> entry : entrys) { //service实现类
final Class<? extends Service> serviceImplClass = entry.getType();
if (Modifier.isFinal(serviceImplClass.getModifiers())) continue; //修饰final的类跳过

View File

@@ -52,6 +52,16 @@ public class TransportFactory {
return groupInfos.get(group);
}
public TransportFactory addGroupInfo(String name, InetSocketAddress... addrs) {
addGroupInfo(new TransportGroupInfo(name, addrs));
return this;
}
public TransportFactory addGroupInfo(String name, Set<InetSocketAddress> addrs) {
addGroupInfo(new TransportGroupInfo(name, addrs));
return this;
}
public boolean addGroupInfo(TransportGroupInfo info) {
if (info == null) throw new RuntimeException("TransportGroupInfo can not null");
if (info.addresses == null) throw new RuntimeException("TransportGroupInfo.addresses can not null");

View File

@@ -8,6 +8,7 @@ package org.redkale.net;
import java.net.InetSocketAddress;
import java.util.*;
import org.redkale.convert.json.JsonConvert;
import org.redkale.util.Utility;
/**
* 协议地址组合对象, 对应application.xml 中 resources-&#62;group 节点信息
@@ -30,13 +31,23 @@ public class TransportGroupInfo {
public TransportGroupInfo() {
}
public TransportGroupInfo(String name, InetSocketAddress... addrs) {
this(name, "TCP", "", Utility.ofSet(addrs));
}
public TransportGroupInfo(String name, Set<InetSocketAddress> addrs) {
this(name, "TCP", "", addrs);
}
public TransportGroupInfo(String name, String protocol, String subprotocol, InetSocketAddress... addrs) {
this(name, protocol, subprotocol, Utility.ofSet(addrs));
}
public TransportGroupInfo(String name, String protocol, String subprotocol, Set<InetSocketAddress> addrs) {
Objects.requireNonNull(name, "Transport.group.name can not null");
Objects.requireNonNull(protocol, "Transport.group.protocol can not null");
Objects.requireNonNull(subprotocol, "Transport.group.subprotocol can not null");
this.name = name;
this.protocol = protocol;
this.subprotocol = subprotocol;
this.protocol = protocol == null ? "TCP" : protocol;
this.subprotocol = subprotocol == null ? "" : subprotocol;
this.addresses = addrs;
}
@@ -54,8 +65,7 @@ public class TransportGroupInfo {
}
public void setProtocol(String protocol) {
Objects.requireNonNull(protocol, "Transport.group.protocol can not null");
this.protocol = protocol;
this.protocol = protocol == null ? "TCP" : protocol;
}
public String getSubprotocol() {
@@ -63,7 +73,7 @@ public class TransportGroupInfo {
}
public void setSubprotocol(String subprotocol) {
Objects.requireNonNull(subprotocol, "Transport.group.subprotocol can not null");
this.subprotocol = subprotocol == null ? "" : subprotocol;
this.subprotocol = subprotocol;
}

View File

@@ -15,7 +15,6 @@ import static jdk.internal.org.objectweb.asm.ClassWriter.COMPUTE_FRAMES;
import jdk.internal.org.objectweb.asm.*;
import static jdk.internal.org.objectweb.asm.Opcodes.*;
import jdk.internal.org.objectweb.asm.Type;
import org.redkale.net.*;
import org.redkale.net.sncp.SncpClient.SncpAction;
import org.redkale.service.*;
import org.redkale.util.*;
@@ -750,6 +749,11 @@ public abstract class Sncp {
}
}
public static <T extends Service> T createSimpleLocalService(final Class<T> serviceImplClass,
final SncpTransportFactory transportFactory, final InetSocketAddress clientSncpAddress, final String... groups) {
return createLocalService("", serviceImplClass, ResourceFactory.root(), transportFactory, clientSncpAddress, Utility.ofSet(groups), null);
}
/**
*
* 创建本地模式Service实例
@@ -758,7 +762,7 @@ public abstract class Sncp {
* @param name 资源名
* @param serviceImplClass Service类
* @param resourceFactory ResourceFactory
* @param transportFactory TransportFactory
* @param transportFactory SncpTransportFactory
* @param clientSncpAddress 本地IP地址
* @param groups 所有的组节点,包含自身
* @param conf 启动配置项
@@ -770,7 +774,7 @@ public abstract class Sncp {
final String name,
final Class<T> serviceImplClass,
final ResourceFactory resourceFactory,
final TransportFactory transportFactory,
final SncpTransportFactory transportFactory,
final InetSocketAddress clientSncpAddress,
final Set<String> groups,
final AnyValue conf) {
@@ -825,6 +829,11 @@ public abstract class Sncp {
}
}
public static <T extends Service> T createSimpleRemoteService(final Class<T> serviceImplClass,
final SncpTransportFactory transportFactory, final InetSocketAddress clientSncpAddress, final String... groups) {
return createRemoteService("", serviceImplClass, transportFactory, clientSncpAddress, Utility.ofSet(groups), null);
}
/**
* <blockquote><pre>
* &#64;Resource(name = "")
@@ -889,7 +898,7 @@ public abstract class Sncp {
* @param <T> Service泛型
* @param name 资源名
* @param serviceTypeOrImplClass Service类
* @param transportFactory TransportFactory
* @param transportFactory SncpTransportFactory
* @param clientAddress 本地IP地址
* @param groups 所有的组节点,包含自身
* @param conf 启动配置项
@@ -897,10 +906,11 @@ public abstract class Sncp {
* @return Service的远程模式实例
*/
@SuppressWarnings("unchecked")
public static <T extends Service> T createRemoteService(
final String name,
final Class<T> serviceTypeOrImplClass,
final TransportFactory transportFactory,
final SncpTransportFactory transportFactory,
final InetSocketAddress clientAddress,
final Set<String> groups,
final AnyValue conf) {

View File

@@ -81,7 +81,7 @@ public final class SncpClient {
//本地模式
protected Transport[] diffGroupTransports;
public <T extends Service> SncpClient(final String serviceName, final Class<T> serviceTypeOrImplClass, final T service, final TransportFactory factory,
public <T extends Service> SncpClient(final String serviceName, final Class<T> serviceTypeOrImplClass, final T service, final SncpTransportFactory factory,
final boolean remote, final Class serviceClass, final InetSocketAddress clientAddress) {
this.remote = remote;
this.executor = factory.getExecutor();
@@ -101,6 +101,7 @@ public final class SncpClient {
this.actions = methodens.toArray(new SncpAction[methodens.size()]);
this.addrBytes = clientAddress == null ? new byte[4] : clientAddress.getAddress().getAddress();
this.addrPort = clientAddress == null ? 0 : clientAddress.getPort();
factory.addSncpClient(this);
}
static List<SncpAction> getSncpActions(final Class serviceClass) {
@@ -337,18 +338,6 @@ public final class SncpClient {
}
private CompletableFuture<byte[]> remote0(final AsyncHandler handler, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) {
if ("rest".equalsIgnoreCase(transport.getSubprotocol())) {
return remoteRest0(handler, transport, addr0, action, params);
}
return remoteSncp0(handler, transport, addr0, action, params);
}
//尚未实现
private CompletableFuture<byte[]> remoteRest0(final AsyncHandler handler, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) {
return null;
}
private CompletableFuture<byte[]> remoteSncp0(final AsyncHandler handler, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) {
final Type[] myparamtypes = action.paramTypes;
final Class[] myparamclass = action.paramClass;
if (action.addressSourceParamIndex >= 0) params[action.addressSourceParamIndex] = this.clientAddress;