This commit is contained in:
Redkale
2020-06-03 19:58:13 +08:00
parent 4806d6ada0
commit ebea6bb92c
7 changed files with 46 additions and 32 deletions

View File

@@ -121,7 +121,7 @@ public class NodeHttpServer extends NodeServer {
resourceFactory.register(RESNAME_SNCP_ADDR, String.class, sncpResFactory.find(RESNAME_SNCP_ADDR, String.class));
}
if (nodeService == null) {
nodeService = Sncp.createLocalService(serverClassLoader, resourceName, WebSocketNodeService.class, application.getResourceFactory(), application.getSncpTransportFactory(), (InetSocketAddress) null, (Set<String>) null, (AnyValue) null);
nodeService = Sncp.createLocalService(serverClassLoader, resourceName, WebSocketNodeService.class, null, application.getResourceFactory(), application.getSncpTransportFactory(), (InetSocketAddress) null, (Set<String>) null, (AnyValue) null);
regFactory.register(resourceName, WebSocketNode.class, nodeService);
}
resourceFactory.inject(nodeService, self);

View File

@@ -281,7 +281,7 @@ public abstract class NodeServer {
SncpClient client = Sncp.getSncpClient(srcService);
final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress();
final Set<String> groups = new HashSet<>();
source = (DataSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appSncpTranFactory, sncpAddr, groups, Sncp.getConf(srcService));
source = (DataSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, client == null ? null : client.getMessageAgent(), appResFactory, appSncpTranFactory, sncpAddr, groups, Sncp.getConf(srcService));
}
}
}
@@ -321,7 +321,7 @@ public abstract class NodeServer {
final Class sourceType = sourceConf == null ? CacheMemorySource.class : serverClassLoader.loadClass(sourceConf.getValue("value"));
Object source = null;
if (CacheSource.class.isAssignableFrom(sourceType)) { // CacheSource
source = (CacheSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appSncpTranFactory, sncpAddr, null, Sncp.getConf(srcService));
source = (CacheSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, client == null ? null : client.getMessageAgent(), appResFactory, appSncpTranFactory, sncpAddr, null, Sncp.getConf(srcService));
Type genericType = field.getGenericType();
ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null;
Type valType = pt == null ? null : pt.getActualTypeArguments()[0];
@@ -366,7 +366,7 @@ public abstract class NodeServer {
if (nodeService == null) {
final HashSet<String> groups = new HashSet<>();
if (groups.isEmpty() && isSNCP() && NodeServer.this.sncpGroup != null) groups.add(NodeServer.this.sncpGroup);
nodeService = Sncp.createLocalService(serverClassLoader, resourceName, WebSocketNodeService.class, application.getResourceFactory(), application.getSncpTransportFactory(), NodeServer.this.sncpAddress, groups, (AnyValue) null);
nodeService = Sncp.createLocalService(serverClassLoader, resourceName, WebSocketNodeService.class, null, application.getResourceFactory(), application.getSncpTransportFactory(), NodeServer.this.sncpAddress, groups, (AnyValue) null);
(isSNCP() ? appResFactory : resourceFactory).register(resourceName, WebSocketNode.class, nodeService);
((WebSocketNodeService) nodeService).setName(resourceName);
}
@@ -431,12 +431,18 @@ public abstract class NodeServer {
return;
}
MessageAgent agent = null;
if (entry.getProperty() != null && entry.getProperty().getValue("mq") != null) {
agent = application.getMessageAgent(entry.getProperty().getValue("mq"));
if (agent != null) messageAgents.put(agent.getName(), agent);
}
Service service;
boolean ws = src instanceof WebSocketServlet;
if (ws || localed) { //本地模式
service = Sncp.createLocalService(serverClassLoader, resourceName, serviceImplClass, appResourceFactory, appSncpTransFactory, NodeServer.this.sncpAddress, groups, entry.getProperty());
service = Sncp.createLocalService(serverClassLoader, resourceName, serviceImplClass, agent, appResourceFactory, appSncpTransFactory, NodeServer.this.sncpAddress, groups, entry.getProperty());
} else {
service = Sncp.createRemoteService(serverClassLoader, resourceName, serviceImplClass, appSncpTransFactory, NodeServer.this.sncpAddress, groups, entry.getProperty());
service = Sncp.createRemoteService(serverClassLoader, resourceName, serviceImplClass, agent, appSncpTransFactory, NodeServer.this.sncpAddress, groups, entry.getProperty());
}
final Class restype = Sncp.getResourceType(service);
if (rf.find(resourceName, restype) == null) {
@@ -444,11 +450,6 @@ public abstract class NodeServer {
} else if (isSNCP() && !entry.isAutoload()) {
throw new RuntimeException(restype.getSimpleName() + "(class:" + serviceImplClass.getName() + ", name:" + resourceName + ", group:" + groups + ") is repeat.");
}
MessageAgent agent = null;
if (entry.getProperty() != null && entry.getProperty().getValue("mq") != null) {
agent = application.getMessageAgent(entry.getProperty().getValue("mq"));
if (agent != null) messageAgents.put(agent.getName(), agent);
}
if (Sncp.isRemote(service)) {
remoteServices.add(service);
if (agent != null) sncpRemoteAgents.put(agent.getName(), agent);

View File

@@ -18,6 +18,7 @@ import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES;
import org.redkale.asm.*;
import static org.redkale.asm.Opcodes.*;
import org.redkale.asm.Type;
import org.redkale.mq.MessageAgent;
import org.redkale.net.TransportFactory;
import org.redkale.net.sncp.SncpClient.SncpAction;
import org.redkale.service.*;
@@ -413,9 +414,9 @@ public abstract class Sncp {
}
}
public static <T extends Service> T createSimpleLocalService(final Class<T> serviceImplClass,
public static <T extends Service> T createSimpleLocalService(final Class<T> serviceImplClass, final MessageAgent remoteAgent,
final TransportFactory transportFactory, final InetSocketAddress clientSncpAddress, final String... groups) {
return createLocalService(null, "", serviceImplClass, ResourceFactory.root(), transportFactory, clientSncpAddress, Utility.ofSet(groups), null);
return createLocalService(null, "", serviceImplClass, remoteAgent, ResourceFactory.root(), transportFactory, clientSncpAddress, Utility.ofSet(groups), null);
}
/**
@@ -426,6 +427,7 @@ public abstract class Sncp {
* @param classLoader ClassLoader
* @param name 资源名
* @param serviceImplClass Service类
* @param remoteAgent MQ管理器
* @param resourceFactory ResourceFactory
* @param transportFactory TransportFactory
* @param clientSncpAddress 本地IP地址
@@ -439,6 +441,7 @@ public abstract class Sncp {
final ClassLoader classLoader,
final String name,
final Class<T> serviceImplClass,
final MessageAgent remoteAgent,
final ResourceFactory resourceFactory,
final TransportFactory transportFactory,
final InetSocketAddress clientSncpAddress,
@@ -459,7 +462,7 @@ public abstract class Sncp {
if (!field.getType().isAssignableFrom(newClazz)) continue;
field.setAccessible(true);
if (remoteService == null && clientSncpAddress != null) {
remoteService = createRemoteService(classLoader, name, serviceImplClass, transportFactory, clientSncpAddress, groups, conf);
remoteService = createRemoteService(classLoader, name, serviceImplClass, remoteAgent, transportFactory, clientSncpAddress, groups, conf);
}
if (remoteService != null) field.set(rs, remoteService);
}
@@ -470,7 +473,7 @@ public abstract class Sncp {
try {
Field e = newClazz.getDeclaredField(FIELDPREFIX + "_client");
e.setAccessible(true);
client = new SncpClient(name, serviceImplClass, rs, transportFactory, false, newClazz, clientSncpAddress);
client = new SncpClient(name, serviceImplClass, rs, remoteAgent, transportFactory, false, newClazz, clientSncpAddress);
e.set(rs, client);
transportFactory.addSncpService(rs);
} catch (NoSuchFieldException ne) {
@@ -491,9 +494,9 @@ public abstract class Sncp {
}
}
public static <T extends Service> T createSimpleRemoteService(final Class<T> serviceImplClass,
public static <T extends Service> T createSimpleRemoteService(final Class<T> serviceImplClass, final MessageAgent agent,
final TransportFactory transportFactory, final InetSocketAddress clientSncpAddress, final String... groups) {
return createRemoteService(null, "", serviceImplClass, transportFactory, clientSncpAddress, Utility.ofSet(groups), null);
return createRemoteService(null, "", serviceImplClass, agent, transportFactory, clientSncpAddress, Utility.ofSet(groups), null);
}
/**
@@ -530,6 +533,7 @@ public abstract class Sncp {
* @param classLoader ClassLoader
* @param name 资源名
* @param serviceTypeOrImplClass Service类
* @param agent MQ管理器
* @param transportFactory TransportFactory
* @param clientAddress 本地IP地址
* @param groups0 所有的组节点,包含自身
@@ -543,6 +547,7 @@ public abstract class Sncp {
final ClassLoader classLoader,
final String name,
final Class<T> serviceTypeOrImplClass,
final MessageAgent agent,
final TransportFactory transportFactory,
final InetSocketAddress clientAddress,
final Set<String> groups0,
@@ -565,7 +570,7 @@ public abstract class Sncp {
try {
Class newClazz = loader.loadClass(newDynName.replace('/', '.'));
T rs = (T) newClazz.getDeclaredConstructor().newInstance();
SncpClient client = new SncpClient(name, serviceTypeOrImplClass, rs, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress);
SncpClient client = new SncpClient(name, serviceTypeOrImplClass, rs, agent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress);
client.setRemoteGroups(groups);
client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups));
Field c = newClazz.getDeclaredField(FIELDPREFIX + "_client");
@@ -746,7 +751,7 @@ public abstract class Sncp {
}.loadClass(newDynName.replace('/', '.'), bytes);
try {
T rs = (T) newClazz.getDeclaredConstructor().newInstance();
SncpClient client = new SncpClient(name, serviceTypeOrImplClass, rs, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress);
SncpClient client = new SncpClient(name, serviceTypeOrImplClass, rs, agent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress);
client.setRemoteGroups(groups);
client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups));
{

View File

@@ -17,6 +17,7 @@ import java.util.logging.*;
import javax.annotation.Resource;
import org.redkale.convert.bson.*;
import org.redkale.convert.json.*;
import org.redkale.mq.MessageAgent;
import org.redkale.net.*;
import static org.redkale.net.sncp.SncpRequest.*;
import org.redkale.service.*;
@@ -56,6 +57,8 @@ public final class SncpClient {
protected final ExecutorService executor;
protected final MessageAgent agent;
protected final Supplier<ByteBuffer> bufferSupplier;
@Resource
@@ -70,11 +73,12 @@ public final class SncpClient {
//远程模式, 可能为null
protected Transport remoteGroupTransport;
public <T extends Service> SncpClient(final String serviceName, final Class<T> serviceTypeOrImplClass, final T service, final TransportFactory factory,
public <T extends Service> SncpClient(final String serviceName, final Class<T> serviceTypeOrImplClass, final T service, MessageAgent agent, final TransportFactory factory,
final boolean remote, final Class serviceClass, final InetSocketAddress clientSncpAddress) {
this.remote = remote;
this.executor = factory.getExecutor();
this.bufferSupplier = factory.getBufferSupplier();
this.agent = agent;
Class<?> tn = serviceTypeOrImplClass;
Version ver = tn.getAnnotation(Version.class);
this.serviceClass = serviceClass;
@@ -104,6 +108,10 @@ public final class SncpClient {
return actions;
}
public MessageAgent getMessageAgent() {
return agent;
}
public InetSocketAddress getClientAddress() {
return clientSncpAddress;
}