diff --git a/src/main/java/org/redkale/net/AsyncIOThread.java b/src/main/java/org/redkale/net/AsyncIOThread.java index 8a7374be9..982eebcc7 100644 --- a/src/main/java/org/redkale/net/AsyncIOThread.java +++ b/src/main/java/org/redkale/net/AsyncIOThread.java @@ -92,7 +92,7 @@ public class AsyncIOThread extends WorkThread { * @param command 操作 */ @Override - public void execute(Runnable command) { + public final void execute(Runnable command) { commandQueue.offer(command); selector.wakeup(); } @@ -103,7 +103,7 @@ public class AsyncIOThread extends WorkThread { * @param commands 操作 */ @Override - public void execute(Runnable... commands) { + public final void execute(Runnable... commands) { for (Runnable command : commands) { commandQueue.offer(command); } @@ -116,7 +116,7 @@ public class AsyncIOThread extends WorkThread { * @param commands 操作 */ @Override - public void execute(Collection commands) { + public final void execute(Collection commands) { if (commands != null) { for (Runnable command : commands) { commandQueue.offer(command); diff --git a/src/main/java/org/redkale/net/WorkThread.java b/src/main/java/org/redkale/net/WorkThread.java index 5a2189068..76fd36849 100644 --- a/src/main/java/org/redkale/net/WorkThread.java +++ b/src/main/java/org/redkale/net/WorkThread.java @@ -86,6 +86,11 @@ public class WorkThread extends Thread implements Executor { } } + //与execute的区别在于子类AsyncIOThread中execute会被重载,确保在IO线程中执行 + public final void runWork(Runnable command) { + execute(command); + } + public void execute(Runnable... commands) { if (workExecutor == null) { for (Runnable command : commands) { @@ -113,22 +118,6 @@ public class WorkThread extends Thread implements Executor { } } - public void runWork(Runnable command) { - if (workExecutor == null) { - command.run(); - } else { - workExecutor.execute(command); - } - } - - public void runAsync(Runnable command) { - if (workExecutor == null) { - ForkJoinPool.commonPool().execute(command); - } else { - workExecutor.execute(command); - } - } - public ExecutorService getWorkExecutor() { return workExecutor; } diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index afa36c759..0bbcdba5f 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -123,21 +123,11 @@ public abstract class ClientCodec implements Complet connection.preComplete(message, (R) request, exc); if (exc != null) { - if (workThread == readThread) { - workThread.runWork(() -> { + if (workThread.inIO()) { + workThread.execute(() -> { Traces.currentTraceid(request.traceid); respFuture.completeExceptionally(exc); }); - } else if (workThread.getState() == Thread.State.RUNNABLE) { - if (workThread.inIO()) { - Traces.currentTraceid(request.traceid); - respFuture.completeExceptionally(exc); - } else { - workThread.execute(() -> { - Traces.currentTraceid(request.traceid); - respFuture.completeExceptionally(exc); - }); - } } else { workThread.runWork(() -> { Traces.currentTraceid(request.traceid); @@ -146,21 +136,11 @@ public abstract class ClientCodec implements Complet } } else { final P rs = request.respTransfer == null ? message : (P) request.respTransfer.apply(message); - if (workThread == readThread) { - workThread.runWork(() -> { + if (workThread.inIO()) { + workThread.execute(() -> { Traces.currentTraceid(request.traceid); respFuture.complete(rs); }); - } else if (workThread.getState() == Thread.State.RUNNABLE) { - if (workThread.inIO()) { - Traces.currentTraceid(request.traceid); - respFuture.complete(rs); - } else { - workThread.execute(() -> { - Traces.currentTraceid(request.traceid); - respFuture.complete(rs); - }); - } } else { workThread.runWork(() -> { Traces.currentTraceid(request.traceid); diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index d993f7614..57fc44d24 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -191,9 +191,11 @@ public abstract class ClientConnection implements Co } else { channel.write(writeArray, this, writeHandler); } + } else { + writePending.compareAndSet(true, false); } } else { - requestQueue.add(respFuture); + requestQueue.offer(respFuture); } } diff --git a/src/main/java/org/redkale/net/http/WebSocket.java b/src/main/java/org/redkale/net/http/WebSocket.java index dd683558b..42f65450b 100644 --- a/src/main/java/org/redkale/net/http/WebSocket.java +++ b/src/main/java/org/redkale/net/http/WebSocket.java @@ -79,7 +79,7 @@ public abstract class WebSocket { public static final int RETCODE_WSOFFLINE = 1 << 8; //256 @Comment("WebSocket将延迟发送") - public static final int RETCODE_DEAYSEND = 1 << 9; //512 + public static final int RETCODE_DELAYSEND = 1 << 9; //512 WebSocketEngine _engine; //不可能为空 @@ -242,7 +242,7 @@ public abstract class WebSocket { delayPackets = new ArrayList<>(); } delayPackets.add(packet); - return CompletableFuture.completedFuture(RETCODE_DEAYSEND); + return CompletableFuture.completedFuture(RETCODE_DELAYSEND); } CompletableFuture rs = this._writeHandler.send(packet); //this._writeIOThread.send(this, packet); if (_engine.logger.isLoggable(Level.FINER) && packet != WebSocketPacket.DEFAULT_PING_PACKET) { diff --git a/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java b/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java index 2f345545e..493bb0aaf 100644 --- a/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java +++ b/src/main/java/org/redkale/net/http/WebSocketWriteHandler.java @@ -84,6 +84,7 @@ public class WebSocketWriteHandler implements CompletionHandler { @Override public void failed(Throwable exc, Void attachment) { + writePending.set(false); WebSocketFuture req; try { while ((req = requestQueue.poll()) != null) { diff --git a/src/main/java/org/redkale/service/AbstractService.java b/src/main/java/org/redkale/service/AbstractService.java index 9a7e88ad2..e4593d161 100644 --- a/src/main/java/org/redkale/service/AbstractService.java +++ b/src/main/java/org/redkale/service/AbstractService.java @@ -47,7 +47,7 @@ public abstract class AbstractService implements Service { } else { Thread thread = Thread.currentThread(); if (thread instanceof WorkThread) { - ((WorkThread) thread).runAsync(command); + ((WorkThread) thread).runWork(command); } else { ForkJoinPool.commonPool().execute(command); } diff --git a/src/main/java/org/redkale/util/Utility.java b/src/main/java/org/redkale/util/Utility.java index 5503cc95f..c4a99981d 100644 --- a/src/main/java/org/redkale/util/Utility.java +++ b/src/main/java/org/redkale/util/Utility.java @@ -300,6 +300,7 @@ public final class Utility { if (methodName != null) { return readFieldName(methodName); } else { + //native-image环境下获取不到methodName throw new RedkaleException("cannot found method-name from lambda " + func); } } catch (IOException e) {