This commit is contained in:
Redkale
2020-06-07 11:27:40 +08:00
parent 7ec91802b2
commit adfa0be79e
6 changed files with 64 additions and 44 deletions

View File

@@ -65,7 +65,6 @@
MQ管理接口配置
不同MQ节点所配置的MQ集群不能重复。
name: 服务的名称用于监控识别多个mq节点时只能有一个name为空的节点mq.name不能重复,命名规则: 字母、数字、下划线
names: 服务的扩展名称用于监控识别一个mq可以使用多个资源名称多个名称用分号;隔开, 且name需要全局唯一
value 实现类名必须是org.redkale.mq.MessageAgent的子类
MQ节点下的子节点配置没有固定格式, 根据MessageAgent实现方的定义来配置
-->

View File

@@ -434,11 +434,6 @@ public final class Application {
this.resourceFactory.inject(agent);
agent.init(agent.getConfig());
this.resourceFactory.register(agent.getName(), MessageAgent.class, agent);
if (agent.getNames() != null) {
for (String n : agent.getNames()) {
this.resourceFactory.register(n, MessageAgent.class, agent);
}
}
}
}
this.messageAgents = mqs;
@@ -462,11 +457,6 @@ public final class Application {
if (messageAgents == null) return null;
for (MessageAgent agent : messageAgents) {
if (agent.getName().equals(name)) return agent;
if (agent.getNames() != null) {
for (String n : agent.getNames()) {
if (n.equals(name)) return agent;
}
}
}
return null;
}

View File

