diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index dda8988b0..73197fb4f 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -11,6 +11,8 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.function.*; import java.util.logging.Logger; +import org.redkale.annotation.Nonnull; +import org.redkale.annotation.Nullable; import org.redkale.net.*; import org.redkale.util.*; @@ -40,8 +42,6 @@ public abstract class Client, R extends ClientR protected final boolean tcp; //是否TCP协议 - protected final ClientAddress address; //连接的地址 - protected final ScheduledThreadPoolExecutor timeoutScheduler; //结合ClientRequest.isCompleted()使用 @@ -53,26 +53,14 @@ public abstract class Client, R extends ClientR private final AtomicBoolean closed = new AtomicBoolean(); + //不可protected、public + private final ClientAddress address; //连接的地址 + protected ScheduledFuture timeoutFuture; - //连随机地址模式 - final AtomicLong connIndexSeq = new AtomicLong(); - - //连随机地址模式 - final ClientConnection[] connArray; //连接池 - - //连随机地址模式 - final LongAdder[] connRespWaitings; //连接当前处理数 - - //连随机地址模式 - final AtomicBoolean[] connOpenStates; //conns的标记组,当conn不存在或closed状态,标记为false - //连随机地址模式 final int connLimit; //最大连接数 - //连随机地址模式 - private final Queue>[] connAcquireWaitings; //连接等待池 - //连指定地址模式 final ConcurrentHashMap connAddrEntrys = new ConcurrentHashMap<>(); @@ -122,36 +110,28 @@ public abstract class Client, R extends ClientR @SuppressWarnings("OverridableMethodCallInConstructor") protected Client(String name, AsyncGroup group, boolean tcp, ClientAddress address, int maxConns, - int maxPipelines, Supplier pingRequestSupplier, Supplier closeRequestSupplier, Function>> authenticate) { + int maxPipelines, Supplier pingRequestSupplier, Supplier closeRequestSupplier, + Function>> authenticate) { if (maxPipelines < 1) { throw new IllegalArgumentException("maxPipelines must bigger 0"); } - address.checkValid(); this.name = name; this.group = group; this.tcp = tcp; - this.address = address; + this.address = Objects.requireNonNull(address); this.connLimit = maxConns; this.maxPipelines = maxPipelines; this.pingRequestSupplier = pingRequestSupplier; this.closeRequestSupplier = closeRequestSupplier; this.authenticate = authenticate; - this.connArray = new ClientConnection[connLimit]; - this.connOpenStates = new AtomicBoolean[connLimit]; - this.connRespWaitings = new LongAdder[connLimit]; - this.connAcquireWaitings = new Queue[connLimit]; - for (int i = 0; i < connOpenStates.length; i++) { - this.connOpenStates[i] = new AtomicBoolean(); - this.connRespWaitings[i] = new LongAdder(); - this.connAcquireWaitings[i] = new ConcurrentLinkedDeque(); - } //timeoutScheduler 不仅仅给超时用, 还给write用 this.timeoutScheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> { final Thread t = new Thread(r, "Redkale-" + Client.this.getClass().getSimpleName() + "-" + resourceName() + "-Timeout-Thread"); t.setDaemon(true); return t; }); - if (pingRequestSupplier != null && this.timeoutFuture == null) { + int pingSeconds = pingIntervalSeconds(); + if (pingRequestSupplier != null && pingSeconds > 0 && this.timeoutFuture == null) { this.timeoutFuture = this.timeoutScheduler.scheduleAtFixedRate(() -> { try { R req = pingRequestSupplier.get(); @@ -161,7 +141,8 @@ public abstract class Client, R extends ClientR return; } long now = System.currentTimeMillis(); - for (ClientConnection conn : this.connArray) { + for (AddressConnEntry> entry : this.connAddrEntrys.values()) { + ClientConnection conn = entry.connection; if (conn == null) { continue; } @@ -173,11 +154,11 @@ public abstract class Client, R extends ClientR } catch (Throwable t) { //do nothing } - }, pingIntervalSeconds(), pingIntervalSeconds(), TimeUnit.SECONDS); + }, pingSeconds, pingSeconds, TimeUnit.SECONDS); } } - protected abstract C createClientConnection(final int index, AsyncConnection channel); + protected abstract C createClientConnection(AsyncConnection channel); //创建连接后会立马从服务器拉取数据构建的虚拟请求,返回null表示连上服务器后不会立马读取数据 protected R createVirtualRequestAfterConnect() { @@ -194,9 +175,6 @@ public abstract class Client, R extends ClientR public void close() { if (closed.compareAndSet(false, true)) { this.timeoutScheduler.shutdownNow(); - for (ClientConnection conn : this.connArray) { - closeConnection(conn); - } for (AddressConnEntry entry : this.connAddrEntrys.values()) { closeConnection(entry.connection); } @@ -223,34 +201,20 @@ public abstract class Client, R extends ClientR } public final CompletableFuture

