优化ByteBuffer对象池

This commit is contained in:
Redkale
2023-01-14 19:45:57 +08:00
parent 9192cb4606
commit dc0849d82a
10 changed files with 63 additions and 43 deletions

View File

@@ -82,23 +82,16 @@ public class AsyncIOGroup extends AsyncGroup {
try { try {
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
String indexfix = WorkThread.formatIndex(threads, i + 1); String indexfix = WorkThread.formatIndex(threads, i + 1);
ObjectPool<ByteBuffer> unsafeReadBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(),
safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler());
if (client) { if (client) {
this.ioReadThreads[i] = new ClientReadIOThread(g, String.format(threadNameFormat, "Read-" + indexfix), i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool); this.ioReadThreads[i] = new ClientReadIOThread(g, String.format(threadNameFormat, "Read-" + indexfix), i, threads, workExecutor, safeBufferPool);
ObjectPool<ByteBuffer> unsafeWriteBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), this.ioWriteThreads[i] = new ClientWriteIOThread(g, String.format(threadNameFormat, "Write-" + indexfix), i, threads, workExecutor, safeBufferPool);
safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler());
this.ioWriteThreads[i] = new ClientWriteIOThread(g, String.format(threadNameFormat, "Write-" + indexfix), i, threads, workExecutor, Selector.open(), unsafeWriteBufferPool, safeBufferPool);
} else { } else {
this.ioReadThreads[i] = new AsyncIOThread(g, String.format(threadNameFormat, indexfix), i, threads, workExecutor, Selector.open(), unsafeReadBufferPool, safeBufferPool); this.ioReadThreads[i] = new AsyncIOThread(g, String.format(threadNameFormat, indexfix), i, threads, workExecutor, safeBufferPool);
this.ioWriteThreads[i] = this.ioReadThreads[i]; this.ioWriteThreads[i] = this.ioReadThreads[i];
} }
} }
if (client) { if (client) {
ObjectPool<ByteBuffer> unsafeBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(), this.connectThread = new ClientReadIOThread(g, String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, safeBufferPool);
safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler());
this.connectThread = client ? new ClientReadIOThread(g, String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool)
: new AsyncIOThread(g, String.format(threadNameFormat, "Connect"), 0, 0, workExecutor, Selector.open(), unsafeBufferPool, safeBufferPool);
} else { } else {
this.connectThread = null; this.connectThread = null;
} }

View File

