优化net模块

This commit is contained in:
redkale
2023-06-30 08:55:57 +08:00
parent 2fa3b1c7b2
commit 93524d515a
8 changed files with 20 additions and 47 deletions

View File

@@ -92,7 +92,7 @@ public class AsyncIOThread extends WorkThread {
* @param command 操作
*/
@Override
public void execute(Runnable command) {
public final void execute(Runnable command) {
commandQueue.offer(command);
selector.wakeup();
}
@@ -103,7 +103,7 @@ public class AsyncIOThread extends WorkThread {
* @param commands 操作
*/
@Override
public void execute(Runnable... commands) {
public final void execute(Runnable... commands) {
for (Runnable command : commands) {
commandQueue.offer(command);
}
@@ -116,7 +116,7 @@ public class AsyncIOThread extends WorkThread {
* @param commands 操作
*/
@Override
public void execute(Collection<Runnable> commands) {
public final void execute(Collection<Runnable> commands) {
if (commands != null) {
for (Runnable command : commands) {
commandQueue.offer(command);

View File

@@ -86,6 +86,11 @@ public class WorkThread extends Thread implements Executor {
}
}
//与execute的区别在于子类AsyncIOThread中execute会被重载确保在IO线程中执行
public final void runWork(Runnable command) {
execute(command);
}
public void execute(Runnable... commands) {
if (workExecutor == null) {
for (Runnable command : commands) {
@@ -113,22 +118,6 @@ public class WorkThread extends Thread implements Executor {
}
}
public void runWork(Runnable command) {
if (workExecutor == null) {
command.run();
} else {
workExecutor.execute(command);
}
}
public void runAsync(Runnable command) {
if (workExecutor == null) {
ForkJoinPool.commonPool().execute(command);
} else {
workExecutor.execute(command);
}
}
public ExecutorService getWorkExecutor() {
return workExecutor;
}

View File

@@ -123,21 +123,11 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
connection.preComplete(message, (R) request, exc);
if (exc != null) {
if (workThread == readThread) {
workThread.runWork(() -> {
if (workThread.inIO()) {
workThread.execute(() -> {
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(exc);
});
} else if (workThread.getState() == Thread.State.RUNNABLE) {
if (workThread.inIO()) {
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(exc);
} else {
workThread.execute(() -> {
Traces.currentTraceid(request.traceid);
respFuture.completeExceptionally(exc);
});
}
} else {
workThread.runWork(() -> {
Traces.currentTraceid(request.traceid);
@@ -146,21 +136,11 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
}
} else {
final P rs = request.respTransfer == null ? message : (P) request.respTransfer.apply(message);
if (workThread == readThread) {
workThread.runWork(() -> {
if (workThread.inIO()) {
workThread.execute(() -> {
Traces.currentTraceid(request.traceid);
respFuture.complete(rs);
});
} else if (workThread.getState() == Thread.State.RUNNABLE) {
if (workThread.inIO()) {
Traces.currentTraceid(request.traceid);
respFuture.complete(rs);
} else {
workThread.execute(() -> {
Traces.currentTraceid(request.traceid);
respFuture.complete(rs);
});
}
} else {
workThread.runWork(() -> {
Traces.currentTraceid(request.traceid);

View File

@@ -191,9 +191,11 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
} else {
channel.write(writeArray, this, writeHandler);
}
} else {
writePending.compareAndSet(true, false);
}
} else {
requestQueue.add(respFuture);
requestQueue.offer(respFuture);
}
}

View File

@@ -79,7 +79,7 @@ public abstract class WebSocket<G extends Serializable, T> {
public static final int RETCODE_WSOFFLINE = 1 << 8; //256
@Comment("WebSocket将延迟发送")
public static final int RETCODE_DEAYSEND = 1 << 9; //512
public static final int RETCODE_DELAYSEND = 1 << 9; //512
WebSocketEngine _engine; //不可能为空
@@ -242,7 +242,7 @@ public abstract class WebSocket<G extends Serializable, T> {
delayPackets = new ArrayList<>();
}
delayPackets.add(packet);
return CompletableFuture.completedFuture(RETCODE_DEAYSEND);
return CompletableFuture.completedFuture(RETCODE_DELAYSEND);
}
CompletableFuture<Integer> rs = this._writeHandler.send(packet); //this._writeIOThread.send(this, packet);
if (_engine.logger.isLoggable(Level.FINER) && packet != WebSocketPacket.DEFAULT_PING_PACKET) {

View File

@@ -84,6 +84,7 @@ public class WebSocketWriteHandler implements CompletionHandler<Integer, Void> {
@Override
public void failed(Throwable exc, Void attachment) {
writePending.set(false);
WebSocketFuture req;
try {
while ((req = requestQueue.poll()) != null) {

View File

@@ -47,7 +47,7 @@ public abstract class AbstractService implements Service {
} else {
Thread thread = Thread.currentThread();
if (thread instanceof WorkThread) {
((WorkThread) thread).runAsync(command);
((WorkThread) thread).runWork(command);
} else {
ForkJoinPool.commonPool().execute(command);
}

View File

@@ -300,6 +300,7 @@ public final class Utility {
if (methodName != null) {
return readFieldName(methodName);
} else {
//native-image环境下获取不到methodName
throw new RedkaleException("cannot found method-name from lambda " + func);
}
} catch (IOException e) {