sendAsync(R request) { - request.traceid = Traces.computeIfAbsent(request.traceid, Traces.currentTraceid()); - if (request.workThread == null) { - request.workThread = WorkThread.currentWorkThread(); - } - return connect(request.workThread).thenCompose(conn -> writeChannel(conn, request)); + return sendAsync(getAddress(request), request, (Function) null); } public final CompletableFuture sendAsync(R request, Function respTransfer) { - request.traceid = Traces.computeIfAbsent(request.traceid, Traces.currentTraceid()); - if (request.workThread == null) { - request.workThread = WorkThread.currentWorkThread(); - } - return connect(request.workThread).thenCompose(conn -> writeChannel(conn, request, respTransfer)); + return sendAsync(getAddress(request), request, respTransfer); } public final CompletableFuture

sendAsync(SocketAddress addr, R request) { - request.traceid = Traces.computeIfAbsent(request.traceid, Traces.currentTraceid()); - if (request.workThread == null) { - request.workThread = WorkThread.currentWorkThread(); - } - return connect(request.workThread, addr).thenCompose(conn -> writeChannel(conn, request)); + return sendAsync(addr, request, (Function) null); } public final CompletableFuture sendAsync(SocketAddress addr, R request, Function respTransfer) { request.traceid = Traces.computeIfAbsent(request.traceid, Traces.currentTraceid()); - if (request.workThread == null) { - request.workThread = WorkThread.currentWorkThread(); - } + request.computeWorkThreadIfAbsent(); return connect(request.workThread, addr).thenCompose(conn -> writeChannel(conn, request, respTransfer)); } @@ -263,45 +227,22 @@ public abstract class Client, R extends ClientR } public final CompletableFuture> sendAsync(R[] requests) { - String traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid()); - for (R request : requests) { - request.traceid = traceid; - if (request.workThread == null) { - request.workThread = WorkThread.currentWorkThread(); - } - } - return connect(requests[0].workThread).thenCompose(conn -> writeChannel(conn, requests)); + return sendAsync(getAddress(requests[0]), requests, (Function) null); } public final CompletableFuture> sendAsync(R[] requests, Function respTransfer) { - String traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid()); - for (R request : requests) { - request.traceid = traceid; - if (request.workThread == null) { - request.workThread = WorkThread.currentWorkThread(); - } - } - return connect(requests[0].workThread).thenCompose(conn -> writeChannel(conn, requests, respTransfer)); + return sendAsync(getAddress(requests[0]), requests, respTransfer); } public final CompletableFuture> sendAsync(SocketAddress addr, R[] requests) { - String traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid()); - for (R request : requests) { - request.traceid = traceid; - if (request.workThread == null) { - request.workThread = WorkThread.currentWorkThread(); - } - } - return connect(requests[0].workThread, addr).thenCompose(conn -> writeChannel(conn, requests)); + return sendAsync(addr, requests, (Function) null); } public final CompletableFuture> sendAsync(SocketAddress addr, R[] requests, Function respTransfer) { String traceid = Traces.computeIfAbsent(requests[0].traceid, Traces.currentTraceid()); for (R request : requests) { request.traceid = traceid; - if (request.workThread == null) { - request.workThread = WorkThread.currentWorkThread(); - } + request.computeWorkThreadIfAbsent(); } return connect(requests[0].workThread, addr).thenCompose(conn -> writeChannel(conn, requests, respTransfer)); } @@ -319,118 +260,44 @@ public abstract class Client, R extends ClientR return conn.writeChannel(requests, respTransfer); } - public final CompletableFuture connect() { - return connect(WorkThread.currentWorkThread(), true); + //根据请求获取地址 + protected SocketAddress getAddress(@Nullable R request) { + return address.randomAddress(); } public final CompletableFuture newConnection() { - return connect(WorkThread.currentWorkThread(), false); - } - - protected CompletableFuture connect(WorkThread workThread) { - return connect(workThread, true); - } - - private CompletableFuture connect(final WorkThread workThread, final boolean pool) { - final String traceid = Traces.currentTraceid(); - final int size = this.connArray.length; - final int connIndex = (workThread != null && workThread.threads() == size) ? workThread.index() : (int) Math.abs(connIndexSeq.getAndIncrement()) % size; - C cc = (C) this.connArray[connIndex]; - if (pool && cc != null && cc.isOpen()) { - return CompletableFuture.completedFuture(cc); - } - long s = System.currentTimeMillis(); - final Queue> waitQueue = this.connAcquireWaitings[connIndex]; - if (!pool || this.connOpenStates[connIndex].compareAndSet(false, true)) { - CompletableFuture future = group.createClient(tcp, this.address.randomAddress(), connectTimeoutSeconds, readTimeoutSeconds, writeTimeoutSeconds) - .thenApply(c -> { - Traces.currentTraceid(traceid); - C rs = (C) createClientConnection(connIndex, c).setMaxPipelines(maxPipelines); -// if (debug) { -// logger.log(Level.FINEST, Times.nowMillis() + ": " + Thread.currentThread().getName() + ": " + rs -// + ", " + rs.channel + ", 创建TCP连接耗时: (" + (System.currentTimeMillis() - s) + "ms)"); -// } - return rs; - }); - R virtualReq = createVirtualRequestAfterConnect(); - if (virtualReq != null) { - virtualReq.traceid = traceid; - future = future.thenCompose(conn -> { - Traces.currentTraceid(traceid); - return conn.writeVirtualRequest(virtualReq).thenApply(v -> conn); - }); - } else { - future = future.thenApply(conn -> { - Traces.currentTraceid(traceid); - conn.channel.readRegister(conn.getCodec()); //不用readRegisterInIOThread,因executeRead可能会异步 - return conn; - }); - } - if (authenticate != null) { - future = future.thenCompose(authenticate.apply(traceid)); - } - return future.thenApply(c -> { - Traces.currentTraceid(traceid); - c.setAuthenticated(true); - if (pool) { - this.connArray[connIndex] = c; - CompletableFuture f; - while ((f = waitQueue.poll()) != null) { - if (!f.isDone()) { - if (workThread != null) { - CompletableFuture fs = f; - workThread.runWork(() -> { - Traces.currentTraceid(traceid); - if (!fs.isDone()) { - fs.complete(c); - } - }); - } else { - CompletableFuture fs = f; - Utility.execute(() -> { - if (!fs.isDone()) { - fs.complete(c); - } - }); - } - } - } - } - return c; - } - ).whenComplete((r, t) -> { - if (pool && t != null) { - this.connOpenStates[connIndex].set(false); - } - }); - } else { - int seconds = connectTimeoutSeconds > 0 ? connectTimeoutSeconds : 6; - CompletableFuture rs = Utility.orTimeout(new CompletableFuture(), seconds, TimeUnit.SECONDS); - waitQueue.offer(rs); - return rs; - } - } - //指定地址获取连接 - - public final CompletableFuture connect(final SocketAddress addr) { - return connect(WorkThread.currentWorkThread(), true, addr); + return connect(getAddress(null), WorkThread.currentWorkThread(), false); } //指定地址获取连接 public final CompletableFuture newConnection(final SocketAddress addr) { - return connect(WorkThread.currentWorkThread(), false, addr); + return connect(addr, WorkThread.currentWorkThread(), false); } - protected CompletableFuture connect(WorkThread workThread, final SocketAddress addr) { - return connect(workThread, true, addr); + public final CompletableFuture connect() { + return connect(getAddress(null), WorkThread.currentWorkThread(), true); + } + + protected CompletableFuture connect(R request) { + return connect(getAddress(request), request.workThread, true); } //指定地址获取连接 - private CompletableFuture connect(final WorkThread workThread, final boolean pool, final SocketAddress addr) { - final String traceid = Traces.currentTraceid(); + public final CompletableFuture connect(final SocketAddress addr) { + return connect(addr, WorkThread.currentWorkThread(), true); + } + + //指定地址获取连接 + protected CompletableFuture connect(WorkThread workThread, final SocketAddress addr) { + return connect(addr, workThread, true); + } + + //指定地址获取连接 + private CompletableFuture connect(@Nonnull final SocketAddress addr, @Nullable final WorkThread workThread, final boolean pool) { if (addr == null) { - return connect(); + return CompletableFuture.failedFuture(new NullPointerException("address is empty")); } + final String traceid = Traces.currentTraceid(); final AddressConnEntry entry = connAddrEntrys.computeIfAbsent(addr, a -> new AddressConnEntry()); C ec = entry.connection; if (pool && ec != null && ec.isOpen()) { @@ -438,9 +305,8 @@ public abstract class Client, R extends ClientR } final Queue> waitQueue = entry.connAcquireWaitings; if (!pool || entry.connOpenState.compareAndSet(false, true)) { - long s = System.currentTimeMillis(); CompletableFuture future = group.createClient(tcp, addr, connectTimeoutSeconds, readTimeoutSeconds, writeTimeoutSeconds) - .thenApply(c -> (C) createClientConnection(-1, c).setMaxPipelines(maxPipelines)); + .thenApply(c -> (C) createClientConnection(c).setConnEntry(entry).setMaxPipelines(maxPipelines)); R virtualReq = createVirtualRequestAfterConnect(); if (virtualReq != null) { virtualReq.traceid = traceid; @@ -450,10 +316,6 @@ public abstract class Client, R extends ClientR }); } else { future = future.thenApply(conn -> { -// if (debug) { -// logger.log(Level.FINEST, Times.nowMillis() + ": " + Thread.currentThread().getName() + ": " + conn -// + ", 注册读操作: (" + (System.currentTimeMillis() - s) + "ms) " + conn.channel); -// } conn.channel.readRegister(conn.getCodec()); //不用readRegisterInIOThread,因executeRead可能会异步 return conn; }); @@ -496,14 +358,6 @@ public abstract class Client, R extends ClientR } } - protected long getRespWaitingCount() { - long s = 0; - for (LongAdder a : connRespWaitings) { - s += a.longValue(); - } - return s; - } - protected void incrReqWritedCounter() { reqWritedCounter.increment(); } diff --git a/src/main/java/org/redkale/net/client/ClientAddress.java b/src/main/java/org/redkale/net/client/ClientAddress.java index 53a5b46e1..c6de32d3c 100644 --- a/src/main/java/org/redkale/net/client/ClientAddress.java +++ b/src/main/java/org/redkale/net/client/ClientAddress.java @@ -19,60 +19,41 @@ import org.redkale.convert.json.JsonConvert; */ public class ClientAddress implements java.io.Serializable { - private SocketAddress address; - - private List weights; - private SocketAddress[] addresses; public ClientAddress() { } - public ClientAddress(SocketAddress address) { - this.address = address; + public ClientAddress(SocketAddress... addresses) { + if (addresses == null || addresses.length == 0) { + throw new NullPointerException("addresses is empty"); + } + for (SocketAddress addr : addresses) { + Objects.requireNonNull(addr); + } + this.addresses = addresses; } public ClientAddress(List addrs) { - this.weights = new ArrayList<>(addrs); + if (addrs == null || addrs.isEmpty()) { + throw new NullPointerException("addresses is empty"); + } + this.addresses = createAddressArray(addrs); } - public ClientAddress addWeightAddress(WeightAddress... addrs) { - if (this.weights == null) { - this.weights = new ArrayList<>(); - } - this.weights.addAll(Arrays.asList(addrs)); - return this; - } - - public void updateAddress(SocketAddress addr, List addrs) { - if (addr == null && (addrs == null || addrs.isEmpty())) { - throw new NullPointerException("address is empty"); - } - setWeights(addrs); - setAddress(addr); - } - - public void checkValid() { - if (address == null && (weights == null || weights.isEmpty())) { - throw new NullPointerException("address is empty"); + public void updateAddress(List addrs) { + if (addrs == null || addrs.isEmpty()) { + throw new NullPointerException("addresses is empty"); } + this.addresses = createAddressArray(addrs); } public SocketAddress randomAddress() { - SocketAddress addr = address; - if (addr == null) { - SocketAddress[] addrs = this.addresses; - if (addrs == null) { - this.addresses = createAddressArray(this.weights); - addrs = this.addresses; - } - if (addrs.length == 1) { - addr = addrs[0]; - } else { - addr = addrs[ThreadLocalRandom.current().nextInt(addrs.length)]; - } + SocketAddress[] addrs = this.addresses; + if (addrs.length == 1) { + return addrs[0]; } - return addr; + return addrs[ThreadLocalRandom.current().nextInt(addrs.length)]; } private static SocketAddress[] createAddressArray(List ws) { @@ -110,23 +91,6 @@ public class ClientAddress implements java.io.Serializable { return newAddrs; } - public SocketAddress getAddress() { - return address; - } - - public void setAddress(SocketAddress address) { - this.address = address; - } - - public List getWeights() { - return weights; - } - - public void setWeights(List weights) { - this.weights = weights == null ? null : new ArrayList<>(weights); - this.addresses = null; - } - @Override public String toString() { return JsonConvert.root().convertTo(this); diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 7250265a5..0a947a46e 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -32,14 +32,10 @@ import org.redkale.util.*; */ public abstract class ClientConnection implements Consumer { - //=-1 表示连接放在connAddrEntrys存储 - //>=0 表示connArray的下坐标,从0开始 - protected final int index; - protected final Client client; @Nonnull - protected final LongAdder respWaitingCounter; + protected LongAdder respWaitingCounter; protected final LongAdder doneRequestCounter = new LongAdder(); @@ -76,8 +72,8 @@ public abstract class ClientConnection, R, P> client, int index, AsyncConnection channel) { + public ClientConnection(Client, R, P> client, AsyncConnection channel) { this.client = client; this.codec = createCodec(); - this.index = index; - this.connEntry = index >= 0 ? null : client.connAddrEntrys.get(channel.getRemoteAddress()); - this.respWaitingCounter = index >= 0 ? client.connRespWaitings[index] : this.connEntry.connRespWaiting; this.channel = channel.beforeCloseListener(this); //.fastHandler(writeHandler); this.writeBuffer = channel.pollWriteBuffer(); } + ClientConnection setConnEntry(Client.AddressConnEntry entry) { + this.connEntry = entry; + this.respWaitingCounter = entry.connRespWaiting; + return this; + } + protected abstract ClientCodec createCodec(); protected final CompletableFuture

writeChannel(R request) { @@ -312,10 +311,7 @@ public abstract class ClientConnection= 0) { - client.connOpenStates[index].set(false); - client.connArray[index] = null; //必须connOpenStates之后 - } else if (connEntry != null) { //index=-1 + if (connEntry != null) { //index=-1 connEntry.connOpenState.set(false); } ClientMessageListener listener = getCodec().getMessageListener(); diff --git a/src/main/java/org/redkale/net/client/ClientRequest.java b/src/main/java/org/redkale/net/client/ClientRequest.java index d237228d6..87ac5cdc8 100644 --- a/src/main/java/org/redkale/net/client/ClientRequest.java +++ b/src/main/java/org/redkale/net/client/ClientRequest.java @@ -35,6 +35,13 @@ public abstract class ClientRequest { public abstract void writeTo(ClientConnection conn, ByteArray array); + T computeWorkThreadIfAbsent() { + if (workThread == null) { + workThread = WorkThread.currentWorkThread(); + } + return (T) this; + } + public Serializable getRequestid() { return null; } diff --git a/src/main/java/org/redkale/net/http/HttpSimpleClient.java b/src/main/java/org/redkale/net/http/HttpSimpleClient.java index 800381cb4..bdb5ae61e 100644 --- a/src/main/java/org/redkale/net/http/HttpSimpleClient.java +++ b/src/main/java/org/redkale/net/http/HttpSimpleClient.java @@ -66,8 +66,8 @@ public class HttpSimpleClient extends Client { - public HttpSimpleConnection(HttpSimpleClient client, int index, AsyncConnection channel) { - super(client, index, channel); + public HttpSimpleConnection(HttpSimpleClient client, AsyncConnection channel) { + super(client, channel); } @Override diff --git a/src/main/java/org/redkale/net/sncp/SncpClient.java b/src/main/java/org/redkale/net/sncp/SncpClient.java index b996ac4a9..6992ebd6d 100644 --- a/src/main/java/org/redkale/net/sncp/SncpClient.java +++ b/src/main/java/org/redkale/net/sncp/SncpClient.java @@ -38,8 +38,8 @@ public class SncpClient extends Client requestPool; - public SncpClientConnection(SncpClient client, int index, AsyncConnection channel) { - super(client, index, channel); + public SncpClientConnection(SncpClient client, AsyncConnection channel) { + super(client, channel); requestPool = ObjectPool.createUnsafePool(Thread.currentThread(), 256, ObjectPool.createSafePool(256, t -> new SncpClientRequest(), SncpClientRequest::prepare, SncpClientRequest::recycle) ); diff --git a/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java b/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java index e86418a0e..e8d13e6af 100644 --- a/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java +++ b/src/main/java/org/redkale/net/sncp/SncpRemoteInfo.java @@ -225,7 +225,7 @@ public class SncpRemoteInfo { protected InetSocketAddress nextRemoteAddress() { InetSocketAddress addr = sncpRpcGroups.nextRemoteAddress(resourceid); - if (addr == null) { + if (addr != null) { return addr; } SncpRpcGroup srg = sncpRpcGroups.getSncpRpcGroup(remoteGroup); diff --git a/src/test/java/org/redkale/test/sncp/SncpClientCodecTest.java b/src/test/java/org/redkale/test/sncp/SncpClientCodecTest.java index 4ef29138c..526108c16 100644 --- a/src/test/java/org/redkale/test/sncp/SncpClientCodecTest.java +++ b/src/test/java/org/redkale/test/sncp/SncpClientCodecTest.java @@ -33,7 +33,7 @@ public class SncpClientCodecTest { InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 3344); final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16); SncpClient client = new SncpClient("test", asyncGroup, 0, sncpAddress, new ClientAddress(remoteAddress), "TCP", Utility.cpus(), 16); - SncpClientConnection conn = client.createClientConnection(1, asyncGroup.newTCPClientConnection()); + SncpClientConnection conn = client.createClientConnection(asyncGroup.newTCPClientConnection()); SncpClientCodec codec = new SncpClientCodec(conn); List respResults = new ArrayList(); try { diff --git a/src/test/java/org/redkale/test/sncp/SncpRequestParseTest.java b/src/test/java/org/redkale/test/sncp/SncpRequestParseTest.java index 3fb97eb14..2634c1ca3 100644 --- a/src/test/java/org/redkale/test/sncp/SncpRequestParseTest.java +++ b/src/test/java/org/redkale/test/sncp/SncpRequestParseTest.java @@ -33,7 +33,7 @@ public class SncpRequestParseTest { InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 3344); final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16); SncpClient client = new SncpClient("test", asyncGroup, 0, sncpAddress, new ClientAddress(remoteAddress), "TCP", Utility.cpus(), 16); - SncpClientConnection conn = client.createClientConnection(1, asyncGroup.newTCPClientConnection()); + SncpClientConnection conn = client.createClientConnection(asyncGroup.newTCPClientConnection()); SncpContext.SncpContextConfig config = new SncpContext.SncpContextConfig(); config.logger = Logger.getLogger(SncpRequestParseTest.class.getSimpleName()); diff --git a/src/test/java/org/redkale/test/sncp/SncpSleepService.java b/src/test/java/org/redkale/test/sncp/SncpSleepService.java index c618ed6e2..f2126c5ae 100644 --- a/src/test/java/org/redkale/test/sncp/SncpSleepService.java +++ b/src/test/java/org/redkale/test/sncp/SncpSleepService.java @@ -5,6 +5,7 @@ package org.redkale.test.sncp; import java.util.concurrent.CompletableFuture; import org.redkale.service.AbstractService; +import org.redkale.util.Times; import org.redkale.util.Utility; /** @@ -14,26 +15,29 @@ import org.redkale.util.Utility; public class SncpSleepService extends AbstractService { public CompletableFuture sleep200() { - return (CompletableFuture) CompletableFuture.supplyAsync(() -> { + System.out.println(Times.nowMillis() + " " + Thread.currentThread().getName() + " 接收sleep200"); + return CompletableFuture.supplyAsync(() -> { Utility.sleep(200); - System.out.println("执行完sleep200"); + System.out.println(Times.nowMillis() + " " + Thread.currentThread().getName() + " 执行完sleep200"); return "ok200"; - }); + }, getExecutor()); } public CompletableFuture sleep300() { - return (CompletableFuture) CompletableFuture.supplyAsync(() -> { + System.out.println(Times.nowMillis() + " " + Thread.currentThread().getName() + " 接收sleep300"); + return CompletableFuture.supplyAsync(() -> { Utility.sleep(300); - System.out.println("执行完sleep300"); + System.out.println(Times.nowMillis() + " " + Thread.currentThread().getName() + " 执行完sleep300"); return "ok300"; - }); + }, getExecutor()); } public CompletableFuture sleep500() { - return (CompletableFuture) CompletableFuture.supplyAsync(() -> { + System.out.println(Times.nowMillis() + " " + Thread.currentThread().getName() + " 接收sleep500"); + return CompletableFuture.supplyAsync(() -> { Utility.sleep(500); - System.out.println("执行完sleep500"); + System.out.println(Times.nowMillis() + " " + Thread.currentThread().getName() + " 执行完sleep500"); return "ok500"; - }); + }, getExecutor()); } } diff --git a/src/test/java/org/redkale/test/sncp/SncpSleepTest.java b/src/test/java/org/redkale/test/sncp/SncpSleepTest.java index 5cf2ee655..d2155f39d 100644 --- a/src/test/java/org/redkale/test/sncp/SncpSleepTest.java +++ b/src/test/java/org/redkale/test/sncp/SncpSleepTest.java @@ -2,11 +2,13 @@ package org.redkale.test.sncp; import java.net.InetSocketAddress; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import org.junit.jupiter.api.*; import org.redkale.boot.Application; import org.redkale.convert.bson.BsonConvert; import org.redkale.convert.json.JsonConvert; import org.redkale.net.AsyncIOGroup; +import org.redkale.net.WorkThread; import org.redkale.net.client.ClientAddress; import org.redkale.net.sncp.*; import org.redkale.util.*; @@ -29,14 +31,17 @@ public class SncpSleepTest { public void run() throws Exception { System.out.println("------------------- 并发调用 -----------------------------------"); final Application application = Application.create(true); - final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16); + final ExecutorService workExecutor = WorkThread.createWorkExecutor(10, "Thread-Work-%s"); + final AsyncIOGroup asyncGroup = new AsyncIOGroup("Redkale-TestClient-IOThread-%s", workExecutor, 8192, 16); asyncGroup.start(); final ResourceFactory resFactory = ResourceFactory.create(); + resFactory.register(Application.RESNAME_APP_EXECUTOR, ExecutorService.class, workExecutor); resFactory.register(JsonConvert.root()); resFactory.register(BsonConvert.root()); //------------------------ 初始化 CService ------------------------------------ SncpSleepService service = Sncp.createSimpleLocalService(SncpSleepService.class, resFactory); + resFactory.inject(service); SncpServer server = new SncpServer(application, System.currentTimeMillis(), null, resFactory); server.getResourceFactory().register(application); server.addSncpServlet(service); @@ -60,6 +65,7 @@ public class SncpSleepTest { long e = System.currentTimeMillis() - s; System.out.println("耗时: " + e + " ms"); server.shutdown(); - Assertions.assertTrue(e < 600); + workExecutor.shutdown(); + Assertions.assertTrue(e < 900); } }