From f6c617574c57f6cc0cfe4ec0a9e8aac37ff242d1 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Mon, 29 May 2017 17:22:12 +0800 Subject: [PATCH] --- src/org/redkale/net/http/WebSocket.java | 28 ++- src/org/redkale/net/http/WebSocketEngine.java | 10 +- src/org/redkale/net/http/WebSocketNode.java | 2 +- src/org/redkale/net/http/WebSocketRunner.java | 3 +- .../redkale/service/WebSocketNodeService.java | 2 +- .../redkale/test/http/HttpRequestDesc.java | 19 ++- .../redkale/test/http/HttpResponseDesc.java | 20 ++- test/org/redkale/test/http/WebSocketDesc.java | 160 ++++++++---------- .../test/websocket/ChatWebSocketServlet.java | 80 ++++----- 9 files changed, 174 insertions(+), 150 deletions(-) diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index 78214c124..8f3c3ac19 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -189,6 +189,18 @@ public abstract class WebSocket { } //---------------------------------------------------------------- + /** + * 给指定userid的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 + * + * @param message 不可为空 + * @param userids Serializable[] + * + * @return 为0表示成功, 其他值表示异常 + */ + public final CompletableFuture sendMessage(Object message, G... userids) { + return sendMessage(message, true, userids); + } + /** * 给指定userid的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 * @@ -345,10 +357,10 @@ public abstract class WebSocket { * * @return WebSocket集合 */ - protected final Stream getWebSockets(G userid) { - return _engine.getWebSockets(userid); + protected final Stream getLocalWebSockets(G userid) { + return _engine.getLocalWebSockets(userid); } - + /** * 获取指定userid的WebSocket数组, 没有返回null
* 此方法用于单用户单连接模式 @@ -357,17 +369,17 @@ public abstract class WebSocket { * * @return WebSocket */ - protected final WebSocket findWebSocket(G userid) { - return _engine.findWebSocket(userid); + protected final WebSocket findLocalWebSocket(G userid) { + return _engine.findLocalWebSocket(userid); } - + /** * 获取当前进程节点所有在线的WebSocket * * @return WebSocketGroup列表 */ - protected final Collection getWebSockets() { - return _engine.getWebSockets(); + protected final Collection getLocalWebSockets() { + return _engine.getLocalWebSockets(); } //------------------------------------------------------------------- diff --git a/src/org/redkale/net/http/WebSocketEngine.java b/src/org/redkale/net/http/WebSocketEngine.java index c31cea401..970e71b4a 100644 --- a/src/org/redkale/net/http/WebSocketEngine.java +++ b/src/org/redkale/net/http/WebSocketEngine.java @@ -85,7 +85,7 @@ public final class WebSocketEngine { }); long delay = (interval - System.currentTimeMillis() / 1000 % interval) + index * 5; scheduler.scheduleWithFixedDelay(() -> { - getWebSockets().forEach(x -> x.sendPing()); + getLocalWebSockets().forEach(x -> x.sendPing()); }, delay, interval, TimeUnit.SECONDS); if (finest) logger.finest(this.getClass().getSimpleName() + "(" + engineid + ")" + " start keeplive(delay:" + delay + ", interval:" + interval + "s) scheduler executor"); } @@ -215,7 +215,7 @@ public final class WebSocketEngine { } } - Collection getWebSockets() { + Collection getLocalWebSockets() { if (single) return websockets.values(); List list = new ArrayList<>(); websockets2.values().forEach(x -> list.addAll(x)); @@ -223,14 +223,14 @@ public final class WebSocketEngine { } //适用于单用户单连接模式 - public WebSocket findWebSocket(Serializable userid) { + public WebSocket findLocalWebSocket(Serializable userid) { if (single) return websockets.get(userid); List list = websockets2.get(userid); return (list == null || list.isEmpty()) ? null : list.get(list.size() - 1); } //适用于单用户多连接模式 - public Stream getWebSockets(Serializable userid) { + public Stream getLocalWebSockets(Serializable userid) { if (single) { WebSocket websocket = websockets.get(userid); return websocket == null ? Stream.empty() : Stream.of(websocket); @@ -240,7 +240,7 @@ public final class WebSocketEngine { } } - public boolean existsWebSocket(Serializable userid) { + public boolean existsLocalWebSocket(Serializable userid) { return single ? websockets.containsKey(userid) : websockets2.containsKey(userid); } diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index 7d04227aa..e137cebbc 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -56,7 +56,7 @@ public abstract class WebSocketNode { public final void postDestroy(AnyValue conf) { if (this.localEngine == null) return; //关掉所有本地本地WebSocket - this.localEngine.getWebSockets().forEach(g -> disconnect(g.userid())); + this.localEngine.getLocalWebSockets().forEach(g -> disconnect(g.userid())); if (sncpNodeAddresses != null && localSncpAddress != null) sncpNodeAddresses.removeSetItem("redkale_sncpnodes", localSncpAddress); } diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index 4256249a4..d8e7c4955 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -208,7 +208,6 @@ class WebSocketRunner implements Runnable { boolean debug = true; //System.out.println("推送消息"); if (debug) context.getLogger().log(Level.FINEST, "send web socket message: " + packet); - this.lastSendTime = System.currentTimeMillis(); final CompletableFuture futureResult = new CompletableFuture<>(); if (writing.getAndSet(true)) { queue.add(new QueueEntry(futureResult, packet)); @@ -216,6 +215,7 @@ class WebSocketRunner implements Runnable { } ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(this.context.getBufferSupplier()); try { + this.lastSendTime = System.currentTimeMillis(); channel.write(buffers, buffers, new CompletionHandler() { private CompletableFuture future = futureResult; @@ -259,6 +259,7 @@ class WebSocketRunner implements Runnable { if (entry != null) { future = entry.future; ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(context.getBufferSupplier()); + lastSendTime = System.currentTimeMillis(); channel.write(buffers, buffers, this); } } catch (Exception e) { diff --git a/src/org/redkale/service/WebSocketNodeService.java b/src/org/redkale/service/WebSocketNodeService.java index bf8b85c23..902b2c2c9 100644 --- a/src/org/redkale/service/WebSocketNodeService.java +++ b/src/org/redkale/service/WebSocketNodeService.java @@ -40,7 +40,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service { if (this.localEngine == null) return CompletableFuture.completedFuture(new ArrayList<>()); return CompletableFuture.supplyAsync(() -> { final List rs = new ArrayList<>(); - this.localEngine.getWebSockets(groupid).forEach(x -> rs.add(x.getRemoteAddr())); + this.localEngine.getLocalWebSockets(groupid).forEach(x -> rs.add(x.getRemoteAddr())); return rs; }); } diff --git a/test/org/redkale/test/http/HttpRequestDesc.java b/test/org/redkale/test/http/HttpRequestDesc.java index 01eec373d..3bbc75596 100644 --- a/test/org/redkale/test/http/HttpRequestDesc.java +++ b/test/org/redkale/test/http/HttpRequestDesc.java @@ -34,13 +34,28 @@ public interface HttpRequestDesc { //获取请求内容的byte[] public byte[] getBody(); + + //获取请求内容的JavaBean对象 + public T getBodyJson(java.lang.reflect.Type type); + //获取请求内容的JavaBean对象 + public T getBodyJson(JsonConvert convert, java.lang.reflect.Type type); + //获取文件上传对象 public MultiContext getMultiContext(); //获取文件上传信息列表 等价于 getMultiContext().parts(); public Iterable multiParts() throws IOException; + //获取当前用户信息 数据类型由@HttpUserType指定 + public T currentUser(); + + //获取模块ID,来自@HttpServlet.moduleid() + public int getModuleid(); + + //获取操作ID,来自@HttpMapping.actionid() + public int getActionid(); + //获取sessionid public String getSessionid(boolean autoCreate); @@ -292,13 +307,13 @@ public interface HttpRequestDesc { //获取所有属性值, servlet执行完后会被清空 public Map getAttributes(); - //获取指定属性值 + //获取指定属性值, servlet执行完后会被清空 public T getAttribute(String name); //删除指定属性 public void removeAttribute(String name); - //设置属性值 + //设置属性值, servlet执行完后会被清空 public void setAttribute(String name, Object value); //获取request创建时间 diff --git a/test/org/redkale/test/http/HttpResponseDesc.java b/test/org/redkale/test/http/HttpResponseDesc.java index 56a1962c8..93e4439d5 100644 --- a/test/org/redkale/test/http/HttpResponseDesc.java +++ b/test/org/redkale/test/http/HttpResponseDesc.java @@ -28,6 +28,12 @@ public interface HttpResponseDesc { //增加Cookie值 public HttpResponse addCookie(Collection cookies); + //创建AsyncHandler实例,将非字符串对象以JSON格式输出,字符串以文本输出 + public AsyncHandler createAsyncHandler(); + + //传入的AsyncHandler子类必须是public,且保证其子类可被继承且completed、failed可被重载且包含空参数的构造函数 + public H createAsyncHandler(Class handlerClass); + //设置状态码 public void setStatus(int status); @@ -62,9 +68,9 @@ public interface HttpResponseDesc { //异步输出指定内容 public void sendBody(ByteBuffer buffer, A attachment, AsyncHandler handler); - //创建AsyncHandler实例,将非字符串对象以JSON格式输出,字符串以文本输出 - public AsyncHandler createAsyncHandler(); - + //异步输出指定内容 + public void sendBody(ByteBuffer[] buffers, A attachment, AsyncHandler handler); + //关闭HTTP连接,如果是keep-alive则不强制关闭 public void finish(); @@ -100,6 +106,12 @@ public interface HttpResponseDesc { //将CompletableFuture的结果对象以JSON格式输出 public void finishJson(final JsonConvert convert, final Type type, final CompletableFuture future); + + //将HttpResult的结果对象以JSON格式输出 + public void finishJson(final HttpResult result); + + //将HttpResult的结果对象以JSON格式输出 + public void finishJson(final JsonConvert convert, final HttpResult result) ; //将指定字符串以响应结果输出 public void finish(String obj); @@ -137,6 +149,6 @@ public interface HttpResponseDesc { public void finish(final String filename, File file) throws IOException; //HttpResponse回收时回调的监听方法 - public void setRecycleListener(BiConsumer recycleListener); + public void recycleListener(BiConsumer recycleListener); } diff --git a/test/org/redkale/test/http/WebSocketDesc.java b/test/org/redkale/test/http/WebSocketDesc.java index 71aa26820..a6103cc76 100644 --- a/test/org/redkale/test/http/WebSocketDesc.java +++ b/test/org/redkale/test/http/WebSocketDesc.java @@ -8,6 +8,9 @@ package org.redkale.test.http; import java.io.*; import java.net.*; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Stream; +import org.redkale.convert.json.JsonConvert; import org.redkale.net.*; import org.redkale.net.http.*; @@ -15,70 +18,99 @@ import org.redkale.net.http.*; * * @author zhangjx */ -public interface WebSocketDesc { +public interface WebSocketDesc { - //发送消息体, 包含二进制/文本 返回结果0表示成功,非0表示错误码 - public int send(WebSocketPacket packet); + //给自身发送消息, 消息类型是String或byte[]或可JavaBean对象 返回结果0表示成功,非0表示错误码 + public CompletableFuture send(Object message); - //发送单一的文本消息 返回结果0表示成功,非0表示错误码 - public int send(String text); + //给自身发送消息, 消息类型是String或byte[]或可JavaBean对象 返回结果0表示成功,非0表示错误码 + public CompletableFuture send(Object message, boolean last); - //发送文本消息 返回结果0表示成功,非0表示错误码 - public int send(String text, boolean last); + //给自身发送消息, 消息类型是JavaBean对象 返回结果0表示成功,非0表示错误码 + public CompletableFuture send(JsonConvert convert, Object message); - //发送单一的二进制消息 返回结果0表示成功,非0表示错误码 - public int send(byte[] data); + //给自身发送消息, 消息类型是JavaBean对象 返回结果0表示成功,非0表示错误码 + public CompletableFuture send(JsonConvert convert, Object message, boolean last); - //发送单一的二进制消息 返回结果0表示成功,非0表示错误码 - public int send(byte[] data, boolean last); + //给指定userid的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 返回结果0表示成功,非0表示错误码 + public CompletableFuture sendMessage(Object message, G... userids); - //发送消息, 消息类型是String或byte[] 返回结果0表示成功,非0表示错误码 - public int send(Serializable message, boolean last); + //给指定userid的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 返回结果0表示成功,非0表示错误码 + public CompletableFuture sendMessage(Object message, boolean last, G... userids); - //给指定groupid的WebSocketGroup下所有WebSocket节点发送文本消息 - public int sendEachMessage(Serializable groupid, String text); + //给所有人广播消息, 返回结果0表示成功,非0表示错误码 + public CompletableFuture broadcastMessage(final Object message); - //给指定groupid的WebSocketGroup下所有WebSocket节点发送文本消息 - public int sendEachMessage(Serializable groupid, String text, boolean last); + //给所有人广播消息, 返回结果0表示成功,非0表示错误码 + public CompletableFuture broadcastMessage(final Object message, boolean last); - //给指定groupid的WebSocketGroup下所有WebSocket节点发送二进制消息 - public int sendEachMessage(Serializable groupid, byte[] data); + //获取用户在线的SNCP节点地址列表,不是分布式则返回元素数量为1,且元素值为null的列表 + public CompletableFuture> getRpcNodeAddresses(final Serializable userid); - //给指定groupid的WebSocketGroup下所有WebSocket节点发送二进制消息 - public int sendEachMessage(Serializable groupid, byte[] data, boolean last); - - //给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送文本消息 - public int sendRecentMessage(Serializable groupid, String text); - - //给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送文本消息 - public int sendRecentMessage(Serializable groupid, String text, boolean last); - - //给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送二进制消息 - public int sendRecentMessage(Serializable groupid, byte[] data); - - //给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送二进制消息 - public int sendRecentMessage(Serializable groupid, byte[] data, boolean last); + //获取在线用户的详细连接信息 + public CompletableFuture>> getRpcNodeWebSocketAddresses(final Serializable userid); //发送PING消息 返回结果0表示成功,非0表示错误码 - public int sendPing(); + public CompletableFuture sendPing(); //发送PING消息,附带其他信息 返回结果0表示成功,非0表示错误码 - public int sendPing(byte[] data); + public CompletableFuture sendPing(byte[] data); //发送PONG消息,附带其他信息 返回结果0表示成功,非0表示错误码 - public int sendPong(byte[] data); + public CompletableFuture sendPong(byte[] data); + + + //获取指定userid的WebSocket数组, 没有返回null 此方法用于单用户多连接模式 + /* protected */ Stream getLocalWebSockets(G userid); + + + //获取指定userid的WebSocket数组, 没有返回null 此方法用于单用户单连接模式 + /* protected */ WebSocket findLocalWebSocket(G userid); + + + //获取当前进程节点所有在线的WebSocket + /* protected */ Collection getLocalWebSockets(); + + + //返回sessionid, null表示连接不合法或异常,默认实现是request.sessionid(true),通常需要重写该方法 + /* protected */ CompletableFuture onOpen(final HttpRequest request); + + + //创建userid, null表示异常, 必须实现该方法 + /* protected abstract */ G createUserid(); + + //标记为@WebSocketBinary才需要重写此方法 + public void onRead(AsyncConnection channel); + + //WebSokcet连接成功后的回调方法 + public void onConnected(); + + //ping后的回调方法 + public void onPing(byte[] bytes); + + //pong后的回调方法 + public void onPong(byte[] bytes); + + //接收到消息的回调方法 + public void onMessage(T message, boolean last); + + //接收到二进制消息的回调方法 + public void onMessage(byte[] bytes, boolean last); + + //关闭的回调方法,调用此方法时WebSocket已经被关闭 + public void onClose(int code, String reason); //获取当前WebSocket下的属性 - public T getAttribute(String name); + public T getAttribute(String name); //移出当前WebSocket下的属性 - public T removeAttribute(String name); + public T removeAttribute(String name); //给当前WebSocket下的增加属性 public void setAttribute(String name, Object value); - //获取当前WebSocket所属的groupid - public Serializable getGroupid(); + //获取当前WebSocket所属的userid + public G userid(); //获取当前WebSocket的会话ID, 不会为null public Serializable getSessionid(); @@ -92,51 +124,9 @@ public interface WebSocketDesc { //获取WebSocket创建时间 public long getCreatetime(); + //获取最后一次发送消息的时间 + public long getLastSendTime(); + //显式地关闭WebSocket public void close(); - - - //获取在线用户的节点地址列表 - /* protected */ Collection getOnlineNodes(Serializable groupid); - - - //获取在线用户的详细连接信息 - /* protected */ Map> getOnlineRemoteAddress(Serializable groupid); - - //返回sessionid, null表示连接不合法或异常,默认实现是request.getSessionid(false),通常需要重写该方法 - public Serializable onOpen(final HttpRequest request); - - - //创建groupid, null表示异常, 必须实现该方法, 通常为用户ID为groupid - /* protected abstract */ Serializable createGroupid(); - - //标记为@WebSocketBinary才需要重写此方法 - default void onRead(AsyncConnection channel) { - } - - default void onConnected() { - } - - //接收文本消息响应事件,可能会接收到文本消息需要重写该方法 - default void onMessage(String text) { - } - - default void onPing(byte[] bytes) { - } - - default void onPong(byte[] bytes) { - } - - //接收二进制消息响应事件,可能会接收到二进制消息需要重写该方法 - default void onMessage(byte[] bytes) { - } - - default void onFragment(String text, boolean last) { - } - - default void onFragment(byte[] bytes, boolean last) { - } - - default void onClose(int code, String reason) { - } } diff --git a/test/org/redkale/test/websocket/ChatWebSocketServlet.java b/test/org/redkale/test/websocket/ChatWebSocketServlet.java index 93ec0cb19..7d74fc3fe 100644 --- a/test/org/redkale/test/websocket/ChatWebSocketServlet.java +++ b/test/org/redkale/test/websocket/ChatWebSocketServlet.java @@ -9,9 +9,10 @@ import org.redkale.net.http.WebServlet; import org.redkale.net.http.WebSocketServlet; import org.redkale.net.http.WebSocket; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.*; -import org.redkale.convert.json.JsonConvert; -import org.redkale.util.Utility; +import javax.annotation.Resource; +import org.redkale.net.http.*; +import org.redkale.test.rest.*; +import org.redkale.util.*; /** * @@ -20,51 +21,43 @@ import org.redkale.util.Utility; @WebServlet("/ws/chat") public class ChatWebSocketServlet extends WebSocketServlet { - private final AtomicLong counter = new AtomicLong(); + @Resource + private UserService userService; - private final AtomicLong icounter = new AtomicLong(); - - private final boolean debug; - - public ChatWebSocketServlet() { - debug = "true".equalsIgnoreCase(System.getProperty("debug", "true")); - Thread t = new Thread() { - - { - setName("Debug-ChatWebSocket-ShowCount-Thread"); - } - - @Override - public void run() { - while (true) { - try { - sleep(300 * 1000); - } catch (Exception e) { - return; - } - System.out.println(Utility.now() + ": 消息总数: " + counter.get() + ",间隔消息数: " + icounter.getAndSet(0)); - } - } - }; - t.start(); + @Override + public void init(HttpContext context, AnyValue conf) { + System.out.println("本实例的WebSocketNode: " + super.node); } @Override - protected WebSocket createWebSocket() { + public void destroy(HttpContext context, AnyValue conf) { + System.out.println("关闭了ChatWebSocketServlet"); + } - return new WebSocket() { + @Override + protected WebSocket createWebSocket() { + + return new WebSocket() { + + private UserInfo user; @Override - public void onMessage(ChatMessage message, boolean last) { - icounter.incrementAndGet(); - counter.incrementAndGet(); - if (debug) System.out.println("收到消息: " + message); - super.getWebSockets().forEach(x -> x.send(message)); + public void onMessage(ChatMessage message, boolean last) { // text 接收的格式: {"receiveid":200000001, "content":"Hi Redkale!"} + message.sendid = user.getUserid(); //将当前用户设为消息的发送方 + message.sendtime = System.currentTimeMillis(); //设置消息发送时间 + //给接收方发送消息, 即使接收方在其他WebSocket进程节点上有链接,Redkale也会自动发送到其他链接进程节点上。 + super.sendMessage(message, message.receiveid); } @Override - protected CompletableFuture createUserid() { - return CompletableFuture.completedFuture("2"); + protected CompletableFuture createUserid() { //创建用户ID + this.user = userService.current(String.valueOf(super.getSessionid())); + return CompletableFuture.completedFuture(this.user == null ? null : this.user.getUserid()); + } + + @Override + public CompletableFuture onOpen(HttpRequest request) { + return CompletableFuture.completedFuture(request.getSessionid(false)); //以request中的sessionid字符串作为WebSocket的sessionid } }; @@ -72,11 +65,12 @@ public class ChatWebSocketServlet extends WebSocketServlet { public static class ChatMessage { - public String message; + public int sendid; //发送方用户ID - @Override - public String toString() { - return JsonConvert.root().convertTo(this); - } + public int receiveid; //接收方用户ID + + public String content; //文本消息内容 + + public long sendtime; //消息发送时间 } }