优化WebSocket.IOThread
This commit is contained in:
@@ -567,21 +567,23 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
|
||||
}
|
||||
}
|
||||
HttpContext rs = new HttpContext(contextConfig);
|
||||
rs.webSocketWriterIOThreadFunc = ws -> {
|
||||
if (asyncGroup == null) {
|
||||
groupLock.lock();
|
||||
try {
|
||||
if (asyncGroup == null) {
|
||||
WebSocketAsyncGroup g = new WebSocketAsyncGroup("Redkale-HTTP:" + address.getPort() + "-WebSocketWriteIOThread-%s", workExecutor, safeBufferPool);
|
||||
g.start();
|
||||
asyncGroup = g;
|
||||
if (false) { //暂不使用WebSocketAsyncGroup模式
|
||||
rs.webSocketWriterIOThreadFunc = ws -> {
|
||||
if (asyncGroup == null) {
|
||||
groupLock.lock();
|
||||
try {
|
||||
if (asyncGroup == null) {
|
||||
WebSocketAsyncGroup g = new WebSocketAsyncGroup("Redkale-HTTP:" + address.getPort() + "-WebSocketWriteIOThread-%s", workExecutor, safeBufferPool);
|
||||
g.start();
|
||||
asyncGroup = g;
|
||||
}
|
||||
} finally {
|
||||
groupLock.unlock();
|
||||
}
|
||||
} finally {
|
||||
groupLock.unlock();
|
||||
}
|
||||
}
|
||||
return (WebSocketWriteIOThread) asyncGroup.nextWriteIOThread();
|
||||
};
|
||||
return (WebSocketWriteIOThread) asyncGroup.nextWriteIOThread();
|
||||
};
|
||||
}
|
||||
return rs;
|
||||
}
|
||||
|
||||
|
||||
@@ -86,7 +86,8 @@ public abstract class WebSocket<G extends Serializable, T> {
|
||||
|
||||
WebSocketReadHandler _readHandler;
|
||||
|
||||
//WebSocketWriteHandler _writeHandler;
|
||||
WebSocketWriteHandler _writeHandler;
|
||||
|
||||
WebSocketWriteIOThread _writeIOThread;
|
||||
|
||||
InetSocketAddress _sncpAddress; //分布式下不可为空
|
||||
@@ -239,14 +240,14 @@ public abstract class WebSocket<G extends Serializable, T> {
|
||||
* @return 0表示成功, 非0表示错误码
|
||||
*/
|
||||
CompletableFuture<Integer> sendPacket(WebSocketPacket packet) {
|
||||
if (this._writeIOThread == null) {
|
||||
if (this._writeHandler == null) { //if (this._writeIOThread == null) {
|
||||
if (delayPackets == null) {
|
||||
delayPackets = new ArrayList<>();
|
||||
}
|
||||
delayPackets.add(packet);
|
||||
return CompletableFuture.completedFuture(RETCODE_DEAYSEND);
|
||||
}
|
||||
CompletableFuture<Integer> rs = this._writeIOThread.send(this, packet);
|
||||
CompletableFuture<Integer> rs = this._writeHandler.send(packet); //this._writeIOThread.send(this, packet);
|
||||
if (_engine.logger.isLoggable(Level.FINER) && packet != WebSocketPacket.DEFAULT_PING_PACKET) {
|
||||
_engine.logger.finer("userid:" + getUserid() + " send websocket message(" + packet + ")" + " on " + this);
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ import org.redkale.util.ByteBufferPool;
|
||||
*
|
||||
* @since 2.8.0
|
||||
*/
|
||||
@Deprecated(since = "2.8.0")
|
||||
class WebSocketAsyncGroup extends AsyncIOGroup {
|
||||
|
||||
public WebSocketAsyncGroup(String threadNameFormat, ExecutorService workExecutor, ByteBufferPool safeBufferPool) {
|
||||
|
||||
@@ -285,7 +285,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
|
||||
@Override
|
||||
public void completed(Integer result, Void attachment) {
|
||||
webSocket._readHandler = new WebSocketReadHandler(response.getContext(), webSocket, restMessageConsumer);
|
||||
//webSocket._writeHandler = new WebSocketWriteHandler(response.getContext(), webSocket);
|
||||
webSocket._writeHandler = new WebSocketWriteHandler(response.getContext(), webSocket);
|
||||
response.getContext().updateWebSocketWriteIOThread(webSocket);
|
||||
|
||||
Runnable createUseridHandler = () -> {
|
||||
@@ -349,7 +349,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
|
||||
if (webSocket.delayPackets != null) { //存在待发送的消息
|
||||
List<WebSocketPacket> delayPackets = webSocket.delayPackets;
|
||||
webSocket.delayPackets = null;
|
||||
CompletableFuture<Integer> cf = webSocket._writeIOThread.send(webSocket, delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
|
||||
//CompletableFuture<Integer> cf = webSocket._writeIOThread.send(webSocket, delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
|
||||
CompletableFuture<Integer> cf = webSocket._writeHandler.send(delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
|
||||
cf.whenComplete((Integer v, Throwable t) -> {
|
||||
if (userid == null || t != null) {
|
||||
if (t != null) {
|
||||
@@ -368,7 +369,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
|
||||
if (webSocket.delayPackets != null) { //存在待发送的消息
|
||||
List<WebSocketPacket> delayPackets = webSocket.delayPackets;
|
||||
webSocket.delayPackets = null;
|
||||
CompletableFuture<Integer> cf = webSocket._writeIOThread.send(webSocket, delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
|
||||
//CompletableFuture<Integer> cf = webSocket._writeIOThread.send(webSocket, delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
|
||||
CompletableFuture<Integer> cf = webSocket._writeHandler.send(delayPackets.toArray(new WebSocketPacket[delayPackets.size()]));
|
||||
cf.whenComplete((Integer v, Throwable t) -> {
|
||||
if (sessionid == null || t != null) {
|
||||
if (t != null) {
|
||||
|
||||
@@ -17,7 +17,6 @@ import org.redkale.util.ByteArray;
|
||||
*
|
||||
* @author zhangjx
|
||||
*/
|
||||
@Deprecated(since = "2.8.0")
|
||||
public class WebSocketWriteHandler implements CompletionHandler<Integer, Void> {
|
||||
|
||||
protected final HttpContext context;
|
||||
|
||||
@@ -21,6 +21,7 @@ import org.redkale.util.*;
|
||||
*
|
||||
* @since 2.8.0
|
||||
*/
|
||||
@Deprecated(since = "2.8.0")
|
||||
public class WebSocketWriteIOThread extends AsyncIOThread {
|
||||
|
||||
private final ScheduledThreadPoolExecutor timeoutExecutor;
|
||||
|
||||
Reference in New Issue
Block a user