From ebea6bb92c96456e04cf640a951cf570ac1382ab Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Wed, 3 Jun 2020 19:58:13 +0800 Subject: [PATCH] --- src/org/redkale/boot/NodeHttpServer.java | 2 +- src/org/redkale/boot/NodeServer.java | 21 ++++++++++--------- src/org/redkale/net/sncp/Sncp.java | 21 ++++++++++++------- src/org/redkale/net/sncp/SncpClient.java | 10 ++++++++- .../redkale/test/service/ABMainService.java | 10 ++++----- test/org/redkale/test/sncp/SncpTest.java | 8 +++---- .../test/sncp/SncpTestServiceImpl.java | 6 +++--- 7 files changed, 46 insertions(+), 32 deletions(-) diff --git a/src/org/redkale/boot/NodeHttpServer.java b/src/org/redkale/boot/NodeHttpServer.java index 003738758..fffd8e7d2 100644 --- a/src/org/redkale/boot/NodeHttpServer.java +++ b/src/org/redkale/boot/NodeHttpServer.java @@ -121,7 +121,7 @@ public class NodeHttpServer extends NodeServer { resourceFactory.register(RESNAME_SNCP_ADDR, String.class, sncpResFactory.find(RESNAME_SNCP_ADDR, String.class)); } if (nodeService == null) { - nodeService = Sncp.createLocalService(serverClassLoader, resourceName, WebSocketNodeService.class, application.getResourceFactory(), application.getSncpTransportFactory(), (InetSocketAddress) null, (Set) null, (AnyValue) null); + nodeService = Sncp.createLocalService(serverClassLoader, resourceName, WebSocketNodeService.class, null, application.getResourceFactory(), application.getSncpTransportFactory(), (InetSocketAddress) null, (Set) null, (AnyValue) null); regFactory.register(resourceName, WebSocketNode.class, nodeService); } resourceFactory.inject(nodeService, self); diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index 282a9916f..7432be888 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -281,7 +281,7 @@ public abstract class NodeServer { SncpClient client = Sncp.getSncpClient(srcService); final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress(); final Set groups = new HashSet<>(); - source = (DataSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appSncpTranFactory, sncpAddr, groups, Sncp.getConf(srcService)); + source = (DataSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, client == null ? null : client.getMessageAgent(), appResFactory, appSncpTranFactory, sncpAddr, groups, Sncp.getConf(srcService)); } } } @@ -321,7 +321,7 @@ public abstract class NodeServer { final Class sourceType = sourceConf == null ? CacheMemorySource.class : serverClassLoader.loadClass(sourceConf.getValue("value")); Object source = null; if (CacheSource.class.isAssignableFrom(sourceType)) { // CacheSource - source = (CacheSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, appResFactory, appSncpTranFactory, sncpAddr, null, Sncp.getConf(srcService)); + source = (CacheSource) Sncp.createLocalService(serverClassLoader, resourceName, sourceType, client == null ? null : client.getMessageAgent(), appResFactory, appSncpTranFactory, sncpAddr, null, Sncp.getConf(srcService)); Type genericType = field.getGenericType(); ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null; Type valType = pt == null ? null : pt.getActualTypeArguments()[0]; @@ -366,7 +366,7 @@ public abstract class NodeServer { if (nodeService == null) { final HashSet groups = new HashSet<>(); if (groups.isEmpty() && isSNCP() && NodeServer.this.sncpGroup != null) groups.add(NodeServer.this.sncpGroup); - nodeService = Sncp.createLocalService(serverClassLoader, resourceName, WebSocketNodeService.class, application.getResourceFactory(), application.getSncpTransportFactory(), NodeServer.this.sncpAddress, groups, (AnyValue) null); + nodeService = Sncp.createLocalService(serverClassLoader, resourceName, WebSocketNodeService.class, null, application.getResourceFactory(), application.getSncpTransportFactory(), NodeServer.this.sncpAddress, groups, (AnyValue) null); (isSNCP() ? appResFactory : resourceFactory).register(resourceName, WebSocketNode.class, nodeService); ((WebSocketNodeService) nodeService).setName(resourceName); } @@ -431,12 +431,18 @@ public abstract class NodeServer { return; } + MessageAgent agent = null; + if (entry.getProperty() != null && entry.getProperty().getValue("mq") != null) { + agent = application.getMessageAgent(entry.getProperty().getValue("mq")); + if (agent != null) messageAgents.put(agent.getName(), agent); + } + Service service; boolean ws = src instanceof WebSocketServlet; if (ws || localed) { //本地模式 - service = Sncp.createLocalService(serverClassLoader, resourceName, serviceImplClass, appResourceFactory, appSncpTransFactory, NodeServer.this.sncpAddress, groups, entry.getProperty()); + service = Sncp.createLocalService(serverClassLoader, resourceName, serviceImplClass, agent, appResourceFactory, appSncpTransFactory, NodeServer.this.sncpAddress, groups, entry.getProperty()); } else { - service = Sncp.createRemoteService(serverClassLoader, resourceName, serviceImplClass, appSncpTransFactory, NodeServer.this.sncpAddress, groups, entry.getProperty()); + service = Sncp.createRemoteService(serverClassLoader, resourceName, serviceImplClass, agent, appSncpTransFactory, NodeServer.this.sncpAddress, groups, entry.getProperty()); } final Class restype = Sncp.getResourceType(service); if (rf.find(resourceName, restype) == null) { @@ -444,11 +450,6 @@ public abstract class NodeServer { } else if (isSNCP() && !entry.isAutoload()) { throw new RuntimeException(restype.getSimpleName() + "(class:" + serviceImplClass.getName() + ", name:" + resourceName + ", group:" + groups + ") is repeat."); } - MessageAgent agent = null; - if (entry.getProperty() != null && entry.getProperty().getValue("mq") != null) { - agent = application.getMessageAgent(entry.getProperty().getValue("mq")); - if (agent != null) messageAgents.put(agent.getName(), agent); - } if (Sncp.isRemote(service)) { remoteServices.add(service); if (agent != null) sncpRemoteAgents.put(agent.getName(), agent); diff --git a/src/org/redkale/net/sncp/Sncp.java b/src/org/redkale/net/sncp/Sncp.java index 2eb64552e..970d99253 100644 --- a/src/org/redkale/net/sncp/Sncp.java +++ b/src/org/redkale/net/sncp/Sncp.java @@ -18,6 +18,7 @@ import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES; import org.redkale.asm.*; import static org.redkale.asm.Opcodes.*; import org.redkale.asm.Type; +import org.redkale.mq.MessageAgent; import org.redkale.net.TransportFactory; import org.redkale.net.sncp.SncpClient.SncpAction; import org.redkale.service.*; @@ -413,9 +414,9 @@ public abstract class Sncp { } } - public static T createSimpleLocalService(final Class serviceImplClass, + public static T createSimpleLocalService(final Class serviceImplClass, final MessageAgent remoteAgent, final TransportFactory transportFactory, final InetSocketAddress clientSncpAddress, final String... groups) { - return createLocalService(null, "", serviceImplClass, ResourceFactory.root(), transportFactory, clientSncpAddress, Utility.ofSet(groups), null); + return createLocalService(null, "", serviceImplClass, remoteAgent, ResourceFactory.root(), transportFactory, clientSncpAddress, Utility.ofSet(groups), null); } /** @@ -426,6 +427,7 @@ public abstract class Sncp { * @param classLoader ClassLoader * @param name 资源名 * @param serviceImplClass Service类 + * @param remoteAgent MQ管理器 * @param resourceFactory ResourceFactory * @param transportFactory TransportFactory * @param clientSncpAddress 本地IP地址 @@ -439,6 +441,7 @@ public abstract class Sncp { final ClassLoader classLoader, final String name, final Class serviceImplClass, + final MessageAgent remoteAgent, final ResourceFactory resourceFactory, final TransportFactory transportFactory, final InetSocketAddress clientSncpAddress, @@ -459,7 +462,7 @@ public abstract class Sncp { if (!field.getType().isAssignableFrom(newClazz)) continue; field.setAccessible(true); if (remoteService == null && clientSncpAddress != null) { - remoteService = createRemoteService(classLoader, name, serviceImplClass, transportFactory, clientSncpAddress, groups, conf); + remoteService = createRemoteService(classLoader, name, serviceImplClass, remoteAgent, transportFactory, clientSncpAddress, groups, conf); } if (remoteService != null) field.set(rs, remoteService); } @@ -470,7 +473,7 @@ public abstract class Sncp { try { Field e = newClazz.getDeclaredField(FIELDPREFIX + "_client"); e.setAccessible(true); - client = new SncpClient(name, serviceImplClass, rs, transportFactory, false, newClazz, clientSncpAddress); + client = new SncpClient(name, serviceImplClass, rs, remoteAgent, transportFactory, false, newClazz, clientSncpAddress); e.set(rs, client); transportFactory.addSncpService(rs); } catch (NoSuchFieldException ne) { @@ -491,9 +494,9 @@ public abstract class Sncp { } } - public static T createSimpleRemoteService(final Class serviceImplClass, + public static T createSimpleRemoteService(final Class serviceImplClass, final MessageAgent agent, final TransportFactory transportFactory, final InetSocketAddress clientSncpAddress, final String... groups) { - return createRemoteService(null, "", serviceImplClass, transportFactory, clientSncpAddress, Utility.ofSet(groups), null); + return createRemoteService(null, "", serviceImplClass, agent, transportFactory, clientSncpAddress, Utility.ofSet(groups), null); } /** @@ -530,6 +533,7 @@ public abstract class Sncp { * @param classLoader ClassLoader * @param name 资源名 * @param serviceTypeOrImplClass Service类 + * @param agent MQ管理器 * @param transportFactory TransportFactory * @param clientAddress 本地IP地址 * @param groups0 所有的组节点,包含自身 @@ -543,6 +547,7 @@ public abstract class Sncp { final ClassLoader classLoader, final String name, final Class serviceTypeOrImplClass, + final MessageAgent agent, final TransportFactory transportFactory, final InetSocketAddress clientAddress, final Set groups0, @@ -565,7 +570,7 @@ public abstract class Sncp { try { Class newClazz = loader.loadClass(newDynName.replace('/', '.')); T rs = (T) newClazz.getDeclaredConstructor().newInstance(); - SncpClient client = new SncpClient(name, serviceTypeOrImplClass, rs, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress); + SncpClient client = new SncpClient(name, serviceTypeOrImplClass, rs, agent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress); client.setRemoteGroups(groups); client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups)); Field c = newClazz.getDeclaredField(FIELDPREFIX + "_client"); @@ -746,7 +751,7 @@ public abstract class Sncp { }.loadClass(newDynName.replace('/', '.'), bytes); try { T rs = (T) newClazz.getDeclaredConstructor().newInstance(); - SncpClient client = new SncpClient(name, serviceTypeOrImplClass, rs, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress); + SncpClient client = new SncpClient(name, serviceTypeOrImplClass, rs, agent, transportFactory, true, realed ? createLocalServiceClass(loader, name, serviceTypeOrImplClass) : serviceTypeOrImplClass, clientAddress); client.setRemoteGroups(groups); client.setRemoteGroupTransport(transportFactory.loadTransport(clientAddress, groups)); { diff --git a/src/org/redkale/net/sncp/SncpClient.java b/src/org/redkale/net/sncp/SncpClient.java index 1d2348b1c..3ba3b0e1c 100644 --- a/src/org/redkale/net/sncp/SncpClient.java +++ b/src/org/redkale/net/sncp/SncpClient.java @@ -17,6 +17,7 @@ import java.util.logging.*; import javax.annotation.Resource; import org.redkale.convert.bson.*; import org.redkale.convert.json.*; +import org.redkale.mq.MessageAgent; import org.redkale.net.*; import static org.redkale.net.sncp.SncpRequest.*; import org.redkale.service.*; @@ -56,6 +57,8 @@ public final class SncpClient { protected final ExecutorService executor; + protected final MessageAgent agent; + protected final Supplier bufferSupplier; @Resource @@ -70,11 +73,12 @@ public final class SncpClient { //远程模式, 可能为null protected Transport remoteGroupTransport; - public SncpClient(final String serviceName, final Class serviceTypeOrImplClass, final T service, final TransportFactory factory, + public SncpClient(final String serviceName, final Class serviceTypeOrImplClass, final T service, MessageAgent agent, final TransportFactory factory, final boolean remote, final Class serviceClass, final InetSocketAddress clientSncpAddress) { this.remote = remote; this.executor = factory.getExecutor(); this.bufferSupplier = factory.getBufferSupplier(); + this.agent = agent; Class tn = serviceTypeOrImplClass; Version ver = tn.getAnnotation(Version.class); this.serviceClass = serviceClass; @@ -104,6 +108,10 @@ public final class SncpClient { return actions; } + public MessageAgent getMessageAgent() { + return agent; + } + public InetSocketAddress getClientAddress() { return clientSncpAddress; } diff --git a/test/org/redkale/test/service/ABMainService.java b/test/org/redkale/test/service/ABMainService.java index be7c3911a..b50ce40d1 100644 --- a/test/org/redkale/test/service/ABMainService.java +++ b/test/org/redkale/test/service/ABMainService.java @@ -46,7 +46,7 @@ public class ABMainService implements Service { resFactory.register(BsonConvert.root()); //------------------------ 初始化 CService ------------------------------------ - CService cservice = Sncp.createSimpleLocalService(CService.class, transFactory, new InetSocketAddress("127.0.0.1", 5577), "g77"); + CService cservice = Sncp.createSimpleLocalService(CService.class, null, transFactory, new InetSocketAddress("127.0.0.1", 5577), "g77"); SncpServer cserver = new SncpServer(); cserver.getLogger().setLevel(Level.WARNING); cserver.addSncpServlet(cservice); @@ -54,8 +54,8 @@ public class ABMainService implements Service { cserver.start(); //------------------------ 初始化 BCService ------------------------------------ - BCService bcservice = Sncp.createSimpleLocalService(BCService.class, transFactory, new InetSocketAddress("127.0.0.1", 5588), "g88"); - CService remoteCService = Sncp.createSimpleRemoteService(CService.class, transFactory, new InetSocketAddress("127.0.0.1", 5588), "g77"); + BCService bcservice = Sncp.createSimpleLocalService(BCService.class, null, transFactory, new InetSocketAddress("127.0.0.1", 5588), "g88"); + CService remoteCService = Sncp.createSimpleRemoteService(CService.class, null, transFactory, new InetSocketAddress("127.0.0.1", 5588), "g77"); resFactory.inject(remoteCService); resFactory.register("", remoteCService); SncpServer bcserver = new SncpServer(); @@ -65,8 +65,8 @@ public class ABMainService implements Service { bcserver.start(); //------------------------ 初始化 ABMainService ------------------------------------ - ABMainService service = Sncp.createSimpleLocalService(ABMainService.class, transFactory, new InetSocketAddress("127.0.0.1", 5599), "g99"); - BCService remoteBCService = Sncp.createSimpleRemoteService(BCService.class, transFactory, new InetSocketAddress("127.0.0.1", 5599), "g88"); + ABMainService service = Sncp.createSimpleLocalService(ABMainService.class, null, transFactory, new InetSocketAddress("127.0.0.1", 5599), "g99"); + BCService remoteBCService = Sncp.createSimpleRemoteService(BCService.class, null, transFactory, new InetSocketAddress("127.0.0.1", 5599), "g88"); resFactory.inject(remoteBCService); resFactory.register("", remoteBCService); diff --git a/test/org/redkale/test/sncp/SncpTest.java b/test/org/redkale/test/sncp/SncpTest.java index fcdaca4ab..a14498364 100644 --- a/test/org/redkale/test/sncp/SncpTest.java +++ b/test/org/redkale/test/sncp/SncpTest.java @@ -81,7 +81,7 @@ public class SncpTest { if (port2 > 0) set.add(new InetSocketAddress(myhost, port2)); final TransportFactory transFactory = TransportFactory.create(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup()); transFactory.addGroupInfo("client", set); - final SncpTestIService service = Sncp.createSimpleRemoteService(SncpTestIService.class, transFactory, addr, "client"); + final SncpTestIService service = Sncp.createSimpleRemoteService(SncpTestIService.class, null, transFactory, addr, "client"); ResourceFactory.root().inject(service); // SncpTestBean bean = new SncpTestBean(); @@ -94,7 +94,7 @@ public class SncpTest { SncpTestBean callbean = new SncpTestBean(); callbean.setId(1); callbean.setContent("数据X"); - service.queryLongResult("f", 3,33L); + service.queryLongResult("f", 3, 33L); service.insert(callbean); System.out.println("bean.id应该会被修改(id不会是1): " + callbean); @@ -159,7 +159,7 @@ public class SncpTest { if (port2 > 0) set.add(new InetSocketAddress(myhost, port2)); final TransportFactory transFactory = TransportFactory.create(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup()); transFactory.addGroupInfo("server", set); - SncpTestIService service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, transFactory, addr, "server"); + SncpTestIService service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, null, transFactory, addr, "server"); ResourceFactory.root().inject(service); server.addSncpServlet(service); System.out.println(service); @@ -194,7 +194,7 @@ public class SncpTest { final TransportFactory transFactory = TransportFactory.create(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup()); transFactory.addGroupInfo("server", set); - Service service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, transFactory, addr, "server"); + Service service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, null, transFactory, addr, "server"); server.addSncpServlet(service); AnyValue.DefaultAnyValue conf = new AnyValue.DefaultAnyValue(); conf.addValue("host", "0.0.0.0"); diff --git a/test/org/redkale/test/sncp/SncpTestServiceImpl.java b/test/org/redkale/test/sncp/SncpTestServiceImpl.java index 15432803b..3590a7fa2 100644 --- a/test/org/redkale/test/sncp/SncpTestServiceImpl.java +++ b/test/org/redkale/test/sncp/SncpTestServiceImpl.java @@ -112,7 +112,7 @@ public class SncpTestServiceImpl implements SncpTestIService { final TransportFactory transFactory = TransportFactory.create(Executors.newSingleThreadExecutor(), newBufferPool(), newChannelGroup()); transFactory.addGroupInfo("g70", new InetSocketAddress("127.0.0.1", 7070)); - Service service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, transFactory, new InetSocketAddress("127.0.0.1", 7070), "g70"); + Service service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, null, transFactory, new InetSocketAddress("127.0.0.1", 7070), "g70"); for (Method method : service.getClass().getDeclaredMethods()) { System.out.println(method); } @@ -121,7 +121,7 @@ public class SncpTestServiceImpl implements SncpTestIService { System.out.println(method); } System.out.println("-----------------------------------"); - service = Sncp.createSimpleRemoteService(SncpTestServiceImpl.class, transFactory, new InetSocketAddress("127.0.0.1", 7070), "g70"); + service = Sncp.createSimpleRemoteService(SncpTestServiceImpl.class, null, transFactory, new InetSocketAddress("127.0.0.1", 7070), "g70"); for (Method method : service.getClass().getDeclaredMethods()) { System.out.println(method); } @@ -130,7 +130,7 @@ public class SncpTestServiceImpl implements SncpTestIService { System.out.println(method); } System.out.println("-----------------------------------"); - service = Sncp.createSimpleRemoteService(SncpTestIService.class, transFactory, new InetSocketAddress("127.0.0.1", 7070), "g70"); + service = Sncp.createSimpleRemoteService(SncpTestIService.class, null, transFactory, new InetSocketAddress("127.0.0.1", 7070), "g70"); for (Method method : service.getClass().getDeclaredMethods()) { System.out.println(method); }