增加 TransportStrategy 功能
This commit is contained in:
@@ -37,6 +37,7 @@
|
|||||||
threads: 线程总数, 默认: <group>节点数*CPU核数*8
|
threads: 线程总数, 默认: <group>节点数*CPU核数*8
|
||||||
bufferCapacity: ByteBuffer的初始化大小, 默认: 8K;
|
bufferCapacity: ByteBuffer的初始化大小, 默认: 8K;
|
||||||
bufferPoolSize: ByteBuffer池的大小,默认: <group>节点数*CPU核数*8
|
bufferPoolSize: ByteBuffer池的大小,默认: <group>节点数*CPU核数*8
|
||||||
|
strategy: 远程请求的负载均衡策略, 必须是org.redkale.net.TransportStrategy的实现类
|
||||||
-->
|
-->
|
||||||
<transport bufferCapacity="8K" bufferPoolSize="32" threads="32"/>
|
<transport bufferCapacity="8K" bufferPoolSize="32" threads="32"/>
|
||||||
|
|
||||||
|
|||||||
@@ -232,12 +232,14 @@ public final class Application {
|
|||||||
}
|
}
|
||||||
this.logger = Logger.getLogger(this.getClass().getSimpleName());
|
this.logger = Logger.getLogger(this.getClass().getSimpleName());
|
||||||
this.serversLatch = new CountDownLatch(config.getAnyValues("server").length + 1);
|
this.serversLatch = new CountDownLatch(config.getAnyValues("server").length + 1);
|
||||||
|
this.classLoader = new RedkaleClassLoader(Thread.currentThread().getContextClassLoader());
|
||||||
logger.log(Level.INFO, "------------------------------- Redkale " + Redkale.getDotedVersion() + " -------------------------------");
|
logger.log(Level.INFO, "------------------------------- Redkale " + Redkale.getDotedVersion() + " -------------------------------");
|
||||||
//------------------配置 <transport> 节点 ------------------
|
//------------------配置 <transport> 节点 ------------------
|
||||||
ObjectPool<ByteBuffer> transportPool = null;
|
ObjectPool<ByteBuffer> transportPool = null;
|
||||||
ExecutorService transportExec = null;
|
ExecutorService transportExec = null;
|
||||||
AsynchronousChannelGroup transportGroup = null;
|
AsynchronousChannelGroup transportGroup = null;
|
||||||
final AnyValue resources = config.getAnyValue("resources");
|
final AnyValue resources = config.getAnyValue("resources");
|
||||||
|
TransportStrategy strategy = null;
|
||||||
if (resources != null) {
|
if (resources != null) {
|
||||||
AnyValue transportConf = resources.getAnyValue("transport");
|
AnyValue transportConf = resources.getAnyValue("transport");
|
||||||
int groupsize = resources.getAnyValues("group").length;
|
int groupsize = resources.getAnyValues("group").length;
|
||||||
@@ -257,6 +259,10 @@ public final class Application {
|
|||||||
});
|
});
|
||||||
//-----------transportChannelGroup--------------
|
//-----------transportChannelGroup--------------
|
||||||
try {
|
try {
|
||||||
|
final String strategyClass = transportConf.getValue("strategy");
|
||||||
|
if (strategyClass != null && !strategyClass.isEmpty()) {
|
||||||
|
strategy = (TransportStrategy) classLoader.loadClass(strategyClass).newInstance();
|
||||||
|
}
|
||||||
final AtomicInteger counter = new AtomicInteger();
|
final AtomicInteger counter = new AtomicInteger();
|
||||||
transportExec = Executors.newFixedThreadPool(threads, (Runnable r) -> {
|
transportExec = Executors.newFixedThreadPool(threads, (Runnable r) -> {
|
||||||
Thread t = new Thread(r);
|
Thread t = new Thread(r);
|
||||||
@@ -271,8 +277,7 @@ public final class Application {
|
|||||||
logger.log(Level.INFO, Transport.class.getSimpleName() + " configure bufferCapacity = " + bufferCapacity + "; bufferPoolSize = " + bufferPoolSize + "; threads = " + threads + ";");
|
logger.log(Level.INFO, Transport.class.getSimpleName() + " configure bufferCapacity = " + bufferCapacity + "; bufferPoolSize = " + bufferPoolSize + "; threads = " + threads + ";");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.transportFactory = new TransportFactory(transportExec, transportPool, transportGroup);
|
this.transportFactory = new TransportFactory(transportExec, transportPool, transportGroup, strategy);
|
||||||
this.classLoader = new RedkaleClassLoader(Thread.currentThread().getContextClassLoader());
|
|
||||||
Thread.currentThread().setContextClassLoader(this.classLoader);
|
Thread.currentThread().setContextClassLoader(this.classLoader);
|
||||||
this.serverClassLoader = new RedkaleClassLoader(this.classLoader);
|
this.serverClassLoader = new RedkaleClassLoader(this.classLoader);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -56,15 +56,20 @@ public final class Transport {
|
|||||||
|
|
||||||
protected final ObjectPool<ByteBuffer> bufferPool;
|
protected final ObjectPool<ByteBuffer> bufferPool;
|
||||||
|
|
||||||
|
//负载均衡策略
|
||||||
|
protected final TransportStrategy strategy;
|
||||||
|
|
||||||
protected final ConcurrentHashMap<SocketAddress, BlockingQueue<AsyncConnection>> connPool = new ConcurrentHashMap<>();
|
protected final ConcurrentHashMap<SocketAddress, BlockingQueue<AsyncConnection>> connPool = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public Transport(String name, String subprotocol, final ObjectPool<ByteBuffer> transportBufferPool,
|
public Transport(String name, String subprotocol, final ObjectPool<ByteBuffer> transportBufferPool,
|
||||||
final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, final Collection<InetSocketAddress> addresses) {
|
final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress,
|
||||||
this(name, DEFAULT_PROTOCOL, subprotocol, transportBufferPool, transportChannelGroup, clientAddress, addresses);
|
final Collection<InetSocketAddress> addresses, final TransportStrategy strategy) {
|
||||||
|
this(name, DEFAULT_PROTOCOL, subprotocol, transportBufferPool, transportChannelGroup, clientAddress, addresses, strategy);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Transport(String name, String protocol, String subprotocol, final ObjectPool<ByteBuffer> transportBufferPool,
|
public Transport(String name, String protocol, String subprotocol, final ObjectPool<ByteBuffer> transportBufferPool,
|
||||||
final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, final Collection<InetSocketAddress> addresses) {
|
final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress,
|
||||||
|
final Collection<InetSocketAddress> addresses, final TransportStrategy strategy) {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.subprotocol = subprotocol == null ? "" : subprotocol.trim();
|
this.subprotocol = subprotocol == null ? "" : subprotocol.trim();
|
||||||
this.protocol = protocol;
|
this.protocol = protocol;
|
||||||
@@ -72,6 +77,7 @@ public final class Transport {
|
|||||||
this.group = transportChannelGroup;
|
this.group = transportChannelGroup;
|
||||||
this.bufferPool = transportBufferPool;
|
this.bufferPool = transportBufferPool;
|
||||||
this.clientAddress = clientAddress;
|
this.clientAddress = clientAddress;
|
||||||
|
this.strategy = strategy;
|
||||||
updateRemoteAddresses(addresses);
|
updateRemoteAddresses(addresses);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -132,6 +138,10 @@ public final class Transport {
|
|||||||
return remoteAddres;
|
return remoteAddres;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ConcurrentHashMap<SocketAddress, BlockingQueue<AsyncConnection>> getAsyncConnectionPool() {
|
||||||
|
return connPool;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return Transport.class.getSimpleName() + "{name = " + name + ", protocol = " + protocol + ", clientAddress = " + clientAddress + ", remoteAddres = " + Arrays.toString(remoteAddres) + "}";
|
return Transport.class.getSimpleName() + "{name = " + name + ", protocol = " + protocol + ", clientAddress = " + clientAddress + ", remoteAddres = " + Arrays.toString(remoteAddres) + "}";
|
||||||
@@ -158,6 +168,7 @@ public final class Transport {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public AsyncConnection pollConnection(SocketAddress addr) {
|
public AsyncConnection pollConnection(SocketAddress addr) {
|
||||||
|
if (this.strategy != null) return strategy.pollConnection(addr, this);
|
||||||
if (addr == null && remoteAddres.length == 1) addr = remoteAddres[0];
|
if (addr == null && remoteAddres.length == 1) addr = remoteAddres[0];
|
||||||
final boolean rand = addr == null;
|
final boolean rand = addr == null;
|
||||||
if (rand && remoteAddres.length < 1) throw new RuntimeException("Transport (" + this.name + ") have no remoteAddress list");
|
if (rand && remoteAddres.length < 1) throw new RuntimeException("Transport (" + this.name + ") have no remoteAddress list");
|
||||||
|
|||||||
@@ -43,10 +43,19 @@ public class TransportFactory {
|
|||||||
|
|
||||||
protected final List<WeakReference<Service>> services = new CopyOnWriteArrayList<>();
|
protected final List<WeakReference<Service>> services = new CopyOnWriteArrayList<>();
|
||||||
|
|
||||||
public TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup) {
|
//负载均衡策略
|
||||||
|
protected final TransportStrategy strategy;
|
||||||
|
|
||||||
|
public TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup,
|
||||||
|
final TransportStrategy strategy) {
|
||||||
this.executor = executor;
|
this.executor = executor;
|
||||||
this.bufferPool = bufferPool;
|
this.bufferPool = bufferPool;
|
||||||
this.channelGroup = channelGroup;
|
this.channelGroup = channelGroup;
|
||||||
|
this.strategy = strategy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup) {
|
||||||
|
this(executor, bufferPool, channelGroup, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String findGroupName(InetSocketAddress addr) {
|
public String findGroupName(InetSocketAddress addr) {
|
||||||
@@ -127,14 +136,14 @@ public class TransportFactory {
|
|||||||
}
|
}
|
||||||
if (info == null) return null;
|
if (info == null) return null;
|
||||||
if (sncpAddress != null) addresses.remove(sncpAddress);
|
if (sncpAddress != null) addresses.remove(sncpAddress);
|
||||||
return new Transport(groups.stream().sorted().collect(Collectors.joining(";")), info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, addresses);
|
return new Transport(groups.stream().sorted().collect(Collectors.joining(";")), info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, addresses, this.strategy);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Transport loadTransport(final String groupName, InetSocketAddress sncpAddress) {
|
private Transport loadTransport(final String groupName, InetSocketAddress sncpAddress) {
|
||||||
if (groupName == null) return null;
|
if (groupName == null) return null;
|
||||||
TransportGroupInfo info = groupInfos.get(groupName);
|
TransportGroupInfo info = groupInfos.get(groupName);
|
||||||
if (info == null) return null;
|
if (info == null) return null;
|
||||||
return new Transport(groupName, info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, info.addresses);
|
return new Transport(groupName, info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, info.addresses, this.strategy);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ExecutorService getExecutor() {
|
public ExecutorService getExecutor() {
|
||||||
|
|||||||
21
src/org/redkale/net/TransportStrategy.java
Normal file
21
src/org/redkale/net/TransportStrategy.java
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
/*
|
||||||
|
* To change this license header, choose License Headers in Project Properties.
|
||||||
|
* To change this template file, choose Tools | Templates
|
||||||
|
* and open the template in the editor.
|
||||||
|
*/
|
||||||
|
package org.redkale.net;
|
||||||
|
|
||||||
|
import java.net.SocketAddress;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 远程请求的负载均衡策略
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* 详情见: https://redkale.org
|
||||||
|
*
|
||||||
|
* @author zhangjx
|
||||||
|
*/
|
||||||
|
public interface TransportStrategy {
|
||||||
|
|
||||||
|
public AsyncConnection pollConnection(SocketAddress addr, Transport transport);
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user