优化Client
This commit is contained in:
@@ -11,6 +11,8 @@ import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.*;
|
||||
import java.util.function.*;
|
||||
import java.util.logging.Logger;
|
||||
import org.redkale.annotation.Nonnull;
|
||||
import org.redkale.annotation.Nullable;
|
||||
import org.redkale.net.*;
|
||||
import org.redkale.util.*;
|
||||
|
||||
@@ -40,8 +42,6 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
|
||||
|
||||
protected final boolean tcp; //是否TCP协议
|
||||
|
||||
protected final ClientAddress address; //连接的地址
|
||||
|
||||
protected final ScheduledThreadPoolExecutor timeoutScheduler;
|
||||
|
||||
//结合ClientRequest.isCompleted()使用
|
||||
@@ -53,26 +53,14 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
|
||||
|
||||
private final AtomicBoolean closed = new AtomicBoolean();
|
||||
|
||||
//不可protected、public
|
||||
private final ClientAddress address; //连接的地址
|
||||
|
||||
protected ScheduledFuture timeoutFuture;
|
||||
|
||||
//连随机地址模式
|
||||
final AtomicLong connIndexSeq = new AtomicLong();
|
||||
|
||||
//连随机地址模式
|
||||
final ClientConnection<R, P>[] connArray; //连接池
|
||||
|
||||
//连随机地址模式
|
||||
final LongAdder[] connRespWaitings; //连接当前处理数
|
||||
|
||||
//连随机地址模式
|
||||
final AtomicBoolean[] connOpenStates; //conns的标记组,当conn不存在或closed状态,标记为false
|
||||
|
||||
//连随机地址模式
|
||||
final int connLimit; //最大连接数
|
||||
|
||||
//连随机地址模式
|
||||
private final Queue<CompletableFuture<C>>[] connAcquireWaitings; //连接等待池
|
||||
|
||||
//连指定地址模式
|
||||
final ConcurrentHashMap<SocketAddress, AddressConnEntry> connAddrEntrys = new ConcurrentHashMap<>();
|
||||
|
||||
@@ -122,36 +110,28 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
|
||||
|
||||
@SuppressWarnings("OverridableMethodCallInConstructor")
|
||||
protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxConns,
|
||||
int maxPipelines, Supplier<R> pingRequestSupplier, Supplier<R> closeRequestSupplier, Function<String, Function<C, CompletableFuture<C>>> authenticate) {
|
||||
int maxPipelines, Supplier<R> pingRequestSupplier, Supplier<R> closeRequestSupplier,
|
||||
Function<String, Function<C, CompletableFuture<C>>> authenticate) {
|
||||
if (maxPipelines < 1) {
|
||||
throw new IllegalArgumentException("maxPipelines must bigger 0");
|
||||
}
|
||||
address.checkValid();
|
||||
this.name = name;
|
||||
this.group = group;
|
||||
this.tcp = tcp;
|
||||
this.address = address;
|
||||
this.address = Objects.requireNonNull(address);
|
||||
this.connLimit = maxConns;
|
||||
this.maxPipelines = maxPipelines;
|
||||
this.pingRequestSupplier = pingRequestSupplier;
|
||||
this.closeRequestSupplier = closeRequestSupplier;
|
||||
this.authenticate = authenticate;
|
||||
this.connArray = new ClientConnection[connLimit];
|
||||
this.connOpenStates = new AtomicBoolean[connLimit];
|
||||
this.connRespWaitings = new LongAdder[connLimit];
|
||||
this.connAcquireWaitings = new Queue[connLimit];
|
||||
for (int i = 0; i < connOpenStates.length; i++) {
|
||||
this.connOpenStates[i] = new AtomicBoolean();
|
||||
this.connRespWaitings[i] = new LongAdder();
|
||||
this.connAcquireWaitings[i] = new ConcurrentLinkedDeque();
|
||||
}
|
||||
//timeoutScheduler 不仅仅给超时用, 还给write用
|
||||
this.timeoutScheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> {
|
||||
final Thread t = new Thread(r, "Redkale-" + Client.this.getClass().getSimpleName() + "-" + resourceName() + "-Timeout-Thread");
|
||||
t.setDaemon(true);
|
||||
return t;
|
||||
});
|
||||
if (pingRequestSupplier != null && this.timeoutFuture == null) {
|
||||
int pingSeconds = pingIntervalSeconds();
|
||||
if (pingRequestSupplier != null && pingSeconds > 0 && this.timeoutFuture == null) {
|
||||
this.timeoutFuture = this.timeoutScheduler.scheduleAtFixedRate(() -> {
|
||||
try {
|
||||
R req = pingRequestSupplier.get();
|
||||
@@ -161,7 +141,8 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
|
||||
return;
|
||||
}
|
||||
long now = System.currentTimeMillis();
|
||||
for (ClientConnection<R, P> conn : this.connArray) {
|
||||
for (AddressConnEntry<ClientConnection<R, P>> entry : this.connAddrEntrys.values()) {
|
||||
ClientConnection<R, P> conn = entry.connection;
|
||||
if (conn == null) {
|
||||
continue;
|
||||
}
|
||||
@@ -173,11 +154,11 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
|
||||
} catch (Throwable t) {
|
||||
//do nothing
|
||||
}
|
||||
}, pingIntervalSeconds(), pingIntervalSeconds(), TimeUnit.SECONDS);
|
||||
}, pingSeconds, pingSeconds, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract C createClientConnection(final int index, AsyncConnection channel);
|
||||
protected abstract C createClientConnection(AsyncConnection channel);
|
||||
|
||||
//创建连接后会立马从服务器拉取数据构建的虚拟请求,返回null表示连上服务器后不会立马读取数据
|
||||
protected R createVirtualRequestAfterConnect() {
|
||||
@@ -194,9 +175,6 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
|
||||
public void close() {
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
this.timeoutScheduler.shutdownNow();
|
||||
for (ClientConnection conn : this.connArray) {
|
||||
closeConnection(conn);
|
||||
}
|
||||
for (AddressConnEntry<C> entry : this.connAddrEntrys.values()) {
|
||||
closeConnection(entry.connection);
|
||||
}
|
||||
@@ -223,34 +201,20 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
|
||||
}
|
||||
|
||||
public final CompletableFuture<P> sendAsync(R request) {
|
||||
request.traceid = Traces.computeIfAbsent(request.traceid, Traces.currentTraceid());
|
||||
if (request.workThread == null) {
|
||||
request.workThread = WorkThread.currentWorkThread();
|
||||
}
|
||||
return connect(request.workThread).thenCompose(conn -> writeChannel(conn, request));
|
||||
return sendAsync(getAddress(request), request, (Function) null);
|
||||
}
|
||||
|
||||
public final <T> CompletableFuture<T> sendAsync(R request, Function<P, T> respTransfer) {
|
||||
request.traceid = Traces.computeIfAbsent(request.traceid, Traces.currentTraceid());
|
||||
if (request.workThread == null) {
|
||||
request.workThread = WorkThread.currentWorkThread();
|
||||
}
|
||||
return connect(request.workThread).thenCompose(conn -> writeChannel(conn, request, respTransfer));
|
||||
return sendAsync(getAddress(request), request, respTransfer);
|
||||
}
|
||||
|
||||
public final CompletableFuture<P> sendAsync(SocketAddress addr, R request) {
|
||||
request.traceid = Traces.computeIfAbsent(request.traceid, Traces.currentTraceid());
|
||||
if (request.workThread == null) {
|
||||
request.workThread = WorkThread.currentWorkThread();
|
||||
}
|
||||
return connect(request.workThread, addr).thenCompose(conn -> writeChannel(conn, request));
|
||||
return sendAsync(addr, request, (Function) null);
|
||||
}
|
||||
|
||||
public final <T> CompletableFuture<T> sendAsync(SocketAddress addr, R request, Function<P, T> respTransfer) {
|
||||
request.traceid = Traces.computeIfAbsent(request.traceid, Traces.currentTraceid());
|
||||
if (request.workThread == null) {
|
||||
request.workThread = WorkThread.currentWorkThread();
|
||||
}
|
||||
request.computeWorkThreadIfAbsent();
|
||||
return connect(request.workThread, addr).thenCompose(conn -> writeChannel(conn, request, respTransfer));
|
||||
}
|
||||
|
||||
@@ -263,45 +227,22 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
|
||||
}
|
||||
|
||||
public final CompletableFuture<List<P>> sendAsync(R[] requests) {
|
||||
String traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid());
|
||||
for (R request : requests) {
|
||||
request.traceid = traceid;
|
||||
if (request.workThread == null) {
|
||||
request.workThread = WorkThread.currentWorkThread();
|
||||
}
|
||||
}
|
||||
return connect(requests[0].workThread).thenCompose(conn -> writeChannel(conn, requests));
|
||||
return sendAsync(getAddress(requests[0]), requests, (Function) null);
|
||||
}
|
||||
|
||||
public final <T> CompletableFuture<List<T>> sendAsync(R[] requests, Function<P, T> respTransfer) {
|
||||
String traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid());
|
||||
for (R request : requests) {
|
||||
request.traceid = traceid;
|
||||
if (request.workThread == null) {
|
||||
request.workThread = WorkThread.currentWorkThread();
|
||||
}
|
||||
}
|
||||
return connect(requests[0].workThread).thenCompose(conn -> writeChannel(conn, requests, respTransfer));
|
||||
return sendAsync(getAddress(requests[0]), requests, respTransfer);
|
||||
}
|
||||
|
||||
public final CompletableFuture<List<P>> sendAsync(SocketAddress addr, R[] requests) {
|
||||
String traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid());
|
||||
for (R request : requests) {
|
||||
request.traceid = traceid;
|
||||
if (request.workThread == null) {
|
||||
request.workThread = WorkThread.currentWorkThread();
|
||||
}
|
||||
}
|
||||
return connect(requests[0].workThread, addr).thenCompose(conn -> writeChannel(conn, requests));
|
||||
return sendAsync(addr, requests, (Function) null);
|
||||
}
|
||||
|
||||
public final <T> CompletableFuture<List<T>> sendAsync(SocketAddress addr, R[] requests, Function<P, T> respTransfer) {
|
||||
String traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid());
|
||||
for (R request : requests) {
|
||||
request.traceid = traceid;
|
||||
if (request.workThread == null) {
|
||||
request.workThread = WorkThread.currentWorkThread();
|
||||
}
|
||||
request.computeWorkThreadIfAbsent();
|
||||
}
|
||||
return connect(requests[0].workThread, addr).thenCompose(conn -> writeChannel(conn, requests, respTransfer));
|
||||
}
|
||||
@@ -319,118 +260,44 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
|
||||
return conn.writeChannel(requests, respTransfer);
|
||||
}
|
||||
|
||||
public final CompletableFuture<C> connect() {
|
||||
return connect(WorkThread.currentWorkThread(), true);
|
||||
//根据请求获取地址
|
||||
protected SocketAddress getAddress(@Nullable R request) {
|
||||
return address.randomAddress();
|
||||
}
|
||||
|
||||
public final CompletableFuture<C> newConnection() {
|
||||
return connect(WorkThread.currentWorkThread(), false);
|
||||
}
|
||||
|
||||
protected CompletableFuture<C> connect(WorkThread workThread) {
|
||||
return connect(workThread, true);
|
||||
}
|
||||
|
||||
private CompletableFuture<C> connect(final WorkThread workThread, final boolean pool) {
|
||||
final String traceid = Traces.currentTraceid();
|
||||
final int size = this.connArray.length;
|
||||
final int connIndex = (workThread != null && workThread.threads() == size) ? workThread.index() : (int) Math.abs(connIndexSeq.getAndIncrement()) % size;
|
||||
C cc = (C) this.connArray[connIndex];
|
||||
if (pool && cc != null && cc.isOpen()) {
|
||||
return CompletableFuture.completedFuture(cc);
|
||||
}
|
||||
long s = System.currentTimeMillis();
|
||||
final Queue<CompletableFuture<C>> waitQueue = this.connAcquireWaitings[connIndex];
|
||||
if (!pool || this.connOpenStates[connIndex].compareAndSet(false, true)) {
|
||||
CompletableFuture<C> future = group.createClient(tcp, this.address.randomAddress(), connectTimeoutSeconds, readTimeoutSeconds, writeTimeoutSeconds)
|
||||
.thenApply(c -> {
|
||||
Traces.currentTraceid(traceid);
|
||||
C rs = (C) createClientConnection(connIndex, c).setMaxPipelines(maxPipelines);
|
||||
// if (debug) {
|
||||
// logger.log(Level.FINEST, Times.nowMillis() + ": " + Thread.currentThread().getName() + ": " + rs
|
||||
// + ", " + rs.channel + ", 创建TCP连接耗时: (" + (System.currentTimeMillis() - s) + "ms)");
|
||||
// }
|
||||
return rs;
|
||||
});
|
||||
R virtualReq = createVirtualRequestAfterConnect();
|
||||
if (virtualReq != null) {
|
||||
virtualReq.traceid = traceid;
|
||||
future = future.thenCompose(conn -> {
|
||||
Traces.currentTraceid(traceid);
|
||||
return conn.writeVirtualRequest(virtualReq).thenApply(v -> conn);
|
||||
});
|
||||
} else {
|
||||
future = future.thenApply(conn -> {
|
||||
Traces.currentTraceid(traceid);
|
||||
conn.channel.readRegister(conn.getCodec()); //不用readRegisterInIOThread,因executeRead可能会异步
|
||||
return conn;
|
||||
});
|
||||
}
|
||||
if (authenticate != null) {
|
||||
future = future.thenCompose(authenticate.apply(traceid));
|
||||
}
|
||||
return future.thenApply(c -> {
|
||||
Traces.currentTraceid(traceid);
|
||||
c.setAuthenticated(true);
|
||||
if (pool) {
|
||||
this.connArray[connIndex] = c;
|
||||
CompletableFuture<C> f;
|
||||
while ((f = waitQueue.poll()) != null) {
|
||||
if (!f.isDone()) {
|
||||
if (workThread != null) {
|
||||
CompletableFuture<C> fs = f;
|
||||
workThread.runWork(() -> {
|
||||
Traces.currentTraceid(traceid);
|
||||
if (!fs.isDone()) {
|
||||
fs.complete(c);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
CompletableFuture<C> fs = f;
|
||||
Utility.execute(() -> {
|
||||
if (!fs.isDone()) {
|
||||
fs.complete(c);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return c;
|
||||
}
|
||||
).whenComplete((r, t) -> {
|
||||
if (pool && t != null) {
|
||||
this.connOpenStates[connIndex].set(false);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
int seconds = connectTimeoutSeconds > 0 ? connectTimeoutSeconds : 6;
|
||||
CompletableFuture rs = Utility.orTimeout(new CompletableFuture(), seconds, TimeUnit.SECONDS);
|
||||
waitQueue.offer(rs);
|
||||
return rs;
|
||||
}
|
||||
}
|
||||
//指定地址获取连接
|
||||
|
||||
public final CompletableFuture<C> connect(final SocketAddress addr) {
|
||||
return connect(WorkThread.currentWorkThread(), true, addr);
|
||||
return connect(getAddress(null), WorkThread.currentWorkThread(), false);
|
||||
}
|
||||
|
||||
//指定地址获取连接
|
||||
public final CompletableFuture<C> newConnection(final SocketAddress addr) {
|
||||
return connect(WorkThread.currentWorkThread(), false, addr);
|
||||
return connect(addr, WorkThread.currentWorkThread(), false);
|
||||
}
|
||||
|
||||
protected CompletableFuture<C> connect(WorkThread workThread, final SocketAddress addr) {
|
||||
return connect(workThread, true, addr);
|
||||
public final CompletableFuture<C> connect() {
|
||||
return connect(getAddress(null), WorkThread.currentWorkThread(), true);
|
||||
}
|
||||
|
||||
protected CompletableFuture<C> connect(R request) {
|
||||
return connect(getAddress(request), request.workThread, true);
|
||||
}
|
||||
|
||||
//指定地址获取连接
|
||||
private CompletableFuture<C> connect(final WorkThread workThread, final boolean pool, final SocketAddress addr) {
|
||||
final String traceid = Traces.currentTraceid();
|
||||
public final CompletableFuture<C> connect(final SocketAddress addr) {
|
||||
return connect(addr, WorkThread.currentWorkThread(), true);
|
||||
}
|
||||
|
||||
//指定地址获取连接
|
||||
protected CompletableFuture<C> connect(WorkThread workThread, final SocketAddress addr) {
|
||||
return connect(addr, workThread, true);
|
||||
}
|
||||
|
||||
//指定地址获取连接
|
||||
private CompletableFuture<C> connect(@Nonnull final SocketAddress addr, @Nullable final WorkThread workThread, final boolean pool) {
|
||||
if (addr == null) {
|
||||
return connect();
|
||||
return CompletableFuture.failedFuture(new NullPointerException("address is empty"));
|
||||
}
|
||||
final String traceid = Traces.currentTraceid();
|
||||
final AddressConnEntry<C> entry = connAddrEntrys.computeIfAbsent(addr, a -> new AddressConnEntry());
|
||||
C ec = entry.connection;
|
||||
if (pool && ec != null && ec.isOpen()) {
|
||||
@@ -438,9 +305,8 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
|
||||
}
|
||||
final Queue<CompletableFuture<C>> waitQueue = entry.connAcquireWaitings;
|
||||
if (!pool || entry.connOpenState.compareAndSet(false, true)) {
|
||||
long s = System.currentTimeMillis();
|
||||
CompletableFuture<C> future = group.createClient(tcp, addr, connectTimeoutSeconds, readTimeoutSeconds, writeTimeoutSeconds)
|
||||
.thenApply(c -> (C) createClientConnection(-1, c).setMaxPipelines(maxPipelines));
|
||||
.thenApply(c -> (C) createClientConnection(c).setConnEntry(entry).setMaxPipelines(maxPipelines));
|
||||
R virtualReq = createVirtualRequestAfterConnect();
|
||||
if (virtualReq != null) {
|
||||
virtualReq.traceid = traceid;
|
||||
@@ -450,10 +316,6 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
|
||||
});
|
||||
} else {
|
||||
future = future.thenApply(conn -> {
|
||||
// if (debug) {
|
||||
// logger.log(Level.FINEST, Times.nowMillis() + ": " + Thread.currentThread().getName() + ": " + conn
|
||||
// + ", 注册读操作: (" + (System.currentTimeMillis() - s) + "ms) " + conn.channel);
|
||||
// }
|
||||
conn.channel.readRegister(conn.getCodec()); //不用readRegisterInIOThread,因executeRead可能会异步
|
||||
return conn;
|
||||
});
|
||||
@@ -496,14 +358,6 @@ public abstract class Client<C extends ClientConnection<R, P>, R extends ClientR
|
||||
}
|
||||
}
|
||||
|
||||
protected long getRespWaitingCount() {
|
||||
long s = 0;
|
||||
for (LongAdder a : connRespWaitings) {
|
||||
s += a.longValue();
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
protected void incrReqWritedCounter() {
|
||||
reqWritedCounter.increment();
|
||||
}
|
||||
|
||||
@@ -19,60 +19,41 @@ import org.redkale.convert.json.JsonConvert;
|
||||
*/
|
||||
public class ClientAddress implements java.io.Serializable {
|
||||
|
||||
private SocketAddress address;
|
||||
|
||||
private List<WeightAddress> weights;
|
||||
|
||||
private SocketAddress[] addresses;
|
||||
|
||||
public ClientAddress() {
|
||||
}
|
||||
|
||||
public ClientAddress(SocketAddress address) {
|
||||
this.address = address;
|
||||
public ClientAddress(SocketAddress... addresses) {
|
||||
if (addresses == null || addresses.length == 0) {
|
||||
throw new NullPointerException("addresses is empty");
|
||||
}
|
||||
for (SocketAddress addr : addresses) {
|
||||
Objects.requireNonNull(addr);
|
||||
}
|
||||
this.addresses = addresses;
|
||||
}
|
||||
|
||||
public ClientAddress(List<WeightAddress> addrs) {
|
||||
this.weights = new ArrayList<>(addrs);
|
||||
if (addrs == null || addrs.isEmpty()) {
|
||||
throw new NullPointerException("addresses is empty");
|
||||
}
|
||||
this.addresses = createAddressArray(addrs);
|
||||
}
|
||||
|
||||
public ClientAddress addWeightAddress(WeightAddress... addrs) {
|
||||
if (this.weights == null) {
|
||||
this.weights = new ArrayList<>();
|
||||
}
|
||||
this.weights.addAll(Arrays.asList(addrs));
|
||||
return this;
|
||||
}
|
||||
|
||||
public void updateAddress(SocketAddress addr, List<WeightAddress> addrs) {
|
||||
if (addr == null && (addrs == null || addrs.isEmpty())) {
|
||||
throw new NullPointerException("address is empty");
|
||||
}
|
||||
setWeights(addrs);
|
||||
setAddress(addr);
|
||||
}
|
||||
|
||||
public void checkValid() {
|
||||
if (address == null && (weights == null || weights.isEmpty())) {
|
||||
throw new NullPointerException("address is empty");
|
||||
public void updateAddress(List<WeightAddress> addrs) {
|
||||
if (addrs == null || addrs.isEmpty()) {
|
||||
throw new NullPointerException("addresses is empty");
|
||||
}
|
||||
this.addresses = createAddressArray(addrs);
|
||||
}
|
||||
|
||||
public SocketAddress randomAddress() {
|
||||
SocketAddress addr = address;
|
||||
if (addr == null) {
|
||||
SocketAddress[] addrs = this.addresses;
|
||||
if (addrs == null) {
|
||||
this.addresses = createAddressArray(this.weights);
|
||||
addrs = this.addresses;
|
||||
}
|
||||
if (addrs.length == 1) {
|
||||
addr = addrs[0];
|
||||
} else {
|
||||
addr = addrs[ThreadLocalRandom.current().nextInt(addrs.length)];
|
||||
}
|
||||
SocketAddress[] addrs = this.addresses;
|
||||
if (addrs.length == 1) {
|
||||
return addrs[0];
|
||||
}
|
||||
return addr;
|
||||
return addrs[ThreadLocalRandom.current().nextInt(addrs.length)];
|
||||
}
|
||||
|
||||
private static SocketAddress[] createAddressArray(List<WeightAddress> ws) {
|
||||
@@ -110,23 +91,6 @@ public class ClientAddress implements java.io.Serializable {
|
||||
return newAddrs;
|
||||
}
|
||||
|
||||
public SocketAddress getAddress() {
|
||||
return address;
|
||||
}
|
||||
|
||||
public void setAddress(SocketAddress address) {
|
||||
this.address = address;
|
||||
}
|
||||
|
||||
public List<WeightAddress> getWeights() {
|
||||
return weights;
|
||||
}
|
||||
|
||||
public void setWeights(List<WeightAddress> weights) {
|
||||
this.weights = weights == null ? null : new ArrayList<>(weights);
|
||||
this.addresses = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return JsonConvert.root().convertTo(this);
|
||||
|
||||
@@ -32,14 +32,10 @@ import org.redkale.util.*;
|
||||
*/
|
||||
public abstract class ClientConnection<R extends ClientRequest, P extends ClientResult> implements Consumer<AsyncConnection> {
|
||||
|
||||
//=-1 表示连接放在connAddrEntrys存储
|
||||
//>=0 表示connArray的下坐标,从0开始
|
||||
protected final int index;
|
||||
|
||||
protected final Client client;
|
||||
|
||||
@Nonnull
|
||||
protected final LongAdder respWaitingCounter;
|
||||
protected LongAdder respWaitingCounter;
|
||||
|
||||
protected final LongAdder doneRequestCounter = new LongAdder();
|
||||
|
||||
@@ -76,8 +72,8 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
|
||||
//pauseWriting=true,此字段才会有值; pauseWriting=false,此字段值为null
|
||||
ClientFuture currHalfWriteFuture;
|
||||
|
||||
@Nullable
|
||||
private final Client.AddressConnEntry connEntry;
|
||||
@Nonnull
|
||||
private Client.AddressConnEntry connEntry;
|
||||
|
||||
protected final AsyncConnection channel;
|
||||
|
||||
@@ -96,16 +92,19 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
|
||||
private boolean authenticated;
|
||||
|
||||
@SuppressWarnings({"LeakingThisInConstructor", "OverridableMethodCallInConstructor"})
|
||||
public ClientConnection(Client<? extends ClientConnection<R, P>, R, P> client, int index, AsyncConnection channel) {
|
||||
public ClientConnection(Client<? extends ClientConnection<R, P>, R, P> client, AsyncConnection channel) {
|
||||
this.client = client;
|
||||
this.codec = createCodec();
|
||||
this.index = index;
|
||||
this.connEntry = index >= 0 ? null : client.connAddrEntrys.get(channel.getRemoteAddress());
|
||||
this.respWaitingCounter = index >= 0 ? client.connRespWaitings[index] : this.connEntry.connRespWaiting;
|
||||
this.channel = channel.beforeCloseListener(this); //.fastHandler(writeHandler);
|
||||
this.writeBuffer = channel.pollWriteBuffer();
|
||||
}
|
||||
|
||||
ClientConnection setConnEntry(Client.AddressConnEntry entry) {
|
||||
this.connEntry = entry;
|
||||
this.respWaitingCounter = entry.connRespWaiting;
|
||||
return this;
|
||||
}
|
||||
|
||||
protected abstract ClientCodec createCodec();
|
||||
|
||||
protected final CompletableFuture<P> writeChannel(R request) {
|
||||
@@ -312,10 +311,7 @@ public abstract class ClientConnection<R extends ClientRequest, P extends Client
|
||||
@Override //AsyncConnection.beforeCloseListener
|
||||
public void accept(AsyncConnection t) {
|
||||
respWaitingCounter.reset();
|
||||
if (index >= 0) {
|
||||
client.connOpenStates[index].set(false);
|
||||
client.connArray[index] = null; //必须connOpenStates之后
|
||||
} else if (connEntry != null) { //index=-1
|
||||
if (connEntry != null) { //index=-1
|
||||
connEntry.connOpenState.set(false);
|
||||
}
|
||||
ClientMessageListener listener = getCodec().getMessageListener();
|
||||
|
||||
@@ -35,6 +35,13 @@ public abstract class ClientRequest {
|
||||
|
||||
public abstract void writeTo(ClientConnection conn, ByteArray array);
|
||||
|
||||
<T extends ClientRequest> T computeWorkThreadIfAbsent() {
|
||||
if (workThread == null) {
|
||||
workThread = WorkThread.currentWorkThread();
|
||||
}
|
||||
return (T) this;
|
||||
}
|
||||
|
||||
public Serializable getRequestid() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -66,8 +66,8 @@ public class HttpSimpleClient extends Client<HttpSimpleConnection, HttpSimpleReq
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HttpSimpleConnection createClientConnection(int index, AsyncConnection channel) {
|
||||
return new HttpSimpleConnection(this, index, channel);
|
||||
protected HttpSimpleConnection createClientConnection(AsyncConnection channel) {
|
||||
return new HttpSimpleConnection(this, channel);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -19,8 +19,8 @@ import org.redkale.net.client.ClientConnection;
|
||||
*/
|
||||
class HttpSimpleConnection extends ClientConnection<HttpSimpleRequest, HttpSimpleResult> {
|
||||
|
||||
public HttpSimpleConnection(HttpSimpleClient client, int index, AsyncConnection channel) {
|
||||
super(client, index, channel);
|
||||
public HttpSimpleConnection(HttpSimpleClient client, AsyncConnection channel) {
|
||||
super(client, channel);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -38,8 +38,8 @@ public class SncpClient extends Client<SncpClientConnection, SncpClientRequest,
|
||||
}
|
||||
|
||||
@Override
|
||||
public SncpClientConnection createClientConnection(int index, AsyncConnection channel) {
|
||||
return new SncpClientConnection(this, index, channel);
|
||||
public SncpClientConnection createClientConnection(AsyncConnection channel) {
|
||||
return new SncpClientConnection(this, channel);
|
||||
}
|
||||
|
||||
public InetSocketAddress getClientSncpAddress() {
|
||||
|
||||
@@ -21,8 +21,8 @@ public class SncpClientConnection extends ClientConnection<SncpClientRequest, Sn
|
||||
|
||||
private final ObjectPool<SncpClientRequest> requestPool;
|
||||
|
||||
public SncpClientConnection(SncpClient client, int index, AsyncConnection channel) {
|
||||
super(client, index, channel);
|
||||
public SncpClientConnection(SncpClient client, AsyncConnection channel) {
|
||||
super(client, channel);
|
||||
requestPool = ObjectPool.createUnsafePool(Thread.currentThread(), 256,
|
||||
ObjectPool.createSafePool(256, t -> new SncpClientRequest(), SncpClientRequest::prepare, SncpClientRequest::recycle)
|
||||
);
|
||||
|
||||
@@ -225,7 +225,7 @@ public class SncpRemoteInfo<T extends Service> {
|
||||
|
||||
protected InetSocketAddress nextRemoteAddress() {
|
||||
InetSocketAddress addr = sncpRpcGroups.nextRemoteAddress(resourceid);
|
||||
if (addr == null) {
|
||||
if (addr != null) {
|
||||
return addr;
|
||||
}
|
||||
SncpRpcGroup srg = sncpRpcGroups.getSncpRpcGroup(remoteGroup);
|
||||
|
||||
@@ -33,7 +33,7 @@ public class SncpClientCodecTest {
|
||||
InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 3344);
|
||||
final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16);
|
||||
SncpClient client = new SncpClient("test", asyncGroup, 0, sncpAddress, new ClientAddress(remoteAddress), "TCP", Utility.cpus(), 16);
|
||||
SncpClientConnection conn = client.createClientConnection(1, asyncGroup.newTCPClientConnection());
|
||||
SncpClientConnection conn = client.createClientConnection(asyncGroup.newTCPClientConnection());
|
||||
SncpClientCodec codec = new SncpClientCodec(conn);
|
||||
List respResults = new ArrayList();
|
||||
try {
|
||||
|
||||
@@ -33,7 +33,7 @@ public class SncpRequestParseTest {
|
||||
InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 3344);
|
||||
final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16);
|
||||
SncpClient client = new SncpClient("test", asyncGroup, 0, sncpAddress, new ClientAddress(remoteAddress), "TCP", Utility.cpus(), 16);
|
||||
SncpClientConnection conn = client.createClientConnection(1, asyncGroup.newTCPClientConnection());
|
||||
SncpClientConnection conn = client.createClientConnection(asyncGroup.newTCPClientConnection());
|
||||
|
||||
SncpContext.SncpContextConfig config = new SncpContext.SncpContextConfig();
|
||||
config.logger = Logger.getLogger(SncpRequestParseTest.class.getSimpleName());
|
||||
|
||||
@@ -5,6 +5,7 @@ package org.redkale.test.sncp;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import org.redkale.service.AbstractService;
|
||||
import org.redkale.util.Times;
|
||||
import org.redkale.util.Utility;
|
||||
|
||||
/**
|
||||
@@ -14,26 +15,29 @@ import org.redkale.util.Utility;
|
||||
public class SncpSleepService extends AbstractService {
|
||||
|
||||
public CompletableFuture<String> sleep200() {
|
||||
return (CompletableFuture) CompletableFuture.supplyAsync(() -> {
|
||||
System.out.println(Times.nowMillis() + " " + Thread.currentThread().getName() + " 接收sleep200");
|
||||
return CompletableFuture.supplyAsync(() -> {
|
||||
Utility.sleep(200);
|
||||
System.out.println("执行完sleep200");
|
||||
System.out.println(Times.nowMillis() + " " + Thread.currentThread().getName() + " 执行完sleep200");
|
||||
return "ok200";
|
||||
});
|
||||
}, getExecutor());
|
||||
}
|
||||
|
||||
public CompletableFuture<String> sleep300() {
|
||||
return (CompletableFuture) CompletableFuture.supplyAsync(() -> {
|
||||
System.out.println(Times.nowMillis() + " " + Thread.currentThread().getName() + " 接收sleep300");
|
||||
return CompletableFuture.supplyAsync(() -> {
|
||||
Utility.sleep(300);
|
||||
System.out.println("执行完sleep300");
|
||||
System.out.println(Times.nowMillis() + " " + Thread.currentThread().getName() + " 执行完sleep300");
|
||||
return "ok300";
|
||||
});
|
||||
}, getExecutor());
|
||||
}
|
||||
|
||||
public CompletableFuture<String> sleep500() {
|
||||
return (CompletableFuture) CompletableFuture.supplyAsync(() -> {
|
||||
System.out.println(Times.nowMillis() + " " + Thread.currentThread().getName() + " 接收sleep500");
|
||||
return CompletableFuture.supplyAsync(() -> {
|
||||
Utility.sleep(500);
|
||||
System.out.println("执行完sleep500");
|
||||
System.out.println(Times.nowMillis() + " " + Thread.currentThread().getName() + " 执行完sleep500");
|
||||
return "ok500";
|
||||
});
|
||||
}, getExecutor());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,11 +2,13 @@ package org.redkale.test.sncp;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.redkale.boot.Application;
|
||||
import org.redkale.convert.bson.BsonConvert;
|
||||
import org.redkale.convert.json.JsonConvert;
|
||||
import org.redkale.net.AsyncIOGroup;
|
||||
import org.redkale.net.WorkThread;
|
||||
import org.redkale.net.client.ClientAddress;
|
||||
import org.redkale.net.sncp.*;
|
||||
import org.redkale.util.*;
|
||||
@@ -29,14 +31,17 @@ public class SncpSleepTest {
|
||||
public void run() throws Exception {
|
||||
System.out.println("------------------- 并发调用 -----------------------------------");
|
||||
final Application application = Application.create(true);
|
||||
final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16);
|
||||
final ExecutorService workExecutor = WorkThread.createWorkExecutor(10, "Thread-Work-%s");
|
||||
final AsyncIOGroup asyncGroup = new AsyncIOGroup("Redkale-TestClient-IOThread-%s", workExecutor, 8192, 16);
|
||||
asyncGroup.start();
|
||||
final ResourceFactory resFactory = ResourceFactory.create();
|
||||
resFactory.register(Application.RESNAME_APP_EXECUTOR, ExecutorService.class, workExecutor);
|
||||
resFactory.register(JsonConvert.root());
|
||||
resFactory.register(BsonConvert.root());
|
||||
|
||||
//------------------------ 初始化 CService ------------------------------------
|
||||
SncpSleepService service = Sncp.createSimpleLocalService(SncpSleepService.class, resFactory);
|
||||
resFactory.inject(service);
|
||||
SncpServer server = new SncpServer(application, System.currentTimeMillis(), null, resFactory);
|
||||
server.getResourceFactory().register(application);
|
||||
server.addSncpServlet(service);
|
||||
@@ -60,6 +65,7 @@ public class SncpSleepTest {
|
||||
long e = System.currentTimeMillis() - s;
|
||||
System.out.println("耗时: " + e + " ms");
|
||||
server.shutdown();
|
||||
Assertions.assertTrue(e < 600);
|
||||
workExecutor.shutdown();
|
||||
Assertions.assertTrue(e < 900);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user