diff --git a/src/main/java/org/redkale/net/http/RestMapping.java b/src/main/java/org/redkale/net/http/RestMapping.java
index d4ec0d766..527ec7979 100644
--- a/src/main/java/org/redkale/net/http/RestMapping.java
+++ b/src/main/java/org/redkale/net/http/RestMapping.java
@@ -14,6 +14,8 @@ import static java.lang.annotation.RetentionPolicy.*;
* value默认为"/" + Service的类名去掉Service字样的小写字符串 (如HelloService,的默认路径为/hello)。
*
* 详情见: https://redkale.org
+ *
+ * @see org.redkale.net.http.RestService
*
* @author zhangjx
*/
diff --git a/src/main/java/org/redkale/net/http/RestOnMessage.java b/src/main/java/org/redkale/net/http/RestOnMessage.java
index 214d0fae9..2016b13c6 100644
--- a/src/main/java/org/redkale/net/http/RestOnMessage.java
+++ b/src/main/java/org/redkale/net/http/RestOnMessage.java
@@ -10,7 +10,7 @@ import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
/**
- * 标记在RestWebSocket的接收消息方法上;
+ * 标记在{@link org.redkale.net.http.RestWebSocket}的接收消息方法上;
* 注意:被标记的方法必须同时符合以下条件:
* 1、必须修饰为public
* 2、不能修饰为final和static
@@ -19,6 +19,8 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
*
*
* 详情见: https://redkale.org
+ *
+ * @see org.redkale.net.http.RestWebSocket
*
* @author zhangjx
*/
diff --git a/src/main/java/org/redkale/net/http/RestService.java b/src/main/java/org/redkale/net/http/RestService.java
index d6421ac62..1b9a0cbb3 100644
--- a/src/main/java/org/redkale/net/http/RestService.java
+++ b/src/main/java/org/redkale/net/http/RestService.java
@@ -13,6 +13,8 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
* 只能依附在Service类上,name默认为Service的类名小写并去掉Service字样及后面的字符串 (如HelloService/HelloServiceImpl,的默认路径为 hello)。
*
* 详情见: https://redkale.org
+ *
+ * @see org.redkale.net.http.RestMapping
*
* @author zhangjx
*/
diff --git a/src/main/java/org/redkale/net/http/RestWebSocket.java b/src/main/java/org/redkale/net/http/RestWebSocket.java
index 34209adc9..c56ee3557 100644
--- a/src/main/java/org/redkale/net/http/RestWebSocket.java
+++ b/src/main/java/org/redkale/net/http/RestWebSocket.java
@@ -5,9 +5,9 @@
*/
package org.redkale.net.http;
+import java.lang.annotation.*;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
-import java.lang.annotation.*;
import org.redkale.net.Cryptor;
/**
@@ -16,6 +16,8 @@ import org.redkale.net.Cryptor;
* name值支持含{system.property.xxx}模式
*
* 详情见: https://redkale.org
+ *
+ * @see org.redkale.net.http.RestOnMessage
*
* @author zhangjx
*/
diff --git a/src/main/java/org/redkale/net/http/WebSocket.java b/src/main/java/org/redkale/net/http/WebSocket.java
index 948ea98ce..9fffaca51 100644
--- a/src/main/java/org/redkale/net/http/WebSocket.java
+++ b/src/main/java/org/redkale/net/http/WebSocket.java
@@ -225,7 +225,8 @@ public abstract class WebSocket {
public final CompletableFuture send(Convert convert, Object message, boolean last) {
final Convert c = convert == null ? getSendConvert() : convert;
if (message instanceof CompletableFuture) {
- return ((CompletableFuture) message).thenCompose((json) -> sendPacket(new WebSocketPacket(c.isBinary() ? FrameType.BINARY : FrameType.TEXT, c.convertToBytes(json), last)));
+ return ((CompletableFuture) message).thenCompose(json
+ -> sendPacket(new WebSocketPacket(c.isBinary() ? FrameType.BINARY : FrameType.TEXT, c.convertToBytes(json), last)));
}
return sendPacket(new WebSocketPacket(c.isBinary() ? FrameType.BINARY : FrameType.TEXT, c.convertToBytes(message), last));
}
@@ -326,7 +327,7 @@ public abstract class WebSocket {
* @return 为0表示成功, 其他值表示异常
*/
public final CompletableFuture sendMessage(Object message, boolean last, G... userids) {
- return sendMessage((Convert) null, message, last, userids);
+ return sendMessage((Convert) null, message, last, (Serializable[]) userids);
}
/**
@@ -363,7 +364,7 @@ public abstract class WebSocket {
return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
}
if (message instanceof CompletableFuture) {
- return ((CompletableFuture) message).thenCompose((json) -> _engine.node.sendMessage(convert, json, last, userids));
+ return ((CompletableFuture) message).thenCompose(json -> _engine.node.sendMessage(convert, json, last, userids));
}
CompletableFuture rs = _engine.node.sendMessage(convert, message, last, userids);
if (_engine.logger.isLoggable(Level.FINER)) {
@@ -468,12 +469,12 @@ public abstract class WebSocket {
*
* @return 为0表示成功, 其他值表示部分发送异常
*/
- public final CompletableFuture broadcastMessage(final WebSocketRange wsrange, final Convert convert, final Object message, final boolean last) {
+ public final CompletableFuture broadcastMessage(WebSocketRange wsrange, Convert convert, final Object message, final boolean last) {
if (_engine.node == null) {
return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
}
if (message instanceof CompletableFuture) {
- return ((CompletableFuture) message).thenCompose((json) -> _engine.node.broadcastMessage(wsrange, convert, json, last));
+ return ((CompletableFuture) message).thenCompose(json -> _engine.node.broadcastMessage(wsrange, convert, json, last));
}
CompletableFuture rs = _engine.node.broadcastMessage(wsrange, convert, message, last);
if (_engine.logger.isLoggable(Level.FINER)) {
@@ -854,7 +855,7 @@ public abstract class WebSocket {
* @return Future 可以为null, 为null或者Future值为false表示关闭新连接, Future值为true表示关闭旧连接
*/
public CompletableFuture onSingleRepeatConnect() {
- return forceCloseWebSocket(getUserid()).thenApply((r) -> true);
+ return forceCloseWebSocket(getUserid()).thenApply(r -> true);
}
/**
diff --git a/src/main/java/org/redkale/net/http/WebSocketEngine.java b/src/main/java/org/redkale/net/http/WebSocketEngine.java
index 93541f508..b0c0e2ec4 100644
--- a/src/main/java/org/redkale/net/http/WebSocketEngine.java
+++ b/src/main/java/org/redkale/net/http/WebSocketEngine.java
@@ -100,7 +100,8 @@ public class WebSocketEngine {
if (conf != null && conf.getAnyValue("properties") != null) {
props = conf.getAnyValue("properties");
}
- this.liveInterval = props == null ? (liveInterval < 0 ? DEFAILT_LIVEINTERVAL : liveInterval) : props.getIntValue(WEBPARAM_LIVEINTERVAL, (liveInterval < 0 ? DEFAILT_LIVEINTERVAL : liveInterval));
+ this.liveInterval = props == null ? (liveInterval < 0 ? DEFAILT_LIVEINTERVAL : liveInterval)
+ : props.getIntValue(WEBPARAM_LIVEINTERVAL, (liveInterval < 0 ? DEFAILT_LIVEINTERVAL : liveInterval));
if (liveInterval <= 0) {
return;
}
@@ -121,19 +122,21 @@ public class WebSocketEngine {
t.setDaemon(true);
return t;
});
- this.scheduler.setRemoveOnCancelPolicy(true);
+ this.scheduler.setRemoveOnCancelPolicy(true);
long delay = (liveInterval - System.currentTimeMillis() / 1000 % liveInterval) + index * 5;
final int intervalms = liveInterval * 1000;
scheduler.scheduleWithFixedDelay(() -> {
try {
long now = System.currentTimeMillis();
- getLocalWebSockets().stream().filter(x -> ((now - x.getLastReadTime()) > intervalms && (now - x.getLastSendTime()) > intervalms)).forEach(x -> x.sendPing());
+ getLocalWebSockets().stream().filter(x -> (now - Math.max(x.getLastReadTime(), x.getLastSendTime())) > intervalms)
+ .forEach(x -> x.sendPing());
} catch (Throwable t) {
logger.log(Level.SEVERE, "WebSocketEngine schedule(interval=" + liveInterval + "s) ping error", t);
}
}, delay, liveInterval, TimeUnit.SECONDS);
if (logger.isLoggable(Level.FINEST)) {
- logger.finest(this.getClass().getSimpleName() + "(" + engineid + ")" + " start keeplive(wsmaxconns:" + wsMaxConns + ", delay:" + delay + "s, interval:" + liveInterval + "s) scheduler executor");
+ logger.finest(this.getClass().getSimpleName() + "(" + engineid + ")"
+ + " start keeplive(wsmaxconns:" + wsMaxConns + ", delay:" + delay + "s, interval:" + liveInterval + "s) scheduler executor");
}
}
diff --git a/src/main/java/org/redkale/net/http/WebSocketNode.java b/src/main/java/org/redkale/net/http/WebSocketNode.java
index 323ea14b4..d155c239a 100644
--- a/src/main/java/org/redkale/net/http/WebSocketNode.java
+++ b/src/main/java/org/redkale/net/http/WebSocketNode.java
@@ -126,17 +126,23 @@ public abstract class WebSocketNode implements Service {
}
}
- protected abstract CompletableFuture> getWebSocketAddresses(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, Serializable userid);
+ protected abstract CompletableFuture> getWebSocketAddresses(@RpcTargetTopic String topic,
+ @RpcTargetAddress InetSocketAddress targetAddress, Serializable userid);
- protected abstract CompletableFuture sendMessage(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable... userids);
+ protected abstract CompletableFuture sendMessage(@RpcTargetTopic String topic,
+ @RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable... userids);
- protected abstract CompletableFuture broadcastMessage(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, WebSocketRange wsrange, Object message, boolean last);
+ protected abstract CompletableFuture broadcastMessage(@RpcTargetTopic String topic,
+ @RpcTargetAddress InetSocketAddress targetAddress, WebSocketRange wsrange, Object message, boolean last);
- protected abstract CompletableFuture sendAction(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action, Serializable... userids);
+ protected abstract CompletableFuture sendAction(@RpcTargetTopic String topic,
+ @RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action, Serializable... userids);
- protected abstract CompletableFuture broadcastAction(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action);
+ protected abstract CompletableFuture broadcastAction(@RpcTargetTopic String topic,
+ @RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action);
- protected abstract CompletableFuture getUserSize(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress);
+ protected abstract CompletableFuture getUserSize(@RpcTargetTopic String topic,
+ @RpcTargetAddress InetSocketAddress targetAddress);
protected abstract CompletableFuture connect(Serializable userid, WebSocketAddress wsaddr);
@@ -144,28 +150,33 @@ public abstract class WebSocketNode implements Service {
protected abstract CompletableFuture changeUserid(Serializable fromuserid, Serializable touserid, WebSocketAddress wsaddr);
- protected abstract CompletableFuture existsWebSocket(Serializable userid, @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress);
+ protected abstract CompletableFuture existsWebSocket(Serializable userid,
+ @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress);
- protected abstract CompletableFuture forceCloseWebSocket(Serializable userid, @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress);
+ protected abstract CompletableFuture forceCloseWebSocket(Serializable userid,
+ @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress);
//--------------------------------------------------------------------------------
final CompletableFuture connect(final Serializable userid) {
if (logger.isLoggable(Level.FINEST)) {
- logger.finest(wsNodeAddress + " receive websocket connect event (" + userid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ").");
+ logger.finest(wsNodeAddress + " receive websocket connect event (" + userid + " on "
+ + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ").");
}
return connect(userid, wsNodeAddress);
}
final CompletableFuture disconnect(final Serializable userid) {
if (logger.isLoggable(Level.FINEST)) {
- logger.finest(wsNodeAddress + " receive websocket disconnect event (" + userid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ").");
+ logger.finest(wsNodeAddress + " receive websocket disconnect event (" + userid + " on "
+ + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ").");
}
return disconnect(userid, wsNodeAddress);
}
final CompletableFuture changeUserid(Serializable olduserid, final Serializable newuserid) {
if (logger.isLoggable(Level.FINEST)) {
- logger.finest(wsNodeAddress + " receive websocket changeUserid event (from " + olduserid + " to " + newuserid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ").");
+ logger.finest(wsNodeAddress + " receive websocket changeUserid event (from " + olduserid
+ + " to " + newuserid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ").");
}
return changeUserid(olduserid, newuserid, wsNodeAddress);
}
@@ -185,7 +196,8 @@ public abstract class WebSocketNode implements Service {
*
* @return 客户端地址列表
*/
- protected CompletableFuture> remoteWebSocketAddresses(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, Serializable userid) {
+ protected CompletableFuture> remoteWebSocketAddresses(@RpcTargetTopic String topic,
+ @RpcTargetAddress InetSocketAddress targetAddress, Serializable userid) {
if (remoteNode == null) {
return CompletableFuture.completedFuture(null);
}
@@ -239,8 +251,9 @@ public abstract class WebSocketNode implements Service {
}
CompletableFuture