Compare commits
29 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
44507a97a6 | ||
|
|
f4a7f1cff6 | ||
|
|
a5fcb45a88 | ||
|
|
bc8b68526d | ||
|
|
180f201dc0 | ||
|
|
9ab315a405 | ||
|
|
27b4742b6d | ||
|
|
702220d18e | ||
|
|
414489da8e | ||
|
|
77057df25d | ||
|
|
2f98cd1ab5 | ||
|
|
8809fe8ec9 | ||
|
|
f9702a9517 | ||
|
|
29e46b9b68 | ||
|
|
f838e35413 | ||
|
|
f3bb77c49b | ||
|
|
12fa033e15 | ||
|
|
f4abfafea2 | ||
|
|
0918af71d2 | ||
|
|
275befa330 | ||
|
|
ab4cd8bcb6 | ||
|
|
36c109b32f | ||
|
|
73a915665d | ||
|
|
bd6d71c94a | ||
|
|
842e93507c | ||
|
|
76df1108d7 | ||
|
|
941d09cde2 | ||
|
|
9dd3e1da07 | ||
|
|
2bf73245ec |
@@ -18,7 +18,7 @@ java.util.logging.FileHandler.limit = 10485760
|
|||||||
java.util.logging.FileHandler.count = 10000
|
java.util.logging.FileHandler.count = 10000
|
||||||
java.util.logging.FileHandler.encoding = UTF-8
|
java.util.logging.FileHandler.encoding = UTF-8
|
||||||
java.util.logging.FileHandler.pattern = ${APP_HOME}/logs-%m/log-%d.log
|
java.util.logging.FileHandler.pattern = ${APP_HOME}/logs-%m/log-%d.log
|
||||||
java.util.logging.FileHandler.unusual = ${APP_HOME}/logs-%m/log-warnerr-%u.log
|
java.util.logging.FileHandler.unusual = ${APP_HOME}/logs-%m/log-warnerr-%d.log
|
||||||
java.util.logging.FileHandler.append = true
|
java.util.logging.FileHandler.append = true
|
||||||
|
|
||||||
java.util.logging.ConsoleHandler.level = FINER
|
java.util.logging.ConsoleHandler.level = FINER
|
||||||
|
|||||||
@@ -37,6 +37,7 @@
|
|||||||
threads: 线程总数, 默认: <group>节点数*CPU核数*8
|
threads: 线程总数, 默认: <group>节点数*CPU核数*8
|
||||||
bufferCapacity: ByteBuffer的初始化大小, 默认: 8K;
|
bufferCapacity: ByteBuffer的初始化大小, 默认: 8K;
|
||||||
bufferPoolSize: ByteBuffer池的大小,默认: <group>节点数*CPU核数*8
|
bufferPoolSize: ByteBuffer池的大小,默认: <group>节点数*CPU核数*8
|
||||||
|
strategy: 远程请求的负载均衡策略, 必须是org.redkale.net.TransportStrategy的实现类
|
||||||
-->
|
-->
|
||||||
<transport bufferCapacity="8K" bufferPoolSize="32" threads="32"/>
|
<transport bufferCapacity="8K" bufferPoolSize="32" threads="32"/>
|
||||||
|
|
||||||
|
|||||||
@@ -15,9 +15,9 @@ com.sun.level = INFO
|
|||||||
java.util.logging.FileHandler.limit = 10485760
|
java.util.logging.FileHandler.limit = 10485760
|
||||||
java.util.logging.FileHandler.count = 100
|
java.util.logging.FileHandler.count = 100
|
||||||
java.util.logging.FileHandler.encoding = UTF-8
|
java.util.logging.FileHandler.encoding = UTF-8
|
||||||
java.util.logging.FileHandler.pattern = ${APP_HOME}/logs-%m/log-%u.log
|
java.util.logging.FileHandler.pattern = ${APP_HOME}/logs-%m/log-%d.log
|
||||||
#java.util.logging.FileHandler.unusual \u5c5e\u6027\u8868\u793a\u5c06 WARNING\u3001SEVERE \u7ea7\u522b\u7684\u65e5\u5fd7\u590d\u5236\u5199\u5165\u5355\u72ec\u7684\u6587\u4ef6\u4e2d
|
#java.util.logging.FileHandler.unusual \u5c5e\u6027\u8868\u793a\u5c06 WARNING\u3001SEVERE \u7ea7\u522b\u7684\u65e5\u5fd7\u590d\u5236\u5199\u5165\u5355\u72ec\u7684\u6587\u4ef6\u4e2d
|
||||||
java.util.logging.FileHandler.unusual = ${APP_HOME}/logs-%m/log-warnerr-%u.log
|
java.util.logging.FileHandler.unusual = ${APP_HOME}/logs-%m/log-warnerr-%d.log
|
||||||
java.util.logging.FileHandler.append = true
|
java.util.logging.FileHandler.append = true
|
||||||
|
|
||||||
#java.util.logging.ConsoleHandler.level = FINE
|
#java.util.logging.ConsoleHandler.level = FINE
|
||||||
|
|||||||
@@ -232,12 +232,14 @@ public final class Application {
|
|||||||
}
|
}
|
||||||
this.logger = Logger.getLogger(this.getClass().getSimpleName());
|
this.logger = Logger.getLogger(this.getClass().getSimpleName());
|
||||||
this.serversLatch = new CountDownLatch(config.getAnyValues("server").length + 1);
|
this.serversLatch = new CountDownLatch(config.getAnyValues("server").length + 1);
|
||||||
|
this.classLoader = new RedkaleClassLoader(Thread.currentThread().getContextClassLoader());
|
||||||
logger.log(Level.INFO, "------------------------------- Redkale " + Redkale.getDotedVersion() + " -------------------------------");
|
logger.log(Level.INFO, "------------------------------- Redkale " + Redkale.getDotedVersion() + " -------------------------------");
|
||||||
//------------------配置 <transport> 节点 ------------------
|
//------------------配置 <transport> 节点 ------------------
|
||||||
ObjectPool<ByteBuffer> transportPool = null;
|
ObjectPool<ByteBuffer> transportPool = null;
|
||||||
ExecutorService transportExec = null;
|
ExecutorService transportExec = null;
|
||||||
AsynchronousChannelGroup transportGroup = null;
|
AsynchronousChannelGroup transportGroup = null;
|
||||||
final AnyValue resources = config.getAnyValue("resources");
|
final AnyValue resources = config.getAnyValue("resources");
|
||||||
|
TransportStrategy strategy = null;
|
||||||
if (resources != null) {
|
if (resources != null) {
|
||||||
AnyValue transportConf = resources.getAnyValue("transport");
|
AnyValue transportConf = resources.getAnyValue("transport");
|
||||||
int groupsize = resources.getAnyValues("group").length;
|
int groupsize = resources.getAnyValues("group").length;
|
||||||
@@ -257,6 +259,10 @@ public final class Application {
|
|||||||
});
|
});
|
||||||
//-----------transportChannelGroup--------------
|
//-----------transportChannelGroup--------------
|
||||||
try {
|
try {
|
||||||
|
final String strategyClass = transportConf.getValue("strategy");
|
||||||
|
if (strategyClass != null && !strategyClass.isEmpty()) {
|
||||||
|
strategy = (TransportStrategy) classLoader.loadClass(strategyClass).newInstance();
|
||||||
|
}
|
||||||
final AtomicInteger counter = new AtomicInteger();
|
final AtomicInteger counter = new AtomicInteger();
|
||||||
transportExec = Executors.newFixedThreadPool(threads, (Runnable r) -> {
|
transportExec = Executors.newFixedThreadPool(threads, (Runnable r) -> {
|
||||||
Thread t = new Thread(r);
|
Thread t = new Thread(r);
|
||||||
@@ -271,8 +277,7 @@ public final class Application {
|
|||||||
logger.log(Level.INFO, Transport.class.getSimpleName() + " configure bufferCapacity = " + bufferCapacity + "; bufferPoolSize = " + bufferPoolSize + "; threads = " + threads + ";");
|
logger.log(Level.INFO, Transport.class.getSimpleName() + " configure bufferCapacity = " + bufferCapacity + "; bufferPoolSize = " + bufferPoolSize + "; threads = " + threads + ";");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.transportFactory = new TransportFactory(transportExec, transportPool, transportGroup);
|
this.transportFactory = new TransportFactory(transportExec, transportPool, transportGroup, strategy);
|
||||||
this.classLoader = new RedkaleClassLoader(Thread.currentThread().getContextClassLoader());
|
|
||||||
Thread.currentThread().setContextClassLoader(this.classLoader);
|
Thread.currentThread().setContextClassLoader(this.classLoader);
|
||||||
this.serverClassLoader = new RedkaleClassLoader(this.classLoader);
|
this.serverClassLoader = new RedkaleClassLoader(this.classLoader);
|
||||||
}
|
}
|
||||||
@@ -379,7 +384,7 @@ public final class Application {
|
|||||||
try {
|
try {
|
||||||
Resource res = field.getAnnotation(Resource.class);
|
Resource res = field.getAnnotation(Resource.class);
|
||||||
if (res == null) return;
|
if (res == null) return;
|
||||||
if (!(src instanceof WatchService) || Sncp.isRemote((Service) src)) return; //远程模式不得注入
|
if (Sncp.isRemote((Service) src)) return; //远程模式不得注入
|
||||||
Class type = field.getType();
|
Class type = field.getType();
|
||||||
if (type == Application.class) {
|
if (type == Application.class) {
|
||||||
field.set(src, application);
|
field.set(src, application);
|
||||||
@@ -431,7 +436,7 @@ public final class Application {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
}, Application.class, TransportFactory.class, NodeSncpServer.class, NodeHttpServer.class, NodeWatchServer.class);
|
}, Application.class, ResourceFactory.class, TransportFactory.class, NodeSncpServer.class, NodeHttpServer.class, NodeWatchServer.class);
|
||||||
//--------------------------------------------------------------------------
|
//--------------------------------------------------------------------------
|
||||||
initResources();
|
initResources();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -99,7 +99,11 @@ public final class ClassFilter<T> {
|
|||||||
* @return Set<FilterEntry<T>>
|
* @return Set<FilterEntry<T>>
|
||||||
*/
|
*/
|
||||||
public final Set<FilterEntry<T>> getFilterEntrys() {
|
public final Set<FilterEntry<T>> getFilterEntrys() {
|
||||||
return entrys;
|
HashSet<FilterEntry<T>> set = new HashSet<>();
|
||||||
|
set.addAll(entrys);
|
||||||
|
if (ors != null) ors.forEach(f -> set.addAll(f.getFilterEntrys()));
|
||||||
|
if (ands != null) ands.forEach(f -> set.addAll(f.getFilterEntrys()));
|
||||||
|
return set;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -108,7 +112,11 @@ public final class ClassFilter<T> {
|
|||||||
* @return Set<FilterEntry<T>>
|
* @return Set<FilterEntry<T>>
|
||||||
*/
|
*/
|
||||||
public final Set<FilterEntry<T>> getFilterExpectEntrys() {
|
public final Set<FilterEntry<T>> getFilterExpectEntrys() {
|
||||||
return expectEntrys;
|
HashSet<FilterEntry<T>> set = new HashSet<>();
|
||||||
|
set.addAll(entrys);
|
||||||
|
if (ors != null) ors.forEach(f -> set.addAll(f.getFilterExpectEntrys()));
|
||||||
|
if (ands != null) ands.forEach(f -> set.addAll(f.getFilterExpectEntrys()));
|
||||||
|
return set;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -118,8 +126,8 @@ public final class ClassFilter<T> {
|
|||||||
*/
|
*/
|
||||||
public final Set<FilterEntry<T>> getAllFilterEntrys() {
|
public final Set<FilterEntry<T>> getAllFilterEntrys() {
|
||||||
HashSet<FilterEntry<T>> rs = new HashSet<>();
|
HashSet<FilterEntry<T>> rs = new HashSet<>();
|
||||||
rs.addAll(entrys);
|
rs.addAll(getFilterEntrys());
|
||||||
rs.addAll(expectEntrys);
|
rs.addAll(getFilterExpectEntrys());
|
||||||
return rs;
|
return rs;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -183,7 +191,7 @@ public final class ClassFilter<T> {
|
|||||||
} catch (Throwable cfe) {
|
} catch (Throwable cfe) {
|
||||||
if (finer && !clazzname.startsWith("sun.") && !clazzname.startsWith("javax.")
|
if (finer && !clazzname.startsWith("sun.") && !clazzname.startsWith("javax.")
|
||||||
&& !clazzname.startsWith("com.sun.") && !clazzname.startsWith("jdk.")) {
|
&& !clazzname.startsWith("com.sun.") && !clazzname.startsWith("jdk.")) {
|
||||||
//logger.log(Level.FINEST, ClassFilter.class.getSimpleName() + " filter error", cfe);
|
logger.log(Level.FINEST, ClassFilter.class.getSimpleName() + " filter error", cfe);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -435,7 +435,7 @@ public abstract class NodeServer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected ClassFilter<Service> createServiceClassFilter() {
|
protected ClassFilter<Service> createServiceClassFilter() {
|
||||||
return createClassFilter(this.sncpGroup, null, Service.class, (!isSNCP() || application.watching) ? null : new Class[]{org.redkale.watch.WatchService.class}, Annotation.class, "services", "service");
|
return createClassFilter(this.sncpGroup, null, Service.class, (!isSNCP() && application.watching) ? null : new Class[]{org.redkale.watch.WatchService.class}, Annotation.class, "services", "service");
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ClassFilter createClassFilter(final String localGroup, Class<? extends Annotation> ref,
|
protected ClassFilter createClassFilter(final String localGroup, Class<? extends Annotation> ref,
|
||||||
|
|||||||
@@ -107,7 +107,7 @@ public class NodeSncpServer extends NodeServer {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ClassFilter<Filter> createFilterClassFilter() {
|
protected ClassFilter<Filter> createFilterClassFilter() {
|
||||||
return createClassFilter(null, null, SncpFilter.class, null, null, "filters", "filter");
|
return createClassFilter(null, null, SncpFilter.class, new Class[]{org.redkale.watch.WatchFilter.class}, null, "filters", "filter");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -11,6 +11,8 @@ import java.nio.channels.*;
|
|||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.*;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
import org.redkale.convert.*;
|
||||||
|
import org.redkale.convert.json.JsonConvert;
|
||||||
import org.redkale.util.*;
|
import org.redkale.util.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -52,19 +54,24 @@ public final class Transport {
|
|||||||
|
|
||||||
protected final InetSocketAddress clientAddress;
|
protected final InetSocketAddress clientAddress;
|
||||||
|
|
||||||
protected InetSocketAddress[] remoteAddres = new InetSocketAddress[0];
|
protected TransportAddress[] transportAddres = new TransportAddress[0];
|
||||||
|
|
||||||
protected final ObjectPool<ByteBuffer> bufferPool;
|
protected final ObjectPool<ByteBuffer> bufferPool;
|
||||||
|
|
||||||
|
//负载均衡策略
|
||||||
|
protected final TransportStrategy strategy;
|
||||||
|
|
||||||
protected final ConcurrentHashMap<SocketAddress, BlockingQueue<AsyncConnection>> connPool = new ConcurrentHashMap<>();
|
protected final ConcurrentHashMap<SocketAddress, BlockingQueue<AsyncConnection>> connPool = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public Transport(String name, String subprotocol, final ObjectPool<ByteBuffer> transportBufferPool,
|
public Transport(String name, String subprotocol, final ObjectPool<ByteBuffer> transportBufferPool,
|
||||||
final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, final Collection<InetSocketAddress> addresses) {
|
final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress,
|
||||||
this(name, DEFAULT_PROTOCOL, subprotocol, transportBufferPool, transportChannelGroup, clientAddress, addresses);
|
final Collection<InetSocketAddress> addresses, final TransportStrategy strategy) {
|
||||||
|
this(name, DEFAULT_PROTOCOL, subprotocol, transportBufferPool, transportChannelGroup, clientAddress, addresses, strategy);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Transport(String name, String protocol, String subprotocol, final ObjectPool<ByteBuffer> transportBufferPool,
|
public Transport(String name, String protocol, String subprotocol, final ObjectPool<ByteBuffer> transportBufferPool,
|
||||||
final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, final Collection<InetSocketAddress> addresses) {
|
final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress,
|
||||||
|
final Collection<InetSocketAddress> addresses, final TransportStrategy strategy) {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.subprotocol = subprotocol == null ? "" : subprotocol.trim();
|
this.subprotocol = subprotocol == null ? "" : subprotocol.trim();
|
||||||
this.protocol = protocol;
|
this.protocol = protocol;
|
||||||
@@ -72,32 +79,38 @@ public final class Transport {
|
|||||||
this.group = transportChannelGroup;
|
this.group = transportChannelGroup;
|
||||||
this.bufferPool = transportBufferPool;
|
this.bufferPool = transportBufferPool;
|
||||||
this.clientAddress = clientAddress;
|
this.clientAddress = clientAddress;
|
||||||
|
this.strategy = strategy;
|
||||||
updateRemoteAddresses(addresses);
|
updateRemoteAddresses(addresses);
|
||||||
}
|
}
|
||||||
|
|
||||||
public final InetSocketAddress[] updateRemoteAddresses(final Collection<InetSocketAddress> addresses) {
|
public final InetSocketAddress[] updateRemoteAddresses(final Collection<InetSocketAddress> addresses) {
|
||||||
InetSocketAddress[] oldAddresses = this.remoteAddres;
|
TransportAddress[] oldAddresses = this.transportAddres;
|
||||||
List<InetSocketAddress> list = new ArrayList<>();
|
List<TransportAddress> list = new ArrayList<>();
|
||||||
if (addresses != null) {
|
if (addresses != null) {
|
||||||
for (InetSocketAddress addr : addresses) {
|
for (InetSocketAddress addr : addresses) {
|
||||||
if (clientAddress != null && clientAddress.equals(addr)) continue;
|
if (clientAddress != null && clientAddress.equals(addr)) continue;
|
||||||
list.add(addr);
|
list.add(new TransportAddress(addr));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.remoteAddres = list.toArray(new InetSocketAddress[list.size()]);
|
this.transportAddres = list.toArray(new TransportAddress[list.size()]);
|
||||||
return oldAddresses;
|
|
||||||
|
InetSocketAddress[] rs = new InetSocketAddress[oldAddresses.length];
|
||||||
|
for (int i = 0; i < rs.length; i++) {
|
||||||
|
rs[i] = oldAddresses[i].getAddress();
|
||||||
|
}
|
||||||
|
return rs;
|
||||||
}
|
}
|
||||||
|
|
||||||
public final boolean addRemoteAddresses(final InetSocketAddress addr) {
|
public final boolean addRemoteAddresses(final InetSocketAddress addr) {
|
||||||
if (addr == null) return false;
|
if (addr == null) return false;
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (this.remoteAddres == null) {
|
if (this.transportAddres == null) {
|
||||||
this.remoteAddres = new InetSocketAddress[]{addr};
|
this.transportAddres = new TransportAddress[]{new TransportAddress(addr)};
|
||||||
} else {
|
} else {
|
||||||
for (InetSocketAddress i : this.remoteAddres) {
|
for (TransportAddress i : this.transportAddres) {
|
||||||
if (addr.equals(i)) return false;
|
if (addr.equals(i.address)) return false;
|
||||||
}
|
}
|
||||||
this.remoteAddres = Utility.append(remoteAddres, addr);
|
this.transportAddres = Utility.append(transportAddres, new TransportAddress(addr));
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@@ -105,9 +118,9 @@ public final class Transport {
|
|||||||
|
|
||||||
public final boolean removeRemoteAddresses(InetSocketAddress addr) {
|
public final boolean removeRemoteAddresses(InetSocketAddress addr) {
|
||||||
if (addr == null) return false;
|
if (addr == null) return false;
|
||||||
if (this.remoteAddres == null) return false;
|
if (this.transportAddres == null) return false;
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
this.remoteAddres = Utility.remove(remoteAddres, addr);
|
this.transportAddres = Utility.remove(transportAddres, new TransportAddress(addr));
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@@ -128,13 +141,25 @@ public final class Transport {
|
|||||||
return clientAddress;
|
return clientAddress;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public TransportAddress[] getTransportAddresses() {
|
||||||
|
return transportAddres;
|
||||||
|
}
|
||||||
|
|
||||||
public InetSocketAddress[] getRemoteAddresses() {
|
public InetSocketAddress[] getRemoteAddresses() {
|
||||||
return remoteAddres;
|
InetSocketAddress[] rs = new InetSocketAddress[transportAddres.length];
|
||||||
|
for (int i = 0; i < rs.length; i++) {
|
||||||
|
rs[i] = transportAddres[i].getAddress();
|
||||||
|
}
|
||||||
|
return rs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ConcurrentHashMap<SocketAddress, BlockingQueue<AsyncConnection>> getAsyncConnectionPool() {
|
||||||
|
return connPool;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return Transport.class.getSimpleName() + "{name = " + name + ", protocol = " + protocol + ", clientAddress = " + clientAddress + ", remoteAddres = " + Arrays.toString(remoteAddres) + "}";
|
return Transport.class.getSimpleName() + "{name = " + name + ", protocol = " + protocol + ", clientAddress = " + clientAddress + ", remoteAddres = " + Arrays.toString(transportAddres) + "}";
|
||||||
}
|
}
|
||||||
|
|
||||||
public ByteBuffer pollBuffer() {
|
public ByteBuffer pollBuffer() {
|
||||||
@@ -158,32 +183,57 @@ public final class Transport {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public AsyncConnection pollConnection(SocketAddress addr) {
|
public AsyncConnection pollConnection(SocketAddress addr) {
|
||||||
if (addr == null && remoteAddres.length == 1) addr = remoteAddres[0];
|
if (this.strategy != null) return strategy.pollConnection(addr, this);
|
||||||
|
if (addr == null && this.transportAddres.length == 1) addr = this.transportAddres[0].address;
|
||||||
final boolean rand = addr == null;
|
final boolean rand = addr == null;
|
||||||
if (rand && remoteAddres.length < 1) throw new RuntimeException("Transport (" + this.name + ") have no remoteAddress list");
|
if (rand && this.transportAddres.length < 1) throw new RuntimeException("Transport (" + this.name + ") have no remoteAddress list");
|
||||||
try {
|
try {
|
||||||
if (tcp) {
|
if (tcp) {
|
||||||
AsynchronousSocketChannel channel = null;
|
AsynchronousSocketChannel channel = null;
|
||||||
if (rand) { //取地址
|
if (rand) { //取地址
|
||||||
for (int i = 0; i < remoteAddres.length; i++) {
|
TransportAddress transportAddr;
|
||||||
addr = remoteAddres[i];
|
boolean tryed = false;
|
||||||
BlockingQueue<AsyncConnection> queue = connPool.get(addr);
|
for (int i = 0; i < transportAddres.length; i++) {
|
||||||
if (queue != null && !queue.isEmpty()) {
|
transportAddr = transportAddres[i];
|
||||||
|
addr = transportAddr.address;
|
||||||
|
if (!transportAddr.enable) continue;
|
||||||
|
final BlockingQueue<AsyncConnection> queue = transportAddr.conns;
|
||||||
|
if (!queue.isEmpty()) {
|
||||||
AsyncConnection conn;
|
AsyncConnection conn;
|
||||||
while ((conn = queue.poll()) != null) {
|
while ((conn = queue.poll()) != null) {
|
||||||
if (conn.isOpen()) return conn;
|
if (conn.isOpen()) return conn;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
tryed = true;
|
||||||
if (channel == null) {
|
if (channel == null) {
|
||||||
channel = AsynchronousSocketChannel.open(group);
|
channel = AsynchronousSocketChannel.open(group);
|
||||||
if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
|
if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
channel.connect(addr).get(2, TimeUnit.SECONDS);
|
channel.connect(addr).get(2, TimeUnit.SECONDS);
|
||||||
|
transportAddr.enable = true;
|
||||||
break;
|
break;
|
||||||
} catch (Exception iex) {
|
} catch (Exception iex) {
|
||||||
iex.printStackTrace();
|
transportAddr.enable = false;
|
||||||
if (i == remoteAddres.length - 1) channel = null;
|
channel = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (channel == null && !tryed) {
|
||||||
|
for (int i = 0; i < transportAddres.length; i++) {
|
||||||
|
transportAddr = transportAddres[i];
|
||||||
|
addr = transportAddr.address;
|
||||||
|
if (channel == null) {
|
||||||
|
channel = AsynchronousSocketChannel.open(group);
|
||||||
|
if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
channel.connect(addr).get(2, TimeUnit.SECONDS);
|
||||||
|
transportAddr.enable = true;
|
||||||
|
break;
|
||||||
|
} catch (Exception iex) {
|
||||||
|
transportAddr.enable = false;
|
||||||
|
channel = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -194,7 +244,7 @@ public final class Transport {
|
|||||||
if (channel == null) return null;
|
if (channel == null) return null;
|
||||||
return AsyncConnection.create(channel, addr, 3000, 3000);
|
return AsyncConnection.create(channel, addr, 3000, 3000);
|
||||||
} else { // UDP
|
} else { // UDP
|
||||||
if (rand) addr = remoteAddres[0];
|
if (rand) addr = this.transportAddres[0].address;
|
||||||
DatagramChannel channel = DatagramChannel.open();
|
DatagramChannel channel = DatagramChannel.open();
|
||||||
channel.configureBlocking(true);
|
channel.configureBlocking(true);
|
||||||
channel.connect(addr);
|
channel.connect(addr);
|
||||||
@@ -256,4 +306,54 @@ public final class Transport {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class TransportAddress {
|
||||||
|
|
||||||
|
protected InetSocketAddress address;
|
||||||
|
|
||||||
|
protected volatile boolean enable;
|
||||||
|
|
||||||
|
protected final BlockingQueue<AsyncConnection> conns = new ArrayBlockingQueue<>(MAX_POOL_LIMIT);
|
||||||
|
|
||||||
|
public TransportAddress(InetSocketAddress address) {
|
||||||
|
this.address = address;
|
||||||
|
this.enable = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@java.beans.ConstructorProperties({"address", "enable"})
|
||||||
|
public TransportAddress(InetSocketAddress address, boolean enable) {
|
||||||
|
this.address = address;
|
||||||
|
this.enable = enable;
|
||||||
|
}
|
||||||
|
|
||||||
|
public InetSocketAddress getAddress() {
|
||||||
|
return address;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isEnable() {
|
||||||
|
return enable;
|
||||||
|
}
|
||||||
|
|
||||||
|
@ConvertColumn(ignore = true)
|
||||||
|
public BlockingQueue<AsyncConnection> getConns() {
|
||||||
|
return conns;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return this.address.hashCode();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (this == obj) return true;
|
||||||
|
if (obj == null) return false;
|
||||||
|
if (getClass() != obj.getClass()) return false;
|
||||||
|
final TransportAddress other = (TransportAddress) obj;
|
||||||
|
return this.address.equals(other.address);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
return JsonConvert.root().convertTo(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -43,10 +43,19 @@ public class TransportFactory {
|
|||||||
|
|
||||||
protected final List<WeakReference<Service>> services = new CopyOnWriteArrayList<>();
|
protected final List<WeakReference<Service>> services = new CopyOnWriteArrayList<>();
|
||||||
|
|
||||||
public TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup) {
|
//负载均衡策略
|
||||||
|
protected final TransportStrategy strategy;
|
||||||
|
|
||||||
|
public TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup,
|
||||||
|
final TransportStrategy strategy) {
|
||||||
this.executor = executor;
|
this.executor = executor;
|
||||||
this.bufferPool = bufferPool;
|
this.bufferPool = bufferPool;
|
||||||
this.channelGroup = channelGroup;
|
this.channelGroup = channelGroup;
|
||||||
|
this.strategy = strategy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup) {
|
||||||
|
this(executor, bufferPool, channelGroup, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String findGroupName(InetSocketAddress addr) {
|
public String findGroupName(InetSocketAddress addr) {
|
||||||
@@ -127,14 +136,14 @@ public class TransportFactory {
|
|||||||
}
|
}
|
||||||
if (info == null) return null;
|
if (info == null) return null;
|
||||||
if (sncpAddress != null) addresses.remove(sncpAddress);
|
if (sncpAddress != null) addresses.remove(sncpAddress);
|
||||||
return new Transport(groups.stream().sorted().collect(Collectors.joining(";")), info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, addresses);
|
return new Transport(groups.stream().sorted().collect(Collectors.joining(";")), info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, addresses, this.strategy);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Transport loadTransport(final String groupName, InetSocketAddress sncpAddress) {
|
private Transport loadTransport(final String groupName, InetSocketAddress sncpAddress) {
|
||||||
if (groupName == null) return null;
|
if (groupName == null) return null;
|
||||||
TransportGroupInfo info = groupInfos.get(groupName);
|
TransportGroupInfo info = groupInfos.get(groupName);
|
||||||
if (info == null) return null;
|
if (info == null) return null;
|
||||||
return new Transport(groupName, info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, info.addresses);
|
return new Transport(groupName, info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, info.addresses, this.strategy);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ExecutorService getExecutor() {
|
public ExecutorService getExecutor() {
|
||||||
|
|||||||
21
src/org/redkale/net/TransportStrategy.java
Normal file
21
src/org/redkale/net/TransportStrategy.java
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
/*
|
||||||
|
* To change this license header, choose License Headers in Project Properties.
|
||||||
|
* To change this template file, choose Tools | Templates
|
||||||
|
* and open the template in the editor.
|
||||||
|
*/
|
||||||
|
package org.redkale.net;
|
||||||
|
|
||||||
|
import java.net.SocketAddress;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 远程请求的负载均衡策略
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* 详情见: https://redkale.org
|
||||||
|
*
|
||||||
|
* @author zhangjx
|
||||||
|
*/
|
||||||
|
public interface TransportStrategy {
|
||||||
|
|
||||||
|
public AsyncConnection pollConnection(SocketAddress addr, Transport transport);
|
||||||
|
}
|
||||||
@@ -121,7 +121,6 @@ public class HttpRequest extends Request<HttpContext> {
|
|||||||
} else {
|
} else {
|
||||||
this.requestURI = array.toDecodeString(index, offset - index, charset).trim();
|
this.requestURI = array.toDecodeString(index, offset - index, charset).trim();
|
||||||
}
|
}
|
||||||
if (this.requestURI.contains("../")) return -1;
|
|
||||||
index = ++offset;
|
index = ++offset;
|
||||||
this.protocol = array.toString(index, array.size() - index, charset).trim();
|
this.protocol = array.toString(index, array.size() - index, charset).trim();
|
||||||
while (readLine(buffer, array)) {
|
while (readLine(buffer, array)) {
|
||||||
|
|||||||
@@ -192,6 +192,11 @@ public class HttpResourceServlet extends HttpServlet {
|
|||||||
@Override
|
@Override
|
||||||
public void execute(HttpRequest request, HttpResponse response) throws IOException {
|
public void execute(HttpRequest request, HttpResponse response) throws IOException {
|
||||||
String uri = request.getRequestURI();
|
String uri = request.getRequestURI();
|
||||||
|
if (uri.contains("../")) {
|
||||||
|
if (finest) logger.log(Level.FINEST, "Not found resource (404) be " + uri + ", request = " + request);
|
||||||
|
response.finish404();
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (locationRewrites != null) {
|
if (locationRewrites != null) {
|
||||||
for (SimpleEntry<Pattern, String> entry : locationRewrites) {
|
for (SimpleEntry<Pattern, String> entry : locationRewrites) {
|
||||||
Matcher matcher = entry.getKey().matcher(uri);
|
Matcher matcher = entry.getKey().matcher(uri);
|
||||||
|
|||||||
@@ -42,7 +42,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
|
|||||||
public ByteBuffer[] execute(final HttpResponse response, final ByteBuffer[] buffers);
|
public ByteBuffer[] execute(final HttpResponse response, final ByteBuffer[] buffers);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final ByteBuffer buffer304 = ByteBuffer.wrap("HTTP/1.1 304 Not Modified\r\n\r\n".getBytes()).asReadOnlyBuffer();
|
private static final ByteBuffer buffer304 = ByteBuffer.wrap("HTTP/1.1 304 Not Modified\r\nContent-Length:0\r\n\r\n".getBytes()).asReadOnlyBuffer();
|
||||||
|
|
||||||
private static final ByteBuffer buffer404 = ByteBuffer.wrap("HTTP/1.1 404 Not Found\r\nContent-Length:0\r\n\r\n".getBytes()).asReadOnlyBuffer();
|
private static final ByteBuffer buffer404 = ByteBuffer.wrap("HTTP/1.1 404 Not Found\r\nContent-Length:0\r\n\r\n".getBytes()).asReadOnlyBuffer();
|
||||||
|
|
||||||
@@ -704,8 +704,8 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
|
|||||||
final String match = request.getHeader("If-None-Match");
|
final String match = request.getHeader("If-None-Match");
|
||||||
final String etag = (file == null ? 0L : file.lastModified()) + "-" + length;
|
final String etag = (file == null ? 0L : file.lastModified()) + "-" + length;
|
||||||
if (match != null && etag.equals(match)) {
|
if (match != null && etag.equals(match)) {
|
||||||
finish304();
|
//finish304();
|
||||||
return;
|
//return;
|
||||||
}
|
}
|
||||||
this.contentLength = length;
|
this.contentLength = length;
|
||||||
if (filename != null && !filename.isEmpty() && file != null) {
|
if (filename != null && !filename.isEmpty() && file != null) {
|
||||||
@@ -744,7 +744,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void finishFile(ByteBuffer hbuffer, File file, long offset, long length) throws IOException {
|
private void finishFile(ByteBuffer hbuffer, File file, long offset, long length) throws IOException {
|
||||||
this.channel.write(hbuffer, hbuffer, new TransferFileHandler(AsynchronousFileChannel.open(file.toPath(), options, ((HttpContext) context).getExecutor()), offset, length));
|
this.channel.write(hbuffer, hbuffer, new TransferFileHandler(file, offset, length));
|
||||||
}
|
}
|
||||||
|
|
||||||
private ByteBuffer createHeader() {
|
private ByteBuffer createHeader() {
|
||||||
@@ -976,6 +976,8 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
|
|||||||
|
|
||||||
protected final class TransferFileHandler implements AsyncHandler<Integer, ByteBuffer> {
|
protected final class TransferFileHandler implements AsyncHandler<Integer, ByteBuffer> {
|
||||||
|
|
||||||
|
private final File file;
|
||||||
|
|
||||||
private final AsynchronousFileChannel filechannel;
|
private final AsynchronousFileChannel filechannel;
|
||||||
|
|
||||||
private final long max; //需要读取的字节数, -1表示读到文件结尾
|
private final long max; //需要读取的字节数, -1表示读到文件结尾
|
||||||
@@ -988,23 +990,36 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
|
|||||||
|
|
||||||
private boolean read = true;
|
private boolean read = true;
|
||||||
|
|
||||||
public TransferFileHandler(AsynchronousFileChannel channel) {
|
public TransferFileHandler(File file) throws IOException {
|
||||||
this.filechannel = channel;
|
this.file = file;
|
||||||
this.max = -1;
|
this.filechannel = AsynchronousFileChannel.open(file.toPath(), options, ((HttpContext) context).getExecutor());
|
||||||
|
this.position = 0;
|
||||||
|
this.max = file.length();
|
||||||
}
|
}
|
||||||
|
|
||||||
public TransferFileHandler(AsynchronousFileChannel channel, long offset, long len) {
|
public TransferFileHandler(File file, long offset, long len) throws IOException {
|
||||||
this.filechannel = channel;
|
this.file = file;
|
||||||
|
this.filechannel = AsynchronousFileChannel.open(file.toPath(), options, ((HttpContext) context).getExecutor());
|
||||||
this.position = offset <= 0 ? 0 : offset;
|
this.position = offset <= 0 ? 0 : offset;
|
||||||
this.max = len;
|
this.max = len <= 0 ? file.length() : len;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void completed(Integer result, ByteBuffer attachment) {
|
public void completed(Integer result, ByteBuffer attachment) {
|
||||||
if (result < 0 || (max > 0 && count >= max)) {
|
//(Thread.currentThread().getName() + "-----------" + file + "-------------------result: " + result + ", max = " + max + ", count = " + count);
|
||||||
|
if (result < 0 || count >= max) {
|
||||||
failed(null, attachment);
|
failed(null, attachment);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (!next && attachment.hasRemaining()) { //Header还没写完
|
||||||
|
channel.write(attachment, attachment, this);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (next && read && attachment.hasRemaining()) { //Buffer还没写完
|
||||||
|
channel.write(attachment, attachment, this);
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (read) {
|
if (read) {
|
||||||
read = false;
|
read = false;
|
||||||
if (next) {
|
if (next) {
|
||||||
@@ -1016,14 +1031,16 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
|
|||||||
filechannel.read(attachment, position, attachment, this);
|
filechannel.read(attachment, position, attachment, this);
|
||||||
} else {
|
} else {
|
||||||
read = true;
|
read = true;
|
||||||
if (max > 0) {
|
count += result;
|
||||||
count += result;
|
if (count > max) {
|
||||||
if (count > max) {
|
attachment.limit((int) (attachment.position() + max - count));
|
||||||
attachment.limit((int) (attachment.position() + max - count));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
attachment.flip();
|
attachment.flip();
|
||||||
channel.write(attachment, attachment, this);
|
if (attachment.hasRemaining()) {
|
||||||
|
channel.write(attachment, attachment, this);
|
||||||
|
} else {
|
||||||
|
failed(null, attachment);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -328,6 +328,13 @@ public final class Rest {
|
|||||||
mv.visitMaxs(2, 1);
|
mv.visitMaxs(2, 1);
|
||||||
mv.visitEnd();
|
mv.visitEnd();
|
||||||
}
|
}
|
||||||
|
{ //resourceName
|
||||||
|
mv = new AsmMethodVisitor(cw.visitMethod(ACC_PUBLIC, "resourceName", "()Ljava/lang/String;", null, null));
|
||||||
|
mv.visitLdcInsn(rws.name());
|
||||||
|
mv.visitInsn(ARETURN);
|
||||||
|
mv.visitMaxs(1, 1);
|
||||||
|
mv.visitEnd();
|
||||||
|
}
|
||||||
|
|
||||||
RestClassLoader newLoader = new RestClassLoader(loader);
|
RestClassLoader newLoader = new RestClassLoader(loader);
|
||||||
|
|
||||||
|
|||||||
@@ -82,6 +82,8 @@ public abstract class WebSocket<G extends Serializable, T> {
|
|||||||
|
|
||||||
private long createtime = System.currentTimeMillis();
|
private long createtime = System.currentTimeMillis();
|
||||||
|
|
||||||
|
private long pingtime;
|
||||||
|
|
||||||
private Map<String, Object> attributes = new HashMap<>(); //非线程安全
|
private Map<String, Object> attributes = new HashMap<>(); //非线程安全
|
||||||
|
|
||||||
protected WebSocket() {
|
protected WebSocket() {
|
||||||
@@ -89,11 +91,13 @@ public abstract class WebSocket<G extends Serializable, T> {
|
|||||||
|
|
||||||
//----------------------------------------------------------------
|
//----------------------------------------------------------------
|
||||||
public final CompletableFuture<Integer> sendPing() {
|
public final CompletableFuture<Integer> sendPing() {
|
||||||
|
this.pingtime = System.currentTimeMillis();
|
||||||
//if (_engine.finest) _engine.logger.finest(this + " on "+_engine.getEngineid()+" ping...");
|
//if (_engine.finest) _engine.logger.finest(this + " on "+_engine.getEngineid()+" ping...");
|
||||||
return sendPacket(WebSocketPacket.DEFAULT_PING_PACKET);
|
return sendPacket(WebSocketPacket.DEFAULT_PING_PACKET);
|
||||||
}
|
}
|
||||||
|
|
||||||
public final CompletableFuture<Integer> sendPing(byte[] data) {
|
public final CompletableFuture<Integer> sendPing(byte[] data) {
|
||||||
|
this.pingtime = System.currentTimeMillis();
|
||||||
return sendPacket(new WebSocketPacket(FrameType.PING, data));
|
return sendPacket(new WebSocketPacket(FrameType.PING, data));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -487,6 +491,15 @@ public abstract class WebSocket<G extends Serializable, T> {
|
|||||||
return this._runner == null ? 0 : this._runner.lastSendTime;
|
return this._runner == null ? 0 : this._runner.lastSendTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取最后一次发送PING消息的时间
|
||||||
|
*
|
||||||
|
* @return long
|
||||||
|
*/
|
||||||
|
public long getLastPingTime() {
|
||||||
|
return this.pingtime;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 显式地关闭WebSocket
|
* 显式地关闭WebSocket
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import java.io.*;
|
|||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.*;
|
||||||
import java.util.concurrent.atomic.*;
|
import java.util.concurrent.atomic.*;
|
||||||
|
import java.util.function.Predicate;
|
||||||
import java.util.logging.*;
|
import java.util.logging.*;
|
||||||
import java.util.stream.*;
|
import java.util.stream.*;
|
||||||
import org.redkale.convert.Convert;
|
import org.redkale.convert.Convert;
|
||||||
@@ -23,18 +24,18 @@ import org.redkale.util.*;
|
|||||||
*
|
*
|
||||||
* @author zhangjx
|
* @author zhangjx
|
||||||
*/
|
*/
|
||||||
public final class WebSocketEngine {
|
public class WebSocketEngine {
|
||||||
|
|
||||||
//全局自增长ID
|
@Comment("全局自增长ID, 为了确保在一个进程里多个WebSocketEngine定时发送ping时不会同时进行")
|
||||||
private static final AtomicInteger sequence = new AtomicInteger();
|
private static final AtomicInteger sequence = new AtomicInteger();
|
||||||
|
|
||||||
//Engine自增长序号ID
|
@Comment("Engine自增长序号ID")
|
||||||
private final int index;
|
private final int index;
|
||||||
|
|
||||||
//当前WebSocket对应的Engine
|
@Comment("当前WebSocket对应的Engine")
|
||||||
private final String engineid;
|
private final String engineid;
|
||||||
|
|
||||||
//当前WebSocket对应的Node
|
@Comment("当前WebSocket对应的Node")
|
||||||
protected final WebSocketNode node;
|
protected final WebSocketNode node;
|
||||||
|
|
||||||
//HttpContext
|
//HttpContext
|
||||||
@@ -43,23 +44,25 @@ public final class WebSocketEngine {
|
|||||||
//Convert
|
//Convert
|
||||||
protected final Convert sendConvert;
|
protected final Convert sendConvert;
|
||||||
|
|
||||||
protected final boolean single; //是否单用户单连接
|
@Comment("是否单用户单连接")
|
||||||
|
protected final boolean single;
|
||||||
|
|
||||||
//在线用户ID对应的WebSocket组,用于单用户单连接模式
|
@Comment("在线用户ID对应的WebSocket组,用于单用户单连接模式")
|
||||||
private final Map<Serializable, WebSocket> websockets = new ConcurrentHashMap<>();
|
private final Map<Serializable, WebSocket> websockets = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
//在线用户ID对应的WebSocket组,用于单用户多连接模式
|
@Comment("在线用户ID对应的WebSocket组,用于单用户多连接模式")
|
||||||
private final Map<Serializable, List<WebSocket>> websockets2 = new ConcurrentHashMap<>();
|
private final Map<Serializable, List<WebSocket>> websockets2 = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
//用于PING的定时器
|
@Comment("用于PING的定时器")
|
||||||
private ScheduledThreadPoolExecutor scheduler;
|
private ScheduledThreadPoolExecutor scheduler;
|
||||||
|
|
||||||
//日志
|
@Comment("日志")
|
||||||
protected final Logger logger;
|
protected final Logger logger;
|
||||||
|
|
||||||
//FINEST日志级别
|
@Comment("日志级别")
|
||||||
protected final boolean finest;
|
protected final boolean finest;
|
||||||
|
|
||||||
|
@Comment("PING的间隔秒数")
|
||||||
private int liveinterval;
|
private int liveinterval;
|
||||||
|
|
||||||
protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, WebSocketNode node, Convert sendConvert, Logger logger) {
|
protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, WebSocketNode node, Convert sendConvert, Logger logger) {
|
||||||
@@ -70,8 +73,8 @@ public final class WebSocketEngine {
|
|||||||
this.node = node;
|
this.node = node;
|
||||||
this.liveinterval = liveinterval;
|
this.liveinterval = liveinterval;
|
||||||
this.logger = logger;
|
this.logger = logger;
|
||||||
this.index = sequence.getAndIncrement();
|
|
||||||
this.finest = logger.isLoggable(Level.FINEST);
|
this.finest = logger.isLoggable(Level.FINEST);
|
||||||
|
this.index = sequence.getAndIncrement();
|
||||||
}
|
}
|
||||||
|
|
||||||
void init(AnyValue conf) {
|
void init(AnyValue conf) {
|
||||||
@@ -96,6 +99,7 @@ public final class WebSocketEngine {
|
|||||||
if (scheduler != null) scheduler.shutdownNow();
|
if (scheduler != null) scheduler.shutdownNow();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Comment("添加WebSocket")
|
||||||
void add(WebSocket socket) {
|
void add(WebSocket socket) {
|
||||||
if (single) {
|
if (single) {
|
||||||
websockets.put(socket._userid, socket);
|
websockets.put(socket._userid, socket);
|
||||||
@@ -110,6 +114,7 @@ public final class WebSocketEngine {
|
|||||||
if (node != null) node.connect(socket._userid);
|
if (node != null) node.connect(socket._userid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Comment("从WebSocketEngine删除指定WebSocket")
|
||||||
void remove(WebSocket socket) {
|
void remove(WebSocket socket) {
|
||||||
Serializable userid = socket._userid;
|
Serializable userid = socket._userid;
|
||||||
if (single) {
|
if (single) {
|
||||||
@@ -127,9 +132,15 @@ public final class WebSocketEngine {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Comment("给所有连接用户发送消息")
|
||||||
public CompletableFuture<Integer> broadcastMessage(final Object message, final boolean last) {
|
public CompletableFuture<Integer> broadcastMessage(final Object message, final boolean last) {
|
||||||
|
return broadcastMessage(null, message, last);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Comment("给指定WebSocket连接用户发送消息")
|
||||||
|
public CompletableFuture<Integer> broadcastMessage(final Predicate<WebSocket> predicate, final Object message, final boolean last) {
|
||||||
if (message instanceof CompletableFuture) {
|
if (message instanceof CompletableFuture) {
|
||||||
return ((CompletableFuture) message).thenCompose((json) -> broadcastMessage(json, last));
|
return ((CompletableFuture) message).thenCompose((json) -> broadcastMessage(predicate, json, last));
|
||||||
}
|
}
|
||||||
final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null);
|
final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null);
|
||||||
if (more) {
|
if (more) {
|
||||||
@@ -140,11 +151,13 @@ public final class WebSocketEngine {
|
|||||||
CompletableFuture<Integer> future = null;
|
CompletableFuture<Integer> future = null;
|
||||||
if (single) {
|
if (single) {
|
||||||
for (WebSocket websocket : websockets.values()) {
|
for (WebSocket websocket : websockets.values()) {
|
||||||
|
if (predicate != null && !predicate.test(websocket)) continue;
|
||||||
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
|
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for (List<WebSocket> list : websockets2.values()) {
|
for (List<WebSocket> list : websockets2.values()) {
|
||||||
for (WebSocket websocket : list) {
|
for (WebSocket websocket : list) {
|
||||||
|
if (predicate != null && !predicate.test(websocket)) continue;
|
||||||
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
|
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -155,11 +168,13 @@ public final class WebSocketEngine {
|
|||||||
CompletableFuture<Integer> future = null;
|
CompletableFuture<Integer> future = null;
|
||||||
if (single) {
|
if (single) {
|
||||||
for (WebSocket websocket : websockets.values()) {
|
for (WebSocket websocket : websockets.values()) {
|
||||||
|
if (predicate != null && !predicate.test(websocket)) continue;
|
||||||
future = future == null ? websocket.send(message, last) : future.thenCombine(websocket.send(message, last), (a, b) -> a | (Integer) b);
|
future = future == null ? websocket.send(message, last) : future.thenCombine(websocket.send(message, last), (a, b) -> a | (Integer) b);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for (List<WebSocket> list : websockets2.values()) {
|
for (List<WebSocket> list : websockets2.values()) {
|
||||||
for (WebSocket websocket : list) {
|
for (WebSocket websocket : list) {
|
||||||
|
if (predicate != null && !predicate.test(websocket)) continue;
|
||||||
future = future == null ? websocket.send(message, last) : future.thenCombine(websocket.send(message, last), (a, b) -> a | (Integer) b);
|
future = future == null ? websocket.send(message, last) : future.thenCombine(websocket.send(message, last), (a, b) -> a | (Integer) b);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -168,6 +183,7 @@ public final class WebSocketEngine {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Comment("给指定用户组发送消息")
|
||||||
public CompletableFuture<Integer> sendMessage(final Object message, final boolean last, final Serializable... userids) {
|
public CompletableFuture<Integer> sendMessage(final Object message, final boolean last, final Serializable... userids) {
|
||||||
if (message instanceof CompletableFuture) {
|
if (message instanceof CompletableFuture) {
|
||||||
return ((CompletableFuture) message).thenCompose((json) -> sendMessage(json, last, userids));
|
return ((CompletableFuture) message).thenCompose((json) -> sendMessage(json, last, userids));
|
||||||
@@ -217,21 +233,33 @@ public final class WebSocketEngine {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Collection<WebSocket> getLocalWebSockets() {
|
@Comment("获取所有连接")
|
||||||
|
public Collection<WebSocket> getLocalWebSockets() {
|
||||||
if (single) return websockets.values();
|
if (single) return websockets.values();
|
||||||
List<WebSocket> list = new ArrayList<>();
|
List<WebSocket> list = new ArrayList<>();
|
||||||
websockets2.values().forEach(x -> list.addAll(x));
|
websockets2.values().forEach(x -> list.addAll(x));
|
||||||
return list;
|
return list;
|
||||||
}
|
}
|
||||||
|
|
||||||
//适用于单用户单连接模式
|
@Comment("获取当前连接总数")
|
||||||
|
public int getLocalWebSocketSize() {
|
||||||
|
if (single) return websockets.size();
|
||||||
|
return (int) websockets2.values().stream().mapToInt(sublist -> sublist.size()).count();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Comment("获取当前用户总数")
|
||||||
|
public int getLocalUserSize() {
|
||||||
|
return single ? websockets.size() : websockets2.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Comment("适用于单用户单连接模式")
|
||||||
public WebSocket findLocalWebSocket(Serializable userid) {
|
public WebSocket findLocalWebSocket(Serializable userid) {
|
||||||
if (single) return websockets.get(userid);
|
if (single) return websockets.get(userid);
|
||||||
List<WebSocket> list = websockets2.get(userid);
|
List<WebSocket> list = websockets2.get(userid);
|
||||||
return (list == null || list.isEmpty()) ? null : list.get(list.size() - 1);
|
return (list == null || list.isEmpty()) ? null : list.get(list.size() - 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
//适用于单用户多连接模式
|
@Comment("适用于单用户多连接模式")
|
||||||
public Stream<WebSocket> getLocalWebSockets(Serializable userid) {
|
public Stream<WebSocket> getLocalWebSockets(Serializable userid) {
|
||||||
if (single) {
|
if (single) {
|
||||||
WebSocket websocket = websockets.get(userid);
|
WebSocket websocket = websockets.get(userid);
|
||||||
|
|||||||
@@ -26,6 +26,12 @@ import org.redkale.util.*;
|
|||||||
*/
|
*/
|
||||||
public abstract class WebSocketNode {
|
public abstract class WebSocketNode {
|
||||||
|
|
||||||
|
@Comment("存储当前SNCP节点列表的key")
|
||||||
|
public static final String SOURCE_SNCP_NODES_KEY = "redkale_sncpnodes";
|
||||||
|
|
||||||
|
@Comment("存储当前用户数量的key")
|
||||||
|
public static final String SOURCE_USER_COUNT_KEY = "redkale_usercount";
|
||||||
|
|
||||||
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
|
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
|
||||||
|
|
||||||
protected final boolean finest = logger.isLoggable(Level.FINEST);
|
protected final boolean finest = logger.isLoggable(Level.FINEST);
|
||||||
@@ -57,7 +63,9 @@ public abstract class WebSocketNode {
|
|||||||
if (this.localEngine == null) return;
|
if (this.localEngine == null) return;
|
||||||
//关掉所有本地本地WebSocket
|
//关掉所有本地本地WebSocket
|
||||||
this.localEngine.getLocalWebSockets().forEach(g -> disconnect(g.getUserid()));
|
this.localEngine.getLocalWebSockets().forEach(g -> disconnect(g.getUserid()));
|
||||||
if (sncpNodeAddresses != null && localSncpAddress != null) sncpNodeAddresses.removeSetItem("redkale_sncpnodes", localSncpAddress);
|
if (sncpNodeAddresses != null && localSncpAddress != null) {
|
||||||
|
sncpNodeAddresses.removeSetItem(SOURCE_SNCP_NODES_KEY, localSncpAddress);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract CompletableFuture<List<String>> getWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable userid);
|
protected abstract CompletableFuture<List<String>> getWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable userid);
|
||||||
@@ -140,7 +148,55 @@ public abstract class WebSocketNode {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 判断指定用户是否WebSocket在线
|
||||||
|
*
|
||||||
|
* @param userid
|
||||||
|
*
|
||||||
|
* @return boolean
|
||||||
|
*/
|
||||||
|
public CompletableFuture<Boolean> existsWebSocket(final Serializable userid) {
|
||||||
|
if (this.localEngine != null && this.sncpNodeAddresses == null) {
|
||||||
|
return CompletableFuture.completedFuture(this.localEngine.existsLocalWebSocket(userid));
|
||||||
|
}
|
||||||
|
return this.sncpNodeAddresses.existsAsync(userid);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取在线用户总数
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* @return boolean
|
||||||
|
*/
|
||||||
|
public CompletableFuture<Integer> getUserSize() {
|
||||||
|
if (this.localEngine != null && this.sncpNodeAddresses == null) {
|
||||||
|
return CompletableFuture.completedFuture(this.localEngine.getLocalUserSize());
|
||||||
|
}
|
||||||
|
return this.sncpNodeAddresses.getKeySizeAsync().thenCompose(count -> {
|
||||||
|
return sncpNodeAddresses.existsAsync(SOURCE_SNCP_NODES_KEY).thenApply(exists -> exists ? (count - 1) : count);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
//--------------------------------------------------------------------------------
|
//--------------------------------------------------------------------------------
|
||||||
|
/**
|
||||||
|
* 获取本地的WebSocketEngine,没有则返回null
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* @return WebSocketEngine
|
||||||
|
*/
|
||||||
|
public final WebSocketEngine getLocalWebSocketEngine() {
|
||||||
|
return this.localEngine;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 向指定用户发送消息,先发送本地连接,再发送远程连接 <br>
|
||||||
|
* 如果当前WebSocketNode是远程模式,此方法只发送远程连接
|
||||||
|
*
|
||||||
|
* @param message 消息内容
|
||||||
|
* @param userids Serializable[]
|
||||||
|
*
|
||||||
|
* @return 为0表示成功, 其他值表示部分发送异常
|
||||||
|
*/
|
||||||
public final CompletableFuture<Integer> sendMessage(Object message, final Serializable... userids) {
|
public final CompletableFuture<Integer> sendMessage(Object message, final Serializable... userids) {
|
||||||
return sendMessage(message, true, userids);
|
return sendMessage(message, true, userids);
|
||||||
}
|
}
|
||||||
@@ -155,7 +211,6 @@ public abstract class WebSocketNode {
|
|||||||
*
|
*
|
||||||
* @return 为0表示成功, 其他值表示部分发送异常
|
* @return 为0表示成功, 其他值表示部分发送异常
|
||||||
*/
|
*/
|
||||||
//最近连接发送逻辑还没有理清楚
|
|
||||||
public final CompletableFuture<Integer> sendMessage(final Object message, final boolean last, final Serializable... userids) {
|
public final CompletableFuture<Integer> sendMessage(final Object message, final boolean last, final Serializable... userids) {
|
||||||
if (userids == null || userids.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
if (userids == null || userids.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||||
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
|
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式
|
||||||
|
|||||||
@@ -68,7 +68,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
|
|||||||
@Override
|
@Override
|
||||||
public CompletableFuture<Void> connect(Serializable userid, InetSocketAddress sncpAddr) {
|
public CompletableFuture<Void> connect(Serializable userid, InetSocketAddress sncpAddr) {
|
||||||
CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(userid, sncpAddr);
|
CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(userid, sncpAddr);
|
||||||
future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync("redkale_sncpnodes", sncpAddr));
|
future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_NODES_KEY, sncpAddr));
|
||||||
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + sncpAddr);
|
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + sncpAddr);
|
||||||
return future;
|
return future;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -348,13 +348,13 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getCollectionSize(final K key) {
|
public int getCollectionSize(final K key) {
|
||||||
Collection<V> collection = (Collection<V>) get(key);
|
Collection<V> collection = (Collection<V>) get(key);
|
||||||
return collection == null ? 0 : collection.size();
|
return collection == null ? 0 : collection.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompletableFuture<Long> getCollectionSizeAsync(final K key) {
|
public CompletableFuture<Integer> getCollectionSizeAsync(final K key) {
|
||||||
return CompletableFuture.supplyAsync(() -> getCollectionSize(key), getExecutor());
|
return CompletableFuture.supplyAsync(() -> getCollectionSize(key), getExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -443,6 +443,11 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
|
|||||||
return new ArrayList<>(container.keySet());
|
return new ArrayList<>(container.keySet());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getKeySize() {
|
||||||
|
return container.size();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompletableFuture<List<CacheEntry<K, Object>>> queryListAsync() {
|
public CompletableFuture<List<CacheEntry<K, Object>>> queryListAsync() {
|
||||||
return CompletableFuture.completedFuture(new ArrayList<>(container.values()));
|
return CompletableFuture.completedFuture(new ArrayList<>(container.values()));
|
||||||
@@ -458,4 +463,8 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
|
|||||||
return CompletableFuture.completedFuture(new ArrayList<>(container.keySet()));
|
return CompletableFuture.completedFuture(new ArrayList<>(container.keySet()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Integer> getKeySizeAsync() {
|
||||||
|
return CompletableFuture.completedFuture(container.size());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -45,7 +45,7 @@ public interface CacheSource<K extends Serializable, V extends Object> {
|
|||||||
|
|
||||||
public Collection<V> getCollection(final K key);
|
public Collection<V> getCollection(final K key);
|
||||||
|
|
||||||
public long getCollectionSize(final K key);
|
public int getCollectionSize(final K key);
|
||||||
|
|
||||||
public Collection<V> getCollectionAndRefresh(final K key, final int expireSeconds);
|
public Collection<V> getCollectionAndRefresh(final K key, final int expireSeconds);
|
||||||
|
|
||||||
@@ -59,6 +59,8 @@ public interface CacheSource<K extends Serializable, V extends Object> {
|
|||||||
|
|
||||||
public List<K> queryKeys();
|
public List<K> queryKeys();
|
||||||
|
|
||||||
|
public int getKeySize();
|
||||||
|
|
||||||
public List<CacheEntry<K, Object>> queryList();
|
public List<CacheEntry<K, Object>> queryList();
|
||||||
|
|
||||||
//---------------------- CompletableFuture 异步版 ---------------------------------
|
//---------------------- CompletableFuture 异步版 ---------------------------------
|
||||||
@@ -80,7 +82,7 @@ public interface CacheSource<K extends Serializable, V extends Object> {
|
|||||||
|
|
||||||
public CompletableFuture<Collection<V>> getCollectionAsync(final K key);
|
public CompletableFuture<Collection<V>> getCollectionAsync(final K key);
|
||||||
|
|
||||||
public CompletableFuture<Long> getCollectionSizeAsync(final K key);
|
public CompletableFuture<Integer> getCollectionSizeAsync(final K key);
|
||||||
|
|
||||||
public CompletableFuture<Collection<V>> getCollectionAndRefreshAsync(final K key, final int expireSeconds);
|
public CompletableFuture<Collection<V>> getCollectionAndRefreshAsync(final K key, final int expireSeconds);
|
||||||
|
|
||||||
@@ -94,6 +96,8 @@ public interface CacheSource<K extends Serializable, V extends Object> {
|
|||||||
|
|
||||||
public CompletableFuture<List<K>> queryKeysAsync();
|
public CompletableFuture<List<K>> queryKeysAsync();
|
||||||
|
|
||||||
|
public CompletableFuture<Integer> getKeySizeAsync();
|
||||||
|
|
||||||
public CompletableFuture<List<CacheEntry<K, Object>>> queryListAsync();
|
public CompletableFuture<List<CacheEntry<K, Object>>> queryListAsync();
|
||||||
|
|
||||||
default CompletableFuture<Boolean> isOpenAsync() {
|
default CompletableFuture<Boolean> isOpenAsync() {
|
||||||
|
|||||||
@@ -31,26 +31,26 @@ import org.redkale.util.*;
|
|||||||
@ResourceType(DataSource.class)
|
@ResourceType(DataSource.class)
|
||||||
public class DataJdbcSource extends AbstractService implements DataSource, Service, DataCacheListener, Function<Class, EntityInfo>, AutoCloseable, Resourcable {
|
public class DataJdbcSource extends AbstractService implements DataSource, Service, DataCacheListener, Function<Class, EntityInfo>, AutoCloseable, Resourcable {
|
||||||
|
|
||||||
private static final Flipper FLIPPER_ONE = new Flipper(1);
|
protected static final Flipper FLIPPER_ONE = new Flipper(1);
|
||||||
|
|
||||||
final Logger logger = Logger.getLogger(DataJdbcSource.class.getSimpleName());
|
protected final Logger logger = Logger.getLogger(DataJdbcSource.class.getSimpleName());
|
||||||
|
|
||||||
final AtomicBoolean debug = new AtomicBoolean(logger.isLoggable(Level.FINEST));
|
protected final AtomicBoolean debug = new AtomicBoolean(logger.isLoggable(Level.FINEST));
|
||||||
|
|
||||||
final String name;
|
protected final String name;
|
||||||
|
|
||||||
final URL conf;
|
protected final URL conf;
|
||||||
|
|
||||||
final boolean cacheForbidden;
|
protected final boolean cacheForbidden;
|
||||||
|
|
||||||
private final PoolJdbcSource readPool;
|
protected final PoolJdbcSource readPool;
|
||||||
|
|
||||||
private final PoolJdbcSource writePool;
|
protected final PoolJdbcSource writePool;
|
||||||
|
|
||||||
@Resource(name = "$")
|
@Resource(name = "$")
|
||||||
private DataCacheListener cacheListener;
|
protected DataCacheListener cacheListener;
|
||||||
|
|
||||||
private final BiFunction<DataSource, Class, List> fullloader = (s, t) -> querySheet(false, false, t, null, null, (FilterNode) null).list(true);
|
protected final BiFunction<DataSource, Class, List> fullloader = (s, t) -> querySheet(false, false, t, null, null, (FilterNode) null).list(true);
|
||||||
|
|
||||||
public DataJdbcSource(String unitName, Properties readprop, Properties writeprop) {
|
public DataJdbcSource(String unitName, Properties readprop, Properties writeprop) {
|
||||||
this.name = unitName;
|
this.name = unitName;
|
||||||
@@ -71,15 +71,15 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
|||||||
writePool.close();
|
writePool.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Connection createReadSQLConnection() {
|
protected Connection createReadSQLConnection() {
|
||||||
return readPool.poll();
|
return readPool.poll();
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> Connection createWriteSQLConnection() {
|
protected <T> Connection createWriteSQLConnection() {
|
||||||
return writePool.poll();
|
return writePool.poll();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void closeSQLConnection(final Connection sqlconn) {
|
protected void closeSQLConnection(final Connection sqlconn) {
|
||||||
if (sqlconn == null) return;
|
if (sqlconn == null) return;
|
||||||
try {
|
try {
|
||||||
sqlconn.close();
|
sqlconn.close();
|
||||||
@@ -93,7 +93,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
|||||||
return loadEntityInfo(t);
|
return loadEntityInfo(t);
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> EntityInfo<T> loadEntityInfo(Class<T> clazz) {
|
protected <T> EntityInfo<T> loadEntityInfo(Class<T> clazz) {
|
||||||
return EntityInfo.load(clazz, this.cacheForbidden, this.readPool.props, this, fullloader);
|
return EntityInfo.load(clazz, this.cacheForbidden, this.readPool.props, this, fullloader);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -150,7 +150,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
|||||||
return CompletableFuture.runAsync(() -> insert(values), getExecutor());
|
return CompletableFuture.runAsync(() -> insert(values), getExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> void insert(final Connection conn, final EntityInfo<T> info, T... values) {
|
protected <T> void insert(final Connection conn, final EntityInfo<T> info, T... values) {
|
||||||
if (values.length == 0) return;
|
if (values.length == 0) return;
|
||||||
try {
|
try {
|
||||||
if (!info.isVirtualEntity()) {
|
if (!info.isVirtualEntity()) {
|
||||||
@@ -251,7 +251,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> PreparedStatement createInsertPreparedStatement(final Connection conn, final String sql,
|
protected <T> PreparedStatement createInsertPreparedStatement(final Connection conn, final String sql,
|
||||||
final EntityInfo<T> info, T... values) throws SQLException {
|
final EntityInfo<T> info, T... values) throws SQLException {
|
||||||
Attribute<T, Serializable>[] attrs = info.insertAttributes;
|
Attribute<T, Serializable>[] attrs = info.insertAttributes;
|
||||||
final PreparedStatement prestmt = info.autoGenerated ? conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS) : conn.prepareStatement(sql);
|
final PreparedStatement prestmt = info.autoGenerated ? conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS) : conn.prepareStatement(sql);
|
||||||
@@ -328,7 +328,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
|||||||
return CompletableFuture.supplyAsync(() -> delete(values), getExecutor());
|
return CompletableFuture.supplyAsync(() -> delete(values), getExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> int delete(final Connection conn, final EntityInfo<T> info, T... values) {
|
protected <T> int delete(final Connection conn, final EntityInfo<T> info, T... values) {
|
||||||
if (values.length == 0) return -1;
|
if (values.length == 0) return -1;
|
||||||
final Attribute primary = info.getPrimary();
|
final Attribute primary = info.getPrimary();
|
||||||
Serializable[] ids = new Serializable[values.length];
|
Serializable[] ids = new Serializable[values.length];
|
||||||
@@ -358,7 +358,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
|||||||
return CompletableFuture.supplyAsync(() -> delete(clazz, ids), getExecutor());
|
return CompletableFuture.supplyAsync(() -> delete(clazz, ids), getExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> int delete(final Connection conn, final EntityInfo<T> info, Serializable... keys) {
|
protected <T> int delete(final Connection conn, final EntityInfo<T> info, Serializable... keys) {
|
||||||
if (keys.length == 0) return -1;
|
if (keys.length == 0) return -1;
|
||||||
int c = -1;
|
int c = -1;
|
||||||
int c2 = 0;
|
int c2 = 0;
|
||||||
@@ -430,7 +430,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
|||||||
return CompletableFuture.supplyAsync(() -> delete(clazz, flipper, node), getExecutor());
|
return CompletableFuture.supplyAsync(() -> delete(clazz, flipper, node), getExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> int delete(final Connection conn, final EntityInfo<T> info, final Flipper flipper, final FilterNode node) {
|
protected <T> int delete(final Connection conn, final EntityInfo<T> info, final Flipper flipper, final FilterNode node) {
|
||||||
int c = -1;
|
int c = -1;
|
||||||
try {
|
try {
|
||||||
if (!info.isVirtualEntity()) {
|
if (!info.isVirtualEntity()) {
|
||||||
@@ -521,7 +521,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
|||||||
return CompletableFuture.supplyAsync(() -> update(values), getExecutor());
|
return CompletableFuture.supplyAsync(() -> update(values), getExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> int update(final Connection conn, final EntityInfo<T> info, T... values) {
|
protected <T> int update(final Connection conn, final EntityInfo<T> info, T... values) {
|
||||||
try {
|
try {
|
||||||
Class clazz = info.getType();
|
Class clazz = info.getType();
|
||||||
int c = -1;
|
int c = -1;
|
||||||
@@ -617,7 +617,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
|||||||
return CompletableFuture.supplyAsync(() -> updateColumn(clazz, id, column, value), getExecutor());
|
return CompletableFuture.supplyAsync(() -> updateColumn(clazz, id, column, value), getExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> int updateColumn(Connection conn, final EntityInfo<T> info, Serializable id, String column, final Serializable value) {
|
protected <T> int updateColumn(Connection conn, final EntityInfo<T> info, Serializable id, String column, final Serializable value) {
|
||||||
try {
|
try {
|
||||||
int c = -1;
|
int c = -1;
|
||||||
if (!info.isVirtualEntity()) {
|
if (!info.isVirtualEntity()) {
|
||||||
@@ -684,7 +684,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
|||||||
return CompletableFuture.supplyAsync(() -> updateColumn(clazz, column, value, node), getExecutor());
|
return CompletableFuture.supplyAsync(() -> updateColumn(clazz, column, value, node), getExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> int updateColumn(Connection conn, final EntityInfo<T> info, String column, final Serializable value, FilterNode node) {
|
protected <T> int updateColumn(Connection conn, final EntityInfo<T> info, String column, final Serializable value, FilterNode node) {
|
||||||
try {
|
try {
|
||||||
int c = -1;
|
int c = -1;
|
||||||
if (!info.isVirtualEntity()) {
|
if (!info.isVirtualEntity()) {
|
||||||
@@ -766,7 +766,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
|||||||
return CompletableFuture.supplyAsync(() -> updateColumn(clazz, id, values), getExecutor());
|
return CompletableFuture.supplyAsync(() -> updateColumn(clazz, id, values), getExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> int updateColumn(final Connection conn, final EntityInfo<T> info, final Serializable id, final ColumnValue... values) {
|
protected <T> int updateColumn(final Connection conn, final EntityInfo<T> info, final Serializable id, final ColumnValue... values) {
|
||||||
if (values == null || values.length < 1) return -1;
|
if (values == null || values.length < 1) return -1;
|
||||||
try {
|
try {
|
||||||
StringBuilder setsql = new StringBuilder();
|
StringBuilder setsql = new StringBuilder();
|
||||||
@@ -882,7 +882,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
|||||||
return CompletableFuture.supplyAsync(() -> updateColumn(clazz, node, flipper, values), getExecutor());
|
return CompletableFuture.supplyAsync(() -> updateColumn(clazz, node, flipper, values), getExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> int updateColumn(final Connection conn, final EntityInfo<T> info, final FilterNode node, final Flipper flipper, final ColumnValue... values) {
|
protected <T> int updateColumn(final Connection conn, final EntityInfo<T> info, final FilterNode node, final Flipper flipper, final ColumnValue... values) {
|
||||||
if (values == null || values.length < 1) return -1;
|
if (values == null || values.length < 1) return -1;
|
||||||
try {
|
try {
|
||||||
StringBuilder setsql = new StringBuilder();
|
StringBuilder setsql = new StringBuilder();
|
||||||
@@ -992,7 +992,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
|||||||
return CompletableFuture.supplyAsync(() -> updateColumn(bean, selects), getExecutor());
|
return CompletableFuture.supplyAsync(() -> updateColumn(bean, selects), getExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> int updateColumns(final Connection conn, final EntityInfo<T> info, final T bean, final SelectColumn selects) {
|
protected <T> int updateColumns(final Connection conn, final EntityInfo<T> info, final T bean, final SelectColumn selects) {
|
||||||
if (bean == null || selects == null) return -1;
|
if (bean == null || selects == null) return -1;
|
||||||
try {
|
try {
|
||||||
final Class<T> clazz = (Class<T>) bean.getClass();
|
final Class<T> clazz = (Class<T>) bean.getClass();
|
||||||
@@ -1068,7 +1068,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
|||||||
return CompletableFuture.supplyAsync(() -> updateColumn(bean, node, selects), getExecutor());
|
return CompletableFuture.supplyAsync(() -> updateColumn(bean, node, selects), getExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> int updateColumns(final Connection conn, final EntityInfo<T> info, final T bean, final FilterNode node, final SelectColumn selects) {
|
protected <T> int updateColumns(final Connection conn, final EntityInfo<T> info, final T bean, final FilterNode node, final SelectColumn selects) {
|
||||||
if (bean == null || node == null || selects == null) return -1;
|
if (bean == null || node == null || selects == null) return -1;
|
||||||
try {
|
try {
|
||||||
final Class<T> clazz = (Class<T>) bean.getClass();
|
final Class<T> clazz = (Class<T>) bean.getClass();
|
||||||
@@ -1877,7 +1877,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
|||||||
return CompletableFuture.supplyAsync(() -> queryColumnSheet(selectedColumn, clazz, flipper, node), getExecutor());
|
return CompletableFuture.supplyAsync(() -> queryColumnSheet(selectedColumn, clazz, flipper, node), getExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T, V extends Serializable> Sheet<V> queryColumnSheet(final boolean needtotal, final String selectedColumn, final Class<T> clazz, final Flipper flipper, final FilterNode node) {
|
protected <T, V extends Serializable> Sheet<V> queryColumnSheet(final boolean needtotal, final String selectedColumn, final Class<T> clazz, final Flipper flipper, final FilterNode node) {
|
||||||
Sheet<T> sheet = querySheet(true, needtotal, clazz, SelectColumn.createIncludes(selectedColumn), flipper, node);
|
Sheet<T> sheet = querySheet(true, needtotal, clazz, SelectColumn.createIncludes(selectedColumn), flipper, node);
|
||||||
final Sheet<V> rs = new Sheet<>();
|
final Sheet<V> rs = new Sheet<>();
|
||||||
if (sheet.isEmpty()) return rs;
|
if (sheet.isEmpty()) return rs;
|
||||||
@@ -2083,7 +2083,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
|||||||
return CompletableFuture.supplyAsync(() -> querySheet(clazz, selects, flipper, node), getExecutor());
|
return CompletableFuture.supplyAsync(() -> querySheet(clazz, selects, flipper, node), getExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> Sheet<T> querySheet(final boolean readcache, final boolean needtotal, final Class<T> clazz, final SelectColumn selects, final Flipper flipper, final FilterNode node) {
|
protected <T> Sheet<T> querySheet(final boolean readcache, final boolean needtotal, final Class<T> clazz, final SelectColumn selects, final Flipper flipper, final FilterNode node) {
|
||||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||||
final EntityCache<T> cache = info.getCache();
|
final EntityCache<T> cache = info.getCache();
|
||||||
if (readcache && cache != null && cache.isFullLoaded()) {
|
if (readcache && cache != null && cache.isFullLoaded()) {
|
||||||
@@ -2134,7 +2134,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static StringBuilder multisplit(char ch1, char ch2, String split, StringBuilder sb, String str, int from) {
|
protected static StringBuilder multisplit(char ch1, char ch2, String split, StringBuilder sb, String str, int from) {
|
||||||
if (str == null) return sb;
|
if (str == null) return sb;
|
||||||
int pos1 = str.indexOf(ch1, from);
|
int pos1 = str.indexOf(ch1, from);
|
||||||
if (pos1 < 0) return sb;
|
if (pos1 < 0) return sb;
|
||||||
@@ -2145,7 +2145,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
|
|||||||
return multisplit(ch1, ch2, split, sb, str, pos2 + 1);
|
return multisplit(ch1, ch2, split, sb, str, pos2 + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
private int[] directExecute(final Connection conn, String... sqls) {
|
protected int[] directExecute(final Connection conn, String... sqls) {
|
||||||
if (sqls.length == 0) return new int[0];
|
if (sqls.length == 0) return new int[0];
|
||||||
try {
|
try {
|
||||||
conn.setReadOnly(false);
|
conn.setReadOnly(false);
|
||||||
|
|||||||
@@ -5,14 +5,13 @@
|
|||||||
*/
|
*/
|
||||||
package org.redkale.source;
|
package org.redkale.source;
|
||||||
|
|
||||||
import com.sun.istack.internal.logging.Logger;
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.lang.reflect.*;
|
import java.lang.reflect.*;
|
||||||
import java.sql.*;
|
import java.sql.*;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.function.*;
|
import java.util.function.*;
|
||||||
import java.util.logging.Level;
|
import java.util.logging.*;
|
||||||
import javax.persistence.*;
|
import javax.persistence.*;
|
||||||
import org.redkale.util.*;
|
import org.redkale.util.*;
|
||||||
|
|
||||||
@@ -32,7 +31,7 @@ public final class EntityInfo<T> {
|
|||||||
private static final ConcurrentHashMap<Class, EntityInfo> entityInfos = new ConcurrentHashMap<>();
|
private static final ConcurrentHashMap<Class, EntityInfo> entityInfos = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
//日志
|
//日志
|
||||||
private static final Logger logger = Logger.getLogger(EntityInfo.class);
|
private static final Logger logger = Logger.getLogger(EntityInfo.class.getSimpleName());
|
||||||
|
|
||||||
//Entity类名
|
//Entity类名
|
||||||
private final Class<T> type;
|
private final Class<T> type;
|
||||||
@@ -194,7 +193,7 @@ public final class EntityInfo<T> {
|
|||||||
try {
|
try {
|
||||||
loader = type.getAnnotation(VirtualEntity.class).loader().newInstance();
|
loader = type.getAnnotation(VirtualEntity.class).loader().newInstance();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.severe(type + " init @VirtualEntity.loader error", e);
|
logger.log(Level.SEVERE, type + " init @VirtualEntity.loader error", e);
|
||||||
}
|
}
|
||||||
this.fullloader = loader;
|
this.fullloader = loader;
|
||||||
} else {
|
} else {
|
||||||
@@ -207,7 +206,7 @@ public final class EntityInfo<T> {
|
|||||||
try {
|
try {
|
||||||
dts = (dt == null) ? null : dt.strategy().newInstance();
|
dts = (dt == null) ? null : dt.strategy().newInstance();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.severe(type + " init DistributeTableStrategy error", e);
|
logger.log(Level.SEVERE, type + " init DistributeTableStrategy error", e);
|
||||||
}
|
}
|
||||||
this.tableStrategy = dts;
|
this.tableStrategy = dts;
|
||||||
|
|
||||||
@@ -216,7 +215,7 @@ public final class EntityInfo<T> {
|
|||||||
try {
|
try {
|
||||||
cp = this.creator.getClass().getMethod("create", Object[].class).getAnnotation(Creator.ConstructorParameters.class);
|
cp = this.creator.getClass().getMethod("create", Object[].class).getAnnotation(Creator.ConstructorParameters.class);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.severe(type + " cannot find ConstructorParameters Creator", e);
|
logger.log(Level.SEVERE, type + " cannot find ConstructorParameters Creator", e);
|
||||||
}
|
}
|
||||||
this.constructorParameters = (cp == null || cp.value().length < 1) ? null : cp.value();
|
this.constructorParameters = (cp == null || cp.value().length < 1) ? null : cp.value();
|
||||||
Attribute idAttr0 = null;
|
Attribute idAttr0 = null;
|
||||||
|
|||||||
@@ -85,6 +85,13 @@ public class PoolJdbcSource {
|
|||||||
if (this.isOracle()) {
|
if (this.isOracle()) {
|
||||||
this.props.setProperty(JDBC_CONTAIN_SQLTEMPLATE, "INSTR(${keystr}, ${column}) > 0");
|
this.props.setProperty(JDBC_CONTAIN_SQLTEMPLATE, "INSTR(${keystr}, ${column}) > 0");
|
||||||
this.props.setProperty(JDBC_NOTCONTAIN_SQLTEMPLATE, "INSTR(${keystr}, ${column}) = 0");
|
this.props.setProperty(JDBC_NOTCONTAIN_SQLTEMPLATE, "INSTR(${keystr}, ${column}) = 0");
|
||||||
|
if (!this.props.containsKey(JDBC_TABLENOTEXIST_SQLSTATES)) {
|
||||||
|
this.props.setProperty(JDBC_TABLENOTEXIST_SQLSTATES, "42000;42S02");
|
||||||
|
}
|
||||||
|
if (!this.props.containsKey(JDBC_TABLECOPY_SQLTEMPLATE)) {
|
||||||
|
//注意:此语句复制表结构会导致默认值和主键信息的丢失
|
||||||
|
this.props.setProperty(JDBC_TABLECOPY_SQLTEMPLATE, "CREATE TABLE ${newtable} AS SELECT * FROM ${oldtable} WHERE 1=2");
|
||||||
|
}
|
||||||
} else if (this.isSqlserver()) {
|
} else if (this.isSqlserver()) {
|
||||||
this.props.setProperty(JDBC_CONTAIN_SQLTEMPLATE, "CHARINDEX(${column}, ${keystr}) > 0");
|
this.props.setProperty(JDBC_CONTAIN_SQLTEMPLATE, "CHARINDEX(${column}, ${keystr}) > 0");
|
||||||
this.props.setProperty(JDBC_NOTCONTAIN_SQLTEMPLATE, "CHARINDEX(${column}, ${keystr}) = 0");
|
this.props.setProperty(JDBC_NOTCONTAIN_SQLTEMPLATE, "CHARINDEX(${column}, ${keystr}) = 0");
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ public final class Redkale {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static String getDotedVersion() {
|
public static String getDotedVersion() {
|
||||||
return "1.8.0";
|
return "1.8.1";
|
||||||
}
|
}
|
||||||
|
|
||||||
public static int getMajorVersion() {
|
public static int getMajorVersion() {
|
||||||
|
|||||||
Reference in New Issue
Block a user