diff --git a/src/org/redkale/net/TransportFactory.java b/src/org/redkale/net/TransportFactory.java index 90c476f96..1b06b2e43 100644 --- a/src/org/redkale/net/TransportFactory.java +++ b/src/org/redkale/net/TransportFactory.java @@ -5,12 +5,14 @@ */ package org.redkale.net; +import java.io.IOException; import java.lang.ref.WeakReference; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousChannelGroup; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.*; import java.util.logging.*; import java.util.stream.Collectors; import org.redkale.service.Service; @@ -58,6 +60,33 @@ public class TransportFactory { this(executor, bufferPool, channelGroup, null); } + public static TransportFactory create(int threads) { + return create(threads, threads * 2, 8 * 1024); + } + + public static TransportFactory create(int threads, int bufferPoolSize, int bufferCapacity) { + final ObjectPool transportPool = new ObjectPool<>(new AtomicLong(), new AtomicLong(), bufferPoolSize, + (Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> { + if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) return false; + e.clear(); + return true; + }); + final AtomicInteger counter = new AtomicInteger(); + ExecutorService transportExec = Executors.newFixedThreadPool(threads, (Runnable r) -> { + Thread t = new Thread(r); + t.setDaemon(true); + t.setName("Transport-Thread-" + counter.incrementAndGet()); + return t; + }); + AsynchronousChannelGroup transportGroup = null; + try { + transportGroup = AsynchronousChannelGroup.withCachedThreadPool(transportExec, 1); + } catch (IOException e) { + throw new RuntimeException(e); + } + return create(transportExec, transportPool, transportGroup); + } + public static TransportFactory create(ExecutorService executor, ObjectPool bufferPool, AsynchronousChannelGroup channelGroup) { return new TransportFactory(executor, bufferPool, channelGroup, null); }