From 4b70e19211839c3b6645a2ab87cdc75ba0446e25 Mon Sep 17 00:00:00 2001 From: redkale Date: Mon, 30 Jan 2023 10:33:00 +0800 Subject: [PATCH] =?UTF-8?q?Request=E5=A2=9E=E5=8A=A0getRequestid=E6=96=B9?= =?UTF-8?q?=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/org/redkale/net/AsyncConnection.java | 12 ++++++------ src/main/java/org/redkale/net/AsyncGroup.java | 8 ++++---- src/main/java/org/redkale/net/AsyncIOGroup.java | 10 +++++----- src/main/java/org/redkale/net/AsyncIOThread.java | 2 +- .../org/redkale/net/AsyncNioCompletionHandler.java | 4 ++-- .../java/org/redkale/net/AsyncNioConnection.java | 4 ++-- .../java/org/redkale/net/AsyncNioTcpConnection.java | 4 ++-- .../java/org/redkale/net/AsyncNioUdpConnection.java | 8 ++++---- src/main/java/org/redkale/net/ProtocolCodec.java | 8 ++++++-- src/main/java/org/redkale/net/Request.java | 5 +++-- src/main/java/org/redkale/net/SSLBuilder.java | 6 +++--- src/main/java/org/redkale/net/http/HttpRequest.java | 5 +++++ src/main/java/org/redkale/net/sncp/SncpRequest.java | 6 ++++++ 13 files changed, 49 insertions(+), 33 deletions(-) diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index 0255f535d..156b08adb 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -42,7 +42,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl protected final AsyncIOGroup ioGroup; - protected final boolean client; + protected final boolean clientMode; protected final int bufferCapacity; @@ -82,12 +82,12 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl //用于服务端的Socket, 等同于一直存在的readCompletionHandler ProtocolCodec protocolCodec; - protected AsyncConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, + protected AsyncConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext) { Objects.requireNonNull(ioGroup); Objects.requireNonNull(ioReadThread); Objects.requireNonNull(ioWriteThread); - this.client = client; + this.clientMode = clientMode; this.ioGroup = ioGroup; this.ioReadThread = ioReadThread; this.ioWriteThread = ioWriteThread; @@ -98,17 +98,17 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl this.writeBufferConsumer = ioWriteThread.getBufferConsumer(); this.livingCounter = ioGroup.connLivingCounter; this.closedCounter = ioGroup.connClosedCounter; - if (client) { //client模式下无SSLBuilder + if (clientMode) { //client模式下无SSLBuilder if (sslContext != null) { if (sslBuilder != null) { - this.sslEngine = sslBuilder.createSSLEngine(sslContext, client); + this.sslEngine = sslBuilder.createSSLEngine(sslContext, clientMode); } else { this.sslEngine = sslContext.createSSLEngine(); } } } else { if (sslBuilder != null && sslContext != null) { - this.sslEngine = sslBuilder.createSSLEngine(sslContext, client); + this.sslEngine = sslBuilder.createSSLEngine(sslContext, clientMode); } } } diff --git a/src/main/java/org/redkale/net/AsyncGroup.java b/src/main/java/org/redkale/net/AsyncGroup.java index 1739540ab..83ce40121 100644 --- a/src/main/java/org/redkale/net/AsyncGroup.java +++ b/src/main/java/org/redkale/net/AsyncGroup.java @@ -30,12 +30,12 @@ public abstract class AsyncGroup { return new AsyncIOGroup(true, threadNameFormat, workExecutor, bufferCapacity, safeBufferPool); } - public static AsyncGroup create(boolean client, String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { - return new AsyncIOGroup(client, threadNameFormat, workExecutor, bufferCapacity, bufferPoolSize); + public static AsyncGroup create(boolean clientMode, String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { + return new AsyncIOGroup(clientMode, threadNameFormat, workExecutor, bufferCapacity, bufferPoolSize); } - public static AsyncGroup create(boolean client, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, ObjectPool safeBufferPool) { - return new AsyncIOGroup(client, threadNameFormat, workExecutor, bufferCapacity, safeBufferPool); + public static AsyncGroup create(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, ObjectPool safeBufferPool) { + return new AsyncIOGroup(clientMode, threadNameFormat, workExecutor, bufferCapacity, safeBufferPool); } public CompletableFuture createTCPClient(final SocketAddress address) { diff --git a/src/main/java/org/redkale/net/AsyncIOGroup.java b/src/main/java/org/redkale/net/AsyncIOGroup.java index 125bed1ff..af8c2c0bb 100644 --- a/src/main/java/org/redkale/net/AsyncIOGroup.java +++ b/src/main/java/org/redkale/net/AsyncIOGroup.java @@ -62,8 +62,8 @@ public class AsyncIOGroup extends AsyncGroup { this(true, "Redkale-AnonymousClient-IOThread-%s", null, bufferCapacity, bufferPoolSize); } - public AsyncIOGroup(boolean client, String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { - this(client, threadNameFormat, workExecutor, bufferCapacity, ObjectPool.createSafePool(null, null, bufferPoolSize, + public AsyncIOGroup(boolean clientMode, String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) { + this(clientMode, threadNameFormat, workExecutor, bufferCapacity, ObjectPool.createSafePool(null, null, bufferPoolSize, (Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> { if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) { return false; @@ -74,7 +74,7 @@ public class AsyncIOGroup extends AsyncGroup { } @SuppressWarnings("OverridableMethodCallInConstructor") - public AsyncIOGroup(boolean client, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, ObjectPool safeBufferPool) { + public AsyncIOGroup(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, ObjectPool safeBufferPool) { this.bufferCapacity = bufferCapacity; final int threads = Utility.cpus(); this.ioReadThreads = new AsyncIOThread[threads]; @@ -89,7 +89,7 @@ public class AsyncIOGroup extends AsyncGroup { try { for (int i = 0; i < threads; i++) { String indexfix = WorkThread.formatIndex(threads, i + 1); - if (client) { + if (clientMode) { this.ioReadThreads[i] = createClientReadIOThread(g, String.format(threadNameFormat, "Read-" + indexfix), i, threads, workExecutor, safeBufferPool); this.ioWriteThreads[i] = createClientWriteIOThread(g, String.format(threadNameFormat, "Write-" + indexfix), i, threads, workExecutor, safeBufferPool); } else { @@ -97,7 +97,7 @@ public class AsyncIOGroup extends AsyncGroup { this.ioWriteThreads[i] = this.ioReadThreads[i]; } } - if (client) { + if (clientMode) { this.connectThread = createClientReadIOThread(g, String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, safeBufferPool); } else { this.connectThread = null; diff --git a/src/main/java/org/redkale/net/AsyncIOThread.java b/src/main/java/org/redkale/net/AsyncIOThread.java index db654e9ab..1e54f2c90 100644 --- a/src/main/java/org/redkale/net/AsyncIOThread.java +++ b/src/main/java/org/redkale/net/AsyncIOThread.java @@ -177,7 +177,7 @@ public class AsyncIOThread extends WorkThread { continue; } AsyncNioConnection conn = (AsyncNioConnection) key.attachment(); - if (conn.client) { + if (conn.clientMode) { if (key.isConnectable()) { key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT); conn.doConnect(); diff --git a/src/main/java/org/redkale/net/AsyncNioCompletionHandler.java b/src/main/java/org/redkale/net/AsyncNioCompletionHandler.java index 9c6004e6d..c1dc37aee 100644 --- a/src/main/java/org/redkale/net/AsyncNioCompletionHandler.java +++ b/src/main/java/org/redkale/net/AsyncNioCompletionHandler.java @@ -34,9 +34,9 @@ class AsyncNioCompletionHandler implements CompletionHandler, Run private ByteBuffer buffer; - public AsyncNioCompletionHandler(boolean readFlag, AsyncNioConnection conn) { + public AsyncNioCompletionHandler(boolean readMode, AsyncNioConnection conn) { this.conn = conn; - this.readMode = readFlag; + this.readMode = readMode; } public void handler(CompletionHandler handler, A attachment) { diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index eb7339be8..0e8e3e738 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -92,9 +92,9 @@ abstract class AsyncNioConnection extends AsyncConnection { protected SelectionKey writeKey; - public AsyncNioConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, + public AsyncNioConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext) { - super(client, ioGroup, ioReadThread, ioWriteThread, bufferCapacity, sslBuilder, sslContext); + super(clientMode, ioGroup, ioReadThread, ioWriteThread, bufferCapacity, sslBuilder, sslContext); this.connectThread = ioGroup.connectThread; } diff --git a/src/main/java/org/redkale/net/AsyncNioTcpConnection.java b/src/main/java/org/redkale/net/AsyncNioTcpConnection.java index 555c99276..3f669c881 100644 --- a/src/main/java/org/redkale/net/AsyncNioTcpConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioTcpConnection.java @@ -26,9 +26,9 @@ class AsyncNioTcpConnection extends AsyncNioConnection { private final SocketChannel channel; - public AsyncNioTcpConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, + public AsyncNioTcpConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, SocketChannel ch, SSLBuilder sslBuilder, SSLContext sslContext, final SocketAddress address) { - super(client, ioGroup, ioReadThread, ioWriteThread, ioGroup.bufferCapacity, sslBuilder, sslContext); + super(clientMode, ioGroup, ioReadThread, ioWriteThread, ioGroup.bufferCapacity, sslBuilder, sslContext); this.channel = ch; SocketAddress addr = address; if (addr == null) { diff --git a/src/main/java/org/redkale/net/AsyncNioUdpConnection.java b/src/main/java/org/redkale/net/AsyncNioUdpConnection.java index 640222b19..a98b7e4e3 100644 --- a/src/main/java/org/redkale/net/AsyncNioUdpConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioUdpConnection.java @@ -23,9 +23,9 @@ class AsyncNioUdpConnection extends AsyncNioConnection { private final DatagramChannel channel; - public AsyncNioUdpConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, + public AsyncNioUdpConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread, AsyncIOThread ioWriteThread, DatagramChannel ch, SSLBuilder sslBuilder, SSLContext sslContext, final SocketAddress address) { - super(client, ioGroup, ioReadThread, ioWriteThread, ioGroup.bufferCapacity, sslBuilder, sslContext); + super(clientMode, ioGroup, ioReadThread, ioWriteThread, ioGroup.bufferCapacity, sslBuilder, sslContext); this.channel = ch; SocketAddress addr = address; if (addr == null) { @@ -100,7 +100,7 @@ class AsyncNioUdpConnection extends AsyncNioConnection { @Override public boolean isConnected() { - if (!client) { + if (!clientMode) { return true; } return this.channel.isConnected(); @@ -153,7 +153,7 @@ class AsyncNioUdpConnection extends AsyncNioConnection { @Override public final void close() throws IOException { super.close(); - if (client) { + if (clientMode) { channel.close(); //不能关闭channel } if (this.connectKey != null) { diff --git a/src/main/java/org/redkale/net/ProtocolCodec.java b/src/main/java/org/redkale/net/ProtocolCodec.java index 1247945fe..d35e1e335 100644 --- a/src/main/java/org/redkale/net/ProtocolCodec.java +++ b/src/main/java/org/redkale/net/ProtocolCodec.java @@ -162,12 +162,16 @@ class ProtocolCodec implements CompletionHandler { if (pindex == 0) { pindex++; } - request.pipeline(pindex, pindex + 1); + if (request.getRequestid() == null) { //存在requestid则无视pipeline模式 + request.pipeline(pindex, pindex + 1); + } if (hreq == null) { hreq = request.copyHeader(); } } else { - request.pipeline(pindex, pindex); + if (request.getRequestid() == null) { //存在requestid则无视pipeline模式 + request.pipeline(pindex, pindex); + } channel.setReadBuffer(buffer.clear()); } context.executeDispatch(request, response); diff --git a/src/main/java/org/redkale/net/Request.java b/src/main/java/org/redkale/net/Request.java index 45bba0f2f..b572d2ee6 100644 --- a/src/main/java/org/redkale/net/Request.java +++ b/src/main/java/org/redkale/net/Request.java @@ -5,7 +5,7 @@ */ package org.redkale.net; -import java.io.InputStream; +import java.io.*; import java.nio.ByteBuffer; import java.util.*; import org.redkale.convert.ConvertDisabled; @@ -77,7 +77,6 @@ public abstract class Request { return null; } - //重载此方法,不设置pipelineIndex值可以将协议改成无pipeline模式 protected Request pipeline(int pipelineIndex, int pipelineCount) { this.pipelineIndex = pipelineIndex; this.pipelineCount = pipelineCount; @@ -94,6 +93,8 @@ public abstract class Request { */ protected abstract int readHeader(ByteBuffer buffer, Request last); + protected abstract Serializable getRequestid(); + protected abstract void prepare(); protected void recycle() { diff --git a/src/main/java/org/redkale/net/SSLBuilder.java b/src/main/java/org/redkale/net/SSLBuilder.java index 726247112..82f66e99d 100644 --- a/src/main/java/org/redkale/net/SSLBuilder.java +++ b/src/main/java/org/redkale/net/SSLBuilder.java @@ -5,7 +5,7 @@ */ package org.redkale.net; -import java.io.*; +import java.io.FileInputStream; import java.security.*; import java.security.cert.*; import java.util.*; @@ -151,7 +151,7 @@ public class SSLBuilder { return sslContext; } - public SSLEngine createSSLEngine(SSLContext sslContext, boolean client) { + public SSLEngine createSSLEngine(SSLContext sslContext, boolean clientMode) { SSLEngine engine = sslContext.createSSLEngine(); if (protocols != null) { engine.setEnabledProtocols(protocols); @@ -159,7 +159,7 @@ public class SSLBuilder { if (ciphers != null) { engine.setEnabledCipherSuites(ciphers); } - engine.setUseClientMode(client); + engine.setUseClientMode(clientMode); if (wantClientAuth) { engine.setWantClientAuth(true); } else if (needClientAuth) { diff --git a/src/main/java/org/redkale/net/http/HttpRequest.java b/src/main/java/org/redkale/net/http/HttpRequest.java index 0981eb99a..c3cc6a07c 100644 --- a/src/main/java/org/redkale/net/http/HttpRequest.java +++ b/src/main/java/org/redkale/net/http/HttpRequest.java @@ -859,6 +859,11 @@ public class HttpRequest extends Request { return req; } + @Override + protected Serializable getRequestid() { + return null; + } + @Override protected void prepare() { this.keepAlive = true; //默认HTTP/1.1 diff --git a/src/main/java/org/redkale/net/sncp/SncpRequest.java b/src/main/java/org/redkale/net/sncp/SncpRequest.java index 746bb1db4..02be4e29c 100644 --- a/src/main/java/org/redkale/net/sncp/SncpRequest.java +++ b/src/main/java/org/redkale/net/sncp/SncpRequest.java @@ -5,6 +5,7 @@ */ package org.redkale.net.sncp; +import java.io.Serializable; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.logging.*; @@ -111,6 +112,11 @@ public class SncpRequest extends Request { return 0; } + @Override + protected Serializable getRequestid() { + return seqid; + } + @Override protected void prepare() { this.keepAlive = true;