This commit is contained in:
Redkale
2017-05-25 17:43:22 +08:00
parent 03bcea30df
commit 39d4e6405f
4 changed files with 30 additions and 5 deletions

View File

@@ -127,6 +127,13 @@ public class Context {
bufferPool.offer(buffer);
}
public void offerBuffer(ByteBuffer... buffers) {
if (buffers == null) return;
for (ByteBuffer buffer : buffers) {
bufferPool.offer(buffer);
}
}
public Logger getLogger() {
return logger;
}

View File

@@ -35,6 +35,9 @@ public final class WebSocketEngine {
//当前WebSocket对应的Node
protected final WebSocketNode node;
//HttpContext
protected final HttpContext context;
//在线用户ID对应的WebSocket组当WebSocketGroup内没有WebSocket会从containers删掉
private final Map<Serializable, WebSocketGroup> containers = new ConcurrentHashMap<>();
@@ -47,8 +50,9 @@ public final class WebSocketEngine {
//FINEST日志级别
protected final boolean finest;
protected WebSocketEngine(String engineid, WebSocketNode node, Logger logger) {
protected WebSocketEngine(String engineid, HttpContext context, WebSocketNode node, Logger logger) {
this.engineid = engineid;
this.context = context;
this.node = node;
this.logger = logger;
this.index = sequence.getAndIncrement();
@@ -78,7 +82,7 @@ public final class WebSocketEngine {
void add(WebSocket socket) { //非线程安全, 在常规场景中无需锁
WebSocketGroup group = containers.get(socket._groupid);
if (group == null) {
group = new WebSocketGroup(socket._groupid);
group = new WebSocketGroup(context, socket._groupid);
containers.putIfAbsent(socket._groupid, group);
if (node != null) node.connect(socket._groupid);
}

View File

@@ -21,13 +21,16 @@ public final class WebSocketGroup {
private final Serializable groupid;
private final HttpContext context;
private WebSocket recentWebSocket;
private final List<WebSocket> list = new CopyOnWriteArrayList<>();
private final Map<String, Object> attributes = new HashMap<>();
WebSocketGroup(Serializable groupid) {
WebSocketGroup(HttpContext context, Serializable groupid) {
this.context = context;
this.groupid = groupid;
}
@@ -91,12 +94,22 @@ public final class WebSocketGroup {
}
}
final CompletableFuture<Integer> send(boolean recent, final WebSocketPacket packet) {
if (recent) {
return recentWebSocket.send(packet);
} else {
return sendEach(packet);
}
}
public final CompletableFuture<Integer> sendEach(Object message) {
return sendEach(message, true);
}
public final CompletableFuture<Integer> sendEach(WebSocketPacket packet) {
public final CompletableFuture<Integer> sendEach(final WebSocketPacket packet) {
CompletableFuture<Integer> future = null;
final boolean more = packet.sendBuffers == null && list.size() > 1;
if (more) packet.setSendBuffers(packet.encode(context.getBufferSupplier()));
for (WebSocket s : list) {
if (future == null) {
future = s.sendPacket(packet);
@@ -104,6 +117,7 @@ public final class WebSocketGroup {
future = future.thenCombine(s.sendPacket(packet), (a, b) -> a | (Integer) b);
}
}
if (more && future != null) future = future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers));
return future == null ? CompletableFuture.completedFuture(0) : future;
}

View File

@@ -96,7 +96,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
if (logger.isLoggable(Level.WARNING)) logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName());
}
//存在WebSocketServlet则此WebSocketNode必须是本地模式Service
this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.node, logger);
this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", context, this.node, logger);
this.node.init(conf);
this.node.localEngine.init(conf);
}