diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index 2bb459d03..5f72f4e88 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -71,7 +71,7 @@ public abstract class WebSocket { WebSocketGroup _group; //不可能为空 - Serializable _sessionid; //不可能为空 + String _sessionid; //不可能为空 G _groupid; //不可能为空 @@ -83,11 +83,11 @@ public abstract class WebSocket { java.lang.reflect.Type _messageTextType; //不可能为空 - private final long createtime = System.currentTimeMillis(); + private long createtime = System.currentTimeMillis(); private Map attributes = new HashMap<>(); //非线程安全 - protected final long websocketid = Math.abs(System.nanoTime()); //唯一ID + protected long websocketid = Math.abs(System.nanoTime()); //唯一ID protected WebSocket() { } @@ -308,7 +308,7 @@ public abstract class WebSocket { * * @return sessionid */ - public final Serializable getSessionid() { + public final String getSessionid() { return _sessionid; } @@ -368,7 +368,7 @@ public abstract class WebSocket { * * @return sessionid */ - protected CompletableFuture onOpen(final HttpRequest request) { + protected CompletableFuture onOpen(final HttpRequest request) { return CompletableFuture.completedFuture(request.getSessionid(true)); } diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index 1a9da59ff..4ac7e5e51 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -14,6 +14,7 @@ import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; import java.util.logging.*; import org.redkale.convert.json.JsonConvert; @@ -25,7 +26,7 @@ import org.redkale.convert.json.JsonConvert; * * @author zhangjx */ -public class WebSocketRunner implements Runnable { +class WebSocketRunner implements Runnable { private final WebSocketEngine engine; @@ -47,14 +48,17 @@ public class WebSocketRunner implements Runnable { private final boolean wsbinary; + private final BiConsumer restMessageConsumer; //主要供RestWebSocket使用 + protected long lastSendTime; protected final JsonConvert convert; - public WebSocketRunner(Context context, WebSocket webSocket, AsyncConnection channel, final boolean wsbinary) { + WebSocketRunner(Context context, WebSocket webSocket, BiConsumer messageConsumer, AsyncConnection channel, final boolean wsbinary) { this.context = context; this.engine = webSocket._engine; this.webSocket = webSocket; + this.restMessageConsumer = messageConsumer; this.channel = channel; this.wsbinary = wsbinary; this.readBuffer = context.pollBuffer(); @@ -123,7 +127,11 @@ public class WebSocketRunner implements Runnable { channel.read(readBuffer, null, this); } try { - webSocket.onMessage(message, packet.last); + if (restMessageConsumer != null) { //主要供RestWebSocket使用 + restMessageConsumer.accept(webSocket, message); + } else { + webSocket.onMessage(message, packet.last); + } } catch (Exception e) { context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e); } diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index 2f4db4035..8430f2e5b 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -12,6 +12,7 @@ import java.nio.*; import java.security.*; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; import java.util.logging.*; import javax.annotation.*; import org.redkale.convert.json.JsonConvert; @@ -57,14 +58,16 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl @Comment("是否用于二进制流传输") protected final boolean wsbinary = getClass().getAnnotation(WebSocketBinary.class) != null; + private final BiConsumer restMessageConsumer = createRestOnMessageConsumer(); + + Type messageTextType; //RestWebSocket时会被修改 + @Resource protected JsonConvert jsonConvert; @Resource(name = "$") protected WebSocketNode node; - protected final Type messageTextType; - protected WebSocketServlet() { Type msgtype = String.class; try { @@ -73,7 +76,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl if (method.getParameterCount() > 0) continue; Type rt = method.getGenericReturnType(); if (rt instanceof ParameterizedType) { - msgtype = ((ParameterizedType) rt).getActualTypeArguments()[0]; + msgtype = ((ParameterizedType) rt).getActualTypeArguments()[1]; } if (msgtype == Object.class) msgtype = String.class; break; @@ -130,8 +133,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl webSocket._jsonConvert = jsonConvert; webSocket._remoteAddress = request.getRemoteAddress(); webSocket._remoteAddr = request.getRemoteAddr(); - initWebSocket(webSocket); - CompletableFuture sessionFuture = webSocket.onOpen(request); + initRestWebSocket(webSocket); + CompletableFuture sessionFuture = webSocket.onOpen(request); if (sessionFuture == null) { if (debug) logger.finest("WebSocket connect abort, Not found sessionid. request=" + request); response.finish(true); @@ -172,7 +175,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl } webSocket._groupid = groupid; WebSocketServlet.this.node.localEngine.add(webSocket); - WebSocketRunner runner = new WebSocketRunner(context, webSocket, response.removeChannel(), wsbinary); + WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel(), wsbinary); webSocket._runner = runner; context.runAsync(runner); response.finish(true); @@ -188,15 +191,18 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl }); } - protected void initWebSocket(WebSocket websocket) { - - } + protected abstract WebSocket createWebSocket(); protected WebSocketNode createWebSocketNode() { return null; } - protected abstract WebSocket createWebSocket(); + protected void initRestWebSocket(WebSocket websocket) { //RestWebSocket设置@Resource资源 + } + + protected BiConsumer createRestOnMessageConsumer() { + return null; + } private static MessageDigest getMessageDigest() { try {