This commit is contained in:
Redkale
2020-06-07 15:56:55 +08:00
parent adfa0be79e
commit ecf831c0f5
7 changed files with 47 additions and 7 deletions

View File

@@ -122,7 +122,15 @@ public class NodeHttpServer extends NodeServer {
resourceFactory.register(RESNAME_SNCP_ADDR, String.class, sncpResFactory.find(RESNAME_SNCP_ADDR, String.class));
}
if (nodeService == null) {
nodeService = Sncp.createLocalService(serverClassLoader, resourceName, WebSocketNodeService.class, null, application.getResourceFactory(), application.getSncpTransportFactory(), (InetSocketAddress) null, (Set<String>) null, (AnyValue) null);
MessageAgent messageAgent = null;
try {
Field c = src.getClass().getDeclaredField("messageAgent");
c.setAccessible(true);
messageAgent = (MessageAgent) c.get(src);
} catch (Exception ex) {
logger.log(Level.WARNING, "WebSocketServlet getMessageAgent error", ex);
}
nodeService = Sncp.createLocalService(serverClassLoader, resourceName, WebSocketNodeService.class, messageAgent, application.getResourceFactory(), application.getSncpTransportFactory(), (InetSocketAddress) null, (Set<String>) null, (AnyValue) null);
regFactory.register(resourceName, WebSocketNode.class, nodeService);
}
resourceFactory.inject(nodeService, self);
@@ -332,7 +340,7 @@ public class NodeHttpServer extends NodeServer {
return;
}
restedObjects.add(stype); //避免重复创建Rest对象
WebSocketServlet servlet = httpServer.addRestWebSocketServlet(serverClassLoader, stype, prefix, en.getProperty());
WebSocketServlet servlet = httpServer.addRestWebSocketServlet(serverClassLoader, stype, messageAgent, prefix, en.getProperty());
if (servlet == null) return; //没有RestOnMessage方法的HttpServlet调用Rest.createRestWebSocketServlet就会返回null
String prefix2 = prefix;
WebServlet ws = servlet.getClass().getAnnotation(WebServlet.class);

View File

