优化net.client

This commit is contained in:
Redkale
2023-01-04 11:32:48 +08:00
parent 80c04d9645
commit 7680b4e165
7 changed files with 49 additions and 34 deletions

View File

@@ -16,6 +16,8 @@ import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*;
import static javax.net.ssl.SSLEngineResult.Status.*;
import javax.net.ssl.*;
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*;
import static javax.net.ssl.SSLEngineResult.Status.*;
import org.redkale.util.*;
/**
@@ -671,7 +673,7 @@ public abstract class AsyncConnection implements ChannelContext, Channel, AutoCl
} catch (Exception io) {
}
}
if (this.readBuffer != null) {
if (this.readBuffer != null && Thread.currentThread() == this.ioReadThread) {
Consumer<ByteBuffer> consumer = this.readBufferConsumer;
if (consumer != null) {
consumer.accept(this.readBuffer);

View File

@@ -13,6 +13,7 @@ import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import org.redkale.annotation.ResourceType;
import org.redkale.net.client.*;
import org.redkale.util.*;
/**
@@ -82,14 +83,20 @@ public class AsyncIOGroup extends AsyncGroup {
this.ioWriteThreads = new AsyncIOThread[threads];
try {
for (int i = 0; i < threads; i++) {
String postfix = "-" + (i >= 9 ? (i + 1) : ("0" + (i + 1)));
ObjectPool<ByteBuffer> unsafeReadBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(),
safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler());
String name = threadPrefixName + "-" + (i >= 9 ? (i + 1) : ("0" + (i + 1)));
if (client) {
this.ioReadThreads[i] = new ClientIOThread(name, i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool);
this.ioReadThreads[i] = new ClientIOThread(threadPrefixName + postfix, i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool);
this.ioWriteThreads[i] = this.ioReadThreads[i];
if (false) {
this.ioReadThreads[i].setName(threadPrefixName + "-Read" + postfix);
ObjectPool<ByteBuffer> unsafeWriteBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(),
safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler());
this.ioWriteThreads[i] = new ClientWriteIOThread(threadPrefixName + "-Write" + postfix, i, threads, workExecutor, Selector.open(), unsafeWriteBufferPool, safeBufferPool);
}
} else {
this.ioReadThreads[i] = new AsyncIOThread(name, i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool);
this.ioReadThreads[i] = new AsyncIOThread(threadPrefixName + postfix, i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool);
this.ioWriteThreads[i] = this.ioReadThreads[i];
}
}

View File

@@ -52,6 +52,10 @@ public class AsyncIOThread extends WorkThread {
this.bufferConsumer = (v) -> (inCurrThread() ? unsafeBufferPool : safeBufferPool).accept(v);
}
protected boolean isClosed() {
return closed;
}
public static AsyncIOThread currAsyncIOThread() {
Thread t = Thread.currentThread();
return t instanceof AsyncIOThread ? (AsyncIOThread) t : null;
@@ -127,7 +131,7 @@ public class AsyncIOThread extends WorkThread {
public void run() {
final Queue<Runnable> commands = this.commandQueue;
final Queue<Consumer<Selector>> registers = this.registerQueue;
while (!this.closed) {
while (!isClosed()) {
try {
Consumer<Selector> register;
while ((register = registers.poll()) != null) {

View File

@@ -8,6 +8,7 @@ package org.redkale.net.client;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.AbstractMap.SimpleEntry;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@@ -50,7 +51,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
protected final AtomicBoolean writePending = new AtomicBoolean();
protected final Queue<R> requestQueue = new ArrayDeque<>();
protected final Queue<SimpleEntry<R, ClientFuture<R>>> requestQueue = new ArrayDeque<>();
final ArrayDeque<ClientFuture> responseQueue = new ArrayDeque<>();
@@ -85,7 +86,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
protected int maxPipelines; //最大并行处理数
protected R lastHalfRequest;
protected SimpleEntry<R, ClientFuture<R>> lastHalfEntry;
protected ClientConnection setMaxPipelines(int maxPipelines) {
this.maxPipelines = maxPipelines;
@@ -120,37 +121,42 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
if (pw.get()) {
break;
}
R req;
if (lastHalfRequest == null) {
req = requestQueue.poll();
SimpleEntry<R, ClientFuture<R>> entry;
if (lastHalfEntry == null) {
entry = requestQueue.poll();
} else {
req = lastHalfRequest;
lastHalfRequest = null;
entry = lastHalfEntry;
lastHalfEntry = null;
}
if (req == null) {
if (entry == null) {
break;
}
R req = entry.getKey();
writeLastRequest = req;
if (req.getRequestid() == null && req.canMerge(conn)) {
R r;
SimpleEntry<R, ClientFuture<R>> r;
while ((r = requestQueue.poll()) != null) {
i++;
if (!req.merge(conn, r)) {
if (!req.merge(conn, r.getKey())) {
break;
}
req.respFuture.mergeCount++;
ClientFuture<R> f = entry.getValue();
if (f != null) {
f.mergeCount++;
}
//req.respFuture.mergeCount++;
}
req.accept(conn, rw);
if (r != null) {
r.accept(conn, rw);
req = r;
r.getKey().accept(conn, rw);
req = r.getKey();
}
} else {
req.accept(conn, rw);
}
c++;
if (!req.isCompleted()) {
lastHalfRequest = req;
lastHalfEntry = entry;
this.pauseWriting.set(true);
break;
}
@@ -199,7 +205,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
pauseWriting.set(false);
return;
} else { //异常了需要清掉半包
lastHalfRequest = null;
lastHalfEntry = null;
pauseWriting.set(false);
}
}
@@ -308,7 +314,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
channel.read(this);
}
} else { //还有消息需要读取
if ((!requestQueue.isEmpty() || lastHalfRequest != null) && writePending.compareAndSet(false, true)) {
if ((!requestQueue.isEmpty() || lastHalfEntry != null) && writePending.compareAndSet(false, true)) {
//先写后读取
if (!sendWrite(true)) {
writePending.compareAndSet(true, false);
@@ -370,20 +376,22 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
return respFuture;
}
private void writeChannelInThread(R request, ClientFuture respFuture) {
private void writeChannelInThread(R request, ClientFuture<R> respFuture) {
Serializable reqid = request.getRequestid();
//保证顺序一致
if (respFuture.request.isCloseType()) {
ClientFuture future;
if (request.isCloseType()) {
future = null;
responseQueue.offer(ClientFuture.EMPTY);
} else {
request.respFuture = respFuture;
future = respFuture;
if (reqid == null) {
responseQueue.offer(respFuture);
} else {
responseMap.put(reqid, respFuture);
}
}
requestQueue.offer(request);
requestQueue.offer(new SimpleEntry<>(request, future));
if (isAuthenticated() && client.reqWritedCounter != null) {
client.reqWritedCounter.increment();
}

View File

@@ -1,11 +1,12 @@
/*
*
*/
package org.redkale.net;
package org.redkale.net.client;
import java.nio.ByteBuffer;
import java.nio.channels.Selector;
import java.util.concurrent.ExecutorService;
import org.redkale.net.AsyncIOThread;
import org.redkale.util.ObjectPool;
/**

View File

@@ -27,8 +27,6 @@ public abstract class ClientRequest implements BiConsumer<ClientConnection, Byte
protected String traceid;
ClientFuture respFuture;
public Serializable getRequestid() {
return null;
}
@@ -51,10 +49,6 @@ public abstract class ClientRequest implements BiConsumer<ClientConnection, Byte
return (T) this;
}
public int getMergeCount() {
return respFuture == null ? -1 : respFuture.mergeCount;
}
//是否能合并, requestid=null的情况下值才有效
protected boolean canMerge(ClientConnection conn) {
return false;

View File

@@ -7,7 +7,6 @@ import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.Selector;
import java.util.concurrent.*;
import org.redkale.net.ClientIOThread;
import org.redkale.util.*;
/**
@@ -29,7 +28,7 @@ public class ClientWriteIOThread extends ClientIOThread {
@Override
public void run() {
while (!this.closed) {
while (!isClosed()) {
ClientEntity entity;
try {
while ((entity = requestQueue.take()) != null) {