This commit is contained in:
Redkale
2017-05-20 13:01:43 +08:00
parent cee2c47d9a
commit e939241a8c
6 changed files with 53 additions and 58 deletions

View File

@@ -83,7 +83,7 @@ public abstract class WebSocket {
private Map<String, Object> attributes = new HashMap<>(); //非线程安全
protected final long websocketid = System.nanoTime(); //唯一ID
protected final long websocketid = Math.abs(System.nanoTime()); //唯一ID
protected WebSocket() {
}
@@ -96,11 +96,11 @@ public abstract class WebSocket {
*
* @return 0表示成功 非0表示错误码
*/
public final int send(WebSocketPacket packet) {
int rs = RETCODE_WSOCKET_CLOSED;
public final CompletableFuture<Integer> send(WebSocketPacket packet) {
CompletableFuture<Integer> rs = null;
if (this._runner != null) rs = this._runner.sendMessage(packet);
if (_engine.finest) _engine.logger.finest("wsgroupid:" + getGroupid() + " send websocket result is " + rs + " on " + this + " by message(" + packet + ")");
return rs;
return rs == null ? CompletableFuture.completedFuture(RETCODE_WSOCKET_CLOSED) : rs;
}
/**
@@ -110,7 +110,7 @@ public abstract class WebSocket {
*
* @return 0表示成功 非0表示错误码
*/
public final int send(String text) {
public final CompletableFuture<Integer> send(String text) {
return send(text, true);
}
@@ -122,20 +122,20 @@ public abstract class WebSocket {
*
* @return 0表示成功 非0表示错误码
*/
public final int send(String text, boolean last) {
public final CompletableFuture<Integer> send(String text, boolean last) {
return send(new WebSocketPacket(text, last));
}
public final int sendPing() {
public final CompletableFuture<Integer> sendPing() {
//if (_engine.finest) _engine.logger.finest(this + " on "+_engine.getEngineid()+" ping...");
return send(WebSocketPacket.DEFAULT_PING_PACKET);
}
public final int sendPing(byte[] data) {
public final CompletableFuture<Integer> sendPing(byte[] data) {
return send(new WebSocketPacket(FrameType.PING, data));
}
public final int sendPong(byte[] data) {
public final CompletableFuture<Integer> sendPong(byte[] data) {
return send(new WebSocketPacket(FrameType.PONG, data));
}
@@ -150,7 +150,7 @@ public abstract class WebSocket {
*
* @return 0表示成功 非0表示错误码
*/
public final int send(byte[] data) {
public final CompletableFuture<Integer> send(byte[] data) {
return send(data, true);
}
@@ -162,7 +162,7 @@ public abstract class WebSocket {
*
* @return 0表示成功 非0表示错误码
*/
public final int send(byte[] data, boolean last) {
public final CompletableFuture<Integer> send(byte[] data, boolean last) {
return send(new WebSocketPacket(data, last));
}
@@ -173,7 +173,7 @@ public abstract class WebSocket {
*
* @return 0表示成功 非0表示错误码
*/
public final int send(Object message) {
public final CompletableFuture<Integer> send(Object message) {
return send(message, true);
}
@@ -185,7 +185,7 @@ public abstract class WebSocket {
*
* @return 0表示成功 非0表示错误码
*/
public final int send(Object message, boolean last) {
public final CompletableFuture<Integer> send(Object message, boolean last) {
if (message == null || message instanceof CharSequence || message instanceof byte[]) {
return send(new WebSocketPacket((Serializable) message, last));
} else {
@@ -527,6 +527,6 @@ public abstract class WebSocket {
@Override
public String toString() {
return "ws" + Objects.hashCode(this) + "@" + _remoteAddr;
return this.websocketid + "@" + _remoteAddr;
}
}

View File

@@ -7,7 +7,7 @@ package org.redkale.net.http;
import java.io.*;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.*;
import java.util.stream.Stream;
/**
@@ -83,7 +83,7 @@ public final class WebSocketGroup {
attributes.put(name, value);
}
public final int send(boolean recent, Object message, boolean last) {
public final CompletableFuture<Integer> send(boolean recent, Object message, boolean last) {
if (recent) {
return recentWebSocket.send(message, last);
} else {
@@ -91,46 +91,42 @@ public final class WebSocketGroup {
}
}
public final int sendEach(Object message) {
public final CompletableFuture<Integer> sendEach(Object message) {
return sendEach(message, true);
}
public final int sendEach(WebSocketPacket packet) {
int rs = 0;
public final CompletableFuture<Integer> sendEach(WebSocketPacket packet) {
CompletableFuture<Integer> future = null;
for (WebSocket s : list) {
rs |= s.send(packet);
if (future == null) {
future = s.send(packet);
} else {
future.thenCombine(s.send(packet), (a, b) -> a | b);
}
}
return rs;
return future == null ? CompletableFuture.completedFuture(0) : future;
}
public final int sendEachPing() {
int rs = 0;
for (WebSocket s : list) {
rs |= s.sendPing();
}
return rs;
public final CompletableFuture<Integer> sendEachPing() {
return sendEach(WebSocketPacket.DEFAULT_PING_PACKET);
}
public final int sendRecent(Object message) {
public final CompletableFuture<Integer> sendRecent(Object message) {
return sendRecent(message, true);
}
public final int sendRecent(WebSocketPacket packet) {
public final CompletableFuture<Integer> sendRecent(WebSocketPacket packet) {
return recentWebSocket.send(packet);
}
public final int sendEach(Object message, boolean last) {
public final CompletableFuture<Integer> sendEach(Object message, boolean last) {
if (message != null && !(message instanceof byte[]) && !(message instanceof CharSequence)) {
message = recentWebSocket._jsonConvert.convertTo(message);
}
int rs = 0;
for (WebSocket s : list) {
rs |= s.send(message, last);
}
return rs;
return sendEach(new WebSocketPacket((Serializable) message, last));
}
public final int sendRecent(Object message, boolean last) {
public final CompletableFuture<Integer> sendRecent(Object message, boolean last) {
return recentWebSocket.send(message, last);
}

View File

@@ -158,7 +158,7 @@ public abstract class WebSocketNode {
if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to locale engine");
int rscode = RETCODE_GROUP_EMPTY;
WebSocketGroup group = this.localEngine == null ? null : this.localEngine.getWebSocketGroup(groupid);
if (group != null) rscode = group.send(recent, message, last);
if (group != null) rscode = group.send(recent, message, last).join(); //临时, 要改
if (recent && rscode == 0) { //已经给最近连接发送的消息
if (finest) logger.finest("websocket want send recent message success");
return rscode;

View File

@@ -48,6 +48,8 @@ public class WebSocketRunner implements Runnable {
private final BlockingQueue<byte[]> queue = new ArrayBlockingQueue(1024);
private final boolean wsbinary;
private long lastSendTime;
public WebSocketRunner(Context context, WebSocket webSocket, AsyncConnection channel, final boolean wsbinary) {
this.context = context;
@@ -158,19 +160,20 @@ public class WebSocketRunner implements Runnable {
}
}
public int sendMessage(WebSocketPacket packet) {
if (packet == null) return RETCODE_SEND_ILLPACKET;
if (closed) return RETCODE_WSOCKET_CLOSED;
public CompletableFuture<Integer> sendMessage(WebSocketPacket packet) {
if (packet == null) return CompletableFuture.completedFuture(RETCODE_SEND_ILLPACKET);
if (closed) return CompletableFuture.completedFuture(RETCODE_WSOCKET_CLOSED);
final boolean debug = this.coder.debugable;
//System.out.println("推送消息");
final byte[] bytes = coder.encode(packet);
if (debug) context.getLogger().log(Level.FINEST, "send web socket message's length = " + bytes.length);
this.lastSendTime = System.currentTimeMillis();
if (writing.getAndSet(true)) {
queue.add(bytes);
return 0;
return CompletableFuture.completedFuture(0);
}
if (writeBuffer == null) return RETCODE_ILLEGALBUFFER;
ByteBuffer sendBuffer = null;
if (writeBuffer == null) return CompletableFuture.completedFuture(RETCODE_ILLEGALBUFFER);
ByteBuffer sendBuffer;
if (bytes.length <= writeBuffer.capacity()) {
writeBuffer.clear();
writeBuffer.put(bytes);
@@ -222,12 +225,12 @@ public class WebSocketRunner implements Runnable {
}
}
});
return 0;
return CompletableFuture.completedFuture(0);
} catch (Exception t) {
writing.set(false);
closeRunner();
context.getLogger().log(Level.FINE, "WebSocket sendMessage abort, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", t);
return RETCODE_SENDEXCEPTION;
return CompletableFuture.completedFuture(RETCODE_SENDEXCEPTION);
}
}

View File

@@ -70,7 +70,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
if (logger.isLoggable(Level.WARNING)) logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName());
}
//存在WebSocketServlet则此WebSocketNode必须是本地模式Service
this.node.localEngine = new WebSocketEngine(addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.node, logger);
this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.node, logger);
this.node.init(conf);
this.node.localEngine.init(conf);
}
@@ -84,7 +84,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
@Override
public String resourceName() {
return this.getClass().getSimpleName().replace("Servlet", "").replace("WebSocket", "").replace("_Dyn", "").toLowerCase();
return this.getClass().getSimpleName().replace("_Dyn", "").toLowerCase().replaceAll("websocket.*$", "").replaceAll("servlet.*$", "");
}
@Override

View File

@@ -48,17 +48,13 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
@Override
public CompletableFuture<Integer> sendMessage(@RpcTargetAddress InetSocketAddress addr, Serializable groupid, boolean recent, Object message, boolean last) {
return CompletableFuture.supplyAsync(() -> {
if (this.localEngine == null) return RETCODE_GROUP_EMPTY;
final WebSocketGroup group = this.localEngine.getWebSocketGroup(groupid);
if (group == null || group.isEmpty()) {
if (finest) logger.finest("receive websocket message {engineid:'" + this.localEngine.getEngineid() + "', groupid:" + groupid + ", content:'" + message + "'} from " + addr + " but send result is " + RETCODE_GROUP_EMPTY);
return RETCODE_GROUP_EMPTY;
}
int code = group.send(recent, message, last);
if (finest) logger.finest("websocket node send message (" + message + ") from " + addr + " result is " + code);
return code;
});
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
final WebSocketGroup group = this.localEngine.getWebSocketGroup(groupid);
if (group == null || group.isEmpty()) {
if (finest) logger.finest("receive websocket message {engineid:'" + this.localEngine.getEngineid() + "', groupid:" + groupid + ", content:'" + message + "'} from " + addr + " but send result is " + RETCODE_GROUP_EMPTY);
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
}
return group.send(recent, message, last);
}
/**