diff --git a/src/org/redkale/boot/NodeHttpServer.java b/src/org/redkale/boot/NodeHttpServer.java index 27090dad6..688dddc69 100644 --- a/src/org/redkale/boot/NodeHttpServer.java +++ b/src/org/redkale/boot/NodeHttpServer.java @@ -122,7 +122,15 @@ public class NodeHttpServer extends NodeServer { resourceFactory.register(RESNAME_SNCP_ADDR, String.class, sncpResFactory.find(RESNAME_SNCP_ADDR, String.class)); } if (nodeService == null) { - nodeService = Sncp.createLocalService(serverClassLoader, resourceName, WebSocketNodeService.class, null, application.getResourceFactory(), application.getSncpTransportFactory(), (InetSocketAddress) null, (Set) null, (AnyValue) null); + MessageAgent messageAgent = null; + try { + Field c = src.getClass().getDeclaredField("messageAgent"); + c.setAccessible(true); + messageAgent = (MessageAgent) c.get(src); + } catch (Exception ex) { + logger.log(Level.WARNING, "WebSocketServlet getMessageAgent error", ex); + } + nodeService = Sncp.createLocalService(serverClassLoader, resourceName, WebSocketNodeService.class, messageAgent, application.getResourceFactory(), application.getSncpTransportFactory(), (InetSocketAddress) null, (Set) null, (AnyValue) null); regFactory.register(resourceName, WebSocketNode.class, nodeService); } resourceFactory.inject(nodeService, self); @@ -332,7 +340,7 @@ public class NodeHttpServer extends NodeServer { return; } restedObjects.add(stype); //避免重复创建Rest对象 - WebSocketServlet servlet = httpServer.addRestWebSocketServlet(serverClassLoader, stype, prefix, en.getProperty()); + WebSocketServlet servlet = httpServer.addRestWebSocketServlet(serverClassLoader, stype, messageAgent, prefix, en.getProperty()); if (servlet == null) return; //没有RestOnMessage方法的HttpServlet调用Rest.createRestWebSocketServlet就会返回null String prefix2 = prefix; WebServlet ws = servlet.getClass().getAnnotation(WebServlet.class); diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index 8013ae125..87fbdd03f 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -398,7 +398,7 @@ public abstract class NodeServer { if (nodeService == null) { final HashSet groups = new HashSet<>(); if (groups.isEmpty() && isSNCP() && NodeServer.this.sncpGroup != null) groups.add(NodeServer.this.sncpGroup); - nodeService = Sncp.createLocalService(serverClassLoader, resourceName, WebSocketNodeService.class, null, application.getResourceFactory(), application.getSncpTransportFactory(), NodeServer.this.sncpAddress, groups, (AnyValue) null); + nodeService = Sncp.createLocalService(serverClassLoader, resourceName, WebSocketNodeService.class, Sncp.getMessageAgent((Service) src), application.getResourceFactory(), application.getSncpTransportFactory(), NodeServer.this.sncpAddress, groups, (AnyValue) null); (isSNCP() ? appResFactory : resourceFactory).register(resourceName, WebSocketNode.class, nodeService); ((WebSocketNodeService) nodeService).setName(resourceName); } diff --git a/src/org/redkale/net/http/HttpServer.java b/src/org/redkale/net/http/HttpServer.java index 90fa4efa4..511315890 100644 --- a/src/org/redkale/net/http/HttpServer.java +++ b/src/org/redkale/net/http/HttpServer.java @@ -16,6 +16,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.logging.Level; +import org.redkale.mq.MessageAgent; import org.redkale.net.*; import org.redkale.net.http.HttpContext.HttpContextConfig; import org.redkale.net.http.HttpResponse.HttpResponseConfig; @@ -215,13 +216,14 @@ public class HttpServer extends Server HttpServlet * @param classLoader ClassLoader * @param webSocketType WebSocket的类型 + * @param messageAgent MessageAgent * @param prefix url前缀 * @param conf 配置信息 * * @return RestServlet */ - public T addRestWebSocketServlet(final ClassLoader classLoader, final Class webSocketType, final String prefix, final AnyValue conf) { - T servlet = Rest.createRestWebSocketServlet(classLoader, webSocketType); + public T addRestWebSocketServlet(final ClassLoader classLoader, final Class webSocketType, MessageAgent messageAgent, final String prefix, final AnyValue conf) { + T servlet = Rest.createRestWebSocketServlet(classLoader, webSocketType, messageAgent); if (servlet != null) this.prepare.addServlet(servlet, prefix, conf); return servlet; } diff --git a/src/org/redkale/net/http/Rest.java b/src/org/redkale/net/http/Rest.java index dcb063fb0..a10b166e2 100644 --- a/src/org/redkale/net/http/Rest.java +++ b/src/org/redkale/net/http/Rest.java @@ -22,6 +22,7 @@ import static org.redkale.asm.Opcodes.*; import org.redkale.asm.Type; import org.redkale.convert.*; import org.redkale.convert.json.*; +import org.redkale.mq.MessageAgent; import org.redkale.net.Cryptor; import org.redkale.net.sncp.Sncp; import org.redkale.service.*; @@ -218,7 +219,7 @@ public final class Rest { return t == null ? defValue : t; } - public static T createRestWebSocketServlet(final ClassLoader classLoader, final Class webSocketType) { + public static T createRestWebSocketServlet(final ClassLoader classLoader, final Class webSocketType, MessageAgent messageAgent) { if (webSocketType == null) throw new RuntimeException("Rest WebSocket Class is null on createRestWebSocketServlet"); if (Modifier.isAbstract(webSocketType.getModifiers())) throw new RuntimeException("Rest WebSocket Class(" + webSocketType + ") cannot abstract on createRestWebSocketServlet"); if (Modifier.isFinal(webSocketType.getModifiers())) throw new RuntimeException("Rest WebSocket Class(" + webSocketType + ") cannot final on createRestWebSocketServlet"); @@ -726,6 +727,7 @@ public final class Rest { cryptorField.setAccessible(true); cryptorField.set(servlet, cryptor); } + if (messageAgent != null) ((WebSocketServlet) servlet).messageAgent = messageAgent; return servlet; } catch (Exception e) { throw new RuntimeException(e); diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 7d5ee607d..e1848c1c1 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -16,6 +16,7 @@ import javax.annotation.*; import org.redkale.boot.*; import org.redkale.convert.*; import org.redkale.convert.json.JsonConvert; +import org.redkale.mq.MessageAgent; import org.redkale.service.*; import org.redkale.source.*; import org.redkale.util.*; @@ -59,6 +60,8 @@ public abstract class WebSocketNode { //当前节点的本地WebSocketEngine protected WebSocketEngine localEngine; + protected MessageAgent messageAgent; + protected Semaphore semaphore; private int tryAcquireSeconds = 12; @@ -84,6 +87,11 @@ public abstract class WebSocketNode { return semaphore; } + @Local + public final MessageAgent getMessageAgent() { + return messageAgent; + } + @Local protected void postDestroy(AnyValue conf) { if (this.localEngine == null) return; diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index 2d68d989f..9e7f987cc 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -18,6 +18,7 @@ import java.util.logging.*; import java.util.zip.*; import javax.annotation.*; import org.redkale.convert.Convert; +import org.redkale.mq.MessageAgent; import org.redkale.net.Cryptor; import org.redkale.service.*; import org.redkale.util.*; @@ -100,6 +101,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl protected boolean permessageDeflate = false; + protected MessageAgent messageAgent; + @Resource(name = "jsonconvert") protected Convert jsonConvert; @@ -149,6 +152,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl if (logger.isLoggable(Level.WARNING)) logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName()); } if (this.node.sendConvert == null) this.node.sendConvert = this.sendConvert; + if (this.messageAgent != null) this.node.messageAgent = this.messageAgent; { AnyValue props = conf; if (conf != null && conf.getAnyValue("properties") != null) props = conf.getAnyValue("properties"); diff --git a/src/org/redkale/net/sncp/Sncp.java b/src/org/redkale/net/sncp/Sncp.java index bad22d6e8..c9ddaba9b 100644 --- a/src/org/redkale/net/sncp/Sncp.java +++ b/src/org/redkale/net/sncp/Sncp.java @@ -20,6 +20,7 @@ import static org.redkale.asm.Opcodes.*; import org.redkale.asm.Type; import org.redkale.mq.MessageAgent; import org.redkale.net.TransportFactory; +import org.redkale.net.http.WebSocketNode; import org.redkale.net.sncp.SncpClient.SncpAction; import org.redkale.service.*; import org.redkale.util.*; @@ -516,6 +517,11 @@ public abstract class Sncp { Field c = newClazz.getDeclaredField(FIELDPREFIX + "_conf"); c.setAccessible(true); c.set(service, conf); + if (service instanceof WebSocketNode) { + c = WebSocketNode.class.getDeclaredField("messageAgent"); + c.setAccessible(true); + c.set(service, messageAgent); + } } return service; } catch (RuntimeException rex) { @@ -611,6 +617,11 @@ public abstract class Sncp { Field m = newClazz.getDeclaredField(FIELDPREFIX + "_messageagent"); m.setAccessible(true); m.set(service, messageAgent); + if (service instanceof WebSocketNode) { + c = WebSocketNode.class.getDeclaredField("messageAgent"); + c.setAccessible(true); + c.set(service, messageAgent); + } } if (transportFactory != null) transportFactory.addSncpService(service); return service; @@ -799,10 +810,15 @@ public abstract class Sncp { c.setAccessible(true); c.set(service, client); } - { + if (messageAgent != null) { Field c = newClazz.getDeclaredField(FIELDPREFIX + "_messageagent"); c.setAccessible(true); c.set(service, messageAgent); + if (service instanceof WebSocketNode) { + c = WebSocketNode.class.getDeclaredField("messageAgent"); + c.setAccessible(true); + c.set(service, messageAgent); + } } { Field c = newClazz.getDeclaredField(FIELDPREFIX + "_conf");