Request增加getRequestid方法

This commit is contained in:
redkale
2023-01-30 10:33:00 +08:00
parent ae27fffdb0
commit 4b70e19211
13 changed files with 49 additions and 33 deletions

View File

@@ -42,7 +42,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
protected final AsyncIOGroup ioGroup;
protected final boolean client;
protected final boolean clientMode;
protected final int bufferCapacity;
@@ -82,12 +82,12 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
//用于服务端的Socket, 等同于一直存在的readCompletionHandler
ProtocolCodec protocolCodec;
protected AsyncConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread,
protected AsyncConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread,
AsyncIOThread ioWriteThread, int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext) {
Objects.requireNonNull(ioGroup);
Objects.requireNonNull(ioReadThread);
Objects.requireNonNull(ioWriteThread);
this.client = client;
this.clientMode = clientMode;
this.ioGroup = ioGroup;
this.ioReadThread = ioReadThread;
this.ioWriteThread = ioWriteThread;
@@ -98,17 +98,17 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
this.writeBufferConsumer = ioWriteThread.getBufferConsumer();
this.livingCounter = ioGroup.connLivingCounter;
this.closedCounter = ioGroup.connClosedCounter;
if (client) { //client模式下无SSLBuilder
if (clientMode) { //client模式下无SSLBuilder
if (sslContext != null) {
if (sslBuilder != null) {
this.sslEngine = sslBuilder.createSSLEngine(sslContext, client);
this.sslEngine = sslBuilder.createSSLEngine(sslContext, clientMode);
} else {
this.sslEngine = sslContext.createSSLEngine();
}
}
} else {
if (sslBuilder != null && sslContext != null) {
this.sslEngine = sslBuilder.createSSLEngine(sslContext, client);
this.sslEngine = sslBuilder.createSSLEngine(sslContext, clientMode);
}
}
}

View File

@@ -30,12 +30,12 @@ public abstract class AsyncGroup {
return new AsyncIOGroup(true, threadNameFormat, workExecutor, bufferCapacity, safeBufferPool);
}
public static AsyncGroup create(boolean client, String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) {
return new AsyncIOGroup(client, threadNameFormat, workExecutor, bufferCapacity, bufferPoolSize);
public static AsyncGroup create(boolean clientMode, String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) {
return new AsyncIOGroup(clientMode, threadNameFormat, workExecutor, bufferCapacity, bufferPoolSize);
}
public static AsyncGroup create(boolean client, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) {
return new AsyncIOGroup(client, threadNameFormat, workExecutor, bufferCapacity, safeBufferPool);
public static AsyncGroup create(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) {
return new AsyncIOGroup(clientMode, threadNameFormat, workExecutor, bufferCapacity, safeBufferPool);
}
public CompletableFuture<AsyncConnection> createTCPClient(final SocketAddress address) {

View File

@@ -62,8 +62,8 @@ public class AsyncIOGroup extends AsyncGroup {
this(true, "Redkale-AnonymousClient-IOThread-%s", null, bufferCapacity, bufferPoolSize);
}
public AsyncIOGroup(boolean client, String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) {
this(client, threadNameFormat, workExecutor, bufferCapacity, ObjectPool.createSafePool(null, null, bufferPoolSize,
public AsyncIOGroup(boolean clientMode, String threadNameFormat, final ExecutorService workExecutor, final int bufferCapacity, final int bufferPoolSize) {
this(clientMode, threadNameFormat, workExecutor, bufferCapacity, ObjectPool.createSafePool(null, null, bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) {
return false;
@@ -74,7 +74,7 @@ public class AsyncIOGroup extends AsyncGroup {
}
@SuppressWarnings("OverridableMethodCallInConstructor")
public AsyncIOGroup(boolean client, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) {
public AsyncIOGroup(boolean clientMode, String threadNameFormat, ExecutorService workExecutor, final int bufferCapacity, ObjectPool<ByteBuffer> safeBufferPool) {
this.bufferCapacity = bufferCapacity;
final int threads = Utility.cpus();
this.ioReadThreads = new AsyncIOThread[threads];
@@ -89,7 +89,7 @@ public class AsyncIOGroup extends AsyncGroup {
try {
for (int i = 0; i < threads; i++) {
String indexfix = WorkThread.formatIndex(threads, i + 1);
if (client) {
if (clientMode) {
this.ioReadThreads[i] = createClientReadIOThread(g, String.format(threadNameFormat, "Read-" + indexfix), i, threads, workExecutor, safeBufferPool);
this.ioWriteThreads[i] = createClientWriteIOThread(g, String.format(threadNameFormat, "Write-" + indexfix), i, threads, workExecutor, safeBufferPool);
} else {
@@ -97,7 +97,7 @@ public class AsyncIOGroup extends AsyncGroup {
this.ioWriteThreads[i] = this.ioReadThreads[i];
}
}
if (client) {
if (clientMode) {
this.connectThread = createClientReadIOThread(g, String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, safeBufferPool);
} else {
this.connectThread = null;

View File

@@ -177,7 +177,7 @@ public class AsyncIOThread extends WorkThread {
continue;
}
AsyncNioConnection conn = (AsyncNioConnection) key.attachment();
if (conn.client) {
if (conn.clientMode) {
if (key.isConnectable()) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
conn.doConnect();

View File

@@ -34,9 +34,9 @@ class AsyncNioCompletionHandler<A> implements CompletionHandler<Integer, A>, Run
private ByteBuffer buffer;
public AsyncNioCompletionHandler(boolean readFlag, AsyncNioConnection conn) {
public AsyncNioCompletionHandler(boolean readMode, AsyncNioConnection conn) {
this.conn = conn;
this.readMode = readFlag;
this.readMode = readMode;
}
public void handler(CompletionHandler<Integer, A> handler, A attachment) {

View File

@@ -92,9 +92,9 @@ abstract class AsyncNioConnection extends AsyncConnection {
protected SelectionKey writeKey;
public AsyncNioConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread,
public AsyncNioConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread,
AsyncIOThread ioWriteThread, final int bufferCapacity, SSLBuilder sslBuilder, SSLContext sslContext) {
super(client, ioGroup, ioReadThread, ioWriteThread, bufferCapacity, sslBuilder, sslContext);
super(clientMode, ioGroup, ioReadThread, ioWriteThread, bufferCapacity, sslBuilder, sslContext);
this.connectThread = ioGroup.connectThread;
}

View File

@@ -26,9 +26,9 @@ class AsyncNioTcpConnection extends AsyncNioConnection {
private final SocketChannel channel;
public AsyncNioTcpConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread,
public AsyncNioTcpConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread,
AsyncIOThread ioWriteThread, SocketChannel ch, SSLBuilder sslBuilder, SSLContext sslContext, final SocketAddress address) {
super(client, ioGroup, ioReadThread, ioWriteThread, ioGroup.bufferCapacity, sslBuilder, sslContext);
super(clientMode, ioGroup, ioReadThread, ioWriteThread, ioGroup.bufferCapacity, sslBuilder, sslContext);
this.channel = ch;
SocketAddress addr = address;
if (addr == null) {

View File

@@ -23,9 +23,9 @@ class AsyncNioUdpConnection extends AsyncNioConnection {
private final DatagramChannel channel;
public AsyncNioUdpConnection(boolean client, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread,
public AsyncNioUdpConnection(boolean clientMode, AsyncIOGroup ioGroup, AsyncIOThread ioReadThread,
AsyncIOThread ioWriteThread, DatagramChannel ch, SSLBuilder sslBuilder, SSLContext sslContext, final SocketAddress address) {
super(client, ioGroup, ioReadThread, ioWriteThread, ioGroup.bufferCapacity, sslBuilder, sslContext);
super(clientMode, ioGroup, ioReadThread, ioWriteThread, ioGroup.bufferCapacity, sslBuilder, sslContext);
this.channel = ch;
SocketAddress addr = address;
if (addr == null) {
@@ -100,7 +100,7 @@ class AsyncNioUdpConnection extends AsyncNioConnection {
@Override
public boolean isConnected() {
if (!client) {
if (!clientMode) {
return true;
}
return this.channel.isConnected();
@@ -153,7 +153,7 @@ class AsyncNioUdpConnection extends AsyncNioConnection {
@Override
public final void close() throws IOException {
super.close();
if (client) {
if (clientMode) {
channel.close(); //不能关闭channel
}
if (this.connectKey != null) {

View File

@@ -162,12 +162,16 @@ class ProtocolCodec implements CompletionHandler<Integer, ByteBuffer> {
if (pindex == 0) {
pindex++;
}
request.pipeline(pindex, pindex + 1);
if (request.getRequestid() == null) { //存在requestid则无视pipeline模式
request.pipeline(pindex, pindex + 1);
}
if (hreq == null) {
hreq = request.copyHeader();
}
} else {
request.pipeline(pindex, pindex);
if (request.getRequestid() == null) { //存在requestid则无视pipeline模式
request.pipeline(pindex, pindex);
}
channel.setReadBuffer(buffer.clear());
}
context.executeDispatch(request, response);

View File

@@ -5,7 +5,7 @@
*/
package org.redkale.net;
import java.io.InputStream;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.*;
import org.redkale.convert.ConvertDisabled;
@@ -77,7 +77,6 @@ public abstract class Request<C extends Context> {
return null;
}
//重载此方法不设置pipelineIndex值可以将协议改成无pipeline模式
protected Request pipeline(int pipelineIndex, int pipelineCount) {
this.pipelineIndex = pipelineIndex;
this.pipelineCount = pipelineCount;
@@ -94,6 +93,8 @@ public abstract class Request<C extends Context> {
*/
protected abstract int readHeader(ByteBuffer buffer, Request last);
protected abstract Serializable getRequestid();
protected abstract void prepare();
protected void recycle() {

View File

@@ -5,7 +5,7 @@
*/
package org.redkale.net;
import java.io.*;
import java.io.FileInputStream;
import java.security.*;
import java.security.cert.*;
import java.util.*;
@@ -151,7 +151,7 @@ public class SSLBuilder {
return sslContext;
}
public SSLEngine createSSLEngine(SSLContext sslContext, boolean client) {
public SSLEngine createSSLEngine(SSLContext sslContext, boolean clientMode) {
SSLEngine engine = sslContext.createSSLEngine();
if (protocols != null) {
engine.setEnabledProtocols(protocols);
@@ -159,7 +159,7 @@ public class SSLBuilder {
if (ciphers != null) {
engine.setEnabledCipherSuites(ciphers);
}
engine.setUseClientMode(client);
engine.setUseClientMode(clientMode);
if (wantClientAuth) {
engine.setWantClientAuth(true);
} else if (needClientAuth) {

View File

@@ -859,6 +859,11 @@ public class HttpRequest extends Request<HttpContext> {
return req;
}
@Override
protected Serializable getRequestid() {
return null;
}
@Override
protected void prepare() {
this.keepAlive = true; //默认HTTP/1.1

View File

@@ -5,6 +5,7 @@
*/
package org.redkale.net.sncp;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.logging.*;
@@ -111,6 +112,11 @@ public class SncpRequest extends Request<SncpContext> {
return 0;
}
@Override
protected Serializable getRequestid() {
return seqid;
}
@Override
protected void prepare() {
this.keepAlive = true;