client泛型优化

This commit is contained in:
Redkale
2023-01-13 11:05:04 +08:00
parent 41028384af
commit ad059eb3d6
2 changed files with 22 additions and 21 deletions

View File

@@ -21,10 +21,11 @@ import org.redkale.util.*;
* @author zhangjx * @author zhangjx
* @since 2.3.0 * @since 2.3.0
* *
* @param <C> 连接对象
* @param <R> 请求对象 * @param <R> 请求对象
* @param <P> 响应对象 * @param <P> 响应对象
*/ */
public abstract class Client<R extends ClientRequest, P> implements Resourcable { public abstract class Client<C extends ClientConnection<R, P>, R extends ClientRequest, P> implements Resourcable {
public static final int DEFAULT_MAX_PIPELINES = 128; public static final int DEFAULT_MAX_PIPELINES = 128;
@@ -59,7 +60,7 @@ public abstract class Client<R extends ClientRequest, P> implements Resourcable
protected AtomicBoolean[] connOpenStates; //conns的标记组当conn不存在或closed状态标记为false protected AtomicBoolean[] connOpenStates; //conns的标记组当conn不存在或closed状态标记为false
protected final Queue<CompletableFuture<ClientConnection>>[] connAcquireWaitings; //连接等待池 protected final Queue<CompletableFuture<C>>[] connAcquireWaitings; //连接等待池
protected int connLimit = Utility.cpus(); //最大连接数 protected int connLimit = Utility.cpus(); //最大连接数
@@ -79,7 +80,7 @@ public abstract class Client<R extends ClientRequest, P> implements Resourcable
protected Supplier<R> closeRequestSupplier; protected Supplier<R> closeRequestSupplier;
//创建连接后进行的登录鉴权操作 //创建连接后进行的登录鉴权操作
protected Function<CompletableFuture<ClientConnection>, CompletableFuture<ClientConnection>> authenticate; protected Function<CompletableFuture<C>, CompletableFuture<C>> authenticate;
protected Client(String name, AsyncGroup group, ClientAddress address) { protected Client(String name, AsyncGroup group, ClientAddress address) {
this(name, group, true, address, Utility.cpus(), DEFAULT_MAX_PIPELINES, null, null, null); this(name, group, true, address, Utility.cpus(), DEFAULT_MAX_PIPELINES, null, null, null);
@@ -98,18 +99,18 @@ public abstract class Client<R extends ClientRequest, P> implements Resourcable
} }
protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxConns, protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxConns,
Function<CompletableFuture<ClientConnection>, CompletableFuture<ClientConnection>> authenticate) { Function<CompletableFuture<C>, CompletableFuture<C>> authenticate) {
this(name, group, tcp, address, maxConns, DEFAULT_MAX_PIPELINES, null, null, authenticate); this(name, group, tcp, address, maxConns, DEFAULT_MAX_PIPELINES, null, null, authenticate);
} }
protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxConns, protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxConns,
Supplier<R> closeRequestSupplier, Function<CompletableFuture<ClientConnection>, CompletableFuture<ClientConnection>> authenticate) { Supplier<R> closeRequestSupplier, Function<CompletableFuture<C>, CompletableFuture<C>> authenticate) {
this(name, group, tcp, address, maxConns, DEFAULT_MAX_PIPELINES, null, closeRequestSupplier, authenticate); this(name, group, tcp, address, maxConns, DEFAULT_MAX_PIPELINES, null, closeRequestSupplier, authenticate);
} }
@SuppressWarnings("OverridableMethodCallInConstructor") @SuppressWarnings("OverridableMethodCallInConstructor")
protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxConns, protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxConns,
int maxPipelines, Supplier<R> pingRequestSupplier, Supplier<R> closeRequestSupplier, Function<CompletableFuture<ClientConnection>, CompletableFuture<ClientConnection>> authenticate) { int maxPipelines, Supplier<R> pingRequestSupplier, Supplier<R> closeRequestSupplier, Function<CompletableFuture<C>, CompletableFuture<C>> authenticate) {
if (maxPipelines < 1) { if (maxPipelines < 1) {
throw new IllegalArgumentException("maxPipelines must bigger 0"); throw new IllegalArgumentException("maxPipelines must bigger 0");
} }
@@ -155,7 +156,7 @@ public abstract class Client<R extends ClientRequest, P> implements Resourcable
if (now - conn.getLastWriteTime() < 10_000) { if (now - conn.getLastWriteTime() < 10_000) {
continue; continue;
} }
conn.writeChannel(req).thenAccept(p -> handlePingResult(conn, p)); conn.writeChannel(req).thenAccept(p -> handlePingResult((C) conn, p));
} }
} catch (Throwable t) { } catch (Throwable t) {
} }
@@ -163,7 +164,7 @@ public abstract class Client<R extends ClientRequest, P> implements Resourcable
} }
} }
protected abstract ClientConnection createClientConnection(final int index, AsyncConnection channel); protected abstract C createClientConnection(final int index, AsyncConnection channel);
//创建连接后先从服务器拉取数据构建的虚拟请求返回null表示连上服务器后不读取数据 //创建连接后先从服务器拉取数据构建的虚拟请求返回null表示连上服务器后不读取数据
protected R createVirtualRequestAfterConnect() { protected R createVirtualRequestAfterConnect() {
@@ -174,7 +175,7 @@ public abstract class Client<R extends ClientRequest, P> implements Resourcable
return 30; return 30;
} }
protected void handlePingResult(ClientConnection conn, P result) { protected void handlePingResult(C conn, P result) {
} }
public synchronized void close() { public synchronized void close() {
@@ -219,14 +220,14 @@ public abstract class Client<R extends ClientRequest, P> implements Resourcable
return conn.writeChannel(request); return conn.writeChannel(request);
} }
protected CompletableFuture<ClientConnection> connect() { protected CompletableFuture<C> connect() {
return connect(null); return connect(null);
} }
protected CompletableFuture<ClientConnection> connect(final ChannelContext context) { protected CompletableFuture<C> connect(final ChannelContext context) {
final boolean cflag = context != null && connectionContextName != null; final boolean cflag = context != null && connectionContextName != null;
if (cflag) { if (cflag) {
ClientConnection cc = context.getAttribute(connectionContextName); C cc = context.getAttribute(connectionContextName);
if (cc != null && cc.isOpen()) { if (cc != null && cc.isOpen()) {
return CompletableFuture.completedFuture(cc); return CompletableFuture.completedFuture(cc);
} }
@@ -240,7 +241,7 @@ public abstract class Client<R extends ClientRequest, P> implements Resourcable
connIndex = (int) Math.abs(Thread.currentThread().getId() % size); connIndex = (int) Math.abs(Thread.currentThread().getId() % size);
} }
// if (connIndex >= 0) { // if (connIndex >= 0) {
ClientConnection cc = this.connArray[connIndex]; C cc = (C) this.connArray[connIndex];
if (cc != null && cc.isOpen()) { if (cc != null && cc.isOpen()) {
if (cflag) { if (cflag) {
context.setAttribute(connectionContextName, cc); context.setAttribute(connectionContextName, cc);
@@ -248,20 +249,20 @@ public abstract class Client<R extends ClientRequest, P> implements Resourcable
return CompletableFuture.completedFuture(cc); return CompletableFuture.completedFuture(cc);
} }
final int index = connIndex; final int index = connIndex;
final Queue<CompletableFuture<ClientConnection>> waitQueue = this.connAcquireWaitings[index]; final Queue<CompletableFuture<C>> waitQueue = this.connAcquireWaitings[index];
if (this.connOpenStates[index].compareAndSet(false, true)) { if (this.connOpenStates[index].compareAndSet(false, true)) {
CompletableFuture<ClientConnection> future = address.createClient(tcp, group, readTimeoutSeconds, writeTimeoutSeconds) CompletableFuture<C> future = address.createClient(tcp, group, readTimeoutSeconds, writeTimeoutSeconds)
.thenApply(c -> createClientConnection(index, c).setMaxPipelines(maxPipelines)); .thenApply(c -> (C) createClientConnection(index, c).setMaxPipelines(maxPipelines));
R virtualReq = createVirtualRequestAfterConnect(); R virtualReq = createVirtualRequestAfterConnect();
if (virtualReq != null) { if (virtualReq != null) {
future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn)); future = future.thenCompose(conn -> conn.writeVirtualRequest(virtualReq).thenApply(v -> conn));
} else { } else {
future = future.thenApply(conn -> conn.readChannel()); future = future.thenApply(conn -> (C) conn.readChannel());
} }
return (authenticate == null ? future : authenticate.apply(future)).thenApply(c -> { return (authenticate == null ? future : authenticate.apply(future)).thenApply(c -> {
c.setAuthenticated(true); c.setAuthenticated(true);
this.connArray[index] = c; this.connArray[index] = c;
CompletableFuture<ClientConnection> f; CompletableFuture<C> f;
if (cflag) { if (cflag) {
context.setAttribute(connectionContextName, c); context.setAttribute(connectionContextName, c);
} }
@@ -308,7 +309,7 @@ public abstract class Client<R extends ClientRequest, P> implements Resourcable
// return waitClientConnection(); // return waitClientConnection();
} }
protected CompletableFuture<ClientConnection> waitClientConnection() { protected CompletableFuture<C> waitClientConnection() {
CompletableFuture rs = Utility.orTimeout(new CompletableFuture(), 6, TimeUnit.SECONDS); CompletableFuture rs = Utility.orTimeout(new CompletableFuture(), 6, TimeUnit.SECONDS);
connAcquireWaitings[connSeqno.getAndIncrement() % this.connLimit].offer(rs); connAcquireWaitings[connSeqno.getAndIncrement() % this.connLimit].offer(rs);
return rs; return rs;

View File

@@ -29,7 +29,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
protected final int index; //从0开始 connArray的下坐标 protected final int index; //从0开始 connArray的下坐标
protected final Client<R, P> client; protected final Client client;
protected final LongAdder respWaitingCounter; protected final LongAdder respWaitingCounter;
@@ -56,7 +56,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
private boolean authenticated; private boolean authenticated;
@SuppressWarnings({"LeakingThisInConstructor", "OverridableMethodCallInConstructor"}) @SuppressWarnings({"LeakingThisInConstructor", "OverridableMethodCallInConstructor"})
public ClientConnection(Client client, int index, AsyncConnection channel) { public ClientConnection(Client<? extends ClientConnection<R, P>, R, P> client, int index, AsyncConnection channel) {
this.client = client; this.client = client;
this.codec = createCodec(); this.codec = createCodec();
this.index = index; this.index = index;