优化WebSocket

This commit is contained in:
Redkale
2023-01-14 14:44:46 +08:00
parent 68bfa944fe
commit e6150b3469
10 changed files with 202 additions and 125 deletions

View File

@@ -47,7 +47,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
protected Servlet<C, R, ? extends Response<C, R>> servlet; protected Servlet<C, R, ? extends Response<C, R>> servlet;
protected final ByteBuffer writeBuffer; private final ByteBuffer writeBuffer;
private final CompletionHandler finishBytesHandler = new CompletionHandler<Integer, Void>() { private final CompletionHandler finishBytesHandler = new CompletionHandler<Integer, Void>() {
@@ -377,23 +377,29 @@ public abstract class Response<C extends Context, R extends Request<C>> {
} }
protected <A> void send(final ByteTuple array, final CompletionHandler<Integer, Void> handler) { protected <A> void send(final ByteTuple array, final CompletionHandler<Integer, Void> handler) {
this.channel.write(array, new CompletionHandler<Integer, Void>() { ByteBuffer buffer = this.writeBuffer;
if (buffer != null && buffer.capacity() >= array.length()) {
buffer.clear();
buffer.put(array.content(), array.offset(), array.length());
buffer.flip();
this.channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override @Override
public void completed(Integer result, Void attachment) { public void completed(Integer result, ByteBuffer attachment) {
if (handler != null) { attachment.clear();
handler.completed(result, attachment); handler.completed(result, null);
} }
}
@Override @Override
public void failed(Throwable exc, Void attachment) { public void failed(Throwable exc, ByteBuffer attachment) {
if (handler != null) { attachment.clear();
handler.failed(exc, attachment); handler.failed(exc, null);
} }
}
}); });
} else {
this.channel.write(array, handler);
}
} }
protected <A> void send(final ByteBuffer buffer, final A attachment, final CompletionHandler<Integer, A> handler) { protected <A> void send(final ByteBuffer buffer, final A attachment, final CompletionHandler<Integer, A> handler) {

View File

@@ -129,6 +129,30 @@ public class WorkThread extends Thread implements Executor {
} }
} }
public void runWork(int hash, Runnable command) {
if (hashExecutor == null) {
if (workExecutor == null) {
command.run();
} else {
workExecutor.execute(command);
}
} else {
hashExecutor.execute(hash, command);
}
}
public void runWork(java.io.Serializable hash, Runnable command) {
if (hashExecutor == null) {
if (workExecutor == null) {
command.run();
} else {
workExecutor.execute(command);
}
} else {
hashExecutor.execute(hash, command);
}
}
public void runAsync(Runnable command) { public void runAsync(Runnable command) {
if (workExecutor == null) { if (workExecutor == null) {
ForkJoinPool.commonPool().execute(command); ForkJoinPool.commonPool().execute(command);

View File

@@ -9,8 +9,8 @@ import java.nio.channels.CompletionHandler;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.redkale.annotation.ConstructorParameters; import org.redkale.annotation.ConstructorParameters;
import static org.redkale.asm.Opcodes.*;
import org.redkale.asm.*; import org.redkale.asm.*;
import static org.redkale.asm.Opcodes.*;
import org.redkale.net.Context; import org.redkale.net.Context;
import org.redkale.util.*; import org.redkale.util.*;

View File

@@ -1197,7 +1197,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
this.contentLength = buffer == null ? 0 : buffer.remaining(); this.contentLength = buffer == null ? 0 : buffer.remaining();
} }
createHeader(); createHeader();
if (buffer == null) { if (buffer == null) { //只发header
super.send(headerArray, handler); super.send(headerArray, handler);
} else { } else {
ByteBuffer headbuf = channel.pollWriteBuffer(); ByteBuffer headbuf = channel.pollWriteBuffer();

View File

@@ -827,11 +827,11 @@ public abstract class WebSocket<G extends Serializable, T> {
* 接收到消息前的拦截方法, ping/pong不在其内 <br> * 接收到消息前的拦截方法, ping/pong不在其内 <br>
* 注意:处理完后需要调用 messageEvent.run() 才能响应onMessage * 注意:处理完后需要调用 messageEvent.run() 才能响应onMessage
* *
* @param restmapping Rest的方法名没有则为空字符串 * @param restMapping Rest的方法名没有则为空字符串
* @param param onMessage方法的参数 * @param param onMessage方法的参数
* @param messageEvent onMessage事件 * @param messageEvent onMessage事件
*/ */
public void preOnMessage(String restmapping, WebSocketParam param, Runnable messageEvent) { public void preOnMessage(String restMapping, WebSocketParam param, Runnable messageEvent) {
messageEvent.run(); messageEvent.run();
} }

View File

@@ -56,7 +56,7 @@ public class WebSocketEngine {
private final Map<Serializable, List<WebSocket>> websockets2 = new ConcurrentHashMap<>(); private final Map<Serializable, List<WebSocket>> websockets2 = new ConcurrentHashMap<>();
@Comment("当前连接数") @Comment("当前连接数")
protected final AtomicInteger currconns = new AtomicInteger(); protected final AtomicInteger currConns = new AtomicInteger();
@Comment("用于PING的定时器") @Comment("用于PING的定时器")
private ScheduledThreadPoolExecutor scheduler; private ScheduledThreadPoolExecutor scheduler;
@@ -65,7 +65,7 @@ public class WebSocketEngine {
protected final Logger logger; protected final Logger logger;
@Comment("PING的间隔秒数") @Comment("PING的间隔秒数")
protected int liveinterval; protected int liveInterval;
@Comment("最大连接数, 为0表示无限制") @Comment("最大连接数, 为0表示无限制")
protected int wsMaxConns; protected int wsMaxConns;
@@ -82,14 +82,14 @@ public class WebSocketEngine {
@Comment("加密解密器") @Comment("加密解密器")
protected Cryptor cryptor; protected Cryptor cryptor;
protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, int wsMaxConns, protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveInterval, int wsMaxConns,
int wsThreads, int wsMaxBody, boolean mergeMode, Cryptor cryptor, WebSocketNode node, Convert sendConvert, Logger logger) { int wsThreads, int wsMaxBody, boolean mergeMode, Cryptor cryptor, WebSocketNode node, Convert sendConvert, Logger logger) {
this.engineid = engineid; this.engineid = engineid;
this.single = single; this.single = single;
this.context = context; this.context = context;
this.sendConvert = sendConvert; this.sendConvert = sendConvert;
this.node = node; this.node = node;
this.liveinterval = liveinterval; this.liveInterval = liveInterval;
this.wsMaxConns = wsMaxConns; this.wsMaxConns = wsMaxConns;
this.wsThreads = wsThreads; this.wsThreads = wsThreads;
this.wsMaxBody = wsMaxBody; this.wsMaxBody = wsMaxBody;
@@ -104,8 +104,8 @@ public class WebSocketEngine {
if (conf != null && conf.getAnyValue("properties") != null) { if (conf != null && conf.getAnyValue("properties") != null) {
props = conf.getAnyValue("properties"); props = conf.getAnyValue("properties");
} }
this.liveinterval = props == null ? (liveinterval < 0 ? DEFAILT_LIVEINTERVAL : liveinterval) : props.getIntValue(WEBPARAM__LIVEINTERVAL, (liveinterval < 0 ? DEFAILT_LIVEINTERVAL : liveinterval)); this.liveInterval = props == null ? (liveInterval < 0 ? DEFAILT_LIVEINTERVAL : liveInterval) : props.getIntValue(WEBPARAM__LIVEINTERVAL, (liveInterval < 0 ? DEFAILT_LIVEINTERVAL : liveInterval));
if (liveinterval <= 0) { if (liveInterval <= 0) {
return; return;
} }
if (props != null) { if (props != null) {
@@ -125,18 +125,18 @@ public class WebSocketEngine {
t.setDaemon(true); t.setDaemon(true);
return t; return t;
}); });
long delay = (liveinterval - System.currentTimeMillis() / 1000 % liveinterval) + index * 5; long delay = (liveInterval - System.currentTimeMillis() / 1000 % liveInterval) + index * 5;
final int intervalms = liveinterval * 1000; final int intervalms = liveInterval * 1000;
scheduler.scheduleWithFixedDelay(() -> { scheduler.scheduleWithFixedDelay(() -> {
try { try {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
getLocalWebSockets().stream().filter(x -> ((now - x.getLastReadTime()) > intervalms && (now - x.getLastSendTime()) > intervalms)).forEach(x -> x.sendPing()); getLocalWebSockets().stream().filter(x -> ((now - x.getLastReadTime()) > intervalms && (now - x.getLastSendTime()) > intervalms)).forEach(x -> x.sendPing());
} catch (Throwable t) { } catch (Throwable t) {
logger.log(Level.SEVERE, "WebSocketEngine schedule(interval=" + liveinterval + "s) ping error", t); logger.log(Level.SEVERE, "WebSocketEngine schedule(interval=" + liveInterval + "s) ping error", t);
} }
}, delay, liveinterval, TimeUnit.SECONDS); }, delay, liveInterval, TimeUnit.SECONDS);
if (logger.isLoggable(Level.FINEST)) { if (logger.isLoggable(Level.FINEST)) {
logger.finest(this.getClass().getSimpleName() + "(" + engineid + ")" + " start keeplive(wsmaxconns:" + wsMaxConns + ", delay:" + delay + "s, interval:" + liveinterval + "s) scheduler executor"); logger.finest(this.getClass().getSimpleName() + "(" + engineid + ")" + " start keeplive(wsmaxconns:" + wsMaxConns + ", delay:" + delay + "s, interval:" + liveInterval + "s) scheduler executor");
} }
} }
@@ -149,7 +149,7 @@ public class WebSocketEngine {
@Comment("添加WebSocket") @Comment("添加WebSocket")
CompletableFuture<Void> addLocal(WebSocket socket) { CompletableFuture<Void> addLocal(WebSocket socket) {
if (single) { if (single) {
currconns.incrementAndGet(); currConns.incrementAndGet();
websockets.put(socket._userid, socket); websockets.put(socket._userid, socket);
} else { //非线程安全, 在常规场景中无需锁 } else { //非线程安全, 在常规场景中无需锁
List<WebSocket> list = websockets2.get(socket._userid); List<WebSocket> list = websockets2.get(socket._userid);
@@ -157,7 +157,7 @@ public class WebSocketEngine {
list = new CopyOnWriteArrayList<>(); list = new CopyOnWriteArrayList<>();
websockets2.put(socket._userid, list); websockets2.put(socket._userid, list);
} }
currconns.incrementAndGet(); currConns.incrementAndGet();
list.add(socket); list.add(socket);
} }
if (node != null) { if (node != null) {
@@ -173,7 +173,7 @@ public class WebSocketEngine {
return null; //尚未登录成功 return null; //尚未登录成功
} }
if (single) { if (single) {
currconns.decrementAndGet(); currConns.decrementAndGet();
websockets.remove(userid); websockets.remove(userid);
if (node != null) { if (node != null) {
return node.disconnect(userid); return node.disconnect(userid);
@@ -181,7 +181,7 @@ public class WebSocketEngine {
} else { //非线程安全, 在常规场景中无需锁 } else { //非线程安全, 在常规场景中无需锁
List<WebSocket> list = websockets2.get(userid); List<WebSocket> list = websockets2.get(userid);
if (list != null) { if (list != null) {
currconns.decrementAndGet(); currConns.decrementAndGet();
list.remove(socket); list.remove(socket);
if (list.isEmpty()) { if (list.isEmpty()) {
websockets2.remove(userid); websockets2.remove(userid);
@@ -476,7 +476,7 @@ public class WebSocketEngine {
if (this.wsMaxConns < 1) { if (this.wsMaxConns < 1) {
return false; return false;
} }
return currconns.get() >= this.wsMaxConns; return currConns.get() >= this.wsMaxConns;
} }
@Comment("获取所有连接") @Comment("获取所有连接")

View File

@@ -13,6 +13,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.logging.*; import java.util.logging.*;
import org.redkale.convert.Convert; import org.redkale.convert.Convert;
import org.redkale.net.AsyncIOThread;
import static org.redkale.net.http.WebSocket.*; import static org.redkale.net.http.WebSocket.*;
import org.redkale.net.http.WebSocketPacket.FrameType; import org.redkale.net.http.WebSocketPacket.FrameType;
import org.redkale.util.ByteArray; import org.redkale.util.ByteArray;
@@ -31,8 +32,6 @@ public class WebSocketReadHandler implements CompletionHandler<Integer, ByteBuff
protected final Logger logger; protected final Logger logger;
protected final boolean debug;
protected final List<WebSocketPacket> currPackets = new ArrayList<>(); protected final List<WebSocketPacket> currPackets = new ArrayList<>();
protected ByteArray currSeriesMergeMessageBytes; protected ByteArray currSeriesMergeMessageBytes;
@@ -51,21 +50,23 @@ public class WebSocketReadHandler implements CompletionHandler<Integer, ByteBuff
protected int halfFrameLength = -1; protected int halfFrameLength = -1;
protected AsyncIOThread ioReadThread;
public WebSocketReadHandler(HttpContext context, WebSocket webSocket, BiConsumer<WebSocket, Object> messageConsumer) { public WebSocketReadHandler(HttpContext context, WebSocket webSocket, BiConsumer<WebSocket, Object> messageConsumer) {
this.context = context; this.context = context;
this.restMessageConsumer = messageConsumer; this.restMessageConsumer = messageConsumer;
this.webSocket = webSocket; this.webSocket = webSocket;
this.ioReadThread = webSocket._channel.getReadIOThread();
this.logger = context.getLogger(); this.logger = context.getLogger();
this.debug = context.getLogger().isLoggable(Level.FINEST);
} }
public void startRead() { public void startRead() {
CompletableFuture connectFuture = webSocket.onConnected(); CompletableFuture connectFuture = webSocket.onConnected();
if (connectFuture == null) { if (connectFuture == null) {
webSocket._channel.read(this); webSocket._channel.readInIOThread(this);
} else { } else {
connectFuture.whenComplete((r, t) -> { connectFuture.whenComplete((r, t) -> {
webSocket._channel.read(this); webSocket._channel.readInIOThread(this);
}); });
} }
} }
@@ -96,6 +97,7 @@ public class WebSocketReadHandler implements CompletionHandler<Integer, ByteBuff
* *
*/ */
protected void readDecode(final ByteBuffer realbuf) { protected void readDecode(final ByteBuffer realbuf) {
boolean debug = context.getLogger().isLoggable(Level.FINEST);
if (debug && realbuf.remaining() > 6) { if (debug && realbuf.remaining() > 6) {
logger.log(Level.FINEST, "read websocket message's length = " + realbuf.remaining()); logger.log(Level.FINEST, "read websocket message's length = " + realbuf.remaining());
} }
@@ -290,6 +292,7 @@ public class WebSocketReadHandler implements CompletionHandler<Integer, ByteBuff
@Override @Override
public void completed(Integer count, ByteBuffer readBuffer) { public void completed(Integer count, ByteBuffer readBuffer) {
boolean debug = context.getLogger().isLoggable(Level.FINEST);
if (count < 1) { if (count < 1) {
if (debug) { if (debug) {
logger.log(Level.FINEST, "WebSocket(" + webSocket + ") abort on read buffer count, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreateTime()) / 1000 + " seconds"); logger.log(Level.FINEST, "WebSocket(" + webSocket + ") abort on read buffer count, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreateTime()) / 1000 + " seconds");
@@ -309,40 +312,48 @@ public class WebSocketReadHandler implements CompletionHandler<Integer, ByteBuff
//消息处理 //消息处理
for (final WebSocketPacket packet : currPackets) { for (final WebSocketPacket packet : currPackets) {
if (packet.type == FrameType.TEXT) { if (packet.type == FrameType.TEXT) {
try { ioReadThread.runWork(webSocket._userid, () -> {
Convert convert = webSocket.getTextConvert(); try {
if (restMessageConsumer != null && convert != null) { //主要供RestWebSocket使用 Convert convert = webSocket.getTextConvert();
restMessageConsumer.accept(webSocket, convert.convertFrom(webSocket._messageRestType, packet.getPayload())); if (restMessageConsumer != null && convert != null) { //主要供RestWebSocket使用
} else { restMessageConsumer.accept(webSocket, convert.convertFrom(webSocket._messageRestType, packet.getPayload()));
webSocket.onMessage(packet.getPayload() == null ? null : new String(packet.getPayload(), StandardCharsets.UTF_8), packet.last); } else {
webSocket.onMessage(packet.getPayload() == null ? null : new String(packet.getPayload(), StandardCharsets.UTF_8), packet.last);
}
} catch (Throwable e) {
logger.log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e);
} }
} catch (Throwable e) { });
logger.log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e);
}
} else if (packet.type == FrameType.BINARY) { } else if (packet.type == FrameType.BINARY) {
try { ioReadThread.runWork(webSocket._userid, () -> {
Convert convert = webSocket.getBinaryConvert(); try {
if (restMessageConsumer != null && convert != null) { //主要供RestWebSocket使用 Convert convert = webSocket.getBinaryConvert();
restMessageConsumer.accept(webSocket, convert.convertFrom(webSocket._messageRestType, packet.getPayload())); if (restMessageConsumer != null && convert != null) { //主要供RestWebSocket使用
} else { restMessageConsumer.accept(webSocket, convert.convertFrom(webSocket._messageRestType, packet.getPayload()));
webSocket.onMessage(packet.getPayload(), packet.last); } else {
webSocket.onMessage(packet.getPayload(), packet.last);
}
} catch (Throwable e) {
logger.log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e);
} }
} catch (Throwable e) { });
logger.log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e);
}
} else if (packet.type == FrameType.PING) { } else if (packet.type == FrameType.PING) {
try { ioReadThread.runWork(webSocket._userid, () -> {
webSocket.onPing(packet.getPayload()); try {
} catch (Exception e) { webSocket.onPing(packet.getPayload());
logger.log(Level.SEVERE, "WebSocket onPing error (" + packet + ")", e); } catch (Exception e) {
} logger.log(Level.SEVERE, "WebSocket onPing error (" + packet + ")", e);
}
});
} else if (packet.type == FrameType.PONG) { } else if (packet.type == FrameType.PONG) {
try { ioReadThread.runWork(webSocket._userid, () -> {
//if (debug) logger.log(Level.FINEST, "WebSocket onMessage by PONG FrameType : " + packet); try {
webSocket.onPong(packet.getPayload()); //if (debug) logger.log(Level.FINEST, "WebSocket onMessage by PONG FrameType : " + packet);
} catch (Exception e) { webSocket.onPong(packet.getPayload());
logger.log(Level.SEVERE, "WebSocket(" + webSocket + ") onPong error (" + packet + ")", e); } catch (Exception e) {
} logger.log(Level.SEVERE, "WebSocket(" + webSocket + ") onPong error (" + packet + ")", e);
}
});
} else if (packet.type == FrameType.CLOSE) { } else if (packet.type == FrameType.CLOSE) {
webSocket.initiateClosed = true; webSocket.initiateClosed = true;
if (debug) { if (debug) {
@@ -360,7 +371,7 @@ public class WebSocketReadHandler implements CompletionHandler<Integer, ByteBuff
logger.log(Level.WARNING, "WebSocket(" + webSocket + ") onMessage error", t); logger.log(Level.WARNING, "WebSocket(" + webSocket + ") onMessage error", t);
} }
webSocket._channel.read(this); webSocket._channel.read(this);
} catch (Exception e) { } catch (Throwable e) {
logger.log(Level.WARNING, "WebSocket(" + webSocket + ") onMessage by received error", e); logger.log(Level.WARNING, "WebSocket(" + webSocket + ") onMessage by received error", e);
webSocket.kill(CLOSECODE_WSEXCEPTION, "websocket-received error"); webSocket.kill(CLOSECODE_WSEXCEPTION, "websocket-received error");
} }
@@ -372,7 +383,7 @@ public class WebSocketReadHandler implements CompletionHandler<Integer, ByteBuff
return; return;
} }
if (exc != null) { if (exc != null) {
if (debug) { if (context.getLogger().isLoggable(Level.FINEST)) {
context.getLogger().log(Level.FINEST, "WebSocket(" + webSocket + ") read WebSocketPacket failed, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreateTime()) / 1000 + " seconds", exc); context.getLogger().log(Level.FINEST, "WebSocket(" + webSocket + ") read WebSocketPacket failed, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreateTime()) / 1000 + " seconds", exc);
} }
webSocket.kill(CLOSECODE_WSEXCEPTION, "read websocket-packet failed"); webSocket.kill(CLOSECODE_WSEXCEPTION, "read websocket-packet failed");

View File

@@ -118,7 +118,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
protected Convert sendConvert; protected Convert sendConvert;
@Resource(name = "$") @Resource(name = "$")
protected WebSocketNode node; protected WebSocketNode webSocketNode;
@Resource(name = RESNAME_SERVER_RESFACTORY) @Resource(name = RESNAME_SERVER_RESFACTORY)
protected ResourceFactory resourceFactory; protected ResourceFactory resourceFactory;
@@ -127,13 +127,19 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
Type msgtype = String.class; Type msgtype = String.class;
try { try {
for (Method method : this.getClass().getDeclaredMethods()) { for (Method method : this.getClass().getDeclaredMethods()) {
if (!method.getName().equals("createWebSocket")) continue; if (!method.getName().equals("createWebSocket")) {
if (method.getParameterCount() > 0) continue; continue;
}
if (method.getParameterCount() > 0) {
continue;
}
Type rt = TypeToken.getGenericType(method.getGenericReturnType(), this.getClass()); Type rt = TypeToken.getGenericType(method.getGenericReturnType(), this.getClass());
if (rt instanceof ParameterizedType) { if (rt instanceof ParameterizedType) {
msgtype = ((ParameterizedType) rt).getActualTypeArguments()[1]; msgtype = ((ParameterizedType) rt).getActualTypeArguments()[1];
} }
if (msgtype == Object.class) msgtype = String.class; if (msgtype == Object.class) {
msgtype = String.class;
}
break; break;
} }
} catch (Exception e) { } catch (Exception e) {
@@ -144,19 +150,33 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
@Override @Override
final void preInit(Application application, HttpContext context, AnyValue conf) { final void preInit(Application application, HttpContext context, AnyValue conf) {
if (this.textConvert == null) this.textConvert = jsonConvert; if (this.textConvert == null) {
if (this.sendConvert == null) this.sendConvert = jsonConvert; this.textConvert = jsonConvert;
InetSocketAddress addr = context.getServerAddress(); }
if (this.node == null) this.node = createWebSocketNode(); if (this.sendConvert == null) {
if (this.node == null) { //没有部署SNCP即不是分布式 this.sendConvert = jsonConvert;
this.node = new WebSocketNodeService(); }
if (logger.isLoggable(Level.WARNING)) logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName()); InetSocketAddress addr = context.getServerAddress();
if (this.webSocketNode == null) {
this.webSocketNode = createWebSocketNode();
}
if (this.webSocketNode == null) { //没有部署SNCP即不是分布式
this.webSocketNode = new WebSocketNodeService();
if (logger.isLoggable(Level.WARNING)) {
logger.warning("Not found WebSocketNode, create a default value for " + getClass().getName());
}
}
if (this.webSocketNode.sendConvert == null) {
this.webSocketNode.sendConvert = this.sendConvert;
}
if (this.messageAgent != null) {
this.webSocketNode.messageAgent = this.messageAgent;
} }
if (this.node.sendConvert == null) this.node.sendConvert = this.sendConvert;
if (this.messageAgent != null) this.node.messageAgent = this.messageAgent;
{ {
AnyValue props = conf; AnyValue props = conf;
if (conf != null && conf.getAnyValue("properties") != null) props = conf.getAnyValue("properties"); if (conf != null && conf.getAnyValue("properties") != null) {
props = conf.getAnyValue("properties");
}
if (props != null) { if (props != null) {
String cryptorClass = props.getValue(WEBPARAM__CRYPTOR); String cryptorClass = props.getValue(WEBPARAM__CRYPTOR);
if (cryptorClass != null && !cryptorClass.isEmpty()) { if (cryptorClass != null && !cryptorClass.isEmpty()) {
@@ -164,28 +184,34 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
Class clazz = Thread.currentThread().getContextClassLoader().loadClass(cryptorClass); Class clazz = Thread.currentThread().getContextClassLoader().loadClass(cryptorClass);
this.cryptor = (Cryptor) clazz.getDeclaredConstructor().newInstance(); this.cryptor = (Cryptor) clazz.getDeclaredConstructor().newInstance();
RedkaleClassLoader.putReflectionDeclaredConstructors(clazz, cryptorClass); RedkaleClassLoader.putReflectionDeclaredConstructors(clazz, cryptorClass);
if (resourceFactory != null && this.cryptor != null) resourceFactory.inject(this.cryptor); if (resourceFactory != null && this.cryptor != null) {
resourceFactory.inject(this.cryptor);
}
} catch (Exception e) { } catch (Exception e) {
throw new HttpException(e); throw new HttpException(e);
} }
} }
} }
} }
if (application != null && application.isCompileMode()) return; if (application != null && application.isCompileMode()) {
return;
}
//存在WebSocketServlet则此WebSocketNode必须是本地模式Service //存在WebSocketServlet则此WebSocketNode必须是本地模式Service
this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]", this.webSocketNode.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]",
this.single, context, liveinterval, wsmaxconns, wsthreads, wsmaxbody, mergemsg, this.cryptor, this.node, this.sendConvert, logger); this.single, context, liveinterval, wsmaxconns, wsthreads, wsmaxbody, mergemsg, this.cryptor, this.webSocketNode, this.sendConvert, logger);
this.node.init(conf); this.webSocketNode.init(conf);
this.node.localEngine.init(conf); this.webSocketNode.localEngine.init(conf);
} }
@Override @Override
final void postDestroy(Application application, HttpContext context, AnyValue conf) { final void postDestroy(Application application, HttpContext context, AnyValue conf) {
this.node.postDestroy(conf); this.webSocketNode.postDestroy(conf);
super.destroy(context, conf); super.destroy(context, conf);
if (application != null && application.isCompileMode()) return; if (application != null && application.isCompileMode()) {
this.node.localEngine.destroy(conf); return;
}
this.webSocketNode.localEngine.destroy(conf);
} }
@Override @Override
@@ -193,27 +219,31 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
return this.getClass().getSimpleName().replace("_Dyn", "").toLowerCase().replaceAll("websocket.*$", "").replaceAll("servlet.*$", ""); return this.getClass().getSimpleName().replace("_Dyn", "").toLowerCase().replaceAll("websocket.*$", "").replaceAll("servlet.*$", "");
} }
@Override @Override //在IOThread中执行
public final void execute(final HttpRequest request, final HttpResponse response) throws IOException { public final void execute(final HttpRequest request, final HttpResponse response) throws IOException {
final boolean debug = logger.isLoggable(Level.FINEST); final boolean debug = logger.isLoggable(Level.FINEST);
if (!request.isWebSocket()) { if (!request.isWebSocket()) {
if (debug) logger.finest("WebSocket connect abort, (Not GET Method) or (Connection != Upgrade) or (Upgrade != websocket). request=" + request); if (debug) {
logger.log(Level.FINEST, "WebSocket connect abort, (Not GET Method) or (Connection != Upgrade) or (Upgrade != websocket). request=" + request);
}
response.finish(true); response.finish(true);
return; return;
} }
final String key = request.getHeader("Sec-WebSocket-Key"); final String key = request.getHeader("Sec-WebSocket-Key");
if (key == null) { if (key == null) {
if (debug) logger.finest("WebSocket connect abort, Not found Sec-WebSocket-Key header. request=" + request); if (debug) {
logger.log(Level.FINEST, "WebSocket connect abort, Not found Sec-WebSocket-Key header. request=" + request);
}
response.finish(true); response.finish(true);
return; return;
} }
if (this.node.localEngine.isLocalConnLimited()) { if (this.webSocketNode.localEngine.isLocalConnLimited()) {
if (debug) logger.finest("WebSocket connections limit, wsmaxconns=" + this.node.localEngine.getLocalWsMaxConns()); logger.log(Level.WARNING, "WebSocket connections limit, wsmaxconns=" + this.webSocketNode.localEngine.getLocalWsMaxConns());
response.finish(true); response.finish(true);
return; return;
} }
final WebSocket webSocket = this.createWebSocket(); final WebSocket webSocket = this.createWebSocket();
webSocket._engine = this.node.localEngine; webSocket._engine = this.webSocketNode.localEngine;
webSocket._channel = response.getChannel(); webSocket._channel = response.getChannel();
webSocket._messageRestType = this.messageRestType; webSocket._messageRestType = this.messageRestType;
webSocket._textConvert = textConvert; webSocket._textConvert = textConvert;
@@ -221,7 +251,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
webSocket._sendConvert = sendConvert; webSocket._sendConvert = sendConvert;
webSocket._remoteAddress = request.getRemoteAddress(); webSocket._remoteAddress = request.getRemoteAddress();
webSocket._remoteAddr = request.getRemoteAddr(); webSocket._remoteAddr = request.getRemoteAddr();
webSocket._sncpAddress = this.node.localSncpAddress; webSocket._sncpAddress = this.webSocketNode.localSncpAddress;
if (this.permessageDeflate && request.getHeader("Sec-WebSocket-Extensions", "").contains("permessage-deflate")) { if (this.permessageDeflate && request.getHeader("Sec-WebSocket-Extensions", "").contains("permessage-deflate")) {
webSocket.deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true); webSocket.deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true);
webSocket.inflater = new Inflater(true); webSocket.inflater = new Inflater(true);
@@ -229,13 +259,17 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
initRestWebSocket(webSocket); initRestWebSocket(webSocket);
CompletableFuture<String> sessionFuture = webSocket.onOpen(request); CompletableFuture<String> sessionFuture = webSocket.onOpen(request);
if (sessionFuture == null) { if (sessionFuture == null) {
if (debug) logger.finest("WebSocket connect abort, Not found sessionid. request=" + request); if (debug) {
logger.log(Level.FINEST, "WebSocket connect abort, Not found sessionid. request=" + request);
}
response.finish(true); response.finish(true);
return; return;
} }
BiConsumer<String, Throwable> sessionConsumer = (sessionid, ex) -> { BiConsumer<String, Throwable> sessionConsumer = (sessionid, ex) -> {
if ((sessionid == null && webSocket.delayPackets == null) || ex != null) { if ((sessionid == null && webSocket.delayPackets == null) || ex != null) {
if (debug || ex != null) logger.log(ex == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Not found sessionid or occur error. request=" + request, ex); if (debug || ex != null) {
logger.log(ex == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Not found sessionid or occur error. request=" + request, ex);
}
response.finish(true); response.finish(true);
return; return;
} }
@@ -250,7 +284,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
response.setHeader("Connection", "Upgrade"); response.setHeader("Connection", "Upgrade");
response.addHeader("Upgrade", "websocket"); response.addHeader("Upgrade", "websocket");
response.addHeader("Sec-WebSocket-Accept", Base64.getEncoder().encodeToString(bytes)); response.addHeader("Sec-WebSocket-Accept", Base64.getEncoder().encodeToString(bytes));
if (webSocket.deflater != null) response.addHeader("Sec-WebSocket-Extensions", "permessage-deflate"); if (webSocket.deflater != null) {
response.addHeader("Sec-WebSocket-Extensions", "permessage-deflate");
}
response.sendBody((ByteBuffer) null, new CompletionHandler<Integer, Void>() { response.sendBody((ByteBuffer) null, new CompletionHandler<Integer, Void>() {
@@ -261,30 +297,31 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
Runnable createUseridHandler = () -> { Runnable createUseridHandler = () -> {
CompletableFuture<Serializable> userFuture = webSocket.createUserid(); CompletableFuture<Serializable> userFuture = webSocket.createUserid();
if (userFuture == null) { if (userFuture == null) {
if (debug) logger.finest("WebSocket connect abort, Create userid abort. request = " + request); if (debug) {
logger.log(Level.FINEST, "WebSocket connect abort, Create userid abort. request = " + request);
}
response.finish(true); response.finish(true);
return; return;
} }
userFuture.whenComplete((userid, ex2) -> { userFuture.whenComplete((userid, ex2) -> {
if ((userid == null && webSocket.delayPackets == null) || ex2 != null) { if ((userid == null && webSocket.delayPackets == null) || ex2 != null) {
if (debug || ex2 != null) logger.log(ex2 == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create userid abort. request = " + request, ex2); if (debug || ex2 != null) {
logger.log(ex2 == null ? Level.FINEST : Level.FINE, "WebSocket connect abort, Create userid abort. request = " + request, ex2);
}
response.finish(true); response.finish(true);
return; return;
} }
Runnable runHandler = () -> { Runnable runHandler = () -> {
webSocket._userid = userid; webSocket._userid = userid;
if (single && !anyuser) { if (single && !anyuser) {
WebSocketServlet.this.node.existsWebSocket(userid).whenComplete((rs, nex) -> { webSocketNode.existsWebSocket(userid).whenComplete((rs, nex) -> {
if (rs) { if (rs) {
CompletableFuture<Boolean> rcFuture = webSocket.onSingleRepeatConnect(); CompletableFuture<Boolean> rcFuture = webSocket.onSingleRepeatConnect();
Consumer<Boolean> task = (oldkilled) -> { Consumer<Boolean> task = (oldkilled) -> {
if (oldkilled) { if (oldkilled) {
WebSocketServlet.this.node.localEngine.addLocal(webSocket); webSocketNode.localEngine.addLocal(webSocket);
response.removeChannel(); response.removeChannel();
webSocket._readHandler.startRead(); webSocket._readHandler.startRead();
// WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer);
// webSocket._runner = runner;
// runner.run(); //context.runAsync(runner);
response.finish(true); response.finish(true);
} else { //关闭新连接 } else { //关闭新连接
response.finish(true); response.finish(true);
@@ -302,22 +339,16 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
}); });
} }
} else { } else {
WebSocketServlet.this.node.localEngine.addLocal(webSocket); webSocketNode.localEngine.addLocal(webSocket);
response.removeChannel(); response.removeChannel();
webSocket._readHandler.startRead(); webSocket._readHandler.startRead();
// WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer);
// webSocket._runner = runner;
// runner.run(); //context.runAsync(runner);
response.finish(true); response.finish(true);
} }
}); });
} else { } else {
WebSocketServlet.this.node.localEngine.addLocal(webSocket); webSocketNode.localEngine.addLocal(webSocket);
response.removeChannel(); response.removeChannel();
webSocket._readHandler.startRead(); webSocket._readHandler.startRead();
// WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer);
// webSocket._runner = runner;
// runner.run(); //context.runAsync(runner);
response.finish(true); response.finish(true);
} }
}; };
@@ -327,7 +358,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
CompletableFuture<Integer> cf = webSocket._writeHandler.send(delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); CompletableFuture<Integer> cf = webSocket._writeHandler.send(delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
cf.whenComplete((Integer v, Throwable t) -> { cf.whenComplete((Integer v, Throwable t) -> {
if (userid == null || t != null) { if (userid == null || t != null) {
if (t != null) logger.log(Level.FINEST, "WebSocket connect abort, Response send delayPackets abort. request = " + request, t); if (t != null) {
logger.log(Level.FINEST, "WebSocket connect abort, Response send delayPackets abort. request = " + request, t);
}
response.finish(true); response.finish(true);
} else { } else {
runHandler.run(); runHandler.run();
@@ -344,7 +377,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
CompletableFuture<Integer> cf = webSocket._writeHandler.send(delayPackets.toArray(new WebSocketPacket[delayPackets.size()])); CompletableFuture<Integer> cf = webSocket._writeHandler.send(delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
cf.whenComplete((Integer v, Throwable t) -> { cf.whenComplete((Integer v, Throwable t) -> {
if (sessionid == null || t != null) { if (sessionid == null || t != null) {
if (t != null) logger.log(Level.FINEST, "WebSocket connect abort, Response send delayPackets abort. request = " + request, t); if (t != null) {
logger.log(Level.FINEST, "WebSocket connect abort, Response send delayPackets abort. request = " + request, t);
}
response.finish(true); response.finish(true);
} else { } else {
createUseridHandler.run(); createUseridHandler.run();
@@ -367,7 +402,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
if (workThread == null || workThread == Thread.currentThread()) { if (workThread == null || workThread == Thread.currentThread()) {
sessionConsumer.accept(sessionid, ex); sessionConsumer.accept(sessionid, ex);
} else { } else {
workThread.execute(() -> sessionConsumer.accept(sessionid, ex)); workThread.runWork(() -> sessionConsumer.accept(sessionid, ex));
} }
}); });
} }

View File

@@ -89,6 +89,10 @@ public class ThreadHashExecutor extends AbstractExecutorService {
hashExecutor(hash).execute(command); hashExecutor(hash).execute(command);
} }
public void execute(java.io.Serializable hash, Runnable command) {
hashExecutor(hash == null ? 0 : hash.hashCode()).execute(command);
}
@Override @Override
public Future<?> submit(Runnable task) { public Future<?> submit(Runnable task) {
return hashExecutor(0).submit(task); return hashExecutor(0).submit(task);

View File

@@ -5,14 +5,11 @@
*/ */
package org.redkale.test.websocket; package org.redkale.test.websocket;
import org.redkale.net.http.WebServlet;
import org.redkale.net.http.WebSocketServlet;
import org.redkale.net.http.WebSocket;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.redkale.annotation.Resource; import org.redkale.annotation.Resource;
import org.redkale.net.http.*; import org.redkale.net.http.*;
import org.redkale.test.rest.*; import org.redkale.test.rest.*;
import org.redkale.util.*; import org.redkale.util.AnyValue;
/** /**
* *
@@ -26,7 +23,7 @@ public class ChatWebSocketServlet extends WebSocketServlet {
@Override @Override
public void init(HttpContext context, AnyValue conf) { public void init(HttpContext context, AnyValue conf) {
System.out.println("本实例的WebSocketNode: " + super.node); System.out.println("本实例的WebSocketNode: " + super.webSocketNode);
} }
@Override @Override