ClientWriteThread

This commit is contained in:
redkale
2024-08-17 11:32:27 +08:00
parent 9402db9fd5
commit cd415699b7
2 changed files with 79 additions and 17 deletions

View File

@@ -8,7 +8,6 @@ package org.redkale.net.client;
import java.util.Objects;
import java.util.concurrent.*;
import org.redkale.annotation.Nonnull;
import org.redkale.annotation.Nullable;
import org.redkale.net.*;
import org.redkale.util.Traces;
@@ -25,7 +24,7 @@ public class ClientFuture<R extends ClientRequest, T> extends CompletableFuture<
@Nonnull
protected final R request;
@Nullable
@Nonnull
protected final ClientConnection conn;
private ScheduledFuture timeout;
@@ -38,6 +37,7 @@ public class ClientFuture<R extends ClientRequest, T> extends CompletableFuture<
ClientFuture(ClientConnection conn, R request) {
super();
Objects.requireNonNull(conn);
Objects.requireNonNull(request);
this.conn = conn;
this.request = request;
@@ -79,7 +79,7 @@ public class ClientFuture<R extends ClientRequest, T> extends CompletableFuture<
private void runTimeout() {
String traceid = request != null ? request.getTraceid() : null;
if (request != null && conn != null) {
if (request != null) {
conn.removeRespFuture(request.getRequestid(), this);
}
TimeoutException ex = new TimeoutException("client-request: " + request);
@@ -88,26 +88,17 @@ public class ClientFuture<R extends ClientRequest, T> extends CompletableFuture<
workThread = request.workThread;
request.workThread = null;
}
if (conn != null && (workThread == null || workThread.getWorkExecutor() == null)) {
if (workThread == null || workThread.getWorkExecutor() == null) {
workThread = conn.getChannel().getReadIOThread();
}
if (workThread == null) {
workThread.runWork(() -> {
Traces.currentTraceid(traceid);
if (!isDone()) {
completeExceptionally(ex);
}
} else {
workThread.runWork(() -> {
Traces.currentTraceid(traceid);
if (!isDone()) {
completeExceptionally(ex);
}
Traces.removeTraceid();
});
}
if (conn != null) {
conn.dispose(ex);
}
Traces.removeTraceid();
});
conn.dispose(ex);
}
@Override

View File

@@ -0,0 +1,71 @@
/*
*/
package org.redkale.net.client;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 输出队列线程
*
* <p>详情见: https://redkale.org
*
* @author zhangjx
* @since 2.8.0
*/
public class ClientWriteThread extends Thread {
protected final LinkedBlockingQueue<ClientFuture> writeQueue = new LinkedBlockingQueue();
protected ClientWriteThread() {
// do nothing
}
public void offer(ClientFuture respFuture) {
this.writeQueue.add(Objects.requireNonNull(respFuture));
}
@Override
public void run() {
final List<ClientFuture> list = new ArrayList<>();
while (true) {
try {
ClientFuture respFuture = this.writeQueue.take();
if (respFuture == ClientFuture.NIL) {
return;
}
boolean over = false;
list.clear();
list.add(respFuture);
ClientConnection conn = respFuture.conn;
int max = conn.getMaxPipelines();
while (--max > 0 && (respFuture = this.writeQueue.poll()) != null) {
if (respFuture == ClientFuture.NIL) {
over = true;
break;
} else if (respFuture.conn == conn) {
list.add(respFuture);
}
}
conn.sendRequestToChannel(list.toArray(new ClientFuture[list.size()]));
list.clear();
if (over) {
return;
}
} catch (InterruptedException ie) {
break;
} catch (Throwable e) {
// do nothing
}
}
}
public void close() {
this.writeQueue.add(ClientFuture.NIL);
this.interrupt();
}
}