diff --git a/src/META-INF/application-template.xml b/src/META-INF/application-template.xml index 54f44b1bc..eb9061646 100644 --- a/src/META-INF/application-template.xml +++ b/src/META-INF/application-template.xml @@ -65,7 +65,6 @@ MQ管理接口配置 不同MQ节点所配置的MQ集群不能重复。 name: 服务的名称,用于监控识别,多个mq节点时只能有一个name为空的节点,mq.name不能重复,命名规则: 字母、数字、下划线 - names: 服务的扩展名称,用于监控识别,一个mq可以使用多个资源名称,多个名称用分号;隔开, 且name需要全局唯一 value: 实现类名,必须是org.redkale.mq.MessageAgent的子类 MQ节点下的子节点配置没有固定格式, 根据MessageAgent实现方的定义来配置 --> diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index 91b1496b7..a958264b1 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -434,11 +434,6 @@ public final class Application { this.resourceFactory.inject(agent); agent.init(agent.getConfig()); this.resourceFactory.register(agent.getName(), MessageAgent.class, agent); - if (agent.getNames() != null) { - for (String n : agent.getNames()) { - this.resourceFactory.register(n, MessageAgent.class, agent); - } - } } } this.messageAgents = mqs; @@ -462,11 +457,6 @@ public final class Application { if (messageAgents == null) return null; for (MessageAgent agent : messageAgents) { if (agent.getName().equals(name)) return agent; - if (agent.getNames() != null) { - for (String n : agent.getNames()) { - if (n.equals(name)) return agent; - } - } } return null; } diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index 3e79ee545..8013ae125 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -403,6 +403,8 @@ public abstract class NodeServer { ((WebSocketNodeService) nodeService).setName(resourceName); } resourceFactory.inject(nodeService, self); + MessageAgent messageAgent = Sncp.getMessageAgent((Service) src); + if (messageAgent != null && Sncp.getMessageAgent(nodeService) == null) Sncp.setMessageAgent(nodeService, messageAgent); field.set(src, nodeService); if (Sncp.isRemote(nodeService)) { remoteServices.add(nodeService); @@ -470,13 +472,16 @@ public abstract class NodeServer { } Service service; - boolean ws = src instanceof WebSocketServlet; + final boolean ws = src instanceof WebSocketServlet; if (ws || localed) { //本地模式 service = Sncp.createLocalService(serverClassLoader, resourceName, serviceImplClass, agent, appResourceFactory, appSncpTransFactory, NodeServer.this.sncpAddress, groups, entry.getProperty()); } else { service = Sncp.createRemoteService(serverClassLoader, resourceName, serviceImplClass, agent, appSncpTransFactory, NodeServer.this.sncpAddress, groups, entry.getProperty()); } - if (service instanceof WebSocketNodeService) ((WebSocketNodeService) service).setName(resourceName); + if (service instanceof WebSocketNodeService) { + ((WebSocketNodeService) service).setName(resourceName); + if (agent != null) Sncp.setMessageAgent(service, agent); + } final Class restype = Sncp.getResourceType(service); if (rf.find(resourceName, restype) == null) { regFactory.register(resourceName, restype, service); diff --git a/src/org/redkale/mq/MessageAgent.java b/src/org/redkale/mq/MessageAgent.java index 76cd4d00b..494fe3435 100644 --- a/src/org/redkale/mq/MessageAgent.java +++ b/src/org/redkale/mq/MessageAgent.java @@ -38,8 +38,6 @@ public abstract class MessageAgent { protected String name; - protected String[] names; - protected AnyValue config; protected MessageProducer producer; @@ -58,15 +56,6 @@ public abstract class MessageAgent { public void init(AnyValue config) { this.name = checkName(config.getValue("name", "")); - String namex = config.getValue("names"); - if (namex != null && !namex.isEmpty()) { - List list = new ArrayList<>(); - for (String n : namex.split(";")) { - if (n.trim().isEmpty()) continue; - list.add(n.trim()); - } - if (!list.isEmpty()) this.names = list.toArray(new String[list.size()]); - } } //ServiceLoader时判断配置是否符合当前实现类 @@ -115,10 +104,6 @@ public abstract class MessageAgent { return name; } - public String[] getNames() { - return names; - } - public AnyValue getConfig() { return config; } diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 9578e370d..7d5ee607d 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -16,7 +16,6 @@ import javax.annotation.*; import org.redkale.boot.*; import org.redkale.convert.*; import org.redkale.convert.json.JsonConvert; -import org.redkale.mq.MessageAgent; import org.redkale.service.*; import org.redkale.source.*; import org.redkale.util.*; @@ -57,9 +56,6 @@ public abstract class WebSocketNode { @Resource(name = "$") protected CacheSource sncpNodeAddresses; - @Resource(name = "$") - protected MessageAgent messageAgent; - //当前节点的本地WebSocketEngine protected WebSocketEngine localEngine; diff --git a/src/org/redkale/net/sncp/Sncp.java b/src/org/redkale/net/sncp/Sncp.java index 902ef7a96..bad22d6e8 100644 --- a/src/org/redkale/net/sncp/Sncp.java +++ b/src/org/redkale/net/sncp/Sncp.java @@ -146,6 +146,28 @@ public abstract class Sncp { } } + public static MessageAgent getMessageAgent(Service service) { + if (service == null) return null; + try { + Field ts = service.getClass().getDeclaredField(FIELDPREFIX + "_messageagent"); + ts.setAccessible(true); + return (MessageAgent) ts.get(service); + } catch (Exception e) { + throw new RuntimeException(service + " not found " + FIELDPREFIX + "_messageagent"); + } + } + + public static void setMessageAgent(Service service, MessageAgent messageAgent) { + if (service == null) return; + try { + Field ts = service.getClass().getDeclaredField(FIELDPREFIX + "_messageagent"); + ts.setAccessible(true); + ts.set(service, messageAgent); + } catch (Exception e) { + throw new RuntimeException(service + " not found " + FIELDPREFIX + "_messageagent"); + } + } + public static boolean updateTransport(Service service, final TransportFactory transportFactory, String name, String protocol, InetSocketAddress clientAddress, final Set groups, final Collection addresses) { @@ -329,6 +351,10 @@ public abstract class Sncp { fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_client", clientDesc, null, null); fv.visitEnd(); } + { + fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_messageagent", Type.getDescriptor(MessageAgent.class), null, null); + fv.visitEnd(); + } { //构造函数 mv = new MethodDebugVisitor(cw.visitMethod(ACC_PUBLIC, "", "()V", null, null)); //mv.setDebug(true); @@ -414,9 +440,9 @@ public abstract class Sncp { } } - public static T createSimpleLocalService(final Class serviceImplClass, final MessageAgent remoteAgent, + public static T createSimpleLocalService(final Class serviceImplClass, final MessageAgent messageAgent, final TransportFactory transportFactory, final InetSocketAddress clientSncpAddress, final String... groups) { - return createLocalService(null, "", serviceImplClass, remoteAgent, ResourceFactory.root(), transportFactory, clientSncpAddress, Utility.ofSet(groups), null); + return createLocalService(null, "", serviceImplClass, messageAgent, ResourceFactory.root(), transportFactory, clientSncpAddress, Utility.ofSet(groups), null); } /** @@ -427,7 +453,7 @@ public abstract class Sncp { * @param classLoader ClassLoader * @param name 资源名 * @param serviceImplClass Service类 - * @param remoteAgent MQ管理器 + * @param messageAgent MQ管理器 * @param resourceFactory ResourceFactory * @param transportFactory TransportFactory * @param clientSncpAddress 本地IP地址 @@ -441,7 +467,7 @@ public abstract class Sncp { final ClassLoader classLoader, final String name, final Class serviceImplClass, - final MessageAgent remoteAgent, + final MessageAgent messageAgent, final ResourceFactory resourceFactory, final TransportFactory transportFactory, final InetSocketAddress clientSncpAddress, @@ -462,7 +488,7 @@ public abstract class Sncp { if (!field.getType().isAssignableFrom(newClazz)) continue; field.setAccessible(true); if (remoteService == null && clientSncpAddress != null) { - remoteService = createRemoteService(classLoader, name, serviceImplClass, remoteAgent, transportFactory, clientSncpAddress, groups, conf); + remoteService = createRemoteService(classLoader, name, serviceImplClass, messageAgent, transportFactory, clientSncpAddress, groups, conf); } if (remoteService != null) field.set(service, remoteService); } @@ -471,15 +497,20 @@ public abstract class Sncp { SncpClient client = null; { try { - Field e = newClazz.getDeclaredField(FIELDPREFIX + "_client"); - e.setAccessible(true); - client = new SncpClient(name, serviceImplClass, service, remoteAgent, transportFactory, false, newClazz, clientSncpAddress); - e.set(service, client); + Field c = newClazz.getDeclaredField(FIELDPREFIX + "_client"); + c.setAccessible(true); + client = new SncpClient(name, serviceImplClass, service, messageAgent, transportFactory, false, newClazz, clientSncpAddress); + c.set(service, client); if (transportFactory != null) transportFactory.addSncpService(service); } catch (NoSuchFieldException ne) { ne.printStackTrace(); } } + if (messageAgent != null) { + Field c = newClazz.getDeclaredField(FIELDPREFIX + "_messageagent"); + c.setAccessible(true); + c.set(service, messageAgent); + } if (client == null) return service; { Field c = newClazz.getDeclaredField(FIELDPREFIX + "_conf"); @@ -533,7 +564,7 @@ public abstract class Sncp { * @param classLoader ClassLoader * @param name 资源名 * @param serviceTypeOrImplClass Service类 - * @param messageAgent MQ管理器 + * @param messageAgent MQ管理器 * @param transportFactory TransportFactory * @param clientAddress 本地IP地址 * @param groups0 所有的组节点,包含自身 @@ -572,10 +603,15 @@ public abstract class Sncp { T service = (T) newClazz.getDeclaredConstructor().newInstance(); SncpClient client = new SncpClient(name, serviceTypeOrImplClass, service, messageAgent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress); client.setRemoteGroups(groups); - client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups)); + if (transportFactory != null) client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups)); Field c = newClazz.getDeclaredField(FIELDPREFIX + "_client"); c.setAccessible(true); c.set(service, client); + if (messageAgent != null) { + Field m = newClazz.getDeclaredField(FIELDPREFIX + "_messageagent"); + m.setAccessible(true); + m.set(service, messageAgent); + } if (transportFactory != null) transportFactory.addSncpService(service); return service; } catch (Throwable ex) { @@ -617,6 +653,10 @@ public abstract class Sncp { fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_client", clientDesc, null, null); fv.visitEnd(); } + { + fv = cw.visitField(ACC_PRIVATE, FIELDPREFIX + "_messageagent", Type.getDescriptor(MessageAgent.class), null, null); + fv.visitEnd(); + } { //构造函数 mv = new MethodDebugVisitor(cw.visitMethod(ACC_PUBLIC, "", "()V", null, null)); //mv.setDebug(true); @@ -753,12 +793,17 @@ public abstract class Sncp { T service = (T) newClazz.getDeclaredConstructor().newInstance(); SncpClient client = new SncpClient(name, serviceTypeOrImplClass, service, messageAgent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress); client.setRemoteGroups(groups); - client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups)); + if (transportFactory != null) client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups)); { Field c = newClazz.getDeclaredField(FIELDPREFIX + "_client"); c.setAccessible(true); c.set(service, client); } + { + Field c = newClazz.getDeclaredField(FIELDPREFIX + "_messageagent"); + c.setAccessible(true); + c.set(service, messageAgent); + } { Field c = newClazz.getDeclaredField(FIELDPREFIX + "_conf"); c.setAccessible(true);