This commit is contained in:
Redkale
2017-05-29 17:22:12 +08:00
parent 2291beb5e7
commit f6c617574c
9 changed files with 174 additions and 150 deletions

View File

@@ -189,6 +189,18 @@ public abstract class WebSocket<G extends Serializable, T> {
}
//----------------------------------------------------------------
/**
* 给指定userid的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息
*
* @param message 不可为空
* @param userids Serializable[]
*
* @return 为0表示成功 其他值表示异常
*/
public final CompletableFuture<Integer> sendMessage(Object message, G... userids) {
return sendMessage(message, true, userids);
}
/**
* 给指定userid的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息
*
@@ -345,10 +357,10 @@ public abstract class WebSocket<G extends Serializable, T> {
*
* @return WebSocket集合
*/
protected final Stream<WebSocket> getWebSockets(G userid) {
return _engine.getWebSockets(userid);
protected final Stream<WebSocket> getLocalWebSockets(G userid) {
return _engine.getLocalWebSockets(userid);
}
/**
* 获取指定userid的WebSocket数组, 没有返回null<br>
* 此方法用于单用户单连接模式
@@ -357,17 +369,17 @@ public abstract class WebSocket<G extends Serializable, T> {
*
* @return WebSocket
*/
protected final WebSocket findWebSocket(G userid) {
return _engine.findWebSocket(userid);
protected final WebSocket findLocalWebSocket(G userid) {
return _engine.findLocalWebSocket(userid);
}
/**
* 获取当前进程节点所有在线的WebSocket
*
* @return WebSocketGroup列表
*/
protected final Collection<WebSocket> getWebSockets() {
return _engine.getWebSockets();
protected final Collection<WebSocket> getLocalWebSockets() {
return _engine.getLocalWebSockets();
}
//-------------------------------------------------------------------

View File

@@ -85,7 +85,7 @@ public final class WebSocketEngine {
});
long delay = (interval - System.currentTimeMillis() / 1000 % interval) + index * 5;
scheduler.scheduleWithFixedDelay(() -> {
getWebSockets().forEach(x -> x.sendPing());
getLocalWebSockets().forEach(x -> x.sendPing());
}, delay, interval, TimeUnit.SECONDS);
if (finest) logger.finest(this.getClass().getSimpleName() + "(" + engineid + ")" + " start keeplive(delay:" + delay + ", interval:" + interval + "s) scheduler executor");
}
@@ -215,7 +215,7 @@ public final class WebSocketEngine {
}
}
Collection<WebSocket> getWebSockets() {
Collection<WebSocket> getLocalWebSockets() {
if (single) return websockets.values();
List<WebSocket> list = new ArrayList<>();
websockets2.values().forEach(x -> list.addAll(x));
@@ -223,14 +223,14 @@ public final class WebSocketEngine {
}
//适用于单用户单连接模式
public WebSocket findWebSocket(Serializable userid) {
public WebSocket findLocalWebSocket(Serializable userid) {
if (single) return websockets.get(userid);
List<WebSocket> list = websockets2.get(userid);
return (list == null || list.isEmpty()) ? null : list.get(list.size() - 1);
}
//适用于单用户多连接模式
public Stream<WebSocket> getWebSockets(Serializable userid) {
public Stream<WebSocket> getLocalWebSockets(Serializable userid) {
if (single) {
WebSocket websocket = websockets.get(userid);
return websocket == null ? Stream.empty() : Stream.of(websocket);
@@ -240,7 +240,7 @@ public final class WebSocketEngine {
}
}
public boolean existsWebSocket(Serializable userid) {
public boolean existsLocalWebSocket(Serializable userid) {
return single ? websockets.containsKey(userid) : websockets2.containsKey(userid);
}

View File

@@ -56,7 +56,7 @@ public abstract class WebSocketNode {
public final void postDestroy(AnyValue conf) {
if (this.localEngine == null) return;
//关掉所有本地本地WebSocket
this.localEngine.getWebSockets().forEach(g -> disconnect(g.userid()));
this.localEngine.getLocalWebSockets().forEach(g -> disconnect(g.userid()));
if (sncpNodeAddresses != null && localSncpAddress != null) sncpNodeAddresses.removeSetItem("redkale_sncpnodes", localSncpAddress);
}

View File

@@ -208,7 +208,6 @@ class WebSocketRunner implements Runnable {
boolean debug = true;
//System.out.println("推送消息");
if (debug) context.getLogger().log(Level.FINEST, "send web socket message: " + packet);
this.lastSendTime = System.currentTimeMillis();
final CompletableFuture<Integer> futureResult = new CompletableFuture<>();
if (writing.getAndSet(true)) {
queue.add(new QueueEntry(futureResult, packet));
@@ -216,6 +215,7 @@ class WebSocketRunner implements Runnable {
}
ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(this.context.getBufferSupplier());
try {
this.lastSendTime = System.currentTimeMillis();
channel.write(buffers, buffers, new CompletionHandler<Integer, ByteBuffer[]>() {
private CompletableFuture<Integer> future = futureResult;
@@ -259,6 +259,7 @@ class WebSocketRunner implements Runnable {
if (entry != null) {
future = entry.future;
ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(context.getBufferSupplier());
lastSendTime = System.currentTimeMillis();
channel.write(buffers, buffers, this);
}
} catch (Exception e) {

View File

@@ -40,7 +40,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
if (this.localEngine == null) return CompletableFuture.completedFuture(new ArrayList<>());
return CompletableFuture.supplyAsync(() -> {
final List<String> rs = new ArrayList<>();
this.localEngine.getWebSockets(groupid).forEach(x -> rs.add(x.getRemoteAddr()));
this.localEngine.getLocalWebSockets(groupid).forEach(x -> rs.add(x.getRemoteAddr()));
return rs;
});
}

View File

@@ -34,13 +34,28 @@ public interface HttpRequestDesc {
//获取请求内容的byte[]
public byte[] getBody();
//获取请求内容的JavaBean对象
public <T> T getBodyJson(java.lang.reflect.Type type);
//获取请求内容的JavaBean对象
public <T> T getBodyJson(JsonConvert convert, java.lang.reflect.Type type);
//获取文件上传对象
public MultiContext getMultiContext();
//获取文件上传信息列表 等价于 getMultiContext().parts();
public Iterable<MultiPart> multiParts() throws IOException;
//获取当前用户信息 数据类型由@HttpUserType指定
public <T> T currentUser();
//获取模块ID来自@HttpServlet.moduleid()
public int getModuleid();
//获取操作ID来自@HttpMapping.actionid()
public int getActionid();
//获取sessionid
public String getSessionid(boolean autoCreate);
@@ -292,13 +307,13 @@ public interface HttpRequestDesc {
//获取所有属性值, servlet执行完后会被清空
public Map<String, Object> getAttributes();
//获取指定属性值
//获取指定属性值, servlet执行完后会被清空
public <T> T getAttribute(String name);
//删除指定属性
public void removeAttribute(String name);
//设置属性值
//设置属性值, servlet执行完后会被清空
public void setAttribute(String name, Object value);
//获取request创建时间

View File

@@ -28,6 +28,12 @@ public interface HttpResponseDesc {
//增加Cookie值
public HttpResponse addCookie(Collection<HttpCookie> cookies);
//创建AsyncHandler实例将非字符串对象以JSON格式输出字符串以文本输出
public AsyncHandler createAsyncHandler();
//传入的AsyncHandler子类必须是public且保证其子类可被继承且completed、failed可被重载且包含空参数的构造函数
public <H extends AsyncHandler> H createAsyncHandler(Class<H> handlerClass);
//设置状态码
public void setStatus(int status);
@@ -62,9 +68,9 @@ public interface HttpResponseDesc {
//异步输出指定内容
public <A> void sendBody(ByteBuffer buffer, A attachment, AsyncHandler<Integer, A> handler);
//创建AsyncHandler实例将非字符串对象以JSON格式输出字符串以文本输出
public AsyncHandler createAsyncHandler();
//异步输出指定内容
public <A> void sendBody(ByteBuffer[] buffers, A attachment, AsyncHandler<Integer, A> handler);
//关闭HTTP连接如果是keep-alive则不强制关闭
public void finish();
@@ -100,6 +106,12 @@ public interface HttpResponseDesc {
//将CompletableFuture的结果对象以JSON格式输出
public void finishJson(final JsonConvert convert, final Type type, final CompletableFuture future);
//将HttpResult的结果对象以JSON格式输出
public void finishJson(final HttpResult result);
//将HttpResult的结果对象以JSON格式输出
public void finishJson(final JsonConvert convert, final HttpResult result) ;
//将指定字符串以响应结果输出
public void finish(String obj);
@@ -137,6 +149,6 @@ public interface HttpResponseDesc {
public void finish(final String filename, File file) throws IOException;
//HttpResponse回收时回调的监听方法
public void setRecycleListener(BiConsumer<HttpRequest, HttpResponse> recycleListener);
public void recycleListener(BiConsumer<HttpRequest, HttpResponse> recycleListener);
}

View File

@@ -8,6 +8,9 @@ package org.redkale.test.http;
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import org.redkale.convert.json.JsonConvert;
import org.redkale.net.*;
import org.redkale.net.http.*;
@@ -15,70 +18,99 @@ import org.redkale.net.http.*;
*
* @author zhangjx
*/
public interface WebSocketDesc {
public interface WebSocketDesc<G, T> {
//发送消息, 包含二进制/文本 返回结果0表示成功非0表示错误码
public int send(WebSocketPacket packet);
//给自身发送消息, 消息类型是String或byte[]或可JavaBean对象 返回结果0表示成功非0表示错误码
public CompletableFuture<Integer> send(Object message);
//发送单一的文本消息 返回结果0表示成功非0表示错误码
public int send(String text);
//给自身发送消息, 消息类型是String或byte[]或可JavaBean对象 返回结果0表示成功非0表示错误码
public CompletableFuture<Integer> send(Object message, boolean last);
//发送文本消息 返回结果0表示成功非0表示错误码
public int send(String text, boolean last);
//给自身发送消息, 消息类型是JavaBean对象 返回结果0表示成功非0表示错误码
public CompletableFuture<Integer> send(JsonConvert convert, Object message);
//发送单一的二进制消息 返回结果0表示成功非0表示错误码
public int send(byte[] data);
//给自身发送消息, 消息类型是JavaBean对象 返回结果0表示成功非0表示错误码
public CompletableFuture<Integer> send(JsonConvert convert, Object message, boolean last);
//发送单一的二进制消息 返回结果0表示成功非0表示错误码
public int send(byte[] data, boolean last);
//给指定userid的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 返回结果0表示成功非0表示错误码
public CompletableFuture<Integer> sendMessage(Object message, G... userids);
//发送消息, 消息类型是String或byte[] 返回结果0表示成功非0表示错误码
public int send(Serializable message, boolean last);
//给指定userid的WebSocket节点发送 二进制消息/文本消息/JavaBean对象消息 返回结果0表示成功非0表示错误码
public CompletableFuture<Integer> sendMessage(Object message, boolean last, G... userids);
//给指定groupid的WebSocketGroup下所有WebSocket节点发送文本消息
public int sendEachMessage(Serializable groupid, String text);
//给所有人广播消息, 返回结果0表示成功非0表示错误码
public CompletableFuture<Integer> broadcastMessage(final Object message);
//给指定groupid的WebSocketGroup下所有WebSocket节点发送文本消息
public int sendEachMessage(Serializable groupid, String text, boolean last);
//给所有人广播消息, 返回结果0表示成功非0表示错误码
public CompletableFuture<Integer> broadcastMessage(final Object message, boolean last);
//给指定groupid的WebSocketGroup下所有WebSocket节点发送二进制消息
public int sendEachMessage(Serializable groupid, byte[] data);
//获取用户在线的SNCP节点地址列表不是分布式则返回元素数量为1且元素值为null的列表
public CompletableFuture<Collection<InetSocketAddress>> getRpcNodeAddresses(final Serializable userid);
//给指定groupid的WebSocketGroup下所有WebSocket节点发送二进制消
public int sendEachMessage(Serializable groupid, byte[] data, boolean last);
//给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送文本消息
public int sendRecentMessage(Serializable groupid, String text);
//给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送文本消息
public int sendRecentMessage(Serializable groupid, String text, boolean last);
//给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送二进制消息
public int sendRecentMessage(Serializable groupid, byte[] data);
//给指定groupid的WebSocketGroup下最近接入的WebSocket节点发送二进制消息
public int sendRecentMessage(Serializable groupid, byte[] data, boolean last);
//获取在线用户的详细连接信
public CompletableFuture<Map<InetSocketAddress, List<String>>> getRpcNodeWebSocketAddresses(final Serializable userid);
//发送PING消息 返回结果0表示成功非0表示错误码
public int sendPing();
public CompletableFuture<Integer> sendPing();
//发送PING消息附带其他信息 返回结果0表示成功非0表示错误码
public int sendPing(byte[] data);
public CompletableFuture<Integer> sendPing(byte[] data);
//发送PONG消息附带其他信息 返回结果0表示成功非0表示错误码
public int sendPong(byte[] data);
public CompletableFuture<Integer> sendPong(byte[] data);
//获取指定userid的WebSocket数组, 没有返回null 此方法用于单用户多连接模式
/* protected */ Stream<WebSocket> getLocalWebSockets(G userid);
//获取指定userid的WebSocket数组, 没有返回null 此方法用于单用户单连接模式
/* protected */ WebSocket findLocalWebSocket(G userid);
//获取当前进程节点所有在线的WebSocket
/* protected */ Collection<WebSocket> getLocalWebSockets();
//返回sessionid, null表示连接不合法或异常,默认实现是request.sessionid(true),通常需要重写该方法
/* protected */ CompletableFuture<String> onOpen(final HttpRequest request);
//创建userid null表示异常 必须实现该方法
/* protected abstract */ G createUserid();
//标记为@WebSocketBinary才需要重写此方法
public void onRead(AsyncConnection channel);
//WebSokcet连接成功后的回调方法
public void onConnected();
//ping后的回调方法
public void onPing(byte[] bytes);
//pong后的回调方法
public void onPong(byte[] bytes);
//接收到消息的回调方法
public void onMessage(T message, boolean last);
//接收到二进制消息的回调方法
public void onMessage(byte[] bytes, boolean last);
//关闭的回调方法调用此方法时WebSocket已经被关闭
public void onClose(int code, String reason);
//获取当前WebSocket下的属性
public <T> T getAttribute(String name);
public T getAttribute(String name);
//移出当前WebSocket下的属性
public <T> T removeAttribute(String name);
public T removeAttribute(String name);
//给当前WebSocket下的增加属性
public void setAttribute(String name, Object value);
//获取当前WebSocket所属的groupid
public Serializable getGroupid();
//获取当前WebSocket所属的userid
public G userid();
//获取当前WebSocket的会话ID 不会为null
public Serializable getSessionid();
@@ -92,51 +124,9 @@ public interface WebSocketDesc {
//获取WebSocket创建时间
public long getCreatetime();
//获取最后一次发送消息的时间
public long getLastSendTime();
//显式地关闭WebSocket
public void close();
//获取在线用户的节点地址列表
/* protected */ Collection<InetSocketAddress> getOnlineNodes(Serializable groupid);
//获取在线用户的详细连接信息
/* protected */ Map<InetSocketAddress, List<String>> getOnlineRemoteAddress(Serializable groupid);
//返回sessionid, null表示连接不合法或异常,默认实现是request.getSessionid(false),通常需要重写该方法
public Serializable onOpen(final HttpRequest request);
//创建groupid null表示异常 必须实现该方法, 通常为用户ID为groupid
/* protected abstract */ Serializable createGroupid();
//标记为@WebSocketBinary才需要重写此方法
default void onRead(AsyncConnection channel) {
}
default void onConnected() {
}
//接收文本消息响应事件,可能会接收到文本消息需要重写该方法
default void onMessage(String text) {
}
default void onPing(byte[] bytes) {
}
default void onPong(byte[] bytes) {
}
//接收二进制消息响应事件,可能会接收到二进制消息需要重写该方法
default void onMessage(byte[] bytes) {
}
default void onFragment(String text, boolean last) {
}
default void onFragment(byte[] bytes, boolean last) {
}
default void onClose(int code, String reason) {
}
}

View File

@@ -9,9 +9,10 @@ import org.redkale.net.http.WebServlet;
import org.redkale.net.http.WebSocketServlet;
import org.redkale.net.http.WebSocket;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.*;
import org.redkale.convert.json.JsonConvert;
import org.redkale.util.Utility;
import javax.annotation.Resource;
import org.redkale.net.http.*;
import org.redkale.test.rest.*;
import org.redkale.util.*;
/**
*
@@ -20,51 +21,43 @@ import org.redkale.util.Utility;
@WebServlet("/ws/chat")
public class ChatWebSocketServlet extends WebSocketServlet {
private final AtomicLong counter = new AtomicLong();
@Resource
private UserService userService;
private final AtomicLong icounter = new AtomicLong();
private final boolean debug;
public ChatWebSocketServlet() {
debug = "true".equalsIgnoreCase(System.getProperty("debug", "true"));
Thread t = new Thread() {
{
setName("Debug-ChatWebSocket-ShowCount-Thread");
}
@Override
public void run() {
while (true) {
try {
sleep(300 * 1000);
} catch (Exception e) {
return;
}
System.out.println(Utility.now() + ": 消息总数: " + counter.get() + ",间隔消息数: " + icounter.getAndSet(0));
}
}
};
t.start();
@Override
public void init(HttpContext context, AnyValue conf) {
System.out.println("本实例的WebSocketNode: " + super.node);
}
@Override
protected WebSocket<String, ChatMessage> createWebSocket() {
public void destroy(HttpContext context, AnyValue conf) {
System.out.println("关闭了ChatWebSocketServlet");
}
return new WebSocket<String, ChatMessage>() {
@Override
protected WebSocket<Integer, ChatMessage> createWebSocket() {
return new WebSocket<Integer, ChatMessage>() {
private UserInfo user;
@Override
public void onMessage(ChatMessage message, boolean last) {
icounter.incrementAndGet();
counter.incrementAndGet();
if (debug) System.out.println("收到消息: " + message);
super.getWebSockets().forEach(x -> x.send(message));
public void onMessage(ChatMessage message, boolean last) { // text 接收的格式: {"receiveid":200000001, "content":"Hi Redkale!"}
message.sendid = user.getUserid(); //将当前用户设为消息的发送方
message.sendtime = System.currentTimeMillis(); //设置消息发送时间
//给接收方发送消息, 即使接收方在其他WebSocket进程节点上有链接Redkale也会自动发送到其他链接进程节点上。
super.sendMessage(message, message.receiveid);
}
@Override
protected CompletableFuture<String> createUserid() {
return CompletableFuture.completedFuture("2");
protected CompletableFuture<Integer> createUserid() { //创建用户ID
this.user = userService.current(String.valueOf(super.getSessionid()));
return CompletableFuture.completedFuture(this.user == null ? null : this.user.getUserid());
}
@Override
public CompletableFuture<String> onOpen(HttpRequest request) {
return CompletableFuture.completedFuture(request.getSessionid(false)); //以request中的sessionid字符串作为WebSocket的sessionid
}
};
@@ -72,11 +65,12 @@ public class ChatWebSocketServlet extends WebSocketServlet {
public static class ChatMessage {
public String message;
public int sendid; //发送方用户ID
@Override
public String toString() {
return JsonConvert.root().convertTo(this);
}
public int receiveid; //接收方用户ID
public String content; //文本消息内容
public long sendtime; //消息发送时间
}
}