临时优化Client runWork

This commit is contained in:
Redkale
2022-12-30 10:08:50 +08:00
parent 76a85a6e3d
commit 2542eb959a
5 changed files with 61 additions and 39 deletions

View File

@@ -16,8 +16,6 @@ import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*;
import static javax.net.ssl.SSLEngineResult.Status.*;
import javax.net.ssl.*;
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*;
import static javax.net.ssl.SSLEngineResult.Status.*;
import org.redkale.util.*;
/**

View File

@@ -25,8 +25,4 @@ public class ClientIOThread extends AsyncIOThread {
super(name, index, threads, workExecutor, selector, unsafeBufferPool, safeBufferPool);
}
@Override
public final boolean inClient() {
return true;
}
}

View File

@@ -80,6 +80,14 @@ public class WorkThread extends Thread implements Executor {
}
}
public void runWork(Runnable command) {
if (workExecutor == null) {
command.run();
} else {
workExecutor.execute(command);
}
}
public void runAsync(Runnable command) {
if (workExecutor == null) {
ForkJoinPool.commonPool().execute(command);
@@ -114,16 +122,6 @@ public class WorkThread extends Thread implements Executor {
return false;
}
/**
* 是否客户端的IO线程
*
* @since 2.8.0
* @return boolean
*/
public boolean inClient() {
return false;
}
public boolean inCurrThread() {
return this == Thread.currentThread();
}

View File

@@ -203,28 +203,54 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
workThread = request.workThread;
request.workThread = null;
}
// if (rs.exc != null) {
// if (workThread == null || workThread == Thread.currentThread() || workThread.inIO()
// || workThread.getState() != Thread.State.RUNNABLE) {
// if (request != null) {
// Traces.currTraceid(request.traceid);
// }
// respFuture.completeExceptionally(rs.exc);
// } else {
// workThread.execute(() -> {
// if (request != null) {
// Traces.currTraceid(request.traceid);
// }
// respFuture.completeExceptionally(rs.exc);
// });
// }
// } else {
// if (workThread == null || workThread == Thread.currentThread() || workThread.inIO()
// || workThread.getState() != Thread.State.RUNNABLE) {
// if (request != null) {
// Traces.currTraceid(request.traceid);
// }
// respFuture.complete(rs.result);
// } else {
// workThread.execute(() -> {
// if (request != null) {
// Traces.currTraceid(request.traceid);
// }
// respFuture.complete(rs.result);
// });
// }
// }
if (workThread == null || workThread.getWorkExecutor() == null) {
workThread = channel.getAsyncIOThread();
}
if (rs.exc != null) {
if (workThread == null || workThread == Thread.currentThread() || workThread.inIO()
|| workThread.getState() != Thread.State.RUNNABLE) {
Traces.currTraceid(request.traceid);
workThread.runWork(() -> {
if (request != null) {
Traces.currTraceid(request.traceid);
}
respFuture.completeExceptionally(rs.exc);
} else {
workThread.execute(() -> {
Traces.currTraceid(request.traceid);
respFuture.completeExceptionally(rs.exc);
});
}
});
} else {
if (workThread == null || workThread == Thread.currentThread() || workThread.inIO()
|| workThread.getState() != Thread.State.RUNNABLE) {
Traces.currTraceid(request.traceid);
respFuture.complete(rs.result);
} else {
workThread.execute(() -> {
workThread.runWork(() -> {
if (request != null) {
Traces.currTraceid(request.traceid);
respFuture.complete(rs.result);
});
}
}
respFuture.complete(rs.result);
});
}
} catch (Throwable t) {
client.logger.log(Level.INFO, "Complete result error, request: " + respFuture.request, t);

View File

@@ -82,11 +82,15 @@ public class ClientFuture<T> extends CompletableFuture<T> implements Runnable {
workThread = request.workThread;
request.workThread = null;
}
if (workThread == null || workThread == Thread.currentThread() || workThread.inIO()
|| workThread.getState() != Thread.State.RUNNABLE) {
this.completeExceptionally(ex);
} else {
workThread.execute(() -> completeExceptionally(ex));
// if (workThread == null || workThread == Thread.currentThread() || workThread.inIO()
// || workThread.getState() != Thread.State.RUNNABLE) {
// this.completeExceptionally(ex);
// } else {
// workThread.execute(() -> completeExceptionally(ex));
// }
if (workThread == null || workThread.getWorkExecutor() == null) {
workThread = conn.getChannel().getAsyncIOThread();
}
workThread.runWork(() -> completeExceptionally(ex));
}
}