diff --git a/src/META-INF/application-template.xml b/src/META-INF/application-template.xml index a8b9de8a1..5410bacd2 100644 --- a/src/META-INF/application-template.xml +++ b/src/META-INF/application-template.xml @@ -37,6 +37,7 @@ threads: 线程总数, 默认: 节点数*CPU核数*8 bufferCapacity: ByteBuffer的初始化大小, 默认: 8K; bufferPoolSize: ByteBuffer池的大小,默认: 节点数*CPU核数*8 + strategy: 远程请求的负载均衡策略, 必须是org.redkale.net.TransportStrategy的实现类 --> diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index 299d82177..2630b43bf 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -232,12 +232,14 @@ public final class Application { } this.logger = Logger.getLogger(this.getClass().getSimpleName()); this.serversLatch = new CountDownLatch(config.getAnyValues("server").length + 1); + this.classLoader = new RedkaleClassLoader(Thread.currentThread().getContextClassLoader()); logger.log(Level.INFO, "------------------------------- Redkale " + Redkale.getDotedVersion() + " -------------------------------"); //------------------配置 节点 ------------------ ObjectPool transportPool = null; ExecutorService transportExec = null; AsynchronousChannelGroup transportGroup = null; final AnyValue resources = config.getAnyValue("resources"); + TransportStrategy strategy = null; if (resources != null) { AnyValue transportConf = resources.getAnyValue("transport"); int groupsize = resources.getAnyValues("group").length; @@ -257,6 +259,10 @@ public final class Application { }); //-----------transportChannelGroup-------------- try { + final String strategyClass = transportConf.getValue("strategy"); + if (strategyClass != null && !strategyClass.isEmpty()) { + strategy = (TransportStrategy) classLoader.loadClass(strategyClass).newInstance(); + } final AtomicInteger counter = new AtomicInteger(); transportExec = Executors.newFixedThreadPool(threads, (Runnable r) -> { Thread t = new Thread(r); @@ -271,8 +277,7 @@ public final class Application { logger.log(Level.INFO, Transport.class.getSimpleName() + " configure bufferCapacity = " + bufferCapacity + "; bufferPoolSize = " + bufferPoolSize + "; threads = " + threads + ";"); } } - this.transportFactory = new TransportFactory(transportExec, transportPool, transportGroup); - this.classLoader = new RedkaleClassLoader(Thread.currentThread().getContextClassLoader()); + this.transportFactory = new TransportFactory(transportExec, transportPool, transportGroup, strategy); Thread.currentThread().setContextClassLoader(this.classLoader); this.serverClassLoader = new RedkaleClassLoader(this.classLoader); } diff --git a/src/org/redkale/net/Transport.java b/src/org/redkale/net/Transport.java index aec11106a..239f67ebc 100644 --- a/src/org/redkale/net/Transport.java +++ b/src/org/redkale/net/Transport.java @@ -56,15 +56,20 @@ public final class Transport { protected final ObjectPool bufferPool; + //负载均衡策略 + protected final TransportStrategy strategy; + protected final ConcurrentHashMap> connPool = new ConcurrentHashMap<>(); public Transport(String name, String subprotocol, final ObjectPool transportBufferPool, - final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, final Collection addresses) { - this(name, DEFAULT_PROTOCOL, subprotocol, transportBufferPool, transportChannelGroup, clientAddress, addresses); + final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, + final Collection addresses, final TransportStrategy strategy) { + this(name, DEFAULT_PROTOCOL, subprotocol, transportBufferPool, transportChannelGroup, clientAddress, addresses, strategy); } public Transport(String name, String protocol, String subprotocol, final ObjectPool transportBufferPool, - final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, final Collection addresses) { + final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, + final Collection addresses, final TransportStrategy strategy) { this.name = name; this.subprotocol = subprotocol == null ? "" : subprotocol.trim(); this.protocol = protocol; @@ -72,6 +77,7 @@ public final class Transport { this.group = transportChannelGroup; this.bufferPool = transportBufferPool; this.clientAddress = clientAddress; + this.strategy = strategy; updateRemoteAddresses(addresses); } @@ -132,6 +138,10 @@ public final class Transport { return remoteAddres; } + public ConcurrentHashMap> getAsyncConnectionPool() { + return connPool; + } + @Override public String toString() { return Transport.class.getSimpleName() + "{name = " + name + ", protocol = " + protocol + ", clientAddress = " + clientAddress + ", remoteAddres = " + Arrays.toString(remoteAddres) + "}"; @@ -158,6 +168,7 @@ public final class Transport { } public AsyncConnection pollConnection(SocketAddress addr) { + if (this.strategy != null) return strategy.pollConnection(addr, this); if (addr == null && remoteAddres.length == 1) addr = remoteAddres[0]; final boolean rand = addr == null; if (rand && remoteAddres.length < 1) throw new RuntimeException("Transport (" + this.name + ") have no remoteAddress list"); diff --git a/src/org/redkale/net/TransportFactory.java b/src/org/redkale/net/TransportFactory.java index 165174c63..ada2fb1fb 100644 --- a/src/org/redkale/net/TransportFactory.java +++ b/src/org/redkale/net/TransportFactory.java @@ -43,10 +43,19 @@ public class TransportFactory { protected final List> services = new CopyOnWriteArrayList<>(); - public TransportFactory(ExecutorService executor, ObjectPool bufferPool, AsynchronousChannelGroup channelGroup) { + //负载均衡策略 + protected final TransportStrategy strategy; + + public TransportFactory(ExecutorService executor, ObjectPool bufferPool, AsynchronousChannelGroup channelGroup, + final TransportStrategy strategy) { this.executor = executor; this.bufferPool = bufferPool; this.channelGroup = channelGroup; + this.strategy = strategy; + } + + public TransportFactory(ExecutorService executor, ObjectPool bufferPool, AsynchronousChannelGroup channelGroup) { + this(executor, bufferPool, channelGroup, null); } public String findGroupName(InetSocketAddress addr) { @@ -127,14 +136,14 @@ public class TransportFactory { } if (info == null) return null; if (sncpAddress != null) addresses.remove(sncpAddress); - return new Transport(groups.stream().sorted().collect(Collectors.joining(";")), info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, addresses); + return new Transport(groups.stream().sorted().collect(Collectors.joining(";")), info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, addresses, this.strategy); } private Transport loadTransport(final String groupName, InetSocketAddress sncpAddress) { if (groupName == null) return null; TransportGroupInfo info = groupInfos.get(groupName); if (info == null) return null; - return new Transport(groupName, info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, info.addresses); + return new Transport(groupName, info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, info.addresses, this.strategy); } public ExecutorService getExecutor() { diff --git a/src/org/redkale/net/TransportStrategy.java b/src/org/redkale/net/TransportStrategy.java new file mode 100644 index 000000000..f086cbd32 --- /dev/null +++ b/src/org/redkale/net/TransportStrategy.java @@ -0,0 +1,21 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.net; + +import java.net.SocketAddress; + +/** + * 远程请求的负载均衡策略 + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + */ +public interface TransportStrategy { + + public AsyncConnection pollConnection(SocketAddress addr, Transport transport); +}