diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index 620bf7432..cf7c15b06 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -83,7 +83,7 @@ public abstract class WebSocket { private Map attributes = new HashMap<>(); //非线程安全 - protected final long websocketid = System.nanoTime(); //唯一ID + protected final long websocketid = Math.abs(System.nanoTime()); //唯一ID protected WebSocket() { } @@ -96,11 +96,11 @@ public abstract class WebSocket { * * @return 0表示成功, 非0表示错误码 */ - public final int send(WebSocketPacket packet) { - int rs = RETCODE_WSOCKET_CLOSED; + public final CompletableFuture send(WebSocketPacket packet) { + CompletableFuture rs = null; if (this._runner != null) rs = this._runner.sendMessage(packet); if (_engine.finest) _engine.logger.finest("wsgroupid:" + getGroupid() + " send websocket result is " + rs + " on " + this + " by message(" + packet + ")"); - return rs; + return rs == null ? CompletableFuture.completedFuture(RETCODE_WSOCKET_CLOSED) : rs; } /** @@ -110,7 +110,7 @@ public abstract class WebSocket { * * @return 0表示成功, 非0表示错误码 */ - public final int send(String text) { + public final CompletableFuture send(String text) { return send(text, true); } @@ -122,20 +122,20 @@ public abstract class WebSocket { * * @return 0表示成功, 非0表示错误码 */ - public final int send(String text, boolean last) { + public final CompletableFuture send(String text, boolean last) { return send(new WebSocketPacket(text, last)); } - public final int sendPing() { + public final CompletableFuture sendPing() { //if (_engine.finest) _engine.logger.finest(this + " on "+_engine.getEngineid()+" ping..."); return send(WebSocketPacket.DEFAULT_PING_PACKET); } - public final int sendPing(byte[] data) { + public final CompletableFuture sendPing(byte[] data) { return send(new WebSocketPacket(FrameType.PING, data)); } - public final int sendPong(byte[] data) { + public final CompletableFuture sendPong(byte[] data) { return send(new WebSocketPacket(FrameType.PONG, data)); } @@ -150,7 +150,7 @@ public abstract class WebSocket { * * @return 0表示成功, 非0表示错误码 */ - public final int send(byte[] data) { + public final CompletableFuture send(byte[] data) { return send(data, true); } @@ -162,7 +162,7 @@ public abstract class WebSocket { * * @return 0表示成功, 非0表示错误码 */ - public final int send(byte[] data, boolean last) { + public final CompletableFuture send(byte[] data, boolean last) { return send(new WebSocketPacket(data, last)); } @@ -173,7 +173,7 @@ public abstract class WebSocket { * * @return 0表示成功, 非0表示错误码 */ - public final int send(Object message) { + public final CompletableFuture send(Object message) { return send(message, true); } @@ -185,7 +185,7 @@ public abstract class WebSocket { * * @return 0表示成功, 非0表示错误码 */ - public final int send(Object message, boolean last) { + public final CompletableFuture send(Object message, boolean last) { if (message == null || message instanceof CharSequence || message instanceof byte[]) { return send(new WebSocketPacket((Serializable) message, last)); } else { @@ -527,6 +527,6 @@ public abstract class WebSocket { @Override public String toString() { - return "ws" + Objects.hashCode(this) + "@" + _remoteAddr; + return this.websocketid + "@" + _remoteAddr; } } diff --git a/src/org/redkale/net/http/WebSocketGroup.java b/src/org/redkale/net/http/WebSocketGroup.java index 8cb110076..23889dc51 100644 --- a/src/org/redkale/net/http/WebSocketGroup.java +++ b/src/org/redkale/net/http/WebSocketGroup.java @@ -7,7 +7,7 @@ package org.redkale.net.http; import java.io.*; import java.util.*; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.*; import java.util.stream.Stream; /** @@ -83,7 +83,7 @@ public final class WebSocketGroup { attributes.put(name, value); } - public final int send(boolean recent, Object message, boolean last) { + public final CompletableFuture send(boolean recent, Object message, boolean last) { if (recent) { return recentWebSocket.send(message, last); } else { @@ -91,46 +91,42 @@ public final class WebSocketGroup { } } - public final int sendEach(Object message) { + public final CompletableFuture sendEach(Object message) { return sendEach(message, true); } - public final int sendEach(WebSocketPacket packet) { - int rs = 0; + public final CompletableFuture sendEach(WebSocketPacket packet) { + CompletableFuture future = null; for (WebSocket s : list) { - rs |= s.send(packet); + if (future == null) { + future = s.send(packet); + } else { + future.thenCombine(s.send(packet), (a, b) -> a | b); + } } - return rs; + return future == null ? CompletableFuture.completedFuture(0) : future; } - public final int sendEachPing() { - int rs = 0; - for (WebSocket s : list) { - rs |= s.sendPing(); - } - return rs; + public final CompletableFuture sendEachPing() { + return sendEach(WebSocketPacket.DEFAULT_PING_PACKET); } - public final int sendRecent(Object message) { + public final CompletableFuture sendRecent(Object message) { return sendRecent(message, true); } - public final int sendRecent(WebSocketPacket packet) { + public final CompletableFuture sendRecent(WebSocketPacket packet) { return recentWebSocket.send(packet); } - public final int sendEach(Object message, boolean last) { + public final CompletableFuture sendEach(Object message, boolean last) { if (message != null && !(message instanceof byte[]) && !(message instanceof CharSequence)) { message = recentWebSocket._jsonConvert.convertTo(message); } - int rs = 0; - for (WebSocket s : list) { - rs |= s.send(message, last); - } - return rs; + return sendEach(new WebSocketPacket((Serializable) message, last)); } - public final int sendRecent(Object message, boolean last) { + public final CompletableFuture sendRecent(Object message, boolean last) { return recentWebSocket.send(message, last); } diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 45d4161c0..c4fb1eb84 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -158,7 +158,7 @@ public abstract class WebSocketNode { if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to locale engine"); int rscode = RETCODE_GROUP_EMPTY; WebSocketGroup group = this.localEngine == null ? null : this.localEngine.getWebSocketGroup(groupid); - if (group != null) rscode = group.send(recent, message, last); + if (group != null) rscode = group.send(recent, message, last).join(); //临时, 要改 if (recent && rscode == 0) { //已经给最近连接发送的消息 if (finest) logger.finest("websocket want send recent message success"); return rscode; diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index e32cdf32e..a9ecfbd96 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -48,6 +48,8 @@ public class WebSocketRunner implements Runnable { private final BlockingQueue queue = new ArrayBlockingQueue(1024); private final boolean wsbinary; + + private long lastSendTime; public WebSocketRunner(Context context, WebSocket webSocket, AsyncConnection channel, final boolean wsbinary) { this.context = context; @@ -158,19 +160,20 @@ public class WebSocketRunner implements Runnable { } } - public int sendMessage(WebSocketPacket packet) { - if (packet == null) return RETCODE_SEND_ILLPACKET; - if (closed) return RETCODE_WSOCKET_CLOSED; + public CompletableFuture sendMessage(WebSocketPacket packet) { + if (packet == null) return CompletableFuture.completedFuture(RETCODE_SEND_ILLPACKET); + if (closed) return CompletableFuture.completedFuture(RETCODE_WSOCKET_CLOSED); final boolean debug = this.coder.debugable; //System.out.println("推送消息"); final byte[] bytes = coder.encode(packet); if (debug) context.getLogger().log(Level.FINEST, "send web socket message's length = " + bytes.length); + this.lastSendTime = System.currentTimeMillis(); if (writing.getAndSet(true)) { queue.add(bytes); - return 0; + return CompletableFuture.completedFuture(0); } - if (writeBuffer == null) return RETCODE_ILLEGALBUFFER; - ByteBuffer sendBuffer = null; + if (writeBuffer == null) return CompletableFuture.completedFuture(RETCODE_ILLEGALBUFFER); + ByteBuffer sendBuffer; if (bytes.length <= writeBuffer.capacity()) { writeBuffer.clear(); writeBuffer.put(bytes); @@ -222,12 +225,12 @@ public class WebSocketRunner implements Runnable { } } }); - return 0; + return CompletableFuture.completedFuture(0); } catch (Exception t) { writing.set(false); closeRunner(); context.getLogger().log(Level.FINE, "WebSocket sendMessage abort, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", t); - return RETCODE_SENDEXCEPTION; + return CompletableFuture.completedFuture(RETCODE_SENDEXCEPTION); } } diff --git a/src/org/redkale/net/http/WebSocketServlet.java b/src/org/redkale/net/http/WebSocketServlet.java index 08ee020e5..0cb721248 100644 --- a/src/org/redkale/net/http/WebSocketServlet.java +++ b/src/org/redkale/net/http/WebSocketServlet.java @@ -70,7 +70,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl if (logger.isLoggable(Level.WARNING)) logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName()); } //存在WebSocketServlet,则此WebSocketNode必须是本地模式Service - this.node.localEngine = new WebSocketEngine(addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.node, logger); + this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.node, logger); this.node.init(conf); this.node.localEngine.init(conf); } @@ -84,7 +84,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl @Override public String resourceName() { - return this.getClass().getSimpleName().replace("Servlet", "").replace("WebSocket", "").replace("_Dyn", "").toLowerCase(); + return this.getClass().getSimpleName().replace("_Dyn", "").toLowerCase().replaceAll("websocket.*$", "").replaceAll("servlet.*$", ""); } @Override diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index c98cf6c06..7e758a9a4 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -48,17 +48,13 @@ public class WebSocketNodeService extends WebSocketNode implements Service { @Override public CompletableFuture sendMessage(@RpcTargetAddress InetSocketAddress addr, Serializable groupid, boolean recent, Object message, boolean last) { - return CompletableFuture.supplyAsync(() -> { - if (this.localEngine == null) return RETCODE_GROUP_EMPTY; - final WebSocketGroup group = this.localEngine.getWebSocketGroup(groupid); - if (group == null || group.isEmpty()) { - if (finest) logger.finest("receive websocket message {engineid:'" + this.localEngine.getEngineid() + "', groupid:" + groupid + ", content:'" + message + "'} from " + addr + " but send result is " + RETCODE_GROUP_EMPTY); - return RETCODE_GROUP_EMPTY; - } - int code = group.send(recent, message, last); - if (finest) logger.finest("websocket node send message (" + message + ") from " + addr + " result is " + code); - return code; - }); + if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); + final WebSocketGroup group = this.localEngine.getWebSocketGroup(groupid); + if (group == null || group.isEmpty()) { + if (finest) logger.finest("receive websocket message {engineid:'" + this.localEngine.getEngineid() + "', groupid:" + groupid + ", content:'" + message + "'} from " + addr + " but send result is " + RETCODE_GROUP_EMPTY); + return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY); + } + return group.send(recent, message, last); } /**