代码格式化

This commit is contained in:
redkale
2024-01-17 22:57:23 +08:00
parent 49f03f9701
commit 636927e4eb
13 changed files with 168 additions and 74 deletions

View File

@@ -14,6 +14,8 @@ import static java.lang.annotation.RetentionPolicy.*;
* value默认为"/" + Service的类名去掉Service字样的小写字符串 (如HelloService的默认路径为/hello)。 <br>
* <p>
* 详情见: https://redkale.org
*
* @see org.redkale.net.http.RestService
*
* @author zhangjx
*/

View File

@@ -10,7 +10,7 @@ import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
/**
* 标记在RestWebSocket的接收消息方法上; <br>
* 标记在{@link org.redkale.net.http.RestWebSocket}的接收消息方法上; <br>
* 注意:被标记的方法必须同时符合以下条件: <br>
* 1、必须修饰为public
* 2、不能修饰为final和static
@@ -19,6 +19,8 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
*
* <br><p>
* 详情见: https://redkale.org
*
* @see org.redkale.net.http.RestWebSocket
*
* @author zhangjx
*/

View File

@@ -13,6 +13,8 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
* 只能依附在Service类上name默认为Service的类名小写并去掉Service字样及后面的字符串 (如HelloService/HelloServiceImpl的默认路径为 hello)。
* <p>
* 详情见: https://redkale.org
*
* @see org.redkale.net.http.RestMapping
*
* @author zhangjx
*/

View File

@@ -5,9 +5,9 @@
*/
package org.redkale.net.http;
import java.lang.annotation.*;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.*;
import org.redkale.net.Cryptor;
/**
@@ -16,6 +16,8 @@ import org.redkale.net.Cryptor;
* name值支持含{system.property.xxx}模式
* <p>
* 详情见: https://redkale.org
*
* @see org.redkale.net.http.RestOnMessage
*
* @author zhangjx
*/

View File

