diff --git a/src/main/java/org/redkale/net/client/Client.java b/src/main/java/org/redkale/net/client/Client.java index 501d10696..268772f3e 100644 --- a/src/main/java/org/redkale/net/client/Client.java +++ b/src/main/java/org/redkale/net/client/Client.java @@ -40,6 +40,9 @@ public abstract class Client implements Resourcable protected final ScheduledThreadPoolExecutor timeoutScheduler; + //结合ClientRequest.isCompleted()使用 + //使用场景:批量request提交时,后面的request需响应上一个request返回值来构建 + //例如: MySQL批量提交PrepareSQL场景 protected final LongAdder reqWritedCounter = new LongAdder(); protected final LongAdder respDoneCounter = new LongAdder(); @@ -174,16 +177,16 @@ public abstract class Client implements Resourcable return; } this.timeoutScheduler.shutdownNow(); - final R closereq = closeRequestSupplier == null ? null : closeRequestSupplier.get(); for (ClientConnection conn : this.connArray) { if (conn == null) { continue; } - if (closereq == null) { + final R closeReq = closeRequestSupplier == null ? null : closeRequestSupplier.get(); + if (closeReq == null) { conn.dispose(null); } else { try { - conn.writeChannel(closereq).get(1, TimeUnit.SECONDS); + conn.writeChannel(closeReq).get(1, TimeUnit.SECONDS); } catch (Exception e) { } conn.dispose(null); @@ -245,7 +248,7 @@ public abstract class Client implements Resourcable CompletableFuture future = address.createClient(tcp, group, readTimeoutSeconds, writeTimeoutSeconds) .thenApply(c -> createClientConnection(index, c).setMaxPipelines(maxPipelines)); return (authenticate == null ? future : authenticate.apply(future)).thenApply(c -> { - c.authenticated = true; + c.setAuthenticated(true); this.connArray[index] = c; CompletableFuture f; if (cflag) { @@ -308,6 +311,10 @@ public abstract class Client implements Resourcable return s; } + protected void incrReqWritedCounter() { + reqWritedCounter.increment(); + } + @Override public String resourceName() { return name; diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 2e08401fa..6eecf6dbc 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -53,126 +53,19 @@ public abstract class ClientConnection implements Co protected final Queue>> requestQueue = new ArrayDeque<>(); + //responseQueue、responseMap二选一 final ArrayDeque responseQueue = new ArrayDeque<>(); - //key: requestid + //responseQueue、responseMap二选一, key: requestid final HashMap responseMap = new LinkedHashMap<>(); - protected final CompletionHandler writeHandler = new CompletionHandler() { + private int maxPipelines; //最大并行处理数 - @Override - public void completed(Integer result, Void attachment) { - if (writeLastRequest != null && writeLastRequest.isCloseType()) { - if (closeFuture != null) { - channel.getWriteIOThread().runWork(() -> { - closeFuture.complete(null); - }); - } - closeFuture = null; - return; - } - if (sendWrite(false)) { - return; - } - writePending.compareAndSet(true, false); - readChannel(); - } + private boolean closed; - @Override - public void failed(Throwable exc, Void attachment) { - dispose(exc); - } - }; + private boolean authenticated; - protected int maxPipelines; //最大并行处理数 - - protected SimpleEntry> lastHalfEntry; - - protected ClientConnection setMaxPipelines(int maxPipelines) { - this.maxPipelines = maxPipelines; - return this; - } - - protected ClientConnection resetMaxPipelines() { - this.maxPipelines = client.maxPipelines; - return this; - } - - protected boolean isWaitingResponseEmpty() { - return responseQueue.isEmpty() && responseMap.isEmpty(); - } - - protected void resumeWrite() { - this.pauseWriting.set(false); - } - - //有写入数据返回true,否则返回false - private boolean sendWrite(boolean must) { - ClientConnection conn = this; - ByteArray rw = conn.writeArray; - rw.clear(); - int pipelines = maxPipelines > 1 ? (maxPipelines - responseQueue.size() - responseMap.size()) : 1; - if (must && pipelines < 1) { - pipelines = 1; - } - int c = 0; - AtomicBoolean pw = conn.pauseWriting; - for (int i = 0; i < pipelines; i++) { - if (pw.get()) { - break; - } - SimpleEntry> entry; - if (lastHalfEntry == null) { - entry = requestQueue.poll(); - } else { - entry = lastHalfEntry; - lastHalfEntry = null; - } - if (entry == null) { - break; - } - R req = entry.getKey(); - writeLastRequest = req; - if (req.getRequestid() == null && req.canMerge(conn)) { - SimpleEntry> r; - while ((r = requestQueue.poll()) != null) { - i++; - if (!req.merge(conn, r.getKey())) { - break; - } - ClientFuture f = entry.getValue(); - if (f != null) { - f.incrMergeCount(); - } - //req.respFuture.mergeCount++; - } - req.accept(conn, rw); - if (r != null) { - r.getKey().accept(conn, rw); - req = r.getKey(); - } - } else { - req.accept(conn, rw); - } - c++; - if (!req.isCompleted()) { - lastHalfEntry = entry; - this.pauseWriting.set(true); - break; - } - } - if (c > 0) { //当Client连接Server后先从Server读取数据时,会先发送一个EMPTY的request,这样writeArray.count就会为0 - channel.write(rw, writeHandler); - return true; - } - if (pw.get()) { - writePending.compareAndSet(true, false); - } - return false; - } - - protected void preComplete(P resp, R req, Throwable exc) { - } + private SimpleEntry> lastHalfEntry; protected final CompletionHandler readHandler = new CompletionHandler() { @@ -192,7 +85,58 @@ public abstract class ClientConnection implements Co } } - protected void completeResponse(ClientResponse

