From eef43e8edc674fe7140f650b14a635f3ffba131b Mon Sep 17 00:00:00 2001 From: redkale Date: Sat, 4 Feb 2023 20:19:35 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96client?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/net/client/ClientCodec.java | 4 +- .../redkale/net/client/ClientConnection.java | 34 ++---------- .../org/redkale/net/client/ClientFuture.java | 2 + .../net/client/ClientWriteIOThread.java | 55 ++++++++++--------- .../redkale/test/convert/BsonMainTest.java | 26 +++------ 5 files changed, 45 insertions(+), 76 deletions(-) diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index 41cc5d37a..0c7289f13 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -102,11 +102,11 @@ public abstract class ClientCodec implements Complet try { if (request != null && !request.isCompleted()) { if (exc == null) { - connection.sendHalfWrite(exc); + connection.sendHalfWrite(request, exc); //request没有发送完,respFuture需要再次接收 return; } else { //异常了需要清掉半包 - connection.sendHalfWrite(exc); + connection.sendHalfWrite(request, exc); } } if (request != null) { diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 000f1d3ef..72f45ae87 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -10,7 +10,6 @@ import java.nio.channels.ClosedChannelException; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; import java.util.function.*; import org.redkale.net.*; @@ -34,15 +33,9 @@ public abstract class ClientConnection implements Co protected final LongAdder respWaitingCounter; - protected final AtomicBoolean pauseWriting = new AtomicBoolean(); + final AtomicBoolean pauseWriting = new AtomicBoolean(); - protected final AtomicBoolean pauseResuming = new AtomicBoolean(); - - protected final List pauseRequests = new CopyOnWriteArrayList<>(); - - private final ReentrantLock pauseLock = new ReentrantLock(); - - private final Condition pauseCondition = pauseLock.newCondition(); + final ConcurrentLinkedQueue pauseRequests = new ConcurrentLinkedQueue<>(); protected final AsyncConnection channel; @@ -140,8 +133,8 @@ public abstract class ClientConnection implements Co } } - void sendHalfWrite(Throwable halfRequestExc) { - writeThread.sendHalfWrite(this, halfRequestExc); + void sendHalfWrite(R request, Throwable halfRequestExc) { + writeThread.sendHalfWrite(this, request, halfRequestExc); } //只会在WriteIOThread中调用 @@ -186,25 +179,6 @@ public abstract class ClientConnection implements Co } } - void signalPauseRequest() { - pauseLock.lock(); - try { - pauseCondition.signalAll(); - } finally { - pauseLock.unlock(); - } - } - - void awaitPauseRequest() { - pauseLock.lock(); - try { - pauseCondition.await(3_000, TimeUnit.SECONDS); - } catch (Exception e) { - } finally { - pauseLock.unlock(); - } - } - public boolean isAuthenticated() { return authenticated; } diff --git a/src/main/java/org/redkale/net/client/ClientFuture.java b/src/main/java/org/redkale/net/client/ClientFuture.java index 685f2ab76..de2501c0b 100644 --- a/src/main/java/org/redkale/net/client/ClientFuture.java +++ b/src/main/java/org/redkale/net/client/ClientFuture.java @@ -26,6 +26,8 @@ public class ClientFuture extends CompletableFuture< private ScheduledFuture timeout; + Boolean resumeHalfRequestFlag; + ClientFuture(ClientConnection conn, R request) { super(); Objects.requireNonNull(conn); diff --git a/src/main/java/org/redkale/net/client/ClientWriteIOThread.java b/src/main/java/org/redkale/net/client/ClientWriteIOThread.java index 1139488bd..78d2bb7a7 100644 --- a/src/main/java/org/redkale/net/client/ClientWriteIOThread.java +++ b/src/main/java/org/redkale/net/client/ClientWriteIOThread.java @@ -8,7 +8,6 @@ import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; import org.redkale.net.AsyncIOThread; import org.redkale.util.*; @@ -35,25 +34,13 @@ public class ClientWriteIOThread extends AsyncIOThread { requestQueue.offer(respFuture); } - public void sendHalfWrite(ClientConnection conn, Throwable halfRequestExc) { - if (conn.pauseWriting.get()) { - conn.pauseResuming.set(true); - try { - AtomicBoolean skipFirst = new AtomicBoolean(halfRequestExc != null); - conn.pauseRequests.removeIf(e -> { - if (e != null) { - if (!skipFirst.compareAndSet(true, false)) { - requestQueue.offer((ClientFuture) e); - } - } - return true; - }); - } finally { - conn.pauseResuming.set(false); - conn.pauseWriting.set(false); - conn.signalPauseRequest(); - } + public void sendHalfWrite(ClientConnection conn, ClientRequest request, Throwable halfRequestExc) { + ClientFuture respFuture = conn.createClientFuture(request); + respFuture.resumeHalfRequestFlag = true; + if (halfRequestExc != null) { //halfRequestExc不为null时需要把当前halfRequest移除 + conn.pauseRequests.poll(); } + requestQueue.offer(respFuture); } @Override @@ -71,24 +58,38 @@ public class ClientWriteIOThread extends AsyncIOThread { try { while ((entry = requestQueue.take()) != null) { map.clear(); - if (!entry.isDone()) { + if (entry.resumeHalfRequestFlag != null) { //将暂停的pauseRequests写入list + List cl = map.computeIfAbsent(entry.conn, c -> listPool.get()); + for (ClientFuture f : (List) entry.conn.pauseRequests) { + if (!f.isDone()) { + entry.conn.offerRespFuture(f); + cl.add(f); + } + } + entry.conn.pauseRequests.clear(); + entry.conn.pauseWriting.set(false); + } else if (!entry.isDone()) { entry.conn.offerRespFuture(entry); if (entry.conn.pauseWriting.get()) { - if (entry.conn.pauseResuming.get()) { - entry.conn.awaitPauseRequest(); - } entry.conn.pauseRequests.add(entry); } else { map.computeIfAbsent(entry.conn, c -> listPool.get()).add(entry); } } while ((entry = requestQueue.poll()) != null) { - if (!entry.isDone()) { + if (entry.resumeHalfRequestFlag != null) { //将暂停的pauseRequests写入list + List cl = map.computeIfAbsent(entry.conn, c -> listPool.get()); + for (ClientFuture f : (List) entry.conn.pauseRequests) { + if (!f.isDone()) { + entry.conn.offerRespFuture(f); + cl.add(f); + } + } + entry.conn.pauseRequests.clear(); + entry.conn.pauseWriting.set(false); + } else if (!entry.isDone()) { entry.conn.offerRespFuture(entry); if (entry.conn.pauseWriting.get()) { - if (entry.conn.pauseResuming.get()) { - entry.conn.awaitPauseRequest(); - } entry.conn.pauseRequests.add(entry); } else { map.computeIfAbsent(entry.conn, c -> listPool.get()).add(entry); diff --git a/src/test/java/org/redkale/test/convert/BsonMainTest.java b/src/test/java/org/redkale/test/convert/BsonMainTest.java index b7de20e87..089d77d67 100644 --- a/src/test/java/org/redkale/test/convert/BsonMainTest.java +++ b/src/test/java/org/redkale/test/convert/BsonMainTest.java @@ -6,21 +6,13 @@ package org.redkale.test.convert; import java.io.*; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.redkale.annotation.ConstructorParameters; -import org.redkale.convert.bson.BsonByteBufferWriter; -import org.redkale.convert.bson.BsonFactory; -import org.redkale.persistence.Id; -import org.redkale.persistence.Transient; -import org.redkale.util.Utility; -import org.redkale.convert.bson.BsonConvert; - -import java.nio.*; +import java.nio.ByteBuffer; import java.util.*; - -import org.redkale.convert.json.*; +import org.junit.jupiter.api.*; +import org.redkale.annotation.ConstructorParameters; +import org.redkale.convert.bson.*; +import org.redkale.convert.json.JsonConvert; +import org.redkale.persistence.*; import org.redkale.util.*; /** @@ -46,7 +38,7 @@ public class BsonMainTest { SimpleChildEntity entry = SimpleChildEntity.create(); byte[] bytes = convert.convertTo(SimpleEntity.class, entry); System.out.println("长度: " + bytes.length); - Assertions.assertEquals(260, bytes.length); + Assertions.assertEquals(271, bytes.length); BsonByteBufferWriter writer = convert.pollBsonWriter(() -> ByteBuffer.allocate(1)); convert.convertTo(writer, SimpleEntity.class, entry); ByteBuffer[] buffers = writer.toBuffers(); @@ -60,7 +52,7 @@ public class BsonMainTest { b.flip(); } System.out.println("长度: " + len); - Assertions.assertEquals(260, len); + Assertions.assertEquals(271, len); SimpleChildEntity entry2 = convert.convertFrom(SimpleChildEntity.class, buffers); System.out.println(entry); Assertions.assertEquals(entry.toString(), entry2.toString()); @@ -83,7 +75,7 @@ public class BsonMainTest { convert.convertTo(writer, bean); bytes2 = writer.toArray(); System.out.println(convert.convertFrom(ComplextEntity.class, bytes2).toString()); - Assertions.assertEquals("{\"flag\":true,\"userid\":0}", convert.convertFrom(ComplextEntity.class, bytes2).toString()); + Assertions.assertEquals("{\"chname\":\"\",\"flag\":true,\"userid\":0}", convert.convertFrom(ComplextEntity.class, bytes2).toString()); } @Test