@@ -225,7 +225,8 @@ public abstract class WebSocket<G extends Serializable, T> {
public final CompletableFuture<Integer> send(Convert convert, Object message, boolean last) {
final Convert c = convert == null ? getSendConvert() : convert;
if (message instanceof CompletableFuture) {
return ((CompletableFuture) message).thenCompose((json) -> sendPacket(new WebSocketPacket(c.isBinary() ? FrameType.BINARY : FrameType.TEXT, c.convertToBytes(json), last)));
return ((CompletableFuture) message).thenCompose(json
-> sendPacket(new WebSocketPacket(c.isBinary() ? FrameType.BINARY : FrameType.TEXT, c.convertToBytes(json), last)));
}
return sendPacket(new WebSocketPacket(c.isBinary() ? FrameType.BINARY : FrameType.TEXT, c.convertToBytes(message), last));
}
@@ -326,7 +327,7 @@ public abstract class WebSocket<G extends Serializable, T> {
* @return 为0表示成功 其他值表示异常
*/
public final CompletableFuture<Integer> sendMessage(Object message, boolean last, G... userids) {
return sendMessage((Convert) null, message, last, userids);
return sendMessage((Convert) null, message, last, (Serializable[]) userids);
}
/**
@@ -363,7 +364,7 @@ public abstract class WebSocket<G extends Serializable, T> {
return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
}
if (message instanceof CompletableFuture) {
return ((CompletableFuture) message).thenCompose((json) -> _engine.node.sendMessage(convert, json, last, userids));
return ((CompletableFuture) message).thenCompose(json -> _engine.node.sendMessage(convert, json, last, userids));
}
CompletableFuture<Integer> rs = _engine.node.sendMessage(convert, message, last, userids);
if (_engine.logger.isLoggable(Level.FINER)) {
@@ -468,12 +469,12 @@ public abstract class WebSocket<G extends Serializable, T> {
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> broadcastMessage(final WebSocketRange wsrange, final Convert convert, final Object message, final boolean last) {
public final CompletableFuture<Integer> broadcastMessage(WebSocketRange wsrange, Convert convert, final Object message, final boolean last) {
if (_engine.node == null) {
return CompletableFuture.completedFuture(RETCODE_NODESERVICE_NULL);
}
if (message instanceof CompletableFuture) {
return ((CompletableFuture) message).thenCompose((json) -> _engine.node.broadcastMessage(wsrange, convert, json, last));
return ((CompletableFuture) message).thenCompose(json -> _engine.node.broadcastMessage(wsrange, convert, json, last));
}
CompletableFuture<Integer> rs = _engine.node.broadcastMessage(wsrange, convert, message, last);
if (_engine.logger.isLoggable(Level.FINER)) {
@@ -854,7 +855,7 @@ public abstract class WebSocket<G extends Serializable, T> {
* @return Future 可以为null, 为null或者Future值为false表示关闭新连接 Future值为true表示关闭旧连接
*/
public CompletableFuture<Boolean> onSingleRepeatConnect() {
return forceCloseWebSocket(getUserid()).thenApply((r) -> true);
return forceCloseWebSocket(getUserid()).thenApply(r -> true);
}
/**

View File

@@ -100,7 +100,8 @@ public class WebSocketEngine {
if (conf != null && conf.getAnyValue("properties") != null) {
props = conf.getAnyValue("properties");
}
this.liveInterval = props == null ? (liveInterval < 0 ? DEFAILT_LIVEINTERVAL : liveInterval) : props.getIntValue(WEBPARAM_LIVEINTERVAL, (liveInterval < 0 ? DEFAILT_LIVEINTERVAL : liveInterval));
this.liveInterval = props == null ? (liveInterval < 0 ? DEFAILT_LIVEINTERVAL : liveInterval)
: props.getIntValue(WEBPARAM_LIVEINTERVAL, (liveInterval < 0 ? DEFAILT_LIVEINTERVAL : liveInterval));
if (liveInterval <= 0) {
return;
}
@@ -121,19 +122,21 @@ public class WebSocketEngine {
t.setDaemon(true);
return t;
});
this.scheduler.setRemoveOnCancelPolicy(true);
this.scheduler.setRemoveOnCancelPolicy(true);
long delay = (liveInterval - System.currentTimeMillis() / 1000 % liveInterval) + index * 5;
final int intervalms = liveInterval * 1000;
scheduler.scheduleWithFixedDelay(() -> {
try {
long now = System.currentTimeMillis();
getLocalWebSockets().stream().filter(x -> ((now - x.getLastReadTime()) > intervalms && (now - x.getLastSendTime()) > intervalms)).forEach(x -> x.sendPing());
getLocalWebSockets().stream().filter(x -> (now - Math.max(x.getLastReadTime(), x.getLastSendTime())) > intervalms)
.forEach(x -> x.sendPing());
} catch (Throwable t) {
logger.log(Level.SEVERE, "WebSocketEngine schedule(interval=" + liveInterval + "s) ping error", t);
}
}, delay, liveInterval, TimeUnit.SECONDS);
if (logger.isLoggable(Level.FINEST)) {
logger.finest(this.getClass().getSimpleName() + "(" + engineid + ")" + " start keeplive(wsmaxconns:" + wsMaxConns + ", delay:" + delay + "s, interval:" + liveInterval + "s) scheduler executor");
logger.finest(this.getClass().getSimpleName() + "(" + engineid + ")"
+ " start keeplive(wsmaxconns:" + wsMaxConns + ", delay:" + delay + "s, interval:" + liveInterval + "s) scheduler executor");
}
}

View File

@@ -126,17 +126,23 @@ public abstract class WebSocketNode implements Service {
}
}
protected abstract CompletableFuture<List<String>> getWebSocketAddresses(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, Serializable userid);
protected abstract CompletableFuture<List<String>> getWebSocketAddresses(@RpcTargetTopic String topic,
@RpcTargetAddress InetSocketAddress targetAddress, Serializable userid);
protected abstract CompletableFuture<Integer> sendMessage(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable... userids);
protected abstract CompletableFuture<Integer> sendMessage(@RpcTargetTopic String topic,
@RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable... userids);
protected abstract CompletableFuture<Integer> broadcastMessage(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, WebSocketRange wsrange, Object message, boolean last);
protected abstract CompletableFuture<Integer> broadcastMessage(@RpcTargetTopic String topic,
@RpcTargetAddress InetSocketAddress targetAddress, WebSocketRange wsrange, Object message, boolean last);
protected abstract CompletableFuture<Integer> sendAction(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action, Serializable... userids);
protected abstract CompletableFuture<Integer> sendAction(@RpcTargetTopic String topic,
@RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action, Serializable... userids);
protected abstract CompletableFuture<Integer> broadcastAction(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action);
protected abstract CompletableFuture<Integer> broadcastAction(@RpcTargetTopic String topic,
@RpcTargetAddress InetSocketAddress targetAddress, WebSocketAction action);
protected abstract CompletableFuture<Integer> getUserSize(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress);
protected abstract CompletableFuture<Integer> getUserSize(@RpcTargetTopic String topic,
@RpcTargetAddress InetSocketAddress targetAddress);
protected abstract CompletableFuture<Void> connect(Serializable userid, WebSocketAddress wsaddr);
@@ -144,28 +150,33 @@ public abstract class WebSocketNode implements Service {
protected abstract CompletableFuture<Void> changeUserid(Serializable fromuserid, Serializable touserid, WebSocketAddress wsaddr);
protected abstract CompletableFuture<Boolean> existsWebSocket(Serializable userid, @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress);
protected abstract CompletableFuture<Boolean> existsWebSocket(Serializable userid,
@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress);
protected abstract CompletableFuture<Integer> forceCloseWebSocket(Serializable userid, @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress);
protected abstract CompletableFuture<Integer> forceCloseWebSocket(Serializable userid,
@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress);
//--------------------------------------------------------------------------------
final CompletableFuture<Void> connect(final Serializable userid) {
if (logger.isLoggable(Level.FINEST)) {
logger.finest(wsNodeAddress + " receive websocket connect event (" + userid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ").");
logger.finest(wsNodeAddress + " receive websocket connect event (" + userid + " on "
+ (this.localEngine == null ? null : this.localEngine.getEngineid()) + ").");
}
return connect(userid, wsNodeAddress);
}
final CompletableFuture<Void> disconnect(final Serializable userid) {
if (logger.isLoggable(Level.FINEST)) {
logger.finest(wsNodeAddress + " receive websocket disconnect event (" + userid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ").");
logger.finest(wsNodeAddress + " receive websocket disconnect event (" + userid + " on "
+ (this.localEngine == null ? null : this.localEngine.getEngineid()) + ").");
}
return disconnect(userid, wsNodeAddress);
}
final CompletableFuture<Void> changeUserid(Serializable olduserid, final Serializable newuserid) {
if (logger.isLoggable(Level.FINEST)) {
logger.finest(wsNodeAddress + " receive websocket changeUserid event (from " + olduserid + " to " + newuserid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ").");
logger.finest(wsNodeAddress + " receive websocket changeUserid event (from " + olduserid
+ " to " + newuserid + " on " + (this.localEngine == null ? null : this.localEngine.getEngineid()) + ").");
}
return changeUserid(olduserid, newuserid, wsNodeAddress);
}
@@ -185,7 +196,8 @@ public abstract class WebSocketNode implements Service {
*
* @return 客户端地址列表
*/
protected CompletableFuture<List<String>> remoteWebSocketAddresses(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, Serializable userid) {
protected CompletableFuture<List<String>> remoteWebSocketAddresses(@RpcTargetTopic String topic,
@RpcTargetAddress InetSocketAddress targetAddress, Serializable userid) {
if (remoteNode == null) {
return CompletableFuture.completedFuture(null);
}
@@ -239,8 +251,9 @@ public abstract class WebSocketNode implements Service {
}
CompletableFuture<Map<WebSocketAddress, List<String>>> future = null;
for (final WebSocketAddress nodeAddress : addrs) {
CompletableFuture<Map<WebSocketAddress, List<String>>> mapFuture = getWebSocketAddresses(nodeAddress.getTopic(), nodeAddress.getAddr(), userid)
.thenCompose((List<String> list) -> CompletableFuture.completedFuture(Utility.ofMap(nodeAddress, list)));
CompletableFuture<Map<WebSocketAddress, List<String>>> mapFuture
= getWebSocketAddresses(nodeAddress.getTopic(), nodeAddress.getAddr(), userid)
.thenCompose((List<String> list) -> CompletableFuture.completedFuture(Utility.ofMap(nodeAddress, list)));
future = future == null ? mapFuture : future.thenCombine(mapFuture, (a, b) -> Utility.merge(a, b));
}
return future == null ? CompletableFuture.completedFuture(new HashMap<>()) : future;
@@ -268,7 +281,8 @@ public abstract class WebSocketNode implements Service {
if (this.localEngine != null && this.source == null) { //本地模式且没有分布式
return CompletableFuture.completedFuture(this.localEngine.getLocalUserSize());
}
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : CompletableFuture.completedFuture(this.localEngine.getLocalUserSize());
CompletableFuture<Integer> localFuture = this.localEngine == null ? null
: CompletableFuture.completedFuture(this.localEngine.getLocalUserSize());
tryAcquireSemaphore();
CompletableFuture<Set<WebSocketAddress>> addrsFuture = source.smembersAsync(WS_SOURCE_KEY_NODES, WebSocketAddress.class);
if (semaphore != null) {
@@ -302,11 +316,13 @@ public abstract class WebSocketNode implements Service {
*/
public CompletableFuture<Set<String>> getUserSet() {
if (this.localEngine != null && this.source == null) {
return CompletableFuture.completedFuture(new LinkedHashSet<>(this.localEngine.getLocalUserSet().stream().map(x -> String.valueOf(x)).collect(Collectors.toList())));
return CompletableFuture.completedFuture(new LinkedHashSet<>(this.localEngine.getLocalUserSet()
.stream().map(x -> String.valueOf(x)).collect(Collectors.toList())));
}
tryAcquireSemaphore();
CompletableFuture<List<String>> listFuture = this.source.keysStartsWithAsync(WS_SOURCE_KEY_USERID_PREFIX);
CompletableFuture<Set<String>> rs = listFuture.thenApply(v -> new LinkedHashSet<>(v.stream().map(x -> x.substring(WS_SOURCE_KEY_USERID_PREFIX.length())).collect(Collectors.toList())));
CompletableFuture<Set<String>> rs = listFuture.thenApply(v -> new LinkedHashSet<>(v.stream()
.map(x -> x.substring(WS_SOURCE_KEY_USERID_PREFIX.length())).collect(Collectors.toList())));
if (semaphore != null) {
rs.whenComplete((r, e) -> releaseSemaphore());
}
@@ -374,7 +390,9 @@ public abstract class WebSocketNode implements Service {
}
if (this.source == null || this.remoteNode == null) {
if (logger.isLoggable(Level.FINEST)) {
logger.finest("websocket " + (this.remoteNode == null ? (this.source == null ? "remote and source" : "remote") : "source") + " node is null");
logger.finest("websocket "
+ (this.remoteNode == null ? (this.source == null ? "remote and source" : "remote") : "source")
+ " node is null");
}
//没有CacheSource就不会有分布式节点
return CompletableFuture.completedFuture(false);
@@ -434,7 +452,9 @@ public abstract class WebSocketNode implements Service {
}
if (this.source == null || this.remoteNode == null) {
if (logger.isLoggable(Level.FINEST)) {
logger.finest("websocket " + (this.remoteNode == null ? (this.source == null ? "remote and source" : "remote") : "source") + " node is null");
logger.finest("websocket "
+ (this.remoteNode == null ? (this.source == null ? "remote and source" : "remote") : "source")
+ " node is null");
}
//没有CacheSource就不会有分布式节点
return localFuture == null ? CompletableFuture.completedFuture(0) : localFuture;
@@ -564,7 +584,7 @@ public abstract class WebSocketNode implements Service {
* @return 为0表示成功 其他值表示部分发送异常
*/
@Local
public final CompletableFuture<Integer> sendMessage(final Object message, final boolean last, final Stream<? extends Serializable> useridOrAddrs) {
public final CompletableFuture<Integer> sendMessage(Object message, boolean last, Stream<? extends Serializable> useridOrAddrs) {
return sendMessage((Convert) null, message, last, useridOrAddrs);
}
@@ -595,7 +615,7 @@ public abstract class WebSocketNode implements Service {
* @return 为0表示成功 其他值表示部分发送异常
*/
@Local
public final CompletableFuture<Integer> sendMessage(final Convert convert, final Object message0, final boolean last, final Stream<? extends Serializable> userids) {
public final CompletableFuture<Integer> sendMessage(Convert convert, Object message0, boolean last, Stream<? extends Serializable> userids) {
Object[] array = userids.toArray();
Serializable[] ss = new Serializable[array.length];
for (int i = 0; i < array.length; i++) {
@@ -630,7 +650,10 @@ public abstract class WebSocketNode implements Service {
if (message0 instanceof CompletableFuture) {
return ((CompletableFuture) message0).thenApply(msg -> sendMessage(convert, msg, last, userids));
}
final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(WebSocketPacket.FrameType.TEXT, ((TextConvert) convert).convertToBytes(message0), last) : new WebSocketPacket(WebSocketPacket.FrameType.BINARY, ((BinaryConvert) convert).convertToBytes(message0), last));
final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0
: ((convert instanceof TextConvert) ? new WebSocketPacket(WebSocketPacket.FrameType.TEXT,
((TextConvert) convert).convertToBytes(message0), last) : new WebSocketPacket(WebSocketPacket.FrameType.BINARY,
((BinaryConvert) convert).convertToBytes(message0), last));
if (this.localEngine != null && this.source == null) { //本地模式且没有分布式
return this.localEngine.sendLocalMessage(message, last, userids);
}
@@ -664,7 +687,8 @@ public abstract class WebSocketNode implements Service {
}
});
if (logger.isLoggable(Level.FINEST)) {
logger.finest("websocket(localaddr=" + localSncpAddress + ", userids=" + JsonConvert.root().convertTo(userids) + ") found message-addr-userids: " + addrUsers);
logger.finest("websocket(localaddr=" + localSncpAddress + ", userids="
+ JsonConvert.root().convertTo(userids) + ") found message-addr-userids: " + addrUsers);
}
CompletableFuture<Integer> future = null;
for (Map.Entry<WebSocketAddress, List<Serializable>> en : addrUsers.entrySet()) {
@@ -690,14 +714,16 @@ public abstract class WebSocketNode implements Service {
* @return 为0表示成功 其他值表示部分发送异常
*/
@Local
public CompletableFuture<Integer> sendMessage(final Convert convert, final Object message0, final boolean last, final WebSocketUserAddress... useraddrs) {
public CompletableFuture<Integer> sendMessage(Convert convert, Object message0, boolean last, WebSocketUserAddress... useraddrs) {
if (useraddrs == null || useraddrs.length < 1) {
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
}
if (message0 instanceof CompletableFuture) {
return ((CompletableFuture) message0).thenApply(msg -> sendMessage(convert, msg, last, useraddrs));
}
final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last));
final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0
: ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last)
: new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last));
if (this.localEngine != null && this.source == null) { //本地模式且没有分布式
return this.localEngine.sendLocalMessage(message, last, userAddressToUserids(useraddrs));
}
@@ -705,7 +731,8 @@ public abstract class WebSocketNode implements Service {
final Object remoteMessage = formatRemoteMessage(message);
final Map<WebSocketAddress, List<Serializable>> addrUsers = userAddressToAddrMap(useraddrs);
if (logger.isLoggable(Level.FINEST)) {
logger.finest("websocket(localaddr=" + localSncpAddress + ", useraddrs=" + JsonConvert.root().convertTo(useraddrs) + ") found message-addr-userids: " + addrUsers);
logger.finest("websocket(localaddr=" + localSncpAddress + ", useraddrs=" + JsonConvert.root().convertTo(useraddrs)
+ ") found message-addr-userids: " + addrUsers);
}
CompletableFuture<Integer> future = null;
for (Map.Entry<WebSocketAddress, List<Serializable>> en : addrUsers.entrySet()) {
@@ -722,7 +749,10 @@ public abstract class WebSocketNode implements Service {
return ((CompletableFuture) message).thenApply(msg -> sendOneUserMessage(msg, last, userid));
}
if (logger.isLoggable(Level.FINEST)) {
logger.finest("websocket want send message {userid:" + userid + ", content:" + (message instanceof WebSocketPacket ? ((WebSocketPacket) message).toSimpleString() : (message instanceof CharSequence ? message : JsonConvert.root().convertTo(message))) + "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine");
logger.finest("websocket want send message {userid:" + userid + ", content:"
+ (message instanceof WebSocketPacket ? ((WebSocketPacket) message).toSimpleString()
: (message instanceof CharSequence ? message : JsonConvert.root().convertTo(message)))
+ "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine");
}
CompletableFuture<Integer> localFuture = null;
if (this.localEngine != null) {
@@ -730,7 +760,9 @@ public abstract class WebSocketNode implements Service {
}
if (this.source == null || this.remoteNode == null) {
if (logger.isLoggable(Level.FINEST)) {
logger.finest("websocket " + (this.remoteNode == null ? (this.source == null ? "remote and source" : "remote") : "source") + " node is null");
logger.finest("websocket "
+ (this.remoteNode == null ? (this.source == null ? "remote and source" : "remote") : "source")
+ " node is null");
}
//没有CacheSource就不会有分布式节点
return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture;
@@ -770,14 +802,21 @@ public abstract class WebSocketNode implements Service {
return ((CompletableFuture) message).thenApply(msg -> sendOneAddrMessage(addr, msg, last, userids));
}
if (logger.isLoggable(Level.FINEST) && this.localEngine == null) { //只打印远程模式的
logger.finest("websocket want send message {userids:" + JsonConvert.root().convertTo(userids) + ", sncpaddr:" + addr + ", content:" + (message instanceof WebSocketPacket ? ((WebSocketPacket) message).toSimpleString() : (message instanceof CharSequence ? message : JsonConvert.root().convertTo(message))) + "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine");
logger.finest("websocket want send message {userids:" + JsonConvert.root().convertTo(userids)
+ ", sncpaddr:" + addr + ", content:"
+ (message instanceof WebSocketPacket ? ((WebSocketPacket) message).toSimpleString()
: (message instanceof CharSequence ? message : JsonConvert.root().convertTo(message)))
+ "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine");
}
if (Objects.equals(addr, this.wsNodeAddress)) {
return this.localEngine == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localEngine.sendLocalMessage(message, last, userids);
return this.localEngine == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY)
: localEngine.sendLocalMessage(message, last, userids);
}
if (this.source == null || this.remoteNode == null) {
if (logger.isLoggable(Level.FINEST)) {
logger.finest("websocket " + (this.remoteNode == null ? (this.source == null ? "remote and source" : "remote") : "source") + " node is null");
logger.finest("websocket "
+ (this.remoteNode == null ? (this.source == null ? "remote and source" : "remote") : "source")
+ " node is null");
}
//没有CacheSource就不会有分布式节点
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
@@ -918,11 +957,13 @@ public abstract class WebSocketNode implements Service {
* @return 为0表示成功 其他值表示部分发送异常
*/
@Local
public CompletableFuture<Integer> broadcastMessage(final WebSocketRange wsrange, final Convert convert, final Object message0, final boolean last) {
public CompletableFuture<Integer> broadcastMessage(WebSocketRange wsrange, Convert convert, Object message0, final boolean last) {
if (message0 instanceof CompletableFuture) {
return ((CompletableFuture) message0).thenApply(msg -> broadcastMessage(wsrange, convert, msg, last));
}
final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0 : ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last) : new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last));
final Object message = (convert == null || message0 instanceof WebSocketPacket) ? message0
: ((convert instanceof TextConvert) ? new WebSocketPacket(((TextConvert) convert).convertTo(message0), last)
: new WebSocketPacket(((BinaryConvert) convert).convertTo(message0), last));
if (this.localEngine != null && this.source == null) { //本地模式且没有分布式
return this.localEngine.broadcastLocalMessage(wsrange, message, last);
}
@@ -1044,7 +1085,8 @@ public abstract class WebSocketNode implements Service {
}
});
if (logger.isLoggable(Level.FINEST)) {
logger.finest("websocket(localaddr=" + localSncpAddress + ", userids=" + JsonConvert.root().convertTo(userids) + ") found action-userid-addrs: " + addrUsers);
logger.finest("websocket(localaddr=" + localSncpAddress + ", userids=" + JsonConvert.root().convertTo(userids)
+ ") found action-userid-addrs: " + addrUsers);
}
CompletableFuture<Integer> future = null;
for (Map.Entry<WebSocketAddress, List<Serializable>> en : addrUsers.entrySet()) {
@@ -1078,7 +1120,8 @@ public abstract class WebSocketNode implements Service {
final Map<WebSocketAddress, List<Serializable>> addrUsers = userAddressToAddrMap(useraddrs);
if (logger.isLoggable(Level.FINEST)) {
logger.finest("websocket(localaddr=" + localSncpAddress + ", useraddrs=" + JsonConvert.root().convertTo(useraddrs) + ") found action-userid-addrs: " + addrUsers);
logger.finest("websocket(localaddr=" + localSncpAddress
+ ", useraddrs=" + JsonConvert.root().convertTo(useraddrs) + ") found action-userid-addrs: " + addrUsers);
}
CompletableFuture<Integer> future = null;
for (Map.Entry<WebSocketAddress, List<Serializable>> en : addrUsers.entrySet()) {
@@ -1091,7 +1134,8 @@ public abstract class WebSocketNode implements Service {
protected CompletableFuture<Integer> sendOneUserAction(final WebSocketAction action, final Serializable userid) {
if (logger.isLoggable(Level.FINEST)) {
logger.finest("websocket want send action {userid:" + userid + ", action:" + action + "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine");
logger.finest("websocket want send action {userid:" + userid + ", action:" + action
+ "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine");
}
CompletableFuture<Integer> localFuture = null;
if (this.localEngine != null) {
@@ -1099,7 +1143,9 @@ public abstract class WebSocketNode implements Service {
}
if (this.source == null || this.remoteNode == null) {
if (logger.isLoggable(Level.FINEST)) {
logger.finest("websocket " + (this.remoteNode == null ? (this.source == null ? "remote and source" : "remote") : "source") + " node is null");
logger.finest("websocket "
+ (this.remoteNode == null ? (this.source == null ? "remote and source" : "remote") : "source")
+ " node is null");
}
//没有CacheSource就不会有分布式节点
return localFuture == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localFuture;
@@ -1133,16 +1179,20 @@ public abstract class WebSocketNode implements Service {
return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b);
}
protected CompletableFuture<Integer> sendOneAddrAction(final WebSocketAddress addr, final WebSocketAction action, final Serializable... userids) {
protected CompletableFuture<Integer> sendOneAddrAction(WebSocketAddress addr, WebSocketAction action, Serializable... userids) {
if (logger.isLoggable(Level.FINEST) && this.localEngine == null) { //只打印远程模式的
logger.finest("websocket want send action {userids:" + JsonConvert.root().convertTo(userids) + ", sncpaddr:" + addr + ", action:" + action + " from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine");
logger.finest("websocket want send action {userids:" + JsonConvert.root().convertTo(userids)
+ ", sncpaddr:" + addr + ", action:" + action + " from locale node to "
+ ((this.localEngine != null) ? "locale" : "remote") + " engine");
}
if (Objects.equals(addr, this.wsNodeAddress)) {
return this.localEngine == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : localEngine.sendLocalAction(action, userids);
}
if (this.source == null || this.remoteNode == null) {
if (logger.isLoggable(Level.FINEST)) {
logger.finest("websocket " + (this.remoteNode == null ? (this.source == null ? "remote and source" : "remote") : "source") + " node is null");
logger.finest("websocket "
+ (this.remoteNode == null ? (this.source == null ? "remote and source" : "remote") : "source")
+ " node is null");
}
//没有CacheSource就不会有分布式节点
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);

View File

@@ -7,8 +7,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
import org.redkale.annotation.*;
import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY;
import org.redkale.net.http.WebSocketNodeService;
import org.redkale.service.*;
import org.redkale.service.RpcTargetAddress;
import org.redkale.service.RpcTargetTopic;
import org.redkale.service.Service;
import org.redkale.util.AnyValue;
/**
@@ -37,8 +38,10 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
}
@Override
public CompletableFuture<List<String>> getWebSocketAddresses(@RpcTargetTopic String topic, final @RpcTargetAddress InetSocketAddress targetAddress, final Serializable groupid) {
if ((topic == null || !topic.equals(this.wsNodeAddress.getTopic())) && (localSncpAddress == null || !localSncpAddress.equals(targetAddress))) {
public CompletableFuture<List<String>> getWebSocketAddresses(@RpcTargetTopic String topic,
final @RpcTargetAddress InetSocketAddress targetAddress, final Serializable groupid) {
if ((topic == null || !topic.equals(this.wsNodeAddress.getTopic()))
&& (localSncpAddress == null || !localSncpAddress.equals(targetAddress))) {
return remoteWebSocketAddresses(topic, targetAddress, groupid);
}
if (this.localEngine == null) {
@@ -50,7 +53,8 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
}
@Override
public CompletableFuture<Integer> sendMessage(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable... userids) {
public CompletableFuture<Integer> sendMessage(@RpcTargetTopic String topic,
@RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable... userids) {
if (this.localEngine == null) {
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
}
@@ -58,7 +62,8 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
}
@Override
public CompletableFuture<Integer> broadcastMessage(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketRange wsrange, Object message, boolean last) {
public CompletableFuture<Integer> broadcastMessage(@RpcTargetTopic String topic,
@RpcTargetAddress InetSocketAddress targetAddress, final WebSocketRange wsrange, Object message, boolean last) {
if (this.localEngine == null) {
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
}
@@ -66,7 +71,8 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
}
@Override
public CompletableFuture<Integer> sendAction(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action, Serializable... userids) {
public CompletableFuture<Integer> sendAction(@RpcTargetTopic String topic,
@RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action, Serializable... userids) {
if (this.localEngine == null) {
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
}
@@ -74,7 +80,8 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
}
@Override
public CompletableFuture<Integer> broadcastAction(@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action) {
public CompletableFuture<Integer> broadcastAction(@RpcTargetTopic String topic,
@RpcTargetAddress InetSocketAddress targetAddress, final WebSocketAction action) {
if (this.localEngine == null) {
return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
}
@@ -164,7 +171,8 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
* @return 无返回值
*/
@Override
public CompletableFuture<Boolean> existsWebSocket(Serializable userid, @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress) {
public CompletableFuture<Boolean> existsWebSocket(Serializable userid,
@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress) {
if (logger.isLoggable(Level.FINEST)) {
logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " existsWebSocket from " + targetAddress);
}
@@ -184,7 +192,8 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
* @return 无返回值
*/
@Override
public CompletableFuture<Integer> forceCloseWebSocket(Serializable userid, @RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress) {
public CompletableFuture<Integer> forceCloseWebSocket(Serializable userid,
@RpcTargetTopic String topic, @RpcTargetAddress InetSocketAddress targetAddress) {
//不能从sncpNodeAddresses中移除因为engine.forceCloseWebSocket 会调用到disconnect
if (logger.isLoggable(Level.FINEST)) {
logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " forceCloseWebSocket from " + targetAddress);

View File

@@ -54,7 +54,8 @@ public class WebSocketReadHandler implements CompletionHandler<Integer, ByteBuff
protected AsyncIOThread ioReadThread;
public WebSocketReadHandler(HttpContext context, WebSocket webSocket, ObjectPool<ByteArray> byteArrayPool, BiConsumer<WebSocket, Object> messageConsumer) {
public WebSocketReadHandler(HttpContext context, WebSocket webSocket, ObjectPool<ByteArray> byteArrayPool,
BiConsumer<WebSocket, Object> messageConsumer) {
this.context = context;
this.webSocket = webSocket;
this.byteArrayPool = byteArrayPool;
@@ -299,7 +300,8 @@ public class WebSocketReadHandler implements CompletionHandler<Integer, ByteBuff
boolean debug = context.getLogger().isLoggable(Level.FINEST);
if (count < 1) {
if (debug) {
logger.log(Level.FINEST, "WebSocket(" + webSocket + ") abort on read buffer count, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreateTime()) / 1000 + " seconds");
logger.log(Level.FINEST, "WebSocket(" + webSocket + ") abort on read buffer count, force to close channel, live "
+ (System.currentTimeMillis() - webSocket.getCreateTime()) / 1000 + " seconds");
}
webSocket.kill(CLOSECODE_ILLPACKET, "read buffer count is " + count);
return;
@@ -322,7 +324,8 @@ public class WebSocketReadHandler implements CompletionHandler<Integer, ByteBuff
if (restMessageConsumer != null && convert != null) { //主要供RestWebSocket使用
restMessageConsumer.accept(webSocket, convert.convertFrom(webSocket._messageRestType, packet.getPayload()));
} else {
webSocket.onMessage(packet.getPayload() == null ? null : new String(packet.getPayload(), StandardCharsets.UTF_8), packet.last);
webSocket.onMessage(packet.getPayload() == null ? null
: new String(packet.getPayload(), StandardCharsets.UTF_8), packet.last);
}
} catch (Throwable e) {
logger.log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e);
@@ -388,7 +391,8 @@ public class WebSocketReadHandler implements CompletionHandler<Integer, ByteBuff
}
if (exc != null) {
if (context.getLogger().isLoggable(Level.FINEST)) {
context.getLogger().log(Level.FINEST, "WebSocket(" + webSocket + ") read WebSocketPacket failed, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreateTime()) / 1000 + " seconds", exc);
context.getLogger().log(Level.FINEST, "WebSocket(" + webSocket + ") read WebSocketPacket failed, force to close channel, live "
+ (System.currentTimeMillis() - webSocket.getCreateTime()) / 1000 + " seconds", exc);
}
webSocket.kill(CLOSECODE_WSEXCEPTION, "read websocket-packet failed");
} else {

View File

@@ -194,8 +194,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
return;
}
//存在WebSocketServlet则此WebSocketNode必须是本地模式Service
this.webSocketNode.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]",
this.single, context, liveinterval, wsmaxconns, wsthreads, wsmaxbody, this.cryptor, this.webSocketNode, this.sendConvert, logger);
String id = "WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]";
this.webSocketNode.localEngine = new WebSocketEngine(id, this.single, context,
liveinterval, wsmaxconns, wsthreads, wsmaxbody, this.cryptor, this.webSocketNode, this.sendConvert, logger);
this.webSocketNode.init(conf);
this.webSocketNode.localEngine.init(conf);
@@ -222,7 +223,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
final boolean debug = logger.isLoggable(Level.FINEST);
if (!request.isWebSocket()) {
if (debug) {
logger.log(Level.FINEST, "WebSocket connect abort, (Not GET Method) or (Connection != Upgrade) or (Upgrade != websocket). request=" + request);
logger.log(Level.FINEST, "WebSocket connect abort, (Not GET Method)/(Connection!=Upgrade)/(Upgrade!=websocket). request=" + request);
}
response.finish(true);
return;
@@ -305,7 +306,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
Traces.currentTraceid(request.getTraceid());
if ((userid == null && webSocket.delayPackets == null) || ex2 != null) {
if (debug || ex2 != null) {
logger.log(ex2 == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create userid abort. request = " + request, ex2);
logger.log(ex2 == null ? Level.FINEST : Level.FINE,
"WebSocket connect abort, Create userid abort. request = " + request, ex2);
}
response.finish(true);
return;
@@ -356,7 +358,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
List<WebSocketPacket> delayPackets = webSocket.delayPackets;
webSocket.delayPackets = null;
//CompletableFuture<Integer> cf = webSocket._writeIOThread.send(webSocket, delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
CompletableFuture<Integer> cf = webSocket._writeHandler.send(delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
CompletableFuture<Integer> cf = webSocket._writeHandler
.send(delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
cf.whenComplete((Integer v, Throwable t) -> {
Traces.currentTraceid(request.getTraceid());
if (userid == null || t != null) {

View File

@@ -99,7 +99,8 @@ public class WebSocketWriteHandler implements CompletionHandler<Integer, Void> {
}
webSocket.kill(RETCODE_SENDEXCEPTION, "websocket send message failed on CompletionHandler");
if (exc != null && context.getLogger().isLoggable(Level.FINER)) {
context.getLogger().log(Level.FINER, "WebSocket sendMessage on CompletionHandler failed, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreateTime()) / 1000 + " seconds", exc);
context.getLogger().log(Level.FINER, "WebSocket sendMessage on CompletionHandler failed, force to close channel, live "
+ (System.currentTimeMillis() - webSocket.getCreateTime()) / 1000 + " seconds", exc);
}
}

View File

@@ -27,5 +27,18 @@ import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
public @interface Sql {
/**
* 原生sql语句
*
* @return sql
*/
String value();
/**
* 备注说明
*
* @return 备注说明
*/
String comment() default "";
}

View File

@@ -11,11 +11,12 @@ import org.redkale.net.http.WebSocketNode;
import org.redkale.util.AnyValue;
/**
* 由 org.redkale.net.http.WebSocketNodeService 代替
* 由 {@link org.redkale.net.http.WebSocketNodeService} 代替
*
* <p>
* 详情见: https://redkale.org
*
* @see org.redkale.net.http.WebSocketNodeService
* @deprecated 2.6.0
* @author zhangjx
*/
@@ -27,6 +28,7 @@ public class WebSocketNodeService extends org.redkale.net.http.WebSocketNodeServ
@Override
public void init(AnyValue conf) {
super.init(conf);
logger.log(Level.WARNING, WebSocketNodeService.class.getName() + "is replaced by " + org.redkale.net.http.WebSocketNodeService.class.getName());
logger.log(Level.WARNING, WebSocketNodeService.class.getName()
+ "is replaced by " + org.redkale.net.http.WebSocketNodeService.class.getName());
}
}