rs, ClientFuture respFuture) { + private void decodeResponse(ByteBuffer buffer) { + if (codec.decodeMessages(buffer, readArray)) { //成功了 + readArray.clear(); + List> results = codec.pollMessages(); + if (results != null) { + for (ClientResponse

rs : results) { + Serializable reqid = rs.getRequestid(); + ClientFuture respFuture = reqid == null ? responseQueue.poll() : responseMap.remove(reqid); + if (respFuture != null) { + int mergeCount = respFuture.getMergeCount(); + completeResponse(rs, respFuture); + if (mergeCount > 0) { + for (int i = 0; i < mergeCount; i++) { + respFuture = reqid == null ? responseQueue.poll() : responseMap.remove(reqid); + if (respFuture != null) { + completeResponse(rs, respFuture); + } + } + } + } + } + } + + if (buffer.hasRemaining()) { + decodeResponse(buffer); + } else if (isWaitingResponseEmpty()) { //队列都已处理完了 + buffer.clear(); + channel.setReadBuffer(buffer); + if (readPending.compareAndSet(true, false)) { + //无消息处理 + } else { + channel.read(this); + } + } else { //还有消息需要读取 + if ((!requestQueue.isEmpty() || lastHalfEntry != null) && writePending.compareAndSet(false, true)) { + //先写后读取 + if (sendWrite(true) <= 0) { + writePending.compareAndSet(true, false); + } + } + buffer.clear(); + channel.setReadBuffer(buffer); + channel.read(this); + } + } else { //数据不全, 继续读 + buffer.clear(); + channel.setReadBuffer(buffer); + channel.read(this); + } + } + + private void completeResponse(ClientResponse

rs, ClientFuture respFuture) { if (respFuture != null) { if (!respFuture.request.isCompleted()) { if (rs.exc == null) { @@ -247,68 +191,36 @@ public abstract class ClientConnection implements Co } } - public void decodeResponse(ByteBuffer buffer) { - if (codec.decodeMessages(buffer, readArray)) { //成功了 - readArray.clear(); - List> results = codec.pollMessages(); - if (results != null) { - for (ClientResponse

rs : results) { - Serializable reqid = rs.getRequestid(); - ClientFuture respFuture = reqid == null ? responseQueue.poll() : responseMap.remove(reqid); - if (respFuture != null) { - int mergeCount = respFuture.getMergeCount(); - completeResponse(rs, respFuture); - if (mergeCount > 0) { - for (int i = 0; i < mergeCount; i++) { - respFuture = reqid == null ? responseQueue.poll() : responseMap.remove(reqid); - if (respFuture != null) { - completeResponse(rs, respFuture); - } - } - } - } - } - } - - if (buffer.hasRemaining()) { - decodeResponse(buffer); - } else if (isWaitingResponseEmpty()) { //队列都已处理完了 - buffer.clear(); - channel.setReadBuffer(buffer); - if (readPending.compareAndSet(true, false)) { - //无消息处理 - } else { - channel.read(this); - } - } else { //还有消息需要读取 - if ((!requestQueue.isEmpty() || lastHalfEntry != null) && writePending.compareAndSet(false, true)) { - //先写后读取 - if (!sendWrite(true)) { - writePending.compareAndSet(true, false); - } - } - buffer.clear(); - channel.setReadBuffer(buffer); - channel.read(this); - } - } else { //数据不全, 继续读 - buffer.clear(); - channel.setReadBuffer(buffer); - channel.read(this); - } - } - @Override public void failed(Throwable t, ByteBuffer attachment) { dispose(t); } }; - protected boolean authenticated; + protected final CompletionHandler writeHandler = new CompletionHandler() { - protected ClientFuture closeFuture; + @Override + public void completed(Integer result, Void attachment) { +// if (writeLastRequest != null && writeLastRequest.isCloseType()) { +// if (closeFuture != null) { +// channel.getWriteIOThread().runWork(() -> { +// closeFuture.complete(null); +// }); +// } +// closeFuture = null; +// return; +// } + if (sendWrite(false) <= 0) { + writePending.compareAndSet(true, false); + readChannel(); + } + } - private R writeLastRequest; + @Override + public void failed(Throwable exc, Void attachment) { + dispose(exc); + } + }; @SuppressWarnings({"LeakingThisInConstructor", "OverridableMethodCallInConstructor"}) public ClientConnection(Client client, int index, AsyncConnection channel) { @@ -322,51 +234,120 @@ public abstract class ClientConnection implements Co protected abstract ClientCodec createCodec(); protected final CompletableFuture

writeChannel(R request) { - ClientFuture respFuture; - if (request.isCloseType()) { - respFuture = createClientFuture(null); - closeFuture = respFuture; - } else { - respFuture = createClientFuture(request); - int rts = this.channel.getReadTimeoutSeconds(); - if (rts > 0 && respFuture.request != null) { - respFuture.setConn(this); - respFuture.setTimeout(client.timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS)); - } + ClientFuture respFuture = createClientFuture(request); + int rts = this.channel.getReadTimeoutSeconds(); + if (rts > 0 && !request.isCloseType()) { + respFuture.setConn(this); + respFuture.setTimeout(client.timeoutScheduler.schedule(respFuture, rts, TimeUnit.SECONDS)); } - respWaitingCounter.increment(); //放在writeChannelInThread计数会延迟,导致不准确 + respWaitingCounter.increment(); //放在writeChannelUnsafe计数会延迟,导致不准确 if (channel.inCurrWriteThread()) { - writeChannelInThread(request, respFuture); + writeChannelUnsafe(request, respFuture); } else { - channel.executeWrite(() -> writeChannelInThread(request, respFuture)); + channel.executeWrite(() -> writeChannelUnsafe(request, respFuture)); } return respFuture; } - private void writeChannelInThread(R request, ClientFuture respFuture) { + private void writeChannelUnsafe(R request, ClientFuture respFuture) { + if (closed) { + WorkThread workThread = request.workThread; + if (workThread == null || workThread.getWorkExecutor() == null) { + workThread = channel.getReadIOThread(); + } + Throwable e = new ClosedChannelException(); + workThread.runWork(() -> { + Traces.currTraceid(request.traceid); + respFuture.completeExceptionally(e); + }); + return; + } Serializable reqid = request.getRequestid(); //保证顺序一致 - ClientFuture future; - if (request.isCloseType()) { - future = null; - responseQueue.offer(ClientFuture.EMPTY); + if (reqid == null) { + responseQueue.offer(respFuture); } else { - future = respFuture; - if (reqid == null) { - responseQueue.offer(respFuture); - } else { - responseMap.put(reqid, respFuture); - } + responseMap.put(reqid, respFuture); } - requestQueue.offer(new SimpleEntry<>(request, future)); - if (isAuthenticated() && client.reqWritedCounter != null) { - client.reqWritedCounter.increment(); + requestQueue.offer(new SimpleEntry<>(request, respFuture)); + if (isAuthenticated()) { + client.incrReqWritedCounter(); } if (writePending.compareAndSet(false, true)) { sendWrite(true); } } + //返回写入数据request的数量,返回0表示没有可写的request + private int sendWrite(boolean must) { + ClientConnection conn = this; + ByteArray rw = conn.writeArray; + rw.clear(); + int pipelines = maxPipelines > 1 ? (maxPipelines - responseQueue.size() - responseMap.size()) : 1; + if (must && pipelines < 1) { + pipelines = 1; + } + int c = 0; + AtomicBoolean pw = conn.pauseWriting; + for (int i = 0; i < pipelines; i++) { + if (pw.get()) { + break; + } + SimpleEntry> entry; + if (lastHalfEntry == null) { + entry = requestQueue.poll(); + } else { + entry = lastHalfEntry; + lastHalfEntry = null; + } + if (entry == null) { + break; + } + R req = entry.getKey(); + if (req.getRequestid() == null && req.canMerge(conn)) { + SimpleEntry> r; + while ((r = requestQueue.poll()) != null) { + i++; + if (!req.merge(conn, r.getKey())) { + break; + } + ClientFuture f = entry.getValue(); + if (f != null) { + f.incrMergeCount(); + } + } + req.accept(conn, rw); + if (r != null) { + req = r.getKey(); + req.accept(conn, rw); + } + } else { + req.accept(conn, rw); + } + c++; + if (req.isCloseType()) { + closed = true; + this.pauseWriting.set(true); + break; + } else if (!req.isCompleted()) { + lastHalfEntry = entry; + this.pauseWriting.set(true); + break; + } + } + if (c > 0) { //当Client连接Server后先从Server读取数据时,会先发送一个EMPTY的request,这样writeArray.count就会为0 + channel.write(rw, writeHandler); + return c; + } + if (pw.get()) { + writePending.compareAndSet(true, false); + } + return 0; + } + + protected void preComplete(P resp, R req, Throwable exc) { + } + protected ClientFuture createClientFuture(R request) { return new ClientFuture(request); } @@ -378,18 +359,6 @@ public abstract class ClientConnection implements Co } } - public boolean isAuthenticated() { - return authenticated; - } - - public AsyncConnection getChannel() { - return channel; - } - - public ClientCodec getCodec() { - return codec; - } - @Override //AsyncConnection.beforeCloseListener public void accept(AsyncConnection t) { respWaitingCounter.reset(); @@ -417,6 +386,45 @@ public abstract class ClientConnection implements Co } } + public boolean isAuthenticated() { + return authenticated; + } + + public AsyncConnection getChannel() { + return channel; + } + + public ClientCodec getCodec() { + return codec; + } + + public int getMaxPipelines() { + return maxPipelines; + } + + protected ClientConnection setAuthenticated(boolean authenticated) { + this.authenticated = authenticated; + return this; + } + + protected ClientConnection setMaxPipelines(int maxPipelines) { + this.maxPipelines = maxPipelines; + return this; + } + + protected ClientConnection resetMaxPipelines() { + this.maxPipelines = client.maxPipelines; + return this; + } + + protected boolean isWaitingResponseEmpty() { + return responseQueue.isEmpty() && responseMap.isEmpty(); + } + + protected void resumeWrite() { + this.pauseWriting.set(false); + } + public int runningCount() { return respWaitingCounter.intValue(); } diff --git a/src/main/java/org/redkale/net/client/ClientFuture.java b/src/main/java/org/redkale/net/client/ClientFuture.java index f9272781b..3e019d57c 100644 --- a/src/main/java/org/redkale/net/client/ClientFuture.java +++ b/src/main/java/org/redkale/net/client/ClientFuture.java @@ -120,12 +120,6 @@ public class ClientFuture extends CompletableFuture implements Runnable { workThread = request.workThread; request.workThread = null; } -// if (workThread == null || workThread == Thread.currentThread() || workThread.inIO() -// || workThread.getState() != Thread.State.RUNNABLE) { -// this.completeExceptionally(ex); -// } else { -// workThread.execute(() -> completeExceptionally(ex)); -// } if (workThread == null || workThread.getWorkExecutor() == null) { workThread = conn.getChannel().getReadIOThread(); } diff --git a/src/main/java/org/redkale/net/client/ClientRequest.java b/src/main/java/org/redkale/net/client/ClientRequest.java index 2aae2c13e..603215477 100644 --- a/src/main/java/org/redkale/net/client/ClientRequest.java +++ b/src/main/java/org/redkale/net/client/ClientRequest.java @@ -59,7 +59,7 @@ public abstract class ClientRequest implements BiConsumer