@@ -403,6 +403,8 @@ public abstract class NodeServer {
((WebSocketNodeService) nodeService).setName(resourceName);
}
resourceFactory.inject(nodeService, self);
MessageAgent messageAgent = Sncp.getMessageAgent((Service) src);
if (messageAgent != null && Sncp.getMessageAgent(nodeService) == null) Sncp.setMessageAgent(nodeService, messageAgent);
field.set(src, nodeService);
if (Sncp.isRemote(nodeService)) {
remoteServices.add(nodeService);
@@ -470,13 +472,16 @@ public abstract class NodeServer {
}
Service service;
boolean ws = src instanceof WebSocketServlet;
final boolean ws = src instanceof WebSocketServlet;
if (ws || localed) { //本地模式
service = Sncp.createLocalService(serverClassLoader, resourceName, serviceImplClass, agent, appResourceFactory, appSncpTransFactory, NodeServer.this.sncpAddress, groups, entry.getProperty());
} else {
service = Sncp.createRemoteService(serverClassLoader, resourceName, serviceImplClass, agent, appSncpTransFactory, NodeServer.this.sncpAddress, groups, entry.getProperty());
}
if (service instanceof WebSocketNodeService) ((WebSocketNodeService) service).setName(resourceName);
if (service instanceof WebSocketNodeService) {
((WebSocketNodeService) service).setName(resourceName);
if (agent != null) Sncp.setMessageAgent(service, agent);
}
final Class restype = Sncp.getResourceType(service);
if (rf.find(resourceName, restype) == null) {
regFactory.register(resourceName, restype, service);

View File

@@ -38,8 +38,6 @@ public abstract class MessageAgent {
protected String name;
protected String[] names;
protected AnyValue config;
protected MessageProducer producer;
@@ -58,15 +56,6 @@ public abstract class MessageAgent {
public void init(AnyValue config) {
this.name = checkName(config.getValue("name", ""));
String namex = config.getValue("names");
if (namex != null && !namex.isEmpty()) {
List<String> list = new ArrayList<>();
for (String n : namex.split(";")) {
if (n.trim().isEmpty()) continue;
list.add(n.trim());
}
if (!list.isEmpty()) this.names = list.toArray(new String[list.size()]);
}
}
//ServiceLoader时判断配置是否符合当前实现类
@@ -115,10 +104,6 @@ public abstract class MessageAgent {
return name;
}
public String[] getNames() {
return names;
}
public AnyValue getConfig() {
return config;
}

View File

@@ -16,7 +16,6 @@ import javax.annotation.*;
import org.redkale.boot.*;
import org.redkale.convert.*;
import org.redkale.convert.json.JsonConvert;
import org.redkale.mq.MessageAgent;
import org.redkale.service.*;
import org.redkale.source.*;
import org.redkale.util.*;
@@ -57,9 +56,6 @@ public abstract class WebSocketNode {
@Resource(name = "$")
protected CacheSource<InetSocketAddress> sncpNodeAddresses;
@Resource(name = "$")
protected MessageAgent messageAgent;
//当前节点的本地WebSocketEngine
protected WebSocketEngine localEngine;

View File

@@ -146,6 +146,28 @@ public abstract class Sncp {
}
}
public static MessageAgent getMessageAgent(Service service) {
if (service == null) return null;
try {
Field ts = service.getClass().getDeclaredField(FIELDPREFIX + "_messageagent");
ts.setAccessible(true);
return (MessageAgent) ts.get(service);
} catch (Exception e) {
throw new RuntimeException(service + " not found " + FIELDPREFIX + "_messageagent");
}
}
public static void setMessageAgent(Service service, MessageAgent messageAgent) {
if (service == null) return;
try {
Field ts = service.getClass().getDeclaredField(FIELDPREFIX + "_messageagent");
ts.setAccessible(true);
ts.set(service, messageAgent);
} catch (Exception e) {
throw new RuntimeException(service + " not found " + FIELDPREFIX + "_messageagent");
}
}
public static boolean updateTransport(Service service,
final TransportFactory transportFactory, String name, String protocol, InetSocketAddress clientAddress,
final Set<String> groups, final Collection<InetSocketAddress> addresses) {
@@ -329,6 +351,10 @@ public abstract class Sncp {
fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_client", clientDesc, null, null);
fv.visitEnd();
}
{
fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_messageagent", Type.getDescriptor(MessageAgent.class), null, null);
fv.visitEnd();
}
{ //构造函数
mv = new MethodDebugVisitor(cw.visitMethod(ACC_PUBLIC, "<init>", "()V", null, null));
//mv.setDebug(true);
@@ -414,9 +440,9 @@ public abstract class Sncp {
}
}
public static <T extends Service> T createSimpleLocalService(final Class<T> serviceImplClass, final MessageAgent remoteAgent,
public static <T extends Service> T createSimpleLocalService(final Class<T> serviceImplClass, final MessageAgent messageAgent,
final TransportFactory transportFactory, final InetSocketAddress clientSncpAddress, final String... groups) {
return createLocalService(null, "", serviceImplClass, remoteAgent, ResourceFactory.root(), transportFactory, clientSncpAddress, Utility.ofSet(groups), null);
return createLocalService(null, "", serviceImplClass, messageAgent, ResourceFactory.root(), transportFactory, clientSncpAddress, Utility.ofSet(groups), null);
}
/**
@@ -427,7 +453,7 @@ public abstract class Sncp {
* @param classLoader ClassLoader
* @param name 资源名
* @param serviceImplClass Service类
* @param remoteAgent MQ管理器
* @param messageAgent MQ管理器
* @param resourceFactory ResourceFactory
* @param transportFactory TransportFactory
* @param clientSncpAddress 本地IP地址
@@ -441,7 +467,7 @@ public abstract class Sncp {
final ClassLoader classLoader,
final String name,
final Class<T> serviceImplClass,
final MessageAgent remoteAgent,
final MessageAgent messageAgent,
final ResourceFactory resourceFactory,
final TransportFactory transportFactory,
final InetSocketAddress clientSncpAddress,
@@ -462,7 +488,7 @@ public abstract class Sncp {
if (!field.getType().isAssignableFrom(newClazz)) continue;
field.setAccessible(true);
if (remoteService == null && clientSncpAddress != null) {
remoteService = createRemoteService(classLoader, name, serviceImplClass, remoteAgent, transportFactory, clientSncpAddress, groups, conf);
remoteService = createRemoteService(classLoader, name, serviceImplClass, messageAgent, transportFactory, clientSncpAddress, groups, conf);
}
if (remoteService != null) field.set(service, remoteService);
}
@@ -471,15 +497,20 @@ public abstract class Sncp {
SncpClient client = null;
{
try {
Field e = newClazz.getDeclaredField(FIELDPREFIX + "_client");
e.setAccessible(true);
client = new SncpClient(name, serviceImplClass, service, remoteAgent, transportFactory, false, newClazz, clientSncpAddress);
e.set(service, client);
Field c = newClazz.getDeclaredField(FIELDPREFIX + "_client");
c.setAccessible(true);
client = new SncpClient(name, serviceImplClass, service, messageAgent, transportFactory, false, newClazz, clientSncpAddress);
c.set(service, client);
if (transportFactory != null) transportFactory.addSncpService(service);
} catch (NoSuchFieldException ne) {
ne.printStackTrace();
}
}
if (messageAgent != null) {
Field c = newClazz.getDeclaredField(FIELDPREFIX + "_messageagent");
c.setAccessible(true);
c.set(service, messageAgent);
}
if (client == null) return service;
{
Field c = newClazz.getDeclaredField(FIELDPREFIX + "_conf");
@@ -533,7 +564,7 @@ public abstract class Sncp {
* @param classLoader ClassLoader
* @param name 资源名
* @param serviceTypeOrImplClass Service类
* @param messageAgent MQ管理器
* @param messageAgent MQ管理器
* @param transportFactory TransportFactory
* @param clientAddress 本地IP地址
* @param groups0 所有的组节点,包含自身
@@ -572,10 +603,15 @@ public abstract class Sncp {
T service = (T) newClazz.getDeclaredConstructor().newInstance();
SncpClient client = new SncpClient(name, serviceTypeOrImplClass, service, messageAgent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress);
client.setRemoteGroups(groups);
client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups));
if (transportFactory != null) client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups));
Field c = newClazz.getDeclaredField(FIELDPREFIX + "_client");
c.setAccessible(true);
c.set(service, client);
if (messageAgent != null) {
Field m = newClazz.getDeclaredField(FIELDPREFIX + "_messageagent");
m.setAccessible(true);
m.set(service, messageAgent);
}
if (transportFactory != null) transportFactory.addSncpService(service);
return service;
} catch (Throwable ex) {
@@ -617,6 +653,10 @@ public abstract class Sncp {
fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_client", clientDesc, null, null);
fv.visitEnd();
}
{
fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_messageagent", Type.getDescriptor(MessageAgent.class), null, null);
fv.visitEnd();
}
{ //构造函数
mv = new MethodDebugVisitor(cw.visitMethod(ACC_PUBLIC, "<init>", "()V", null, null));
//mv.setDebug(true);
@@ -753,12 +793,17 @@ public abstract class Sncp {
T service = (T) newClazz.getDeclaredConstructor().newInstance();
SncpClient client = new SncpClient(name, serviceTypeOrImplClass, service, messageAgent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress);
client.setRemoteGroups(groups);
client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups));
if (transportFactory != null) client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups));
{
Field c = newClazz.getDeclaredField(FIELDPREFIX + "_client");
c.setAccessible(true);
c.set(service, client);
}
{
Field c = newClazz.getDeclaredField(FIELDPREFIX + "_messageagent");
c.setAccessible(true);
c.set(service, messageAgent);
}
{
Field c = newClazz.getDeclaredField(FIELDPREFIX + "_conf");
c.setAccessible(true);