This commit is contained in:
@@ -103,7 +103,7 @@ public final class Application {
|
||||
final List<NodeServer> servers = new CopyOnWriteArrayList<>();
|
||||
|
||||
//传输端的TransportFactory
|
||||
final SncpTransportFactory transportFactory;
|
||||
final TransportFactory transportFactory;
|
||||
|
||||
//全局根ResourceFactory
|
||||
final ResourceFactory resourceFactory = ResourceFactory.root();
|
||||
@@ -260,14 +260,14 @@ public final class Application {
|
||||
logger.log(Level.INFO, Transport.class.getSimpleName() + " configure bufferCapacity = " + bufferCapacity + "; bufferPoolSize = " + bufferPoolSize + "; threads = " + threads + ";");
|
||||
}
|
||||
}
|
||||
this.transportFactory = new SncpTransportFactory(transportExec, transportPool, transportGroup);
|
||||
this.transportFactory = new TransportFactory(transportExec, transportPool, transportGroup);
|
||||
}
|
||||
|
||||
public ResourceFactory getResourceFactory() {
|
||||
return resourceFactory;
|
||||
}
|
||||
|
||||
public SncpTransportFactory getTransportFactory() {
|
||||
public TransportFactory getTransportFactory() {
|
||||
return transportFactory;
|
||||
}
|
||||
|
||||
@@ -358,7 +358,7 @@ public final class Application {
|
||||
Class type = field.getType();
|
||||
if (type == Application.class) {
|
||||
field.set(src, application);
|
||||
} else if (type == SncpTransportFactory.class) {
|
||||
} else if (type == TransportFactory.class) {
|
||||
field.set(src, application.transportFactory);
|
||||
} else if (type == NodeSncpServer.class) {
|
||||
NodeServer server = null;
|
||||
@@ -404,7 +404,7 @@ public final class Application {
|
||||
return false;
|
||||
}
|
||||
|
||||
}, Application.class, SncpTransportFactory.class, NodeSncpServer.class, NodeHttpServer.class, NodeWatchServer.class);
|
||||
}, Application.class, TransportFactory.class, NodeSncpServer.class, NodeHttpServer.class, NodeWatchServer.class);
|
||||
//--------------------------------------------------------------------------
|
||||
initResources();
|
||||
}
|
||||
|
||||
@@ -188,7 +188,7 @@ public abstract class NodeServer {
|
||||
final NodeServer self = this;
|
||||
//---------------------------------------------------------------------------------------------
|
||||
final ResourceFactory appResFactory = application.getResourceFactory();
|
||||
final SncpTransportFactory appTranFactory = application.getTransportFactory();
|
||||
final TransportFactory appTranFactory = application.getTransportFactory();
|
||||
final AnyValue resources = application.config.getAnyValue("resources");
|
||||
final Map<String, AnyValue> cacheResource = new HashMap<>();
|
||||
//final Map<String, AnyValue> dataResources = new HashMap<>();
|
||||
@@ -321,7 +321,7 @@ public abstract class NodeServer {
|
||||
final Set<FilterEntry<? extends Service>> entrys = (Set) serviceFilter.getAllFilterEntrys();
|
||||
ResourceFactory regFactory = isSNCP() ? application.getResourceFactory() : resourceFactory;
|
||||
final ResourceFactory appResourceFactory = application.getResourceFactory();
|
||||
final SncpTransportFactory appTransportFactory = application.getTransportFactory();
|
||||
final TransportFactory appTransportFactory = application.getTransportFactory();
|
||||
for (FilterEntry<? extends Service> entry : entrys) { //service实现类
|
||||
final Class<? extends Service> serviceImplClass = entry.getType();
|
||||
if (Modifier.isFinal(serviceImplClass.getModifiers())) continue; //修饰final的类跳过
|
||||
|
||||
@@ -5,12 +5,14 @@
|
||||
*/
|
||||
package org.redkale.net;
|
||||
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.AsynchronousChannelGroup;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.logging.*;
|
||||
import org.redkale.service.Service;
|
||||
import org.redkale.util.ObjectPool;
|
||||
|
||||
/**
|
||||
@@ -36,6 +38,8 @@ public class TransportFactory {
|
||||
//协议地址的Group集合
|
||||
protected final Map<String, TransportGroupInfo> groupInfos = new HashMap<>();
|
||||
|
||||
protected final List<WeakReference<Service>> services = new CopyOnWriteArrayList<>();
|
||||
|
||||
public TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup) {
|
||||
this.executor = executor;
|
||||
this.bufferPool = bufferPool;
|
||||
@@ -57,11 +61,11 @@ public class TransportFactory {
|
||||
return this;
|
||||
}
|
||||
|
||||
public TransportFactory addGroupInfo(String name, Set<InetSocketAddress> addrs) {
|
||||
public TransportFactory addGroupInfo(String name, Set<InetSocketAddress> addrs) {
|
||||
addGroupInfo(new TransportGroupInfo(name, addrs));
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
public boolean addGroupInfo(TransportGroupInfo info) {
|
||||
if (info == null) throw new RuntimeException("TransportGroupInfo can not null");
|
||||
if (info.addresses == null) throw new RuntimeException("TransportGroupInfo.addresses can not null");
|
||||
@@ -124,6 +128,20 @@ public class TransportFactory {
|
||||
return executor;
|
||||
}
|
||||
|
||||
public void addSncpService(Service service) {
|
||||
if (service == null) return;
|
||||
services.add(new WeakReference<>(service));
|
||||
}
|
||||
|
||||
public List<Service> getServices() {
|
||||
List<Service> rs = new ArrayList<>();
|
||||
for (WeakReference<Service> ref : services) {
|
||||
Service service = ref.get();
|
||||
if (service != null) rs.add(service);
|
||||
}
|
||||
return rs;
|
||||
}
|
||||
|
||||
public void shutdownNow() {
|
||||
try {
|
||||
this.channelGroup.shutdownNow();
|
||||
|
||||
@@ -15,6 +15,7 @@ import static jdk.internal.org.objectweb.asm.ClassWriter.COMPUTE_FRAMES;
|
||||
import jdk.internal.org.objectweb.asm.*;
|
||||
import static jdk.internal.org.objectweb.asm.Opcodes.*;
|
||||
import jdk.internal.org.objectweb.asm.Type;
|
||||
import org.redkale.net.TransportFactory;
|
||||
import org.redkale.net.sncp.SncpClient.SncpAction;
|
||||
import org.redkale.service.*;
|
||||
import org.redkale.util.*;
|
||||
@@ -732,7 +733,7 @@ public abstract class Sncp {
|
||||
}
|
||||
|
||||
public static <T extends Service> T createSimpleLocalService(final Class<T> serviceImplClass,
|
||||
final SncpTransportFactory transportFactory, final InetSocketAddress clientSncpAddress, final String... groups) {
|
||||
final TransportFactory transportFactory, final InetSocketAddress clientSncpAddress, final String... groups) {
|
||||
return createLocalService("", serviceImplClass, ResourceFactory.root(), transportFactory, clientSncpAddress, Utility.ofSet(groups), null);
|
||||
}
|
||||
|
||||
@@ -744,7 +745,7 @@ public abstract class Sncp {
|
||||
* @param name 资源名
|
||||
* @param serviceImplClass Service类
|
||||
* @param resourceFactory ResourceFactory
|
||||
* @param transportFactory SncpTransportFactory
|
||||
* @param transportFactory TransportFactory
|
||||
* @param clientSncpAddress 本地IP地址
|
||||
* @param groups 所有的组节点,包含自身
|
||||
* @param conf 启动配置项
|
||||
@@ -756,7 +757,7 @@ public abstract class Sncp {
|
||||
final String name,
|
||||
final Class<T> serviceImplClass,
|
||||
final ResourceFactory resourceFactory,
|
||||
final SncpTransportFactory transportFactory,
|
||||
final TransportFactory transportFactory,
|
||||
final InetSocketAddress clientSncpAddress,
|
||||
final Set<String> groups,
|
||||
final AnyValue conf) {
|
||||
@@ -795,6 +796,7 @@ public abstract class Sncp {
|
||||
client.setSameGroupTransport(transportFactory.loadSameGroupTransport(clientSncpAddress));
|
||||
client.setDiffGroupTransports(transportFactory.loadDiffGroupTransports(clientSncpAddress, diffGroups));
|
||||
e.set(rs, client);
|
||||
transportFactory.addSncpService(rs);
|
||||
} catch (NoSuchFieldException ne) {
|
||||
ne.printStackTrace();
|
||||
}
|
||||
@@ -814,7 +816,7 @@ public abstract class Sncp {
|
||||
}
|
||||
|
||||
public static <T extends Service> T createSimpleRemoteService(final Class<T> serviceImplClass,
|
||||
final SncpTransportFactory transportFactory, final InetSocketAddress clientSncpAddress, final String... groups) {
|
||||
final TransportFactory transportFactory, final InetSocketAddress clientSncpAddress, final String... groups) {
|
||||
return createRemoteService("", serviceImplClass, transportFactory, clientSncpAddress, Utility.ofSet(groups), null);
|
||||
}
|
||||
|
||||
@@ -861,7 +863,7 @@ public abstract class Sncp {
|
||||
* @param <T> Service泛型
|
||||
* @param name 资源名
|
||||
* @param serviceTypeOrImplClass Service类
|
||||
* @param transportFactory SncpTransportFactory
|
||||
* @param transportFactory TransportFactory
|
||||
* @param clientAddress 本地IP地址
|
||||
* @param groups 所有的组节点,包含自身
|
||||
* @param conf 启动配置项
|
||||
@@ -873,7 +875,7 @@ public abstract class Sncp {
|
||||
public static <T extends Service> T createRemoteService(
|
||||
final String name,
|
||||
final Class<T> serviceTypeOrImplClass,
|
||||
final SncpTransportFactory transportFactory,
|
||||
final TransportFactory transportFactory,
|
||||
final InetSocketAddress clientAddress,
|
||||
final Set<String> groups,
|
||||
final AnyValue conf) {
|
||||
@@ -898,6 +900,7 @@ public abstract class Sncp {
|
||||
Field c = newClazz.getDeclaredField(FIELDPREFIX + "_client");
|
||||
c.setAccessible(true);
|
||||
c.set(rs, client);
|
||||
transportFactory.addSncpService(rs);
|
||||
return rs;
|
||||
} catch (Throwable ex) {
|
||||
}
|
||||
@@ -1097,6 +1100,7 @@ public abstract class Sncp {
|
||||
c.setAccessible(true);
|
||||
c.set(rs, conf);
|
||||
}
|
||||
transportFactory.addSncpService(rs);
|
||||
return rs;
|
||||
} catch (Exception ex) {
|
||||
throw new RuntimeException(ex);
|
||||
|
||||
@@ -81,7 +81,7 @@ public final class SncpClient {
|
||||
//本地模式
|
||||
protected Transport[] diffGroupTransports;
|
||||
|
||||
public <T extends Service> SncpClient(final String serviceName, final Class<T> serviceTypeOrImplClass, final T service, final SncpTransportFactory factory,
|
||||
public <T extends Service> SncpClient(final String serviceName, final Class<T> serviceTypeOrImplClass, final T service, final TransportFactory factory,
|
||||
final boolean remote, final Class serviceClass, final InetSocketAddress clientAddress) {
|
||||
this.remote = remote;
|
||||
this.executor = factory.getExecutor();
|
||||
@@ -101,7 +101,6 @@ public final class SncpClient {
|
||||
this.actions = methodens.toArray(new SncpAction[methodens.size()]);
|
||||
this.addrBytes = clientAddress == null ? new byte[4] : clientAddress.getAddress().getAddress();
|
||||
this.addrPort = clientAddress == null ? 0 : clientAddress.getPort();
|
||||
factory.addSncpClient(this);
|
||||
}
|
||||
|
||||
static List<SncpAction> getSncpActions(final Class serviceClass) {
|
||||
|
||||
@@ -1,51 +0,0 @@
|
||||
/*
|
||||
* To change this license header, choose License Headers in Project Properties.
|
||||
* To change this template file, choose Tools | Templates
|
||||
* and open the template in the editor.
|
||||
*/
|
||||
package org.redkale.net.sncp;
|
||||
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.AsynchronousChannelGroup;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import org.redkale.net.*;
|
||||
import org.redkale.util.ObjectPool;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author zhangjx
|
||||
*/
|
||||
public class SncpTransportFactory extends TransportFactory {
|
||||
|
||||
protected final List<WeakReference<SncpClient>> clients = new CopyOnWriteArrayList<>();
|
||||
|
||||
public SncpTransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup) {
|
||||
super(executor, bufferPool, channelGroup);
|
||||
}
|
||||
|
||||
public SncpTransportFactory addGroupInfo(String name, InetSocketAddress... addrs) {
|
||||
addGroupInfo(new TransportGroupInfo(name, addrs));
|
||||
return this;
|
||||
}
|
||||
|
||||
public SncpTransportFactory addGroupInfo(String name, Set<InetSocketAddress> addrs) {
|
||||
addGroupInfo(new TransportGroupInfo(name, addrs));
|
||||
return this;
|
||||
}
|
||||
|
||||
void addSncpClient(SncpClient client) {
|
||||
clients.add(new WeakReference<>(client));
|
||||
}
|
||||
|
||||
public List<SncpClient> getSncpClients() {
|
||||
List<SncpClient> rs = new ArrayList<>();
|
||||
for (WeakReference<SncpClient> ref : clients) {
|
||||
SncpClient client = ref.get();
|
||||
if (client != null) rs.add(client);
|
||||
}
|
||||
return rs;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user