diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index ceb64d52c..165deabd2 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -9,7 +9,6 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.logging.Level; import org.redkale.convert.json.JsonConvert; import org.redkale.net.*; @@ -30,11 +29,11 @@ public abstract class ClientCodec implements Complet protected final ClientConnection connection; - private final List> respResults = new ArrayList<>(); + private final List> respResults = new ArrayList<>(); private final ByteArray readArray = new ByteArray(); - private final ObjectPool respPool = ObjectPool.createUnsafePool(256, t -> new ClientResponse(), ClientResponse::prepare, ClientResponse::recycle); + private final ObjectPool> respPool = ObjectPool.createUnsafePool(256, t -> new ClientResponse(), ClientResponse::prepare, ClientResponse::recycle); public ClientCodec(ClientConnection connection) { Objects.requireNonNull(connection); @@ -63,13 +62,12 @@ public abstract class ClientCodec implements Complet private void decodeResponse(ByteBuffer buffer) { AsyncConnection channel = connection.channel; - ConcurrentLinkedQueue responseQueue = connection.responseQueue; - Map responseMap = connection.responseMap; + connection.currRespIterator = null; if (decodeMessages(buffer, readArray)) { //成功了 + connection.currRespIterator = null; readArray.clear(); - for (ClientResponse

cr : respResults) { - Serializable reqid = cr.getRequestid(); - ClientFuture respFuture = reqid == null ? responseQueue.poll() : responseMap.remove(reqid); + for (ClientResponse cr : respResults) { + ClientFuture respFuture = connection.pollRespFuture(cr.getRequestid()); if (respFuture != null) { responseComplete(respFuture, cr.message, cr.exc); } @@ -85,15 +83,16 @@ public abstract class ClientCodec implements Complet channel.read(this); } } else { //数据不全, 继续读 + connection.currRespIterator = null; buffer.clear(); channel.setReadBuffer(buffer); channel.read(this); } } - private void responseComplete(ClientFuture respFuture, P message, Throwable exc) { + private void responseComplete(ClientFuture respFuture, P message, Throwable exc) { if (respFuture != null) { - ClientRequest request = respFuture.request; + R request = respFuture.request; WorkThread workThread = null; try { if (!request.isCompleted()) { @@ -127,7 +126,7 @@ public abstract class ClientCodec implements Complet final Object rs = request.respTransfer == null ? message : request.respTransfer.apply(message); workThread.runWork(() -> { Traces.currTraceid(request.traceid); - respFuture.complete(rs); + ((ClientFuture) respFuture).complete(rs); }); } } catch (Throwable t) { @@ -154,18 +153,8 @@ public abstract class ClientCodec implements Complet connection.dispose(t); } - protected Iterator responseIterator() { - return connection.responseQueue.iterator(); - } - - protected ClientFuture responseByRequestid(Serializable requestid) { - return (ClientFuture) connection.responseMap.get(requestid); - } - - protected List> pollMessages() { - List> rs = new ArrayList<>(respResults); - this.respResults.clear(); - return rs; + protected R findRequest(Serializable requestid) { + return (R) connection.findRequest(requestid); } public void addMessage(R request, P result) { diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 4d51789eb..3f649803b 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -7,7 +7,7 @@ package org.redkale.net.client; import java.io.Serializable; import java.nio.channels.ClosedChannelException; -import java.util.List; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.function.*; @@ -45,11 +45,13 @@ public abstract class ClientConnection implements Co private final ClientWriteIOThread writeThread; - //responseQueue、responseMap二选一 - final ConcurrentLinkedQueue responseQueue = new ConcurrentLinkedQueue<>(); + //respFutureQueue、respFutureMap二选一, SPSC队列模式 + private final Queue> respFutureQueue = new ConcurrentLinkedQueue<>(); //Utility.unsafe() != null ? new MpscGrowableArrayQueue<>(16, 1 << 16) : new ConcurrentLinkedQueue<>(); - //responseQueue、responseMap二选一, key: requestid - final ConcurrentHashMap responseMap = new ConcurrentHashMap<>(); + //respFutureQueue、respFutureMap二选一, key: requestid, SPSC模式 + private final Map> respFutureMap = new ConcurrentHashMap<>(); + + Iterator> currRespIterator; //必须在调用decodeMessages之前重置为null private int maxPipelines; //最大并行处理数 @@ -84,12 +86,12 @@ public abstract class ClientConnection implements Co return respFuture; } - CompletableFuture writeVirtualRequest(R request) { + CompletableFuture

writeVirtualRequest(R request) { if (!request.isVirtualType()) { return CompletableFuture.failedFuture(new RuntimeException("ClientVirtualRequest must be virtualType = true")); } - ClientFuture respFuture = createClientFuture(request); - responseQueue.offer(respFuture); + ClientFuture respFuture = createClientFuture(request); + respFutureQueue.offer(respFuture); readChannel(); return respFuture; } @@ -97,7 +99,7 @@ public abstract class ClientConnection implements Co protected void preComplete(P resp, R req, Throwable exc) { } - protected ClientFuture createClientFuture(R request) { + protected ClientFuture createClientFuture(R request) { return new ClientFuture(this, request); } @@ -119,15 +121,15 @@ public abstract class ClientConnection implements Co CompletableFuture f; respWaitingCounter.reset(); WorkThread thread = channel.getReadIOThread(); - if (!responseQueue.isEmpty()) { - while ((f = responseQueue.poll()) != null) { + if (!respFutureQueue.isEmpty()) { + while ((f = respFutureQueue.poll()) != null) { CompletableFuture future = f; thread.runWork(() -> future.completeExceptionally(e)); } } - if (!responseMap.isEmpty()) { - responseMap.forEach((key, future) -> { - responseMap.remove(key); + if (!respFutureMap.isEmpty()) { + respFutureMap.forEach((key, future) -> { + respFutureMap.remove(key); thread.runWork(() -> future.completeExceptionally(e)); }); } @@ -137,6 +139,48 @@ public abstract class ClientConnection implements Co writeThread.sendHalfWrite(this, halfRequestExc); } + //只会在WriteIOThread中调用 + void offerRespFuture(ClientFuture respFuture) { + Serializable requestid = respFuture.request.getRequestid(); + if (requestid == null) { + respFutureQueue.offer(respFuture); + } else { + respFutureMap.put(requestid, respFuture); + } + } + + //只会被Timeout在ReadIOThread中调用 + void removeRespFuture(Serializable requestid, ClientFuture respFuture) { + if (requestid == null) { + respFutureQueue.remove(respFuture); + } else { + respFutureMap.remove(requestid); + } + } + + //只会被ClientCodec在ReadIOThread中调用 + ClientFuture pollRespFuture(Serializable requestid) { + if (requestid == null) { + return respFutureQueue.poll(); + } else { + return respFutureMap.remove(requestid); + } + } + + //只会被ClientCodec在ReadIOThread中调用 + R findRequest(Serializable requestid) { + if (requestid == null) { + if (currRespIterator == null) { + currRespIterator = respFutureQueue.iterator(); + } + ClientFuture future = currRespIterator.hasNext() ? currRespIterator.next() : null; + return future == null ? null : future.request; + } else { + ClientFuture future = respFutureMap.get(requestid); + return future == null ? null : future.request; + } + } + public boolean isAuthenticated() { return authenticated; } diff --git a/src/main/java/org/redkale/net/client/ClientFuture.java b/src/main/java/org/redkale/net/client/ClientFuture.java index efd1e9cf2..685f2ab76 100644 --- a/src/main/java/org/redkale/net/client/ClientFuture.java +++ b/src/main/java/org/redkale/net/client/ClientFuture.java @@ -5,28 +5,31 @@ */ package org.redkale.net.client; -import java.util.*; +import java.util.Objects; import java.util.concurrent.*; import org.redkale.net.*; /** * * @author zhangjx - * + * * @since 2.3.0 - * + * + * @param 泛型 * @param 泛型 */ -public class ClientFuture extends CompletableFuture implements Runnable { +public class ClientFuture extends CompletableFuture implements Runnable { - protected final ClientRequest request; + protected final R request; protected final ClientConnection conn; private ScheduledFuture timeout; - public ClientFuture(ClientConnection conn, ClientRequest request) { + ClientFuture(ClientConnection conn, R request) { super(); + Objects.requireNonNull(conn); + Objects.requireNonNull(request); this.conn = conn; this.request = request; } @@ -42,14 +45,14 @@ public class ClientFuture extends CompletableFuture implements Runnable { } @Override //JDK9+ - public ClientFuture newIncompleteFuture() { + public ClientFuture newIncompleteFuture() { ClientFuture future = new ClientFuture<>(conn, request); future.timeout = timeout; return future; } - public R getRequest() { - return (R) request; + public R getRequest() { + return request; } @Override @@ -66,14 +69,7 @@ public class ClientFuture extends CompletableFuture implements Runnable { } private void runTimeout() { - Queue responseQueue = conn.responseQueue; - if (responseQueue != null) { - responseQueue.remove(this); - } - if (request.getRequestid() != null) { - conn.responseMap.remove(request.getRequestid()); - } - + conn.removeRespFuture(request.getRequestid(), this); TimeoutException ex = new TimeoutException(); WorkThread workThread = null; if (request != null) { diff --git a/src/main/java/org/redkale/net/client/ClientResponse.java b/src/main/java/org/redkale/net/client/ClientResponse.java index 650bb49d7..829ef7a69 100644 --- a/src/main/java/org/redkale/net/client/ClientResponse.java +++ b/src/main/java/org/redkale/net/client/ClientResponse.java @@ -13,14 +13,15 @@ import java.io.Serializable; * 详情见: https://redkale.org * * @author zhangjx - * + * * @since 2.3.0 * + * @param 请求对象 * @param

message */ -public class ClientResponse

{ +public class ClientResponse { - protected ClientRequest request; + protected R request; protected P message; @@ -29,12 +30,12 @@ public class ClientResponse

{ public ClientResponse() { } - public ClientResponse(ClientRequest request, P message) { + public ClientResponse(R request, P message) { this.request = request; this.message = message; } - public ClientResponse(ClientRequest request, Throwable exc) { + public ClientResponse(R request, Throwable exc) { this.request = request; this.exc = exc; } @@ -43,13 +44,13 @@ public class ClientResponse

{ return request == null ? null : request.getRequestid(); } - public ClientResponse

set(ClientRequest request, P message) { + public ClientResponse set(R request, P message) { this.request = request; this.message = message; return this; } - public ClientResponse

set(ClientRequest request, Throwable exc) { + public ClientResponse set(R request, Throwable exc) { this.request = request; this.exc = exc; return this; @@ -68,11 +69,11 @@ public class ClientResponse

{ return true; } - public ClientRequest getRequest() { + public R getRequest() { return request; } - public void setRequest(ClientRequest request) { + public void setRequest(R request) { this.request = request; } diff --git a/src/main/java/org/redkale/net/client/ClientWriteIOThread.java b/src/main/java/org/redkale/net/client/ClientWriteIOThread.java index dc8eab2df..ff8e6936d 100644 --- a/src/main/java/org/redkale/net/client/ClientWriteIOThread.java +++ b/src/main/java/org/redkale/net/client/ClientWriteIOThread.java @@ -3,7 +3,7 @@ */ package org.redkale.net.client; -import java.io.*; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; import java.util.*; @@ -74,13 +74,7 @@ public class ClientWriteIOThread extends AsyncIOThread { while ((entry = requestQueue.take()) != null) { map.clear(); if (!entry.isDone()) { - Serializable reqid = entry.request.getRequestid(); - if (reqid == null) { - entry.conn.responseQueue.offer(entry); - } else { - entry.conn.responseMap.put(reqid, entry); - } - + entry.conn.offerRespFuture(entry); if (entry.conn.pauseWriting.get()) { if (entry.conn.pauseResuming.get()) { try { @@ -97,12 +91,7 @@ public class ClientWriteIOThread extends AsyncIOThread { } while ((entry = requestQueue.poll()) != null) { if (!entry.isDone()) { - Serializable reqid = entry.request.getRequestid(); - if (reqid == null) { - entry.conn.responseQueue.offer(entry); - } else { - entry.conn.responseMap.put(reqid, entry); - } + entry.conn.offerRespFuture(entry); if (entry.conn.pauseWriting.get()) { if (entry.conn.pauseResuming.get()) { try {