@@ -5,6 +5,7 @@
*/ */
package org.redkale.net; package org.redkale.net;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.*; import java.util.*;
@@ -43,13 +44,13 @@ public class AsyncIOThread extends WorkThread {
private boolean closed; private boolean closed;
public AsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, Selector selector, public AsyncIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ObjectPool<ByteBuffer> safeBufferPool) throws IOException {
ObjectPool<ByteBuffer> unsafeBufferPool, ObjectPool<ByteBuffer> safeBufferPool) {
super(g, name, index, threads, workExecutor, null); super(g, name, index, threads, workExecutor, null);
this.selector = selector; this.selector = Selector.open();
this.setDaemon(true); this.setDaemon(true);
this.bufferSupplier = () -> (inCurrThread() ? unsafeBufferPool : safeBufferPool).get(); ObjectPool<ByteBuffer> unsafeBufferPool = ObjectPool.createUnsafePool(this, 512, safeBufferPool);
this.bufferConsumer = (v) -> (inCurrThread() ? unsafeBufferPool : safeBufferPool).accept(v); this.bufferSupplier = unsafeBufferPool;
this.bufferConsumer = unsafeBufferPool;
} }
protected boolean isClosed() { protected boolean isClosed() {

View File

@@ -92,8 +92,8 @@ class AsyncNioTcpProtocolServer extends ProtocolServer {
LongAdder createResponseCounter = new LongAdder(); LongAdder createResponseCounter = new LongAdder();
LongAdder cycleResponseCounter = new LongAdder(); LongAdder cycleResponseCounter = new LongAdder();
ObjectPool<ByteBuffer> safeBufferPool = server.createBufferSafePool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize); ObjectPool<ByteBuffer> safeBufferPool = server.createSafeBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize);
ObjectPool<Response> safeResponsePool = server.createResponseSafePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize); ObjectPool<Response> safeResponsePool = server.createSafeResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize);
final int respPoolMax = server.getResponsePoolSize(); final int respPoolMax = server.getResponsePoolSize();
ThreadLocal<ObjectPool<Response>> localResponsePool = ThreadLocal.withInitial(() -> { ThreadLocal<ObjectPool<Response>> localResponsePool = ThreadLocal.withInitial(() -> {
if (!(Thread.currentThread() instanceof WorkThread)) { if (!(Thread.currentThread() instanceof WorkThread)) {

View File

@@ -89,8 +89,8 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
LongAdder createResponseCounter = new LongAdder(); LongAdder createResponseCounter = new LongAdder();
LongAdder cycleResponseCounter = new LongAdder(); LongAdder cycleResponseCounter = new LongAdder();
ObjectPool<ByteBuffer> safeBufferPool = server.createBufferSafePool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize); ObjectPool<ByteBuffer> safeBufferPool = server.createSafeBufferPool(createBufferCounter, cycleBufferCounter, server.bufferPoolSize);
ObjectPool<Response> safeResponsePool = server.createResponseSafePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize); ObjectPool<Response> safeResponsePool = server.createSafeResponsePool(createResponseCounter, cycleResponseCounter, server.responsePoolSize);
ThreadLocal<ObjectPool<Response>> localResponsePool = ThreadLocal.withInitial(() -> { ThreadLocal<ObjectPool<Response>> localResponsePool = ThreadLocal.withInitial(() -> {
if (!(Thread.currentThread() instanceof WorkThread)) { if (!(Thread.currentThread() instanceof WorkThread)) {
return null; return null;
@@ -112,9 +112,6 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
this.serverChannel.register(this.selector, SelectionKey.OP_READ); this.serverChannel.register(this.selector, SelectionKey.OP_READ);
this.acceptThread = new Thread() { this.acceptThread = new Thread() {
ObjectPool<ByteBuffer> unsafeBufferPool = ObjectPool.createUnsafePool(safeBufferPool, safeBufferPool.getCreatCounter(),
safeBufferPool.getCycleCounter(), 512, safeBufferPool.getCreator(), safeBufferPool.getPrepare(), safeBufferPool.getRecycler());
{ {
setName(String.format(threadNameFormat, "Accept")); setName(String.format(threadNameFormat, "Accept"));
} }
@@ -127,6 +124,7 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
final int writes = ioWriteThreads.length; final int writes = ioWriteThreads.length;
int readIndex = -1; int readIndex = -1;
int writeIndex = -1; int writeIndex = -1;
ObjectPool<ByteBuffer> unsafeBufferPool = ObjectPool.createUnsafePool(null, 512, safeBufferPool);
while (!closed) { while (!closed) {
final ByteBuffer buffer = unsafeBufferPool.get(); final ByteBuffer buffer = unsafeBufferPool.get();
try { try {

View File

@@ -423,10 +423,10 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
} }
//必须在 createContext()之后调用 //必须在 createContext()之后调用
protected abstract ObjectPool<ByteBuffer> createBufferSafePool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize); protected abstract ObjectPool<ByteBuffer> createSafeBufferPool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize);
//必须在 createContext()之后调用 //必须在 createContext()之后调用
protected abstract ObjectPool<P> createResponseSafePool(LongAdder createCounter, LongAdder cycleCounter, int responsePoolSize); protected abstract ObjectPool<P> createSafeResponsePool(LongAdder createCounter, LongAdder cycleCounter, int responsePoolSize);
public void shutdown() throws IOException { public void shutdown() throws IOException {
long s = System.currentTimeMillis(); long s = System.currentTimeMillis();

View File

@@ -3,8 +3,8 @@
*/ */
package org.redkale.net.client; package org.redkale.net.client;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.Selector;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.redkale.net.AsyncIOThread; import org.redkale.net.AsyncIOThread;
import org.redkale.util.ObjectPool; import org.redkale.util.ObjectPool;
@@ -21,9 +21,9 @@ import org.redkale.util.ObjectPool;
*/ */
public class ClientReadIOThread extends AsyncIOThread { public class ClientReadIOThread extends AsyncIOThread {
public ClientReadIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, Selector selector, public ClientReadIOThread(ThreadGroup g, String name, int index, int threads,
ObjectPool<ByteBuffer> unsafeBufferPool, ObjectPool<ByteBuffer> safeBufferPool) { ExecutorService workExecutor, ObjectPool<ByteBuffer> safeBufferPool) throws IOException {
super(g, name, index, threads, workExecutor, selector, unsafeBufferPool, safeBufferPool); super(g, name, index, threads, workExecutor, safeBufferPool);
} }
} }

View File

@@ -3,9 +3,9 @@
*/ */
package org.redkale.net.client; package org.redkale.net.client;
import java.io.Serializable; import java.io.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.CompletionHandler;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@@ -26,9 +26,9 @@ public class ClientWriteIOThread extends AsyncIOThread {
private final BlockingDeque<ClientFuture> requestQueue = new LinkedBlockingDeque<>(); private final BlockingDeque<ClientFuture> requestQueue = new LinkedBlockingDeque<>();
public ClientWriteIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, Selector selector, public ClientWriteIOThread(ThreadGroup g, String name, int index, int threads,
ObjectPool<ByteBuffer> unsafeBufferPool, ObjectPool<ByteBuffer> safeBufferPool) { ExecutorService workExecutor, ObjectPool<ByteBuffer> safeBufferPool) throws IOException {
super(g, name, index, threads, workExecutor, selector, unsafeBufferPool, safeBufferPool); super(g, name, index, threads, workExecutor, safeBufferPool);
} }
public void offerRequest(ClientConnection conn, ClientRequest request, ClientFuture respFuture) { public void offerRequest(ClientConnection conn, ClientRequest request, ClientFuture respFuture) {

View File

@@ -41,6 +41,8 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
private HttpResponseConfig respConfig; private HttpResponseConfig respConfig;
private ObjectPool<ByteBuffer> safeBufferPool;
public HttpServer() { public HttpServer() {
this(null, System.currentTimeMillis(), ResourceFactory.create()); this(null, System.currentTimeMillis(), ResourceFactory.create());
} }
@@ -541,7 +543,7 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
} }
@Override @Override
protected ObjectPool<ByteBuffer> createBufferSafePool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize) { protected ObjectPool<ByteBuffer> createSafeBufferPool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize) {
final int rcapacity = this.bufferCapacity; final int rcapacity = this.bufferCapacity;
ObjectPool<ByteBuffer> bufferPool = ObjectPool.createSafePool(createCounter, cycleCounter, bufferPoolSize, ObjectPool<ByteBuffer> bufferPool = ObjectPool.createSafePool(createCounter, cycleCounter, bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> { (Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
@@ -551,11 +553,12 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
e.clear(); e.clear();
return true; return true;
}); });
this.safeBufferPool = bufferPool;
return bufferPool; return bufferPool;
} }
@Override @Override
protected ObjectPool<HttpResponse> createResponseSafePool(LongAdder createCounter, LongAdder cycleCounter, int responsePoolSize) { protected ObjectPool<HttpResponse> createSafeResponsePool(LongAdder createCounter, LongAdder cycleCounter, int responsePoolSize) {
Creator<HttpResponse> creator = (Object... params) -> new HttpResponse(this.context, new HttpRequest(this.context), this.respConfig); Creator<HttpResponse> creator = (Object... params) -> new HttpResponse(this.context, new HttpRequest(this.context), this.respConfig);
ObjectPool<HttpResponse> pool = ObjectPool.createSafePool(createCounter, cycleCounter, responsePoolSize, creator, HttpResponse::prepare, HttpResponse::recycle); ObjectPool<HttpResponse> pool = ObjectPool.createSafePool(createCounter, cycleCounter, responsePoolSize, creator, HttpResponse::prepare, HttpResponse::recycle);
return pool; return pool;

View File

@@ -127,7 +127,7 @@ public class SncpServer extends Server<Uint128, SncpContext, SncpRequest, SncpRe
} }
@Override @Override
protected ObjectPool<ByteBuffer> createBufferSafePool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize) { protected ObjectPool<ByteBuffer> createSafeBufferPool(LongAdder createCounter, LongAdder cycleCounter, int bufferPoolSize) {
final int rcapacity = this.bufferCapacity; final int rcapacity = this.bufferCapacity;
ObjectPool<ByteBuffer> bufferPool = ObjectPool.createSafePool(createCounter, cycleCounter, bufferPoolSize, ObjectPool<ByteBuffer> bufferPool = ObjectPool.createSafePool(createCounter, cycleCounter, bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> { (Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
@@ -141,7 +141,7 @@ public class SncpServer extends Server<Uint128, SncpContext, SncpRequest, SncpRe
} }
@Override @Override
protected ObjectPool<SncpResponse> createResponseSafePool(LongAdder createCounter, LongAdder cycleCounter, int responsePoolSize) { protected ObjectPool<SncpResponse> createSafeResponsePool(LongAdder createCounter, LongAdder cycleCounter, int responsePoolSize) {
Creator<SncpResponse> creator = (Object... params) -> new SncpResponse(this.context, new SncpRequest(this.context)); Creator<SncpResponse> creator = (Object... params) -> new SncpResponse(this.context, new SncpRequest(this.context));
ObjectPool<SncpResponse> pool = ObjectPool.createSafePool(createCounter, cycleCounter, responsePoolSize, creator, SncpResponse::prepare, SncpResponse::recycle); ObjectPool<SncpResponse> pool = ObjectPool.createSafePool(createCounter, cycleCounter, responsePoolSize, creator, SncpResponse::prepare, SncpResponse::recycle);
return pool; return pool;

View File

@@ -45,10 +45,14 @@ public class ObjectPool<T> implements Supplier<T>, Consumer<T> {
protected Thread unsafeThread; protected Thread unsafeThread;
protected ObjectPool(ObjectPool<T> parent, LongAdder creatCounter, LongAdder cycleCounter, int max, Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler, Queue<T> queue) { //true表示unsafeThread不为空且当前为非线程安全版且parent为线程安全版
protected final boolean safeCombine;
protected ObjectPool(ObjectPool<T> parent, LongAdder creatCounter, LongAdder cycleCounter, Thread unsafeThread, int max, Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler, Queue<T> queue) {
this.parent = parent; this.parent = parent;
this.creatCounter = creatCounter; this.creatCounter = creatCounter;
this.cycleCounter = cycleCounter; this.cycleCounter = cycleCounter;
this.unsafeThread = unsafeThread;
this.creator = creator; this.creator = creator;
this.prepare = prepare; this.prepare = prepare;
this.recycler = recycler; this.recycler = recycler;
@@ -56,6 +60,7 @@ public class ObjectPool<T> implements Supplier<T>, Consumer<T> {
this.debug = logger.isLoggable(Level.FINEST); this.debug = logger.isLoggable(Level.FINEST);
this.queue = queue; this.queue = queue;
this.unsafeDequeable = queue instanceof ArrayDeque; this.unsafeDequeable = queue instanceof ArrayDeque;
this.safeCombine = unsafeThread != null && unsafeDequeable && parent != null && !parent.unsafeDequeable;
} }
//非线程安全版 //非线程安全版
@@ -125,10 +130,21 @@ public class ObjectPool<T> implements Supplier<T>, Consumer<T> {
//非线程安全版 //非线程安全版
public static <T> ObjectPool<T> createUnsafePool(ObjectPool<T> parent, LongAdder creatCounter, LongAdder cycleCounter, int max, Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) { public static <T> ObjectPool<T> createUnsafePool(ObjectPool<T> parent, LongAdder creatCounter, LongAdder cycleCounter, int max, Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
return new ObjectPool(parent, creatCounter, cycleCounter, Math.max(Utility.cpus(), max), return new ObjectPool(parent, creatCounter, cycleCounter, null, Math.max(Utility.cpus(), max),
creator, prepare, recycler, new ArrayDeque<>(Math.max(Utility.cpus(), max))); creator, prepare, recycler, new ArrayDeque<>(Math.max(Utility.cpus(), max)));
} }
//非线程安全版
public static <T> ObjectPool<T> createUnsafePool(ObjectPool<T> parent, LongAdder creatCounter, LongAdder cycleCounter, Thread unsafeThread, int max, Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
return new ObjectPool(parent, creatCounter, cycleCounter, unsafeThread, Math.max(Utility.cpus(), max),
creator, prepare, recycler, new ArrayDeque<>(Math.max(Utility.cpus(), max)));
}
//非线程安全版
public static <T> ObjectPool<T> createUnsafePool(Thread unsafeThread, int max, ObjectPool<T> safePool) {
return createUnsafePool(safePool, safePool.getCreatCounter(), safePool.getCycleCounter(), unsafeThread, max, safePool.getCreator(), safePool.getPrepare(), safePool.getRecycler());
}
//线程安全版 //线程安全版
public static <T> ObjectPool<T> createSafePool(Class<T> clazz, Consumer<T> prepare, Predicate<T> recycler) { public static <T> ObjectPool<T> createSafePool(Class<T> clazz, Consumer<T> prepare, Predicate<T> recycler) {
return createSafePool(2, clazz, prepare, recycler); return createSafePool(2, clazz, prepare, recycler);
@@ -161,7 +177,7 @@ public class ObjectPool<T> implements Supplier<T>, Consumer<T> {
//线程安全版 //线程安全版
public static <T> ObjectPool<T> createSafePool(LongAdder creatCounter, LongAdder cycleCounter, int max, Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) { public static <T> ObjectPool<T> createSafePool(LongAdder creatCounter, LongAdder cycleCounter, int max, Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
return new ObjectPool(null, creatCounter, cycleCounter, Math.max(Utility.cpus(), max), return new ObjectPool(null, creatCounter, cycleCounter, null, Math.max(Utility.cpus(), max),
creator, prepare, recycler, new LinkedBlockingQueue<>(Math.max(Utility.cpus(), max))); creator, prepare, recycler, new LinkedBlockingQueue<>(Math.max(Utility.cpus(), max)));
} }
@@ -195,7 +211,11 @@ public class ObjectPool<T> implements Supplier<T>, Consumer<T> {
@Override @Override
public T get() { public T get() {
if (unsafeDequeable) { if (safeCombine) {
if (Thread.currentThread() != unsafeThread) {
return parent.get();
}
} else if (unsafeDequeable) {
if (unsafeThread == null) { if (unsafeThread == null) {
unsafeThread = Thread.currentThread(); unsafeThread = Thread.currentThread();
} else if (unsafeThread != Thread.currentThread()) { } else if (unsafeThread != Thread.currentThread()) {
@@ -225,7 +245,12 @@ public class ObjectPool<T> implements Supplier<T>, Consumer<T> {
if (e == null) { if (e == null) {
return; return;
} }
if (unsafeDequeable) { if (safeCombine) {
if (Thread.currentThread() != unsafeThread) {
parent.accept(e);
return;
}
} else if (unsafeDequeable) {
if (unsafeThread == null) { if (unsafeThread == null) {
unsafeThread = Thread.currentThread(); unsafeThread = Thread.currentThread();
} else if (unsafeThread != Thread.currentThread()) { } else if (unsafeThread != Thread.currentThread()) {