临时修改

This commit is contained in:
Redkale
2017-05-17 02:32:36 +08:00
parent f76ab977d1
commit bb87774ba9
5 changed files with 78 additions and 97 deletions

View File

@@ -79,21 +79,21 @@ public final class WebSocketEngine {
if (group == null) {
group = new WebSocketGroup(socket._groupid);
containers.putIfAbsent(socket._groupid, group);
if (node != null) node.connect(socket._groupid);
}
group.add(socket);
if (node != null) node.connect(socket._groupid, engineid);
}
void remove(WebSocket socket) { //非线程安全, 在常规场景中无需锁
final WebSocketGroup group = containers.get(socket._groupid);
if (group == null) {
if (node != null) node.disconnect(socket._groupid, engineid);
if (node != null) node.disconnect(socket._groupid);
return;
}
group.remove(socket);
if (group.isEmpty()) {
containers.remove(socket._groupid);
if (node != null) node.disconnect(socket._groupid, engineid);
if (node != null) node.disconnect(socket._groupid);
}
}

View File

@@ -40,13 +40,8 @@ public abstract class WebSocketNode {
@Resource(name = "$")
protected CacheSource<Serializable, InetSocketAddress> sncpAddressNodes;
//protected WebSocketEngine onlyoneEngine;
//存放本地节点上所有WebSocketEngine
protected final ConcurrentHashMap<String, WebSocketEngine> localEngines = new ConcurrentHashMap();
//存放本地节点上所有在线用户的队列信息,Set<String> 为 engineid 的集合, key: groupid
protected final ConcurrentHashMap<Serializable, Set<String>> localEngineids = new ConcurrentHashMap();
//当前节点的本地WebSocketEngine
protected WebSocketEngine _localEngine;
public void init(AnyValue conf) {
@@ -57,12 +52,8 @@ public abstract class WebSocketNode {
}
public final void postDestroy(AnyValue conf) {
HashMap<Serializable, Set<String>> engines = new HashMap<>(localEngineids);
engines.forEach((k, v) -> {
new HashSet<>(v).forEach(e -> {
if (localEngines.containsKey(e)) disconnect(k, e);
});
});
if (this._localEngine == null) return;
this._localEngine.getWebSocketGroups().forEach(g -> disconnect(g.getGroupid()));
}
protected abstract CompletableFuture<List<String>> getOnlineRemoteAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid);
@@ -74,41 +65,25 @@ public abstract class WebSocketNode {
protected abstract CompletableFuture<Void> disconnect(Serializable groupid, InetSocketAddress addr);
//--------------------------------------------------------------------------------
final void connect(final Serializable groupid, final String engineid) {
if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + groupid + " on " + engineid + ").");
Set<String> engineids = localEngineids.get(groupid);
if (engineids == null) {
engineids = new CopyOnWriteArraySet<>();
localEngineids.putIfAbsent(groupid, engineids);
}
final Set<String> engineids0 = engineids;
if (localSncpAddress != null && engineids.isEmpty()) {
CompletableFuture<Void> future = connect(groupid, localSncpAddress);
if (future != null) {
future.whenComplete((u, e) -> { //成功才记录
if (e != null) engineids0.add(engineid);
});
}
}
final CompletableFuture<Void> connect(final Serializable groupid) {
if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + groupid + " on " + this._localEngine.getEngineid() + ").");
return connect(groupid, localSncpAddress);
}
final void disconnect(Serializable groupid, String engineid) {
if (finest) logger.finest(localSncpAddress + " receive websocket disconnect event (" + groupid + " on " + engineid + ").");
Set<String> engineids = localEngineids.get(groupid);
if (engineids == null || engineids.isEmpty()) return;
engineids.remove(engineid);
if (engineids.isEmpty()) {
localEngineids.remove(groupid);
if (localSncpAddress != null) disconnect(groupid, localSncpAddress);
}
}
final void putWebSocketEngine(WebSocketEngine engine) {
localEngines.put(engine.getEngineid(), engine);
final CompletableFuture<Void> disconnect(Serializable groupid) {
if (finest) logger.finest(localSncpAddress + " receive websocket disconnect event (" + groupid + " on " + this._localEngine.getEngineid() + ").");
return disconnect(groupid, localSncpAddress);
}
//--------------------------------------------------------------------------------
/**
* 获取目标地址
*
* @param targetAddress
* @param groupid
*
* @return
*/
protected CompletableFuture<List<String>> remoteOnlineRemoteAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable groupid) {
if (remoteNode == null) return CompletableFuture.completedFuture(null);
try {
@@ -158,28 +133,25 @@ public abstract class WebSocketNode {
return rs;
}
/**
* 向指定用户发送消息,先发送本地连接,再发送远程连接 <br>
* 如果当前WebSocketNode是远程模式此方法只发送远程连接
*
* @param groupid String
* @param recent 是否只发送给最近接入的WebSocket节点
* @param message 消息内容
* @param last 是否最后一条
*
* @return 为0表示成功 其他值表示异常
*/
//异步待优化
public final CompletableFuture<Integer> sendMessage(final Serializable groupid, final boolean recent, final Object message, final boolean last) {
return CompletableFuture.supplyAsync(() -> {
final Set<String> engineids = localEngineids.get(groupid);
if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to " + engineids);
if (finest) logger.finest("websocket want send message {groupid:" + groupid + ", content:'" + message + "'} from locale node to locale engine");
int rscode = RETCODE_GROUP_EMPTY;
if (engineids != null && !engineids.isEmpty()) {
for (String engineid : engineids) {
final WebSocketEngine engine = localEngines.get(engineid);
if (engine != null) { //在本地
final WebSocketGroup group = engine.getWebSocketGroup(groupid);
if (group == null || group.isEmpty()) {
engineids.remove(engineid);
if (finest) logger.finest("websocket want send message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + message + "'} but websocket group is empty ");
rscode = RETCODE_GROUP_EMPTY;
break;
}
rscode = group.send(recent, message, last);
}
}
}
if ((recent && rscode == 0) || remoteNode == null || sncpAddressNodes == null) {
WebSocketGroup group = this._localEngine == null ? null : this._localEngine.getWebSocketGroup(groupid);
if (group != null) rscode = group.send(recent, message, last);
if ((recent && rscode == 0) || remoteNode == null || sncpAddressNodes == null) { //没有其他远程的WebSocket连接
if (finest) {
if ((recent && rscode == 0)) {
logger.finest("websocket want send recent message success");
@@ -189,8 +161,9 @@ public abstract class WebSocketNode {
}
return rscode;
}
if (this.sncpAddressNodes == null || this.remoteNode == null) return rscode; //没有CacheSource就不会有分布式节点
//-----------------------发送远程的-----------------------------
Collection<InetSocketAddress> addrs = sncpAddressNodes.getCollection(groupid);
Collection<InetSocketAddress> addrs = sncpAddressNodes.getCollectionAsync(groupid).join();
if (finest) logger.finest("websocket found groupid:" + groupid + " on " + addrs);
if (addrs != null && !addrs.isEmpty()) { //对方连接在远程节点(包含本地节点)所以正常情况下addrs不会为空。
if (recent) {

View File

@@ -18,8 +18,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.*;
/**
* WebSocket的消息接收发送器, 一个WebSocket对应一个WebSocketRunner
*
* <p>
* 详情见: https://redkale.org
*
* <p> 详情见: https://redkale.org
* @author zhangjx
*/
public class WebSocketRunner implements Runnable {
@@ -386,6 +389,7 @@ public class WebSocketRunner implements Runnable {
*
* @param buffer
* @param exbuffers
*
* @return
*/
public WebSocketPacket decode(final ByteBuffer buffer, ByteBuffer... exbuffers) {

View File

@@ -13,7 +13,7 @@ import java.util.*;
import java.util.logging.*;
import javax.annotation.*;
import org.redkale.convert.json.JsonConvert;
import org.redkale.service.WebSocketNodeService;
import org.redkale.service.*;
import org.redkale.util.*;
/**
@@ -76,11 +76,11 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
InetSocketAddress addr = context.getServerAddress();
this.engine = new WebSocketEngine(addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.node, logger);
if (this.node == null) this.node = createWebSocketNode();
if (this.node == null) {
if (this.node == null) { //没有部署SNCP即不是分布式
this.node = new WebSocketNodeService();
if (logger.isLoggable(Level.WARNING)) logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName());
}
this.node.putWebSocketEngine(engine);
this.node._localEngine = engine; //存在WebSocketServlet则此WebSocketNode必须是本地模式Service
this.node.init(conf);
this.engine.init(conf);
}

View File

@@ -37,16 +37,11 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
@Override
public CompletableFuture<List<String>> getOnlineRemoteAddresses(final @RpcTargetAddress InetSocketAddress targetAddress, final Serializable groupid) {
if (localSncpAddress == null || !localSncpAddress.equals(targetAddress)) return remoteOnlineRemoteAddresses(targetAddress, groupid);
if (this._localEngine == null) return CompletableFuture.completedFuture(new ArrayList<>());
return CompletableFuture.supplyAsync(() -> {
final Set<String> engineids = localEngineids.get(groupid);
if (engineids == null || engineids.isEmpty()) return null;
final List<String> rs = new ArrayList<>();
for (String engineid : engineids) {
final WebSocketEngine engine = localEngines.get(engineid);
if (engine == null) continue;
final WebSocketGroup group = engine.getWebSocketGroup(groupid);
group.getWebSockets().forEach(x -> rs.add("ws" + Objects.hashCode(x) + '@' + x.getRemoteAddr()));
}
final WebSocketGroup group = this._localEngine.getWebSocketGroup(groupid);
if (group != null) group.getWebSockets().forEach(x -> rs.add("ws" + Objects.hashCode(x) + '@' + x.getRemoteAddr()));
return rs;
});
}
@@ -54,36 +49,45 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
@Override
public CompletableFuture<Integer> sendMessage(@RpcTargetAddress InetSocketAddress addr, Serializable groupid, boolean recent, Object message, boolean last) {
return CompletableFuture.supplyAsync(() -> {
final Set<String> engineids = localEngineids.get(groupid);
if (engineids == null || engineids.isEmpty()) return RETCODE_GROUP_EMPTY;
int code = RETCODE_GROUP_EMPTY;
for (String engineid : engineids) {
final WebSocketEngine engine = localEngines.get(engineid);
if (engine != null) { //在本地
final WebSocketGroup group = engine.getWebSocketGroup(groupid);
if (group == null || group.isEmpty()) {
if (finest) logger.finest("receive websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + message + "'} from " + addr + " but send result is " + RETCODE_GROUP_EMPTY);
return RETCODE_GROUP_EMPTY;
}
code = group.send(recent, message, last);
if (finest) logger.finest("websocket node send message (" + message + ") from " + addr + " result is " + code);
}
if (this._localEngine == null) return RETCODE_GROUP_EMPTY;
final WebSocketGroup group = this._localEngine.getWebSocketGroup(groupid);
if (group == null || group.isEmpty()) {
if (finest) logger.finest("receive websocket message {engineid:'" + this._localEngine.getEngineid() + "', groupid:" + groupid + ", content:'" + message + "'} from " + addr + " but send result is " + RETCODE_GROUP_EMPTY);
return RETCODE_GROUP_EMPTY;
}
int code = group.send(recent, message, last);
if (finest) logger.finest("websocket node send message (" + message + ") from " + addr + " result is " + code);
return code;
});
}
/**
* 当用户连接到节点需要更新到CacheSource
*
* @param groupid String
* @param sncpAddr InetSocketAddress
*
* @return 无返回值
*/
@Override
public CompletableFuture<Void> connect(Serializable groupid, InetSocketAddress addr) {
CompletableFuture<Void> future = sncpAddressNodes.appendSetItemAsync(groupid, addr);
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " connect from " + addr);
public CompletableFuture<Void> connect(Serializable groupid, InetSocketAddress sncpAddr) {
CompletableFuture<Void> future = sncpAddressNodes.appendSetItemAsync(groupid, sncpAddr);
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " connect from " + sncpAddr);
return future;
}
/**
* 当用户从一个节点断掉了所有的连接需要从CacheSource中删除
*
* @param groupid String
* @param sncpAddr InetSocketAddress
*
* @return 无返回值
*/
@Override
public CompletableFuture<Void> disconnect(Serializable groupid, InetSocketAddress addr) {
CompletableFuture<Void> future = sncpAddressNodes.removeSetItemAsync(groupid, addr);
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " disconnect from " + addr);
public CompletableFuture<Void> disconnect(Serializable groupid, InetSocketAddress sncpAddr) {
CompletableFuture<Void> future = sncpAddressNodes.removeSetItemAsync(groupid, sncpAddr);
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " disconnect from " + sncpAddr);
return future;
}
}