优化net

This commit is contained in:
redkale
2023-06-30 10:18:00 +08:00
parent 479d2d427a
commit f38adacbe4
2 changed files with 14 additions and 38 deletions

View File

@@ -86,11 +86,6 @@ public class WorkThread extends Thread implements Executor {
} }
} }
//与execute的区别在于子类AsyncIOThread中execute会被重载确保在IO线程中执行
public final void runWork(Runnable command) {
execute(command);
}
public void execute(Runnable... commands) { public void execute(Runnable... commands) {
if (workExecutor == null) { if (workExecutor == null) {
for (Runnable command : commands) { for (Runnable command : commands) {
@@ -118,6 +113,15 @@ public class WorkThread extends Thread implements Executor {
} }
} }
//与execute的区别在于子类AsyncIOThread中execute会被重载确保在IO线程中执行
public final void runWork(Runnable command) {
if (workExecutor == null) {
command.run();
} else {
workExecutor.execute(command);
}
}
public ExecutorService getWorkExecutor() { public ExecutorService getWorkExecutor() {
return workExecutor; return workExecutor;
} }

View File

@@ -123,21 +123,11 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
connection.preComplete(message, (R) request, exc); connection.preComplete(message, (R) request, exc);
if (exc != null) { if (exc != null) {
if (workThread == readThread) { if (workThread.inIO() && workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
workThread.runWork(() -> { workThread.execute(() -> {
Traces.currentTraceid(request.traceid); Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(exc); respFuture.completeExceptionally(exc);
}); });
} else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
if (workThread.inIO()) {
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(exc);
} else {
workThread.execute(() -> {
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(exc);
});
}
} else { } else {
workThread.runWork(() -> { workThread.runWork(() -> {
Traces.currentTraceid(request.traceid); Traces.currentTraceid(request.traceid);
@@ -146,21 +136,11 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
} }
} else { } else {
final P rs = request.respTransfer == null ? message : (P) request.respTransfer.apply(message); final P rs = request.respTransfer == null ? message : (P) request.respTransfer.apply(message);
if (workThread == readThread) { if (workThread.inIO() && workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
workThread.runWork(() -> { workThread.execute(() -> {
Traces.currentTraceid(request.traceid); Traces.currentTraceid(request.traceid);
respFuture.complete(rs); respFuture.complete(rs);
}); });
} else if (workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
if (workThread.inIO()) {
Traces.currentTraceid(request.traceid);
respFuture.complete(rs);
} else {
workThread.execute(() -> {
Traces.currentTraceid(request.traceid);
respFuture.complete(rs);
});
}
} else { } else {
workThread.runWork(() -> { workThread.runWork(() -> {
Traces.currentTraceid(request.traceid); Traces.currentTraceid(request.traceid);
@@ -169,15 +149,7 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
} }
} }
} catch (Throwable t) { } catch (Throwable t) {
if (workThread == readThread) { if (workThread.inIO() && workThread.getState() == Thread.State.RUNNABLE) { //fullCache时state不是RUNNABLE
workThread.runWork(() -> {
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(t);
});
} else if (workThread.inIO()) {
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(t);
} else if (workThread.getState() == Thread.State.RUNNABLE) {
workThread.execute(() -> { workThread.execute(() -> {
Traces.currentTraceid(request.traceid); Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(t); respFuture.completeExceptionally(t);