diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index f6f27b0ed..d117481fd 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -16,6 +16,8 @@ import javax.net.ssl.SSLEngineResult.HandshakeStatus; import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*; import static javax.net.ssl.SSLEngineResult.Status.*; import javax.net.ssl.*; +import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*; +import static javax.net.ssl.SSLEngineResult.Status.*; import org.redkale.util.*; /** @@ -671,7 +673,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl } catch (Exception io) { } } - if (this.readBuffer != null) { + if (this.readBuffer != null && Thread.currentThread() == this.ioReadThread) { Consumer consumer = this.readBufferConsumer; if (consumer != null) { consumer.accept(this.readBuffer); diff --git a/src/main/java/org/redkale/net/AsyncIOGroup.java b/src/main/java/org/redkale/net/AsyncIOGroup.java index 7b192402f..8325eacaf 100644 --- a/src/main/java/org/redkale/net/AsyncIOGroup.java +++ b/src/main/java/org/redkale/net/AsyncIOGroup.java @@ -13,6 +13,7 @@ import java.util.Objects; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import org.redkale.annotation.ResourceType; +import org.redkale.net.client.*; import org.redkale.util.*; /** @@ -82,14 +83,20 @@ public class AsyncIOGroup extends AsyncGroup { this.ioWriteThreads = new AsyncIOThread[threads]; try { for (int i = 0; i < threads; i++) { + String postfix = "-" + (i >= 9 ? (i + 1) : ("0" + (i + 1))); ObjectPool unsafeReadBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); - String name = threadPrefixName + "-" + (i >= 9 ? (i + 1) : ("0" + (i + 1))); if (client) { - this.ioReadThreads[i] = new ClientIOThread(name, i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool); + this.ioReadThreads[i] = new ClientIOThread(threadPrefixName + postfix, i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool); this.ioWriteThreads[i] = this.ioReadThreads[i]; + if (false) { + this.ioReadThreads[i].setName(threadPrefixName + "-Read" + postfix); + ObjectPool unsafeWriteBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), + safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); + this.ioWriteThreads[i] = new ClientWriteIOThread(threadPrefixName + "-Write" + postfix, i, threads, workExecutor, Selector.open(), unsafeWriteBufferPool, safeBufferPool); + } } else { - this.ioReadThreads[i] = new AsyncIOThread(name, i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool); + this.ioReadThreads[i] = new AsyncIOThread(threadPrefixName + postfix, i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool); this.ioWriteThreads[i] = this.ioReadThreads[i]; } } diff --git a/src/main/java/org/redkale/net/AsyncIOThread.java b/src/main/java/org/redkale/net/AsyncIOThread.java index 9f93b3a74..f4a692120 100644 --- a/src/main/java/org/redkale/net/AsyncIOThread.java +++ b/src/main/java/org/redkale/net/AsyncIOThread.java @@ -52,6 +52,10 @@ public class AsyncIOThread extends WorkThread { this.bufferConsumer = (v) -> (inCurrThread() ? unsafeBufferPool : safeBufferPool).accept(v); } + protected boolean isClosed() { + return closed; + } + public static AsyncIOThread currAsyncIOThread() { Thread t = Thread.currentThread(); return t instanceof AsyncIOThread ? (AsyncIOThread) t : null; @@ -127,7 +131,7 @@ public class AsyncIOThread extends WorkThread { public void run() { final Queue commands = this.commandQueue; final Queue> registers = this.registerQueue; - while (!this.closed) { + while (!isClosed()) { try { Consumer register; while ((register = registers.poll()) != null) { diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 213d304b5..d6b900ead 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -8,6 +8,7 @@ package org.redkale.net.client; import java.io.Serializable; import java.nio.ByteBuffer; import java.nio.channels.*; +import java.util.AbstractMap.SimpleEntry; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -50,7 +51,7 @@ public abstract class ClientConnection implements Co protected final AtomicBoolean writePending = new AtomicBoolean(); - protected final Queue requestQueue = new ArrayDeque<>(); + protected final Queue>> requestQueue = new ArrayDeque<>(); final ArrayDeque responseQueue = new ArrayDeque<>(); @@ -85,7 +86,7 @@ public abstract class ClientConnection implements Co protected int maxPipelines; //最大并行处理数 - protected R lastHalfRequest; + protected SimpleEntry> lastHalfEntry; protected ClientConnection setMaxPipelines(int maxPipelines) { this.maxPipelines = maxPipelines; @@ -120,37 +121,42 @@ public abstract class ClientConnection implements Co if (pw.get()) { break; } - R req; - if (lastHalfRequest == null) { - req = requestQueue.poll(); + SimpleEntry> entry; + if (lastHalfEntry == null) { + entry = requestQueue.poll(); } else { - req = lastHalfRequest; - lastHalfRequest = null; + entry = lastHalfEntry; + lastHalfEntry = null; } - if (req == null) { + if (entry == null) { break; } + R req = entry.getKey(); writeLastRequest = req; if (req.getRequestid() == null && req.canMerge(conn)) { - R r; + SimpleEntry> r; while ((r = requestQueue.poll()) != null) { i++; - if (!req.merge(conn, r)) { + if (!req.merge(conn, r.getKey())) { break; } - req.respFuture.mergeCount++; + ClientFuture f = entry.getValue(); + if (f != null) { + f.mergeCount++; + } + //req.respFuture.mergeCount++; } req.accept(conn, rw); if (r != null) { - r.accept(conn, rw); - req = r; + r.getKey().accept(conn, rw); + req = r.getKey(); } } else { req.accept(conn, rw); } c++; if (!req.isCompleted()) { - lastHalfRequest = req; + lastHalfEntry = entry; this.pauseWriting.set(true); break; } @@ -199,7 +205,7 @@ public abstract class ClientConnection implements Co pauseWriting.set(false); return; } else { //异常了需要清掉半包 - lastHalfRequest = null; + lastHalfEntry = null; pauseWriting.set(false); } } @@ -308,7 +314,7 @@ public abstract class ClientConnection implements Co channel.read(this); } } else { //还有消息需要读取 - if ((!requestQueue.isEmpty() || lastHalfRequest != null) && writePending.compareAndSet(false, true)) { + if ((!requestQueue.isEmpty() || lastHalfEntry != null) && writePending.compareAndSet(false, true)) { //先写后读取 if (!sendWrite(true)) { writePending.compareAndSet(true, false); @@ -370,20 +376,22 @@ public abstract class ClientConnection implements Co return respFuture; } - private void writeChannelInThread(R request, ClientFuture respFuture) { + private void writeChannelInThread(R request, ClientFuture respFuture) { Serializable reqid = request.getRequestid(); //保证顺序一致 - if (respFuture.request.isCloseType()) { + ClientFuture future; + if (request.isCloseType()) { + future = null; responseQueue.offer(ClientFuture.EMPTY); } else { - request.respFuture = respFuture; + future = respFuture; if (reqid == null) { responseQueue.offer(respFuture); } else { responseMap.put(reqid, respFuture); } } - requestQueue.offer(request); + requestQueue.offer(new SimpleEntry<>(request, future)); if (isAuthenticated() && client.reqWritedCounter != null) { client.reqWritedCounter.increment(); } diff --git a/src/main/java/org/redkale/net/ClientIOThread.java b/src/main/java/org/redkale/net/client/ClientIOThread.java similarity index 90% rename from src/main/java/org/redkale/net/ClientIOThread.java rename to src/main/java/org/redkale/net/client/ClientIOThread.java index 34d077cfb..f80f19f94 100644 --- a/src/main/java/org/redkale/net/ClientIOThread.java +++ b/src/main/java/org/redkale/net/client/ClientIOThread.java @@ -1,11 +1,12 @@ /* * */ -package org.redkale.net; +package org.redkale.net.client; import java.nio.ByteBuffer; import java.nio.channels.Selector; import java.util.concurrent.ExecutorService; +import org.redkale.net.AsyncIOThread; import org.redkale.util.ObjectPool; /** diff --git a/src/main/java/org/redkale/net/client/ClientRequest.java b/src/main/java/org/redkale/net/client/ClientRequest.java index 389d7287e..2aae2c13e 100644 --- a/src/main/java/org/redkale/net/client/ClientRequest.java +++ b/src/main/java/org/redkale/net/client/ClientRequest.java @@ -27,8 +27,6 @@ public abstract class ClientRequest implements BiConsumer