优化Client的pingRequest和closeRequest
This commit is contained in:
@@ -8,7 +8,7 @@ package org.redkale.net.client;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.*;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.*;
|
||||
import java.util.logging.Logger;
|
||||
import org.redkale.net.*;
|
||||
import org.redkale.util.*;
|
||||
@@ -68,10 +68,10 @@ public abstract class Client<R extends ClientRequest, P> {
|
||||
|
||||
//------------------ 可选项 ------------------
|
||||
//PING心跳的请求数据,为null且pingInterval<1表示不需要定时ping
|
||||
protected R pingRequest;
|
||||
protected Supplier<R> pingRequestSupplier;
|
||||
|
||||
//关闭请求的数据, 为null表示直接关闭
|
||||
protected R closeRequest;
|
||||
protected Supplier<R> closeRequestSupplier;
|
||||
|
||||
//创建连接后进行的登录鉴权操作
|
||||
protected Function<CompletableFuture<ClientConnection>, CompletableFuture<ClientConnection>> authenticate;
|
||||
@@ -98,13 +98,13 @@ public abstract class Client<R extends ClientRequest, P> {
|
||||
}
|
||||
|
||||
protected Client(AsyncGroup group, boolean tcp, ClientAddress address, int maxconns,
|
||||
R closeRequest, Function<CompletableFuture<ClientConnection>, CompletableFuture<ClientConnection>> authenticate) {
|
||||
this(group, tcp, address, maxconns, DEFAULT_MAX_PIPELINES, null, closeRequest, authenticate);
|
||||
Supplier<R> closeRequestSupplier, Function<CompletableFuture<ClientConnection>, CompletableFuture<ClientConnection>> authenticate) {
|
||||
this(group, tcp, address, maxconns, DEFAULT_MAX_PIPELINES, null, closeRequestSupplier, authenticate);
|
||||
}
|
||||
|
||||
@SuppressWarnings("OverridableMethodCallInConstructor")
|
||||
protected Client(AsyncGroup group, boolean tcp, ClientAddress address, int maxconns,
|
||||
int maxPipelines, R pingRequest, R closeRequest, Function<CompletableFuture<ClientConnection>, CompletableFuture<ClientConnection>> authenticate) {
|
||||
int maxPipelines, Supplier<R> pingRequestSupplier, Supplier<R> closeRequestSupplier, Function<CompletableFuture<ClientConnection>, CompletableFuture<ClientConnection>> authenticate) {
|
||||
if (maxPipelines < 1) {
|
||||
throw new IllegalArgumentException("maxPipelines must bigger 0");
|
||||
}
|
||||
@@ -113,8 +113,8 @@ public abstract class Client<R extends ClientRequest, P> {
|
||||
this.address = address;
|
||||
this.connLimit = maxconns;
|
||||
this.maxPipelines = maxPipelines;
|
||||
this.pingRequest = pingRequest;
|
||||
this.closeRequest = closeRequest;
|
||||
this.pingRequestSupplier = pingRequestSupplier;
|
||||
this.closeRequestSupplier = closeRequestSupplier;
|
||||
this.authenticate = authenticate;
|
||||
this.connArray = new ClientConnection[connLimit];
|
||||
this.connOpenStates = new AtomicBoolean[connLimit];
|
||||
@@ -131,10 +131,10 @@ public abstract class Client<R extends ClientRequest, P> {
|
||||
t.setDaemon(true);
|
||||
return t;
|
||||
});
|
||||
if (pingRequest != null && this.timeoutFuture == null) {
|
||||
if (pingRequestSupplier != null && this.timeoutFuture == null) {
|
||||
this.timeoutFuture = this.timeoutScheduler.scheduleAtFixedRate(() -> {
|
||||
try {
|
||||
R req = pingRequest;
|
||||
R req = pingRequestSupplier.get();
|
||||
if (req == null) { //可能运行中进行重新赋值
|
||||
timeoutFuture.cancel(true);
|
||||
timeoutFuture = null;
|
||||
@@ -170,7 +170,7 @@ public abstract class Client<R extends ClientRequest, P> {
|
||||
return;
|
||||
}
|
||||
this.timeoutScheduler.shutdownNow();
|
||||
final R closereq = closeRequest;
|
||||
final R closereq = closeRequestSupplier == null ? null : closeRequestSupplier.get();
|
||||
for (ClientConnection conn : this.connArray) {
|
||||
if (conn == null) {
|
||||
continue;
|
||||
|
||||
@@ -61,7 +61,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
|
||||
|
||||
@Override
|
||||
public void completed(Integer result, Void attachment) {
|
||||
if (writeLastRequest != null && writeLastRequest == client.closeRequest) {
|
||||
if (writeLastRequest != null && writeLastRequest.isCloseType()) {
|
||||
if (closeFuture != null) {
|
||||
channel.getWriteIOThread().runWork(() -> {
|
||||
closeFuture.complete(null);
|
||||
@@ -350,7 +350,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
|
||||
|
||||
protected final CompletableFuture<P> writeChannel(R request) {
|
||||
ClientFuture respFuture;
|
||||
if (request == client.closeRequest) {
|
||||
if (request.isCloseType()) {
|
||||
respFuture = createClientFuture(null);
|
||||
closeFuture = respFuture;
|
||||
} else {
|
||||
@@ -373,7 +373,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
|
||||
private void writeChannelInThread(R request, ClientFuture respFuture) {
|
||||
Serializable reqid = request.getRequestid();
|
||||
//保证顺序一致
|
||||
if (client.closeRequest != null && respFuture.request == client.closeRequest) {
|
||||
if (respFuture.request.isCloseType()) {
|
||||
responseQueue.offer(ClientFuture.EMPTY);
|
||||
} else {
|
||||
request.respFuture = respFuture;
|
||||
|
||||
@@ -33,6 +33,11 @@ public abstract class ClientRequest implements BiConsumer<ClientConnection, Byte
|
||||
return null;
|
||||
}
|
||||
|
||||
//关闭请求一定要返回false
|
||||
public boolean isCloseType() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public long getCreateTime() {
|
||||
return createTime;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,70 @@
|
||||
/*
|
||||
*
|
||||
*/
|
||||
package org.redkale.net.client;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.Selector;
|
||||
import java.util.concurrent.*;
|
||||
import org.redkale.net.ClientIOThread;
|
||||
import org.redkale.util.*;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author zhangjx
|
||||
*/
|
||||
public class ClientWriteIOThread extends ClientIOThread {
|
||||
|
||||
private final BlockingQueue<ClientEntity> requestQueue = new LinkedBlockingQueue<>();
|
||||
|
||||
public ClientWriteIOThread(String name, int index, int threads, ExecutorService workExecutor, Selector selector,
|
||||
ObjectPool<ByteBuffer> unsafeBufferPool, ObjectPool<ByteBuffer> safeBufferPool) {
|
||||
super(name, index, threads, workExecutor, selector, unsafeBufferPool, safeBufferPool);
|
||||
}
|
||||
|
||||
public void offerRequest(ClientConnection conn, ClientRequest request, ClientFuture respFuture) {
|
||||
requestQueue.offer(new ClientEntity(conn, request, respFuture));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (!this.closed) {
|
||||
ClientEntity entity;
|
||||
try {
|
||||
while ((entity = requestQueue.take()) != null) {
|
||||
ClientConnection conn = entity.conn;
|
||||
ClientRequest request = entity.request;
|
||||
ClientFuture respFuture = entity.respFuture;
|
||||
Serializable reqid = request.getRequestid();
|
||||
if (reqid == null) {
|
||||
conn.responseQueue.offer(respFuture);
|
||||
} else {
|
||||
conn.responseMap.put(reqid, respFuture);
|
||||
}
|
||||
ByteArray rw = conn.writeArray;
|
||||
rw.clear();
|
||||
request.accept(conn, rw);
|
||||
conn.channel.write(rw, conn.writeHandler);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected static class ClientEntity {
|
||||
|
||||
ClientConnection conn;
|
||||
|
||||
ClientRequest request;
|
||||
|
||||
ClientFuture respFuture;
|
||||
|
||||
public ClientEntity(ClientConnection conn, ClientRequest request, ClientFuture respFuture) {
|
||||
this.conn = conn;
|
||||
this.request = request;
|
||||
this.respFuture = respFuture;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user