WorkThread优化

This commit is contained in:
Redkale
2023-01-14 16:13:46 +08:00
parent e6150b3469
commit 21df7f05dd
7 changed files with 104 additions and 65 deletions

View File

@@ -35,20 +35,18 @@ public class AsyncIOGroup extends AsyncGroup {
private boolean skipClose; private boolean skipClose;
//必须与ioWriteThreads数量相同
final AsyncIOThread[] ioReadThreads; final AsyncIOThread[] ioReadThreads;
//必须与ioReadThreads数量相同
final AsyncIOThread[] ioWriteThreads; final AsyncIOThread[] ioWriteThreads;
final AsyncIOThread connectThread; final AsyncIOThread connectThread;
final int bufferCapacity; final int bufferCapacity;
final AtomicInteger shareCount = new AtomicInteger(1);
private final AtomicInteger readIndex = new AtomicInteger(); private final AtomicInteger readIndex = new AtomicInteger();
private final AtomicInteger writeIndex = new AtomicInteger();
//创建数 //创建数
final LongAdder connCreateCounter = new LongAdder(); final LongAdder connCreateCounter = new LongAdder();
@@ -80,26 +78,27 @@ public class AsyncIOGroup extends AsyncGroup {
final int threads = Utility.cpus(); final int threads = Utility.cpus();
this.ioReadThreads = new AsyncIOThread[threads]; this.ioReadThreads = new AsyncIOThread[threads];
this.ioWriteThreads = new AsyncIOThread[threads]; this.ioWriteThreads = new AsyncIOThread[threads];
final ThreadGroup g = new ThreadGroup(String.format(threadNameFormat, "Group"));
try { try {
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
String indexfix = WorkThread.formatIndex(threads, i + 1); String indexfix = WorkThread.formatIndex(threads, i + 1);
ObjectPool<ByteBuffer> unsafeReadBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), ObjectPool<ByteBuffer> unsafeReadBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(),
safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler());
if (client) { if (client) {
this.ioReadThreads[i] = new ClientReadIOThread(String.format(threadNameFormat, "Read-" + indexfix), i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool); this.ioReadThreads[i] = new ClientReadIOThread(g, String.format(threadNameFormat, "Read-" + indexfix), i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool);
ObjectPool<ByteBuffer> unsafeWriteBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), ObjectPool<ByteBuffer> unsafeWriteBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(),
safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler());
this.ioWriteThreads[i] = new ClientWriteIOThread(String.format(threadNameFormat, "Write-" + indexfix), i, threads, workExecutor, Selector.open(), unsafeWriteBufferPool, safeBufferPool); this.ioWriteThreads[i] = new ClientWriteIOThread(g, String.format(threadNameFormat, "Write-" + indexfix), i, threads, workExecutor, Selector.open(), unsafeWriteBufferPool, safeBufferPool);
} else { } else {
this.ioReadThreads[i] = new AsyncIOThread(String.format(threadNameFormat, indexfix), i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool); this.ioReadThreads[i] = new AsyncIOThread(g, String.format(threadNameFormat, indexfix), i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool);
this.ioWriteThreads[i] = this.ioReadThreads[i]; this.ioWriteThreads[i] = this.ioReadThreads[i];
} }
} }
if (client) { if (client) {
ObjectPool<ByteBuffer> unsafeBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), ObjectPool<ByteBuffer> unsafeBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(),
safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler()); safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler());
this.connectThread = client ? new ClientReadIOThread(String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool) this.connectThread = client ? new ClientReadIOThread(g, String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool)
: new AsyncIOThread(String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool); : new AsyncIOThread(g, String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool);
} else { } else {
this.connectThread = null; this.connectThread = null;
} }
@@ -148,18 +147,15 @@ public class AsyncIOGroup extends AsyncGroup {
return this; return this;
} }
public AsyncIOGroup dispose() { public synchronized AsyncIOGroup dispose() {
if (shareCount.decrementAndGet() > 0) {
return this;
}
if (closed) { if (closed) {
return this; return this;
} }
for (int i = 0; i < this.ioReadThreads.length; i++) { for (AsyncIOThread t : this.ioReadThreads) {
this.ioReadThreads[i].close(); t.close();
if (this.ioWriteThreads[i] != this.ioReadThreads[i]) { }
this.ioWriteThreads[i].close(); for (AsyncIOThread t : this.ioWriteThreads) {
} t.close();
} }
if (connectThread != null) { if (connectThread != null) {
connectThread.close(); connectThread.close();
@@ -181,9 +177,14 @@ public class AsyncIOGroup extends AsyncGroup {
return connClosedCounter; return connClosedCounter;
} }
public AsyncIOThread[] nextIOThreads() { public AsyncIOThread nextReadIOThread() {
int i = Math.abs(readIndex.getAndIncrement()) % ioReadThreads.length; int i = Math.abs(readIndex.getAndIncrement()) % ioReadThreads.length;
return new AsyncIOThread[]{ioReadThreads[i], ioWriteThreads[i]}; return ioReadThreads[i];
}
public AsyncIOThread nextWriteIOThread() {
int i = Math.abs(writeIndex.getAndIncrement()) % ioWriteThreads.length;
return ioWriteThreads[i];
} }
public AsyncIOThread connectThread() { public AsyncIOThread connectThread() {
@@ -235,20 +236,34 @@ public class AsyncIOGroup extends AsyncGroup {
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
AsyncIOThread[] ioThreads = null; AsyncIOThread readThread = null;
Thread currThread = Thread.currentThread(); AsyncIOThread writeThread = null;
if (currThread instanceof AsyncIOThread) { AsyncIOThread currThread = AsyncIOThread.currAsyncIOThread();
for (int i = 0; i < this.ioReadThreads.length; i++) { if (currThread != null) {
if (this.ioReadThreads[i] == currThread || this.ioWriteThreads[i] == currThread) { if (this.ioReadThreads[0].getThreadGroup() == currThread.getThreadGroup()) {
ioThreads = new AsyncIOThread[]{this.ioReadThreads[i], this.ioWriteThreads[i]}; for (int i = 0; i < this.ioReadThreads.length; i++) {
break; if (this.ioReadThreads[i].index() == currThread.index()) {
readThread = this.ioReadThreads[i];
break;
}
}
}
if (this.ioWriteThreads[0].getThreadGroup() == currThread.getThreadGroup()) {
for (int i = 0; i < this.ioWriteThreads.length; i++) {
if (this.ioWriteThreads[i].index() == currThread.index()) {
writeThread = this.ioWriteThreads[i];
break;
}
} }
} }
} }
if (ioThreads == null) { if (readThread == null) {
ioThreads = nextIOThreads(); readThread = nextReadIOThread();
} }
return new AsyncNioTcpConnection(true, this, ioThreads[0], ioThreads[1], channel, null, null, address); if (writeThread == null) {
writeThread = nextWriteIOThread();
}
return new AsyncNioTcpConnection(true, this, readThread, writeThread, channel, null, null, address);
} }
@Override @Override
@@ -304,20 +319,30 @@ public class AsyncIOGroup extends AsyncGroup {
private AsyncNioUdpConnection newUDPClientConnection(final SocketAddress address) throws IOException { private AsyncNioUdpConnection newUDPClientConnection(final SocketAddress address) throws IOException {
DatagramChannel channel = DatagramChannel.open(); DatagramChannel channel = DatagramChannel.open();
AsyncIOThread[] ioThreads = null; AsyncIOThread readThread = null;
Thread currThread = Thread.currentThread(); AsyncIOThread writeThread = null;
if (currThread instanceof AsyncIOThread) { AsyncIOThread currThread = AsyncIOThread.currAsyncIOThread();
if (currThread != null) {
for (int i = 0; i < this.ioReadThreads.length; i++) { for (int i = 0; i < this.ioReadThreads.length; i++) {
if (this.ioReadThreads[i] == currThread || this.ioWriteThreads[i] == currThread) { if (this.ioReadThreads[i].index() == currThread.index()) {
ioThreads = new AsyncIOThread[]{this.ioReadThreads[i], this.ioWriteThreads[i]}; readThread = this.ioReadThreads[i];
break;
}
}
for (int i = 0; i < this.ioWriteThreads.length; i++) {
if (this.ioWriteThreads[i].index() == currThread.index()) {
writeThread = this.ioWriteThreads[i];
break; break;
} }
} }
} }
if (ioThreads == null) { if (readThread == null) {
ioThreads = nextIOThreads(); readThread = nextReadIOThread();
} }
return new AsyncNioUdpConnection(true, this, ioThreads[0], ioThreads[1], channel, null, null, address); if (writeThread == null) {
writeThread = nextWriteIOThread();
}
return new AsyncNioUdpConnection(true, this, readThread, writeThread, channel, null, null, address);
} }
@Override @Override

View File

@@ -43,9 +43,9 @@ public class AsyncIOThread extends WorkThread {
private boolean closed; private boolean closed;
public AsyncIOThread(String name, int index, int threads, ExecutorService workExecutor, Selector selector, public AsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, Selector selector,
ObjectPool<ByteBuffer> unsafeBufferPool, ObjectPool<ByteBuffer> safeBufferPool) { ObjectPool<ByteBuffer> unsafeBufferPool, ObjectPool<ByteBuffer> safeBufferPool) {
super(name, index, threads, workExecutor, null); super(g, name, index, threads, workExecutor, null);
this.selector = selector; this.selector = selector;
this.setDaemon(true); this.setDaemon(true);
this.bufferSupplier = () -> (inCurrThread() ? unsafeBufferPool : safeBufferPool).get(); this.bufferSupplier = () -> (inCurrThread() ? unsafeBufferPool : safeBufferPool).get();
@@ -201,13 +201,15 @@ public class AsyncIOThread extends WorkThread {
} }
} }
public void close() { public synchronized void close() {
this.closed = true; if (!this.closed) {
this.interrupt(); this.interrupt();
try { try {
this.selector.close(); this.selector.close();
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.FINE, getName() + " selector close failed", e); logger.log(Level.FINE, getName() + " selector close failed", e);
}
this.closed = true;
} }
} }
} }

View File

@@ -131,8 +131,10 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
public void run() { public void run() {
final AsyncIOThread[] ioReadThreads = ioGroup.ioReadThreads; final AsyncIOThread[] ioReadThreads = ioGroup.ioReadThreads;
final AsyncIOThread[] ioWriteThreads = ioGroup.ioWriteThreads; final AsyncIOThread[] ioWriteThreads = ioGroup.ioWriteThreads;
int threads = ioReadThreads.length; final int reads = ioReadThreads.length;
int threadIndex = -1; final int writes = ioWriteThreads.length;
int readIndex = -1;
int writeIndex = -1;
Set<SelectionKey> keys = null; Set<SelectionKey> keys = null;
while (!closed) { while (!closed) {
try { try {
@@ -145,10 +147,13 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
} }
for (SelectionKey key : keys) { for (SelectionKey key : keys) {
if (key.isAcceptable()) { if (key.isAcceptable()) {
if (++threadIndex >= threads) { if (++readIndex >= reads) {
threadIndex = 0; readIndex = 0;
} }
accept(key, ioReadThreads[threadIndex], ioWriteThreads[threadIndex]); if (++writeIndex >= writes) {
writeIndex = 0;
}
accept(key, ioReadThreads[readIndex], ioWriteThreads[writeIndex]);
} }
} }
keys.clear(); keys.clear();

View File

@@ -123,17 +123,22 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
public void run() { public void run() {
final AsyncIOThread[] ioReadThreads = ioGroup.ioReadThreads; final AsyncIOThread[] ioReadThreads = ioGroup.ioReadThreads;
final AsyncIOThread[] ioWriteThreads = ioGroup.ioWriteThreads; final AsyncIOThread[] ioWriteThreads = ioGroup.ioWriteThreads;
int threads = ioReadThreads.length; final int reads = ioReadThreads.length;
int threadIndex = -1; final int writes = ioWriteThreads.length;
int readIndex = -1;
int writeIndex = -1;
while (!closed) { while (!closed) {
final ByteBuffer buffer = unsafeBufferPool.get(); final ByteBuffer buffer = unsafeBufferPool.get();
try { try {
SocketAddress address = serverChannel.receive(buffer); SocketAddress address = serverChannel.receive(buffer);
buffer.flip(); buffer.flip();
if (++threadIndex >= threads) { if (++readIndex >= reads) {
threadIndex = 0; readIndex = 0;
} }
accept(address, buffer, ioReadThreads[threadIndex], ioWriteThreads[threadIndex]); if (++writeIndex >= writes) {
writeIndex = 0;
}
accept(address, buffer, ioReadThreads[readIndex], ioWriteThreads[writeIndex]);
} catch (Throwable t) { } catch (Throwable t) {
unsafeBufferPool.accept(buffer); unsafeBufferPool.accept(buffer);
} }

View File

@@ -28,8 +28,8 @@ public class WorkThread extends Thread implements Executor {
private final int threads; //WorkThread个数 private final int threads; //WorkThread个数
public WorkThread(String name, int index, int threads, ExecutorService workExecutor, Runnable target) { public WorkThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, Runnable target) {
super(target); super(g, target);
if (name != null) { if (name != null) {
setName(name); setName(name);
} }
@@ -48,11 +48,12 @@ public class WorkThread extends Thread implements Executor {
public static ExecutorService createHashExecutor(final int threads, final String threadNameFormat) { public static ExecutorService createHashExecutor(final int threads, final String threadNameFormat) {
final AtomicReference<ExecutorService> ref = new AtomicReference<>(); final AtomicReference<ExecutorService> ref = new AtomicReference<>();
final AtomicInteger counter = new AtomicInteger(); final AtomicInteger counter = new AtomicInteger();
final ThreadGroup g = new ThreadGroup(String.format(threadNameFormat, "Group"));
return new ThreadHashExecutor(threads, (Runnable r) -> { return new ThreadHashExecutor(threads, (Runnable r) -> {
int i = counter.get(); int i = counter.get();
int c = counter.incrementAndGet(); int c = counter.incrementAndGet();
String threadName = String.format(threadNameFormat, formatIndex(threads, c)); String threadName = String.format(threadNameFormat, formatIndex(threads, c));
Thread t = new WorkThread(threadName, i, threads, ref.get(), r); Thread t = new WorkThread(g, threadName, i, threads, ref.get(), r);
return t; return t;
}); });
} }
@@ -60,11 +61,12 @@ public class WorkThread extends Thread implements Executor {
public static ExecutorService createExecutor(final int threads, final String threadNameFormat) { public static ExecutorService createExecutor(final int threads, final String threadNameFormat) {
final AtomicReference<ExecutorService> ref = new AtomicReference<>(); final AtomicReference<ExecutorService> ref = new AtomicReference<>();
final AtomicInteger counter = new AtomicInteger(); final AtomicInteger counter = new AtomicInteger();
final ThreadGroup g = new ThreadGroup(String.format(threadNameFormat, "Group"));
return Executors.newFixedThreadPool(threads, (Runnable r) -> { return Executors.newFixedThreadPool(threads, (Runnable r) -> {
int i = counter.get(); int i = counter.get();
int c = counter.incrementAndGet(); int c = counter.incrementAndGet();
String threadName = String.format(threadNameFormat, formatIndex(threads, c)); String threadName = String.format(threadNameFormat, formatIndex(threads, c));
Thread t = new WorkThread(threadName, i, threads, ref.get(), r); Thread t = new WorkThread(g, threadName, i, threads, ref.get(), r);
return t; return t;
}); });
} }

View File

@@ -21,9 +21,9 @@ import org.redkale.util.ObjectPool;
*/ */
public class ClientReadIOThread extends AsyncIOThread { public class ClientReadIOThread extends AsyncIOThread {
public ClientReadIOThread(String name, int index, int threads, ExecutorService workExecutor, Selector selector, public ClientReadIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, Selector selector,
ObjectPool<ByteBuffer> unsafeBufferPool, ObjectPool<ByteBuffer> safeBufferPool) { ObjectPool<ByteBuffer> unsafeBufferPool, ObjectPool<ByteBuffer> safeBufferPool) {
super(name, index, threads, workExecutor, selector, unsafeBufferPool, safeBufferPool); super(g, name, index, threads, workExecutor, selector, unsafeBufferPool, safeBufferPool);
} }
} }

View File

@@ -26,9 +26,9 @@ public class ClientWriteIOThread extends AsyncIOThread {
private final BlockingDeque<ClientFuture> requestQueue = new LinkedBlockingDeque<>(); private final BlockingDeque<ClientFuture> requestQueue = new LinkedBlockingDeque<>();
public ClientWriteIOThread(String name, int index, int threads, ExecutorService workExecutor, Selector selector, public ClientWriteIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, Selector selector,
ObjectPool<ByteBuffer> unsafeBufferPool, ObjectPool<ByteBuffer> safeBufferPool) { ObjectPool<ByteBuffer> unsafeBufferPool, ObjectPool<ByteBuffer> safeBufferPool) {
super(name, index, threads, workExecutor, selector, unsafeBufferPool, safeBufferPool); super(g, name, index, threads, workExecutor, selector, unsafeBufferPool, safeBufferPool);
} }
public void offerRequest(ClientConnection conn, ClientRequest request, ClientFuture respFuture) { public void offerRequest(ClientConnection conn, ClientRequest request, ClientFuture respFuture) {