@@ -398,7 +398,7 @@ public abstract class NodeServer {
if (nodeService == null) {
final HashSet<String> groups = new HashSet<>();
if (groups.isEmpty() && isSNCP() && NodeServer.this.sncpGroup != null) groups.add(NodeServer.this.sncpGroup);
nodeService = Sncp.createLocalService(serverClassLoader, resourceName, WebSocketNodeService.class, null, application.getResourceFactory(), application.getSncpTransportFactory(), NodeServer.this.sncpAddress, groups, (AnyValue) null);
nodeService = Sncp.createLocalService(serverClassLoader, resourceName, WebSocketNodeService.class, Sncp.getMessageAgent((Service) src), application.getResourceFactory(), application.getSncpTransportFactory(), NodeServer.this.sncpAddress, groups, (AnyValue) null);
(isSNCP() ? appResFactory : resourceFactory).register(resourceName, WebSocketNode.class, nodeService);
((WebSocketNodeService) nodeService).setName(resourceName);
}

View File

@@ -16,6 +16,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.logging.Level;
import org.redkale.mq.MessageAgent;
import org.redkale.net.*;
import org.redkale.net.http.HttpContext.HttpContextConfig;
import org.redkale.net.http.HttpResponse.HttpResponseConfig;
@@ -215,13 +216,14 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
* @param <T> HttpServlet
* @param classLoader ClassLoader
* @param webSocketType WebSocket的类型
* @param messageAgent MessageAgent
* @param prefix url前缀
* @param conf 配置信息
*
* @return RestServlet
*/
public <S extends WebSocket, T extends WebSocketServlet> T addRestWebSocketServlet(final ClassLoader classLoader, final Class<S> webSocketType, final String prefix, final AnyValue conf) {
T servlet = Rest.createRestWebSocketServlet(classLoader, webSocketType);
public <S extends WebSocket, T extends WebSocketServlet> T addRestWebSocketServlet(final ClassLoader classLoader, final Class<S> webSocketType, MessageAgent messageAgent, final String prefix, final AnyValue conf) {
T servlet = Rest.createRestWebSocketServlet(classLoader, webSocketType, messageAgent);
if (servlet != null) this.prepare.addServlet(servlet, prefix, conf);
return servlet;
}

View File

@@ -22,6 +22,7 @@ import static org.redkale.asm.Opcodes.*;
import org.redkale.asm.Type;
import org.redkale.convert.*;
import org.redkale.convert.json.*;
import org.redkale.mq.MessageAgent;
import org.redkale.net.Cryptor;
import org.redkale.net.sncp.Sncp;
import org.redkale.service.*;
@@ -218,7 +219,7 @@ public final class Rest {
return t == null ? defValue : t;
}
public static <T extends WebSocketServlet> T createRestWebSocketServlet(final ClassLoader classLoader, final Class<? extends WebSocket> webSocketType) {
public static <T extends WebSocketServlet> T createRestWebSocketServlet(final ClassLoader classLoader, final Class<? extends WebSocket> webSocketType, MessageAgent messageAgent) {
if (webSocketType == null) throw new RuntimeException("Rest WebSocket Class is null on createRestWebSocketServlet");
if (Modifier.isAbstract(webSocketType.getModifiers())) throw new RuntimeException("Rest WebSocket Class(" + webSocketType + ") cannot abstract on createRestWebSocketServlet");
if (Modifier.isFinal(webSocketType.getModifiers())) throw new RuntimeException("Rest WebSocket Class(" + webSocketType + ") cannot final on createRestWebSocketServlet");
@@ -726,6 +727,7 @@ public final class Rest {
cryptorField.setAccessible(true);
cryptorField.set(servlet, cryptor);
}
if (messageAgent != null) ((WebSocketServlet) servlet).messageAgent = messageAgent;
return servlet;
} catch (Exception e) {
throw new RuntimeException(e);

View File

@@ -16,6 +16,7 @@ import javax.annotation.*;
import org.redkale.boot.*;
import org.redkale.convert.*;
import org.redkale.convert.json.JsonConvert;
import org.redkale.mq.MessageAgent;
import org.redkale.service.*;
import org.redkale.source.*;
import org.redkale.util.*;
@@ -59,6 +60,8 @@ public abstract class WebSocketNode {
//当前节点的本地WebSocketEngine
protected WebSocketEngine localEngine;
protected MessageAgent messageAgent;
protected Semaphore semaphore;
private int tryAcquireSeconds = 12;
@@ -84,6 +87,11 @@ public abstract class WebSocketNode {
return semaphore;
}
@Local
public final MessageAgent getMessageAgent() {
return messageAgent;
}
@Local
protected void postDestroy(AnyValue conf) {
if (this.localEngine == null) return;

View File

@@ -18,6 +18,7 @@ import java.util.logging.*;
import java.util.zip.*;
import javax.annotation.*;
import org.redkale.convert.Convert;
import org.redkale.mq.MessageAgent;
import org.redkale.net.Cryptor;
import org.redkale.service.*;
import org.redkale.util.*;
@@ -100,6 +101,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
protected boolean permessageDeflate = false;
protected MessageAgent messageAgent;
@Resource(name = "jsonconvert")
protected Convert jsonConvert;
@@ -149,6 +152,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
if (logger.isLoggable(Level.WARNING)) logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName());
}
if (this.node.sendConvert == null) this.node.sendConvert = this.sendConvert;
if (this.messageAgent != null) this.node.messageAgent = this.messageAgent;
{
AnyValue props = conf;
if (conf != null && conf.getAnyValue("properties") != null) props = conf.getAnyValue("properties");

View File

@@ -20,6 +20,7 @@ import static org.redkale.asm.Opcodes.*;
import org.redkale.asm.Type;
import org.redkale.mq.MessageAgent;
import org.redkale.net.TransportFactory;
import org.redkale.net.http.WebSocketNode;
import org.redkale.net.sncp.SncpClient.SncpAction;
import org.redkale.service.*;
import org.redkale.util.*;
@@ -516,6 +517,11 @@ public abstract class Sncp {
Field c = newClazz.getDeclaredField(FIELDPREFIX + "_conf");
c.setAccessible(true);
c.set(service, conf);
if (service instanceof WebSocketNode) {
c = WebSocketNode.class.getDeclaredField("messageAgent");
c.setAccessible(true);
c.set(service, messageAgent);
}
}
return service;
} catch (RuntimeException rex) {
@@ -611,6 +617,11 @@ public abstract class Sncp {
Field m = newClazz.getDeclaredField(FIELDPREFIX + "_messageagent");
m.setAccessible(true);
m.set(service, messageAgent);
if (service instanceof WebSocketNode) {
c = WebSocketNode.class.getDeclaredField("messageAgent");
c.setAccessible(true);
c.set(service, messageAgent);
}
}
if (transportFactory != null) transportFactory.addSncpService(service);
return service;
@@ -799,10 +810,15 @@ public abstract class Sncp {
c.setAccessible(true);
c.set(service, client);
}
{
if (messageAgent != null) {
Field c = newClazz.getDeclaredField(FIELDPREFIX + "_messageagent");
c.setAccessible(true);
c.set(service, messageAgent);
if (service instanceof WebSocketNode) {
c = WebSocketNode.class.getDeclaredField("messageAgent");
c.setAccessible(true);
c.set(service, messageAgent);
}
}
{
Field c = newClazz.getDeclaredField(FIELDPREFIX + "_conf");