29 Commits
1.8.0 ... 1.8.1

Author SHA1 Message Date
Redkale
44507a97a6 兼容Oracle的复制表结构 2017-07-25 11:05:49 +08:00
Redkale
f4a7f1cff6 修复负载均衡无法正确切换节点的BUG 2017-07-24 18:29:40 +08:00
Redkale
a5fcb45a88 2017-07-24 17:34:07 +08:00
Redkale
bc8b68526d 2017-07-24 17:32:45 +08:00
Redkale
180f201dc0 修复多个<services>节点且其下指定service时无法获取所有service实例的BUG 2017-07-24 17:20:16 +08:00
Redkale
9ab315a405 2017-07-24 16:10:13 +08:00
Redkale
27b4742b6d 增加 TransportStrategy 功能 2017-07-21 16:07:13 +08:00
Redkale
702220d18e 2017-07-19 10:51:17 +08:00
Redkale
414489da8e 2017-07-15 22:24:10 +08:00
Redkale
77057df25d 2017-07-11 11:59:12 +08:00
Redkale
2f98cd1ab5 2017-07-11 11:29:57 +08:00
Redkale
8809fe8ec9 2017-07-11 10:09:49 +08:00
Redkale
f9702a9517 WebSocket时request.keepAlive设置为false 2017-07-11 10:07:42 +08:00
Redkale
29e46b9b68 暂时屏蔽304 2017-07-11 09:47:40 +08:00
Redkale
f838e35413 返回304的响应中增加Content-Length值 2017-07-11 09:30:27 +08:00
Redkale
f3bb77c49b 增加获取最后一次ping的时间点 2017-07-10 22:46:16 +08:00
Redkale
12fa033e15 增加判断用户是否在线和获取在线用户总数接口 2017-07-05 15:57:22 +08:00
Redkale
f4abfafea2 2017-07-05 15:42:54 +08:00
Redkale
0918af71d2 CacheSource增加getKeySize方法 2017-07-05 15:18:16 +08:00
Redkale
275befa330 private方法改成protected,方便重载 2017-07-05 10:28:55 +08:00
Redkale
ab4cd8bcb6 2017-07-02 17:30:08 +08:00
Redkale
36c109b32f 2017-07-02 17:29:22 +08:00
Redkale
73a915665d 2017-07-02 17:00:59 +08:00
Redkale
bd6d71c94a 修复@RestWebSocket对应的WebSocketServlet.resourceName没有重载的BUG 2017-07-01 16:31:00 +08:00
Redkale
842e93507c WebSocketEngine.broadcastMessage 增加 Predicate<WebSocket> 参数 2017-07-01 15:46:51 +08:00
Redkale
76df1108d7 WebSocketEngine.broadcastMessage 增加 Predicate<WebSocket> 参数 2017-07-01 15:38:47 +08:00
Redkale
941d09cde2 2017-07-01 15:29:26 +08:00
Redkale
9dd3e1da07 允许本地模式的Service通过@Resource可以获取Application、ResourceFactory等资源 2017-06-30 13:35:44 +08:00
Redkale
2bf73245ec Redkale 1.8.1 开始 2017-06-26 09:40:30 +08:00
24 changed files with 409 additions and 122 deletions

View File

@@ -18,7 +18,7 @@ java.util.logging.FileHandler.limit = 10485760
java.util.logging.FileHandler.count = 10000
java.util.logging.FileHandler.encoding = UTF-8
java.util.logging.FileHandler.pattern = ${APP_HOME}/logs-%m/log-%d.log
java.util.logging.FileHandler.unusual = ${APP_HOME}/logs-%m/log-warnerr-%u.log
java.util.logging.FileHandler.unusual = ${APP_HOME}/logs-%m/log-warnerr-%d.log
java.util.logging.FileHandler.append = true
java.util.logging.ConsoleHandler.level = FINER

View File

@@ -37,6 +37,7 @@
threads 线程总数, 默认: <group>节点数*CPU核数*8
bufferCapacity: ByteBuffer的初始化大小 默认: 8K;
bufferPoolSize ByteBuffer池的大小默认: <group>节点数*CPU核数*8
strategy: 远程请求的负载均衡策略, 必须是org.redkale.net.TransportStrategy的实现类
-->
<transport bufferCapacity="8K" bufferPoolSize="32" threads="32"/>

View File

@@ -15,9 +15,9 @@ com.sun.level = INFO
java.util.logging.FileHandler.limit = 10485760
java.util.logging.FileHandler.count = 100
java.util.logging.FileHandler.encoding = UTF-8
java.util.logging.FileHandler.pattern = ${APP_HOME}/logs-%m/log-%u.log
java.util.logging.FileHandler.pattern = ${APP_HOME}/logs-%m/log-%d.log
#java.util.logging.FileHandler.unusual \u5c5e\u6027\u8868\u793a\u5c06 WARNING\u3001SEVERE \u7ea7\u522b\u7684\u65e5\u5fd7\u590d\u5236\u5199\u5165\u5355\u72ec\u7684\u6587\u4ef6\u4e2d
java.util.logging.FileHandler.unusual = ${APP_HOME}/logs-%m/log-warnerr-%u.log
java.util.logging.FileHandler.unusual = ${APP_HOME}/logs-%m/log-warnerr-%d.log
java.util.logging.FileHandler.append = true
#java.util.logging.ConsoleHandler.level = FINE

View File

@@ -232,12 +232,14 @@ public final class Application {
}
this.logger = Logger.getLogger(this.getClass().getSimpleName());
this.serversLatch = new CountDownLatch(config.getAnyValues("server").length + 1);
this.classLoader = new RedkaleClassLoader(Thread.currentThread().getContextClassLoader());
logger.log(Level.INFO, "------------------------------- Redkale " + Redkale.getDotedVersion() + " -------------------------------");
//------------------配置 <transport> 节点 ------------------
ObjectPool<ByteBuffer> transportPool = null;
ExecutorService transportExec = null;
AsynchronousChannelGroup transportGroup = null;
final AnyValue resources = config.getAnyValue("resources");
TransportStrategy strategy = null;
if (resources != null) {
AnyValue transportConf = resources.getAnyValue("transport");
int groupsize = resources.getAnyValues("group").length;
@@ -257,6 +259,10 @@ public final class Application {
});
//-----------transportChannelGroup--------------
try {
final String strategyClass = transportConf.getValue("strategy");
if (strategyClass != null && !strategyClass.isEmpty()) {
strategy = (TransportStrategy) classLoader.loadClass(strategyClass).newInstance();
}
final AtomicInteger counter = new AtomicInteger();
transportExec = Executors.newFixedThreadPool(threads, (Runnable r) -> {
Thread t = new Thread(r);
@@ -271,8 +277,7 @@ public final class Application {
logger.log(Level.INFO, Transport.class.getSimpleName() + " configure bufferCapacity = " + bufferCapacity + "; bufferPoolSize = " + bufferPoolSize + "; threads = " + threads + ";");
}
}
this.transportFactory = new TransportFactory(transportExec, transportPool, transportGroup);
this.classLoader = new RedkaleClassLoader(Thread.currentThread().getContextClassLoader());
this.transportFactory = new TransportFactory(transportExec, transportPool, transportGroup, strategy);
Thread.currentThread().setContextClassLoader(this.classLoader);
this.serverClassLoader = new RedkaleClassLoader(this.classLoader);
}
@@ -379,7 +384,7 @@ public final class Application {
try {
Resource res = field.getAnnotation(Resource.class);
if (res == null) return;
if (!(src instanceof WatchService) || Sncp.isRemote((Service) src)) return; //远程模式不得注入
if (Sncp.isRemote((Service) src)) return; //远程模式不得注入
Class type = field.getType();
if (type == Application.class) {
field.set(src, application);
@@ -431,7 +436,7 @@ public final class Application {
return false;
}
}, Application.class, TransportFactory.class, NodeSncpServer.class, NodeHttpServer.class, NodeWatchServer.class);
}, Application.class, ResourceFactory.class, TransportFactory.class, NodeSncpServer.class, NodeHttpServer.class, NodeWatchServer.class);
//--------------------------------------------------------------------------
initResources();
}

View File

@@ -99,7 +99,11 @@ public final class ClassFilter<T> {
* @return Set&lt;FilterEntry&lt;T&gt;&gt;
*/
public final Set<FilterEntry<T>> getFilterEntrys() {
return entrys;
HashSet<FilterEntry<T>> set = new HashSet<>();
set.addAll(entrys);
if (ors != null) ors.forEach(f -> set.addAll(f.getFilterEntrys()));
if (ands != null) ands.forEach(f -> set.addAll(f.getFilterEntrys()));
return set;
}
/**
@@ -108,7 +112,11 @@ public final class ClassFilter<T> {
* @return Set&lt;FilterEntry&lt;T&gt;&gt;
*/
public final Set<FilterEntry<T>> getFilterExpectEntrys() {
return expectEntrys;
HashSet<FilterEntry<T>> set = new HashSet<>();
set.addAll(entrys);
if (ors != null) ors.forEach(f -> set.addAll(f.getFilterExpectEntrys()));
if (ands != null) ands.forEach(f -> set.addAll(f.getFilterExpectEntrys()));
return set;
}
/**
@@ -118,8 +126,8 @@ public final class ClassFilter<T> {
*/
public final Set<FilterEntry<T>> getAllFilterEntrys() {
HashSet<FilterEntry<T>> rs = new HashSet<>();
rs.addAll(entrys);
rs.addAll(expectEntrys);
rs.addAll(getFilterEntrys());
rs.addAll(getFilterExpectEntrys());
return rs;
}
@@ -183,7 +191,7 @@ public final class ClassFilter<T> {
} catch (Throwable cfe) {
if (finer && !clazzname.startsWith("sun.") && !clazzname.startsWith("javax.")
&& !clazzname.startsWith("com.sun.") && !clazzname.startsWith("jdk.")) {
//logger.log(Level.FINEST, ClassFilter.class.getSimpleName() + " filter error", cfe);
logger.log(Level.FINEST, ClassFilter.class.getSimpleName() + " filter error", cfe);
}
}
}

View File

@@ -435,7 +435,7 @@ public abstract class NodeServer {
}
protected ClassFilter<Service> createServiceClassFilter() {
return createClassFilter(this.sncpGroup, null, Service.class, (!isSNCP() || application.watching) ? null : new Class[]{org.redkale.watch.WatchService.class}, Annotation.class, "services", "service");
return createClassFilter(this.sncpGroup, null, Service.class, (!isSNCP() && application.watching) ? null : new Class[]{org.redkale.watch.WatchService.class}, Annotation.class, "services", "service");
}
protected ClassFilter createClassFilter(final String localGroup, Class<? extends Annotation> ref,

View File

@@ -107,7 +107,7 @@ public class NodeSncpServer extends NodeServer {
@Override
protected ClassFilter<Filter> createFilterClassFilter() {
return createClassFilter(null, null, SncpFilter.class, null, null, "filters", "filter");
return createClassFilter(null, null, SncpFilter.class, new Class[]{org.redkale.watch.WatchFilter.class}, null, "filters", "filter");
}
@Override

View File

@@ -11,6 +11,8 @@ import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Supplier;
import org.redkale.convert.*;
import org.redkale.convert.json.JsonConvert;
import org.redkale.util.*;
/**
@@ -52,19 +54,24 @@ public final class Transport {
protected final InetSocketAddress clientAddress;
protected InetSocketAddress[] remoteAddres = new InetSocketAddress[0];
protected TransportAddress[] transportAddres = new TransportAddress[0];
protected final ObjectPool<ByteBuffer> bufferPool;
//负载均衡策略
protected final TransportStrategy strategy;
protected final ConcurrentHashMap<SocketAddress, BlockingQueue<AsyncConnection>> connPool = new ConcurrentHashMap<>();
public Transport(String name, String subprotocol, final ObjectPool<ByteBuffer> transportBufferPool,
final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, final Collection<InetSocketAddress> addresses) {
this(name, DEFAULT_PROTOCOL, subprotocol, transportBufferPool, transportChannelGroup, clientAddress, addresses);
final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress,
final Collection<InetSocketAddress> addresses, final TransportStrategy strategy) {
this(name, DEFAULT_PROTOCOL, subprotocol, transportBufferPool, transportChannelGroup, clientAddress, addresses, strategy);
}
public Transport(String name, String protocol, String subprotocol, final ObjectPool<ByteBuffer> transportBufferPool,
final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress, final Collection<InetSocketAddress> addresses) {
final AsynchronousChannelGroup transportChannelGroup, final InetSocketAddress clientAddress,
final Collection<InetSocketAddress> addresses, final TransportStrategy strategy) {
this.name = name;
this.subprotocol = subprotocol == null ? "" : subprotocol.trim();
this.protocol = protocol;
@@ -72,32 +79,38 @@ public final class Transport {
this.group = transportChannelGroup;
this.bufferPool = transportBufferPool;
this.clientAddress = clientAddress;
this.strategy = strategy;
updateRemoteAddresses(addresses);
}
public final InetSocketAddress[] updateRemoteAddresses(final Collection<InetSocketAddress> addresses) {
InetSocketAddress[] oldAddresses = this.remoteAddres;
List<InetSocketAddress> list = new ArrayList<>();
TransportAddress[] oldAddresses = this.transportAddres;
List<TransportAddress> list = new ArrayList<>();
if (addresses != null) {
for (InetSocketAddress addr : addresses) {
if (clientAddress != null && clientAddress.equals(addr)) continue;
list.add(addr);
list.add(new TransportAddress(addr));
}
}
this.remoteAddres = list.toArray(new InetSocketAddress[list.size()]);
return oldAddresses;
this.transportAddres = list.toArray(new TransportAddress[list.size()]);
InetSocketAddress[] rs = new InetSocketAddress[oldAddresses.length];
for (int i = 0; i < rs.length; i++) {
rs[i] = oldAddresses[i].getAddress();
}
return rs;
}
public final boolean addRemoteAddresses(final InetSocketAddress addr) {
if (addr == null) return false;
synchronized (this) {
if (this.remoteAddres == null) {
this.remoteAddres = new InetSocketAddress[]{addr};
if (this.transportAddres == null) {
this.transportAddres = new TransportAddress[]{new TransportAddress(addr)};
} else {
for (InetSocketAddress i : this.remoteAddres) {
if (addr.equals(i)) return false;
for (TransportAddress i : this.transportAddres) {
if (addr.equals(i.address)) return false;
}
this.remoteAddres = Utility.append(remoteAddres, addr);
this.transportAddres = Utility.append(transportAddres, new TransportAddress(addr));
}
return true;
}
@@ -105,9 +118,9 @@ public final class Transport {
public final boolean removeRemoteAddresses(InetSocketAddress addr) {
if (addr == null) return false;
if (this.remoteAddres == null) return false;
if (this.transportAddres == null) return false;
synchronized (this) {
this.remoteAddres = Utility.remove(remoteAddres, addr);
this.transportAddres = Utility.remove(transportAddres, new TransportAddress(addr));
}
return true;
}
@@ -128,13 +141,25 @@ public final class Transport {
return clientAddress;
}
public TransportAddress[] getTransportAddresses() {
return transportAddres;
}
public InetSocketAddress[] getRemoteAddresses() {
return remoteAddres;
InetSocketAddress[] rs = new InetSocketAddress[transportAddres.length];
for (int i = 0; i < rs.length; i++) {
rs[i] = transportAddres[i].getAddress();
}
return rs;
}
public ConcurrentHashMap<SocketAddress, BlockingQueue<AsyncConnection>> getAsyncConnectionPool() {
return connPool;
}
@Override
public String toString() {
return Transport.class.getSimpleName() + "{name = " + name + ", protocol = " + protocol + ", clientAddress = " + clientAddress + ", remoteAddres = " + Arrays.toString(remoteAddres) + "}";
return Transport.class.getSimpleName() + "{name = " + name + ", protocol = " + protocol + ", clientAddress = " + clientAddress + ", remoteAddres = " + Arrays.toString(transportAddres) + "}";
}
public ByteBuffer pollBuffer() {
@@ -158,32 +183,57 @@ public final class Transport {
}
public AsyncConnection pollConnection(SocketAddress addr) {
if (addr == null && remoteAddres.length == 1) addr = remoteAddres[0];
if (this.strategy != null) return strategy.pollConnection(addr, this);
if (addr == null && this.transportAddres.length == 1) addr = this.transportAddres[0].address;
final boolean rand = addr == null;
if (rand && remoteAddres.length < 1) throw new RuntimeException("Transport (" + this.name + ") have no remoteAddress list");
if (rand && this.transportAddres.length < 1) throw new RuntimeException("Transport (" + this.name + ") have no remoteAddress list");
try {
if (tcp) {
AsynchronousSocketChannel channel = null;
if (rand) { //取地址
for (int i = 0; i < remoteAddres.length; i++) {
addr = remoteAddres[i];
BlockingQueue<AsyncConnection> queue = connPool.get(addr);
if (queue != null && !queue.isEmpty()) {
TransportAddress transportAddr;
boolean tryed = false;
for (int i = 0; i < transportAddres.length; i++) {
transportAddr = transportAddres[i];
addr = transportAddr.address;
if (!transportAddr.enable) continue;
final BlockingQueue<AsyncConnection> queue = transportAddr.conns;
if (!queue.isEmpty()) {
AsyncConnection conn;
while ((conn = queue.poll()) != null) {
if (conn.isOpen()) return conn;
}
}
tryed = true;
if (channel == null) {
channel = AsynchronousSocketChannel.open(group);
if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
}
try {
channel.connect(addr).get(2, TimeUnit.SECONDS);
transportAddr.enable = true;
break;
} catch (Exception iex) {
iex.printStackTrace();
if (i == remoteAddres.length - 1) channel = null;
transportAddr.enable = false;
channel = null;
}
}
if (channel == null && !tryed) {
for (int i = 0; i < transportAddres.length; i++) {
transportAddr = transportAddres[i];
addr = transportAddr.address;
if (channel == null) {
channel = AsynchronousSocketChannel.open(group);
if (supportTcpNoDelay) channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
}
try {
channel.connect(addr).get(2, TimeUnit.SECONDS);
transportAddr.enable = true;
break;
} catch (Exception iex) {
transportAddr.enable = false;
channel = null;
}
}
}
} else {
@@ -194,7 +244,7 @@ public final class Transport {
if (channel == null) return null;
return AsyncConnection.create(channel, addr, 3000, 3000);
} else { // UDP
if (rand) addr = remoteAddres[0];
if (rand) addr = this.transportAddres[0].address;
DatagramChannel channel = DatagramChannel.open();
channel.configureBlocking(true);
channel.connect(addr);
@@ -256,4 +306,54 @@ public final class Transport {
});
}
public static class TransportAddress {
protected InetSocketAddress address;
protected volatile boolean enable;
protected final BlockingQueue<AsyncConnection> conns = new ArrayBlockingQueue<>(MAX_POOL_LIMIT);
public TransportAddress(InetSocketAddress address) {
this.address = address;
this.enable = true;
}
@java.beans.ConstructorProperties({"address", "enable"})
public TransportAddress(InetSocketAddress address, boolean enable) {
this.address = address;
this.enable = enable;
}
public InetSocketAddress getAddress() {
return address;
}
public boolean isEnable() {
return enable;
}
@ConvertColumn(ignore = true)
public BlockingQueue<AsyncConnection> getConns() {
return conns;
}
@Override
public int hashCode() {
return this.address.hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
final TransportAddress other = (TransportAddress) obj;
return this.address.equals(other.address);
}
public String toString() {
return JsonConvert.root().convertTo(this);
}
}
}

View File

@@ -43,10 +43,19 @@ public class TransportFactory {
protected final List<WeakReference<Service>> services = new CopyOnWriteArrayList<>();
public TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup) {
//负载均衡策略
protected final TransportStrategy strategy;
public TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup,
final TransportStrategy strategy) {
this.executor = executor;
this.bufferPool = bufferPool;
this.channelGroup = channelGroup;
this.strategy = strategy;
}
public TransportFactory(ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, AsynchronousChannelGroup channelGroup) {
this(executor, bufferPool, channelGroup, null);
}
public String findGroupName(InetSocketAddress addr) {
@@ -127,14 +136,14 @@ public class TransportFactory {
}
if (info == null) return null;
if (sncpAddress != null) addresses.remove(sncpAddress);
return new Transport(groups.stream().sorted().collect(Collectors.joining(";")), info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, addresses);
return new Transport(groups.stream().sorted().collect(Collectors.joining(";")), info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, addresses, this.strategy);
}
private Transport loadTransport(final String groupName, InetSocketAddress sncpAddress) {
if (groupName == null) return null;
TransportGroupInfo info = groupInfos.get(groupName);
if (info == null) return null;
return new Transport(groupName, info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, info.addresses);
return new Transport(groupName, info.protocol, info.subprotocol, this.bufferPool, this.channelGroup, sncpAddress, info.addresses, this.strategy);
}
public ExecutorService getExecutor() {

View File

@@ -0,0 +1,21 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.net;
import java.net.SocketAddress;
/**
* 远程请求的负载均衡策略
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public interface TransportStrategy {
public AsyncConnection pollConnection(SocketAddress addr, Transport transport);
}

View File

@@ -121,7 +121,6 @@ public class HttpRequest extends Request<HttpContext> {
} else {
this.requestURI = array.toDecodeString(index, offset - index, charset).trim();
}
if (this.requestURI.contains("../")) return -1;
index = ++offset;
this.protocol = array.toString(index, array.size() - index, charset).trim();
while (readLine(buffer, array)) {

View File

@@ -192,6 +192,11 @@ public class HttpResourceServlet extends HttpServlet {
@Override
public void execute(HttpRequest request, HttpResponse response) throws IOException {
String uri = request.getRequestURI();
if (uri.contains("../")) {
if (finest) logger.log(Level.FINEST, "Not found resource (404) be " + uri + ", request = " + request);
response.finish404();
return;
}
if (locationRewrites != null) {
for (SimpleEntry<Pattern, String> entry : locationRewrites) {
Matcher matcher = entry.getKey().matcher(uri);

View File

@@ -42,7 +42,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
public ByteBuffer[] execute(final HttpResponse response, final ByteBuffer[] buffers);
}
private static final ByteBuffer buffer304 = ByteBuffer.wrap("HTTP/1.1 304 Not Modified\r\n\r\n".getBytes()).asReadOnlyBuffer();
private static final ByteBuffer buffer304 = ByteBuffer.wrap("HTTP/1.1 304 Not Modified\r\nContent-Length:0\r\n\r\n".getBytes()).asReadOnlyBuffer();
private static final ByteBuffer buffer404 = ByteBuffer.wrap("HTTP/1.1 404 Not Found\r\nContent-Length:0\r\n\r\n".getBytes()).asReadOnlyBuffer();
@@ -704,8 +704,8 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
final String match = request.getHeader("If-None-Match");
final String etag = (file == null ? 0L : file.lastModified()) + "-" + length;
if (match != null && etag.equals(match)) {
finish304();
return;
//finish304();
//return;
}
this.contentLength = length;
if (filename != null && !filename.isEmpty() && file != null) {
@@ -744,7 +744,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
}
private void finishFile(ByteBuffer hbuffer, File file, long offset, long length) throws IOException {
this.channel.write(hbuffer, hbuffer, new TransferFileHandler(AsynchronousFileChannel.open(file.toPath(), options, ((HttpContext) context).getExecutor()), offset, length));
this.channel.write(hbuffer, hbuffer, new TransferFileHandler(file, offset, length));
}
private ByteBuffer createHeader() {
@@ -976,6 +976,8 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
protected final class TransferFileHandler implements AsyncHandler<Integer, ByteBuffer> {
private final File file;
private final AsynchronousFileChannel filechannel;
private final long max; //需要读取的字节数, -1表示读到文件结尾
@@ -988,23 +990,36 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
private boolean read = true;
public TransferFileHandler(AsynchronousFileChannel channel) {
this.filechannel = channel;
this.max = -1;
public TransferFileHandler(File file) throws IOException {
this.file = file;
this.filechannel = AsynchronousFileChannel.open(file.toPath(), options, ((HttpContext) context).getExecutor());
this.position = 0;
this.max = file.length();
}
public TransferFileHandler(AsynchronousFileChannel channel, long offset, long len) {
this.filechannel = channel;
public TransferFileHandler(File file, long offset, long len) throws IOException {
this.file = file;
this.filechannel = AsynchronousFileChannel.open(file.toPath(), options, ((HttpContext) context).getExecutor());
this.position = offset <= 0 ? 0 : offset;
this.max = len;
this.max = len <= 0 ? file.length() : len;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (result < 0 || (max > 0 && count >= max)) {
//(Thread.currentThread().getName() + "-----------" + file + "-------------------result: " + result + ", max = " + max + ", count = " + count);
if (result < 0 || count >= max) {
failed(null, attachment);
return;
}
if (!next && attachment.hasRemaining()) { //Header还没写完
channel.write(attachment, attachment, this);
return;
}
if (next && read && attachment.hasRemaining()) { //Buffer还没写完
channel.write(attachment, attachment, this);
return;
}
if (read) {
read = false;
if (next) {
@@ -1016,14 +1031,16 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
filechannel.read(attachment, position, attachment, this);
} else {
read = true;
if (max > 0) {
count += result;
if (count > max) {
attachment.limit((int) (attachment.position() + max - count));
}
count += result;
if (count > max) {
attachment.limit((int) (attachment.position() + max - count));
}
attachment.flip();
channel.write(attachment, attachment, this);
if (attachment.hasRemaining()) {
channel.write(attachment, attachment, this);
} else {
failed(null, attachment);
}
}
}

View File

@@ -328,6 +328,13 @@ public final class Rest {
mv.visitMaxs(2, 1);
mv.visitEnd();
}
{ //resourceName
mv = new AsmMethodVisitor(cw.visitMethod(ACC_PUBLIC, "resourceName", "()Ljava/lang/String;", null, null));
mv.visitLdcInsn(rws.name());
mv.visitInsn(ARETURN);
mv.visitMaxs(1, 1);
mv.visitEnd();
}
RestClassLoader newLoader = new RestClassLoader(loader);

View File

@@ -82,6 +82,8 @@ public abstract class WebSocket<G extends Serializable, T> {
private long createtime = System.currentTimeMillis();
private long pingtime;
private Map<String, Object> attributes = new HashMap<>(); //非线程安全
protected WebSocket() {
@@ -89,11 +91,13 @@ public abstract class WebSocket<G extends Serializable, T> {
//----------------------------------------------------------------
public final CompletableFuture<Integer> sendPing() {
this.pingtime = System.currentTimeMillis();
//if (_engine.finest) _engine.logger.finest(this + " on "+_engine.getEngineid()+" ping...");
return sendPacket(WebSocketPacket.DEFAULT_PING_PACKET);
}
public final CompletableFuture<Integer> sendPing(byte[] data) {
this.pingtime = System.currentTimeMillis();
return sendPacket(new WebSocketPacket(FrameType.PING, data));
}
@@ -487,6 +491,15 @@ public abstract class WebSocket<G extends Serializable, T> {
return this._runner == null ? 0 : this._runner.lastSendTime;
}
/**
* 获取最后一次发送PING消息的时间
*
* @return long
*/
public long getLastPingTime() {
return this.pingtime;
}
/**
* 显式地关闭WebSocket
*/

View File

@@ -10,6 +10,7 @@ import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.Predicate;
import java.util.logging.*;
import java.util.stream.*;
import org.redkale.convert.Convert;
@@ -23,18 +24,18 @@ import org.redkale.util.*;
*
* @author zhangjx
*/
public final class WebSocketEngine {
public class WebSocketEngine {
//全局自增长ID
@Comment("全局自增长ID, 为了确保在一个进程里多个WebSocketEngine定时发送ping时不会同时进行")
private static final AtomicInteger sequence = new AtomicInteger();
//Engine自增长序号ID
@Comment("Engine自增长序号ID")
private final int index;
//当前WebSocket对应的Engine
@Comment("当前WebSocket对应的Engine")
private final String engineid;
//当前WebSocket对应的Node
@Comment("当前WebSocket对应的Node")
protected final WebSocketNode node;
//HttpContext
@@ -43,23 +44,25 @@ public final class WebSocketEngine {
//Convert
protected final Convert sendConvert;
protected final boolean single; //是否单用户单连接
@Comment("是否单用户单连接")
protected final boolean single;
//在线用户ID对应的WebSocket组用于单用户单连接模式
@Comment("在线用户ID对应的WebSocket组用于单用户单连接模式")
private final Map<Serializable, WebSocket> websockets = new ConcurrentHashMap<>();
//在线用户ID对应的WebSocket组用于单用户多连接模式
@Comment("在线用户ID对应的WebSocket组用于单用户多连接模式")
private final Map<Serializable, List<WebSocket>> websockets2 = new ConcurrentHashMap<>();
//用于PING的定时器
@Comment("用于PING的定时器")
private ScheduledThreadPoolExecutor scheduler;
//日志
@Comment("日志")
protected final Logger logger;
//FINEST日志级别
@Comment("日志级别")
protected final boolean finest;
@Comment("PING的间隔秒数")
private int liveinterval;
protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval, WebSocketNode node, Convert sendConvert, Logger logger) {
@@ -70,8 +73,8 @@ public final class WebSocketEngine {
this.node = node;
this.liveinterval = liveinterval;
this.logger = logger;
this.index = sequence.getAndIncrement();
this.finest = logger.isLoggable(Level.FINEST);
this.index = sequence.getAndIncrement();
}
void init(AnyValue conf) {
@@ -96,6 +99,7 @@ public final class WebSocketEngine {
if (scheduler != null) scheduler.shutdownNow();
}
@Comment("添加WebSocket")
void add(WebSocket socket) {
if (single) {
websockets.put(socket._userid, socket);
@@ -110,6 +114,7 @@ public final class WebSocketEngine {
if (node != null) node.connect(socket._userid);
}
@Comment("从WebSocketEngine删除指定WebSocket")
void remove(WebSocket socket) {
Serializable userid = socket._userid;
if (single) {
@@ -127,9 +132,15 @@ public final class WebSocketEngine {
}
}
@Comment("给所有连接用户发送消息")
public CompletableFuture<Integer> broadcastMessage(final Object message, final boolean last) {
return broadcastMessage(null, message, last);
}
@Comment("给指定WebSocket连接用户发送消息")
public CompletableFuture<Integer> broadcastMessage(final Predicate<WebSocket> predicate, final Object message, final boolean last) {
if (message instanceof CompletableFuture) {
return ((CompletableFuture) message).thenCompose((json) -> broadcastMessage(json, last));
return ((CompletableFuture) message).thenCompose((json) -> broadcastMessage(predicate, json, last));
}
final boolean more = (!(message instanceof WebSocketPacket) || ((WebSocketPacket) message).sendBuffers == null);
if (more) {
@@ -140,11 +151,13 @@ public final class WebSocketEngine {
CompletableFuture<Integer> future = null;
if (single) {
for (WebSocket websocket : websockets.values()) {
if (predicate != null && !predicate.test(websocket)) continue;
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
}
} else {
for (List<WebSocket> list : websockets2.values()) {
for (WebSocket websocket : list) {
if (predicate != null && !predicate.test(websocket)) continue;
future = future == null ? websocket.sendPacket(packet) : future.thenCombine(websocket.sendPacket(packet), (a, b) -> a | (Integer) b);
}
}
@@ -155,11 +168,13 @@ public final class WebSocketEngine {
CompletableFuture<Integer> future = null;
if (single) {
for (WebSocket websocket : websockets.values()) {
if (predicate != null && !predicate.test(websocket)) continue;
future = future == null ? websocket.send(message, last) : future.thenCombine(websocket.send(message, last), (a, b) -> a | (Integer) b);
}
} else {
for (List<WebSocket> list : websockets2.values()) {
for (WebSocket websocket : list) {
if (predicate != null && !predicate.test(websocket)) continue;
future = future == null ? websocket.send(message, last) : future.thenCombine(websocket.send(message, last), (a, b) -> a | (Integer) b);
}
}
@@ -168,6 +183,7 @@ public final class WebSocketEngine {
}
}
@Comment("给指定用户组发送消息")
public CompletableFuture<Integer> sendMessage(final Object message, final boolean last, final Serializable... userids) {
if (message instanceof CompletableFuture) {
return ((CompletableFuture) message).thenCompose((json) -> sendMessage(json, last, userids));
@@ -217,21 +233,33 @@ public final class WebSocketEngine {
}
}
Collection<WebSocket> getLocalWebSockets() {
@Comment("获取所有连接")
public Collection<WebSocket> getLocalWebSockets() {
if (single) return websockets.values();
List<WebSocket> list = new ArrayList<>();
websockets2.values().forEach(x -> list.addAll(x));
return list;
}
//适用于单用户单连接模式
@Comment("获取当前连接总数")
public int getLocalWebSocketSize() {
if (single) return websockets.size();
return (int) websockets2.values().stream().mapToInt(sublist -> sublist.size()).count();
}
@Comment("获取当前用户总数")
public int getLocalUserSize() {
return single ? websockets.size() : websockets2.size();
}
@Comment("适用于单用户单连接模式")
public WebSocket findLocalWebSocket(Serializable userid) {
if (single) return websockets.get(userid);
List<WebSocket> list = websockets2.get(userid);
return (list == null || list.isEmpty()) ? null : list.get(list.size() - 1);
}
//适用于单用户多连接模式
@Comment("适用于单用户多连接模式")
public Stream<WebSocket> getLocalWebSockets(Serializable userid) {
if (single) {
WebSocket websocket = websockets.get(userid);

View File

@@ -26,6 +26,12 @@ import org.redkale.util.*;
*/
public abstract class WebSocketNode {
@Comment("存储当前SNCP节点列表的key")
public static final String SOURCE_SNCP_NODES_KEY = "redkale_sncpnodes";
@Comment("存储当前用户数量的key")
public static final String SOURCE_USER_COUNT_KEY = "redkale_usercount";
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
protected final boolean finest = logger.isLoggable(Level.FINEST);
@@ -57,7 +63,9 @@ public abstract class WebSocketNode {
if (this.localEngine == null) return;
//关掉所有本地本地WebSocket
this.localEngine.getLocalWebSockets().forEach(g -> disconnect(g.getUserid()));
if (sncpNodeAddresses != null && localSncpAddress != null) sncpNodeAddresses.removeSetItem("redkale_sncpnodes", localSncpAddress);
if (sncpNodeAddresses != null && localSncpAddress != null) {
sncpNodeAddresses.removeSetItem(SOURCE_SNCP_NODES_KEY, localSncpAddress);
}
}
protected abstract CompletableFuture<List<String>> getWebSocketAddresses(@RpcTargetAddress InetSocketAddress targetAddress, Serializable userid);
@@ -140,7 +148,55 @@ public abstract class WebSocketNode {
});
}
/**
* 判断指定用户是否WebSocket在线
*
* @param userid
*
* @return boolean
*/
public CompletableFuture<Boolean> existsWebSocket(final Serializable userid) {
if (this.localEngine != null && this.sncpNodeAddresses == null) {
return CompletableFuture.completedFuture(this.localEngine.existsLocalWebSocket(userid));
}
return this.sncpNodeAddresses.existsAsync(userid);
}
/**
* 获取在线用户总数
*
*
* @return boolean
*/
public CompletableFuture<Integer> getUserSize() {
if (this.localEngine != null && this.sncpNodeAddresses == null) {
return CompletableFuture.completedFuture(this.localEngine.getLocalUserSize());
}
return this.sncpNodeAddresses.getKeySizeAsync().thenCompose(count -> {
return sncpNodeAddresses.existsAsync(SOURCE_SNCP_NODES_KEY).thenApply(exists -> exists ? (count - 1) : count);
});
}
//--------------------------------------------------------------------------------
/**
* 获取本地的WebSocketEngine没有则返回null
*
*
* @return WebSocketEngine
*/
public final WebSocketEngine getLocalWebSocketEngine() {
return this.localEngine;
}
/**
* 向指定用户发送消息,先发送本地连接,再发送远程连接 <br>
* 如果当前WebSocketNode是远程模式此方法只发送远程连接
*
* @param message 消息内容
* @param userids Serializable[]
*
* @return 为0表示成功 其他值表示部分发送异常
*/
public final CompletableFuture<Integer> sendMessage(Object message, final Serializable... userids) {
return sendMessage(message, true, userids);
}
@@ -155,7 +211,6 @@ public abstract class WebSocketNode {
*
* @return 为0表示成功 其他值表示部分发送异常
*/
//最近连接发送逻辑还没有理清楚
public final CompletableFuture<Integer> sendMessage(final Object message, final boolean last, final Serializable... userids) {
if (userids == null || userids.length < 1) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
if (this.localEngine != null && this.sncpNodeAddresses == null) { //本地模式且没有分布式

View File

@@ -68,7 +68,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
@Override
public CompletableFuture<Void> connect(Serializable userid, InetSocketAddress sncpAddr) {
CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(userid, sncpAddr);
future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync("redkale_sncpnodes", sncpAddr));
future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_NODES_KEY, sncpAddr));
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + sncpAddr);
return future;
}

View File

@@ -348,13 +348,13 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
}
@Override
public long getCollectionSize(final K key) {
public int getCollectionSize(final K key) {
Collection<V> collection = (Collection<V>) get(key);
return collection == null ? 0 : collection.size();
}
@Override
public CompletableFuture<Long> getCollectionSizeAsync(final K key) {
public CompletableFuture<Integer> getCollectionSizeAsync(final K key) {
return CompletableFuture.supplyAsync(() -> getCollectionSize(key), getExecutor());
}
@@ -443,6 +443,11 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
return new ArrayList<>(container.keySet());
}
@Override
public int getKeySize() {
return container.size();
}
@Override
public CompletableFuture<List<CacheEntry<K, Object>>> queryListAsync() {
return CompletableFuture.completedFuture(new ArrayList<>(container.values()));
@@ -458,4 +463,8 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
return CompletableFuture.completedFuture(new ArrayList<>(container.keySet()));
}
@Override
public CompletableFuture<Integer> getKeySizeAsync() {
return CompletableFuture.completedFuture(container.size());
}
}

View File

@@ -45,7 +45,7 @@ public interface CacheSource<K extends Serializable, V extends Object> {
public Collection<V> getCollection(final K key);
public long getCollectionSize(final K key);
public int getCollectionSize(final K key);
public Collection<V> getCollectionAndRefresh(final K key, final int expireSeconds);
@@ -59,6 +59,8 @@ public interface CacheSource<K extends Serializable, V extends Object> {
public List<K> queryKeys();
public int getKeySize();
public List<CacheEntry<K, Object>> queryList();
//---------------------- CompletableFuture 异步版 ---------------------------------
@@ -80,7 +82,7 @@ public interface CacheSource<K extends Serializable, V extends Object> {
public CompletableFuture<Collection<V>> getCollectionAsync(final K key);
public CompletableFuture<Long> getCollectionSizeAsync(final K key);
public CompletableFuture<Integer> getCollectionSizeAsync(final K key);
public CompletableFuture<Collection<V>> getCollectionAndRefreshAsync(final K key, final int expireSeconds);
@@ -94,6 +96,8 @@ public interface CacheSource<K extends Serializable, V extends Object> {
public CompletableFuture<List<K>> queryKeysAsync();
public CompletableFuture<Integer> getKeySizeAsync();
public CompletableFuture<List<CacheEntry<K, Object>>> queryListAsync();
default CompletableFuture<Boolean> isOpenAsync() {

View File

@@ -31,26 +31,26 @@ import org.redkale.util.*;
@ResourceType(DataSource.class)
public class DataJdbcSource extends AbstractService implements DataSource, Service, DataCacheListener, Function<Class, EntityInfo>, AutoCloseable, Resourcable {
private static final Flipper FLIPPER_ONE = new Flipper(1);
protected static final Flipper FLIPPER_ONE = new Flipper(1);
final Logger logger = Logger.getLogger(DataJdbcSource.class.getSimpleName());
protected final Logger logger = Logger.getLogger(DataJdbcSource.class.getSimpleName());
final AtomicBoolean debug = new AtomicBoolean(logger.isLoggable(Level.FINEST));
protected final AtomicBoolean debug = new AtomicBoolean(logger.isLoggable(Level.FINEST));
final String name;
protected final String name;
final URL conf;
protected final URL conf;
final boolean cacheForbidden;
protected final boolean cacheForbidden;
private final PoolJdbcSource readPool;
protected final PoolJdbcSource readPool;
private final PoolJdbcSource writePool;
protected final PoolJdbcSource writePool;
@Resource(name = "$")
private DataCacheListener cacheListener;
protected DataCacheListener cacheListener;
private final BiFunction<DataSource, Class, List> fullloader = (s, t) -> querySheet(false, false, t, null, null, (FilterNode) null).list(true);
protected final BiFunction<DataSource, Class, List> fullloader = (s, t) -> querySheet(false, false, t, null, null, (FilterNode) null).list(true);
public DataJdbcSource(String unitName, Properties readprop, Properties writeprop) {
this.name = unitName;
@@ -71,15 +71,15 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
writePool.close();
}
private Connection createReadSQLConnection() {
protected Connection createReadSQLConnection() {
return readPool.poll();
}
private <T> Connection createWriteSQLConnection() {
protected <T> Connection createWriteSQLConnection() {
return writePool.poll();
}
private void closeSQLConnection(final Connection sqlconn) {
protected void closeSQLConnection(final Connection sqlconn) {
if (sqlconn == null) return;
try {
sqlconn.close();
@@ -93,7 +93,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
return loadEntityInfo(t);
}
private <T> EntityInfo<T> loadEntityInfo(Class<T> clazz) {
protected <T> EntityInfo<T> loadEntityInfo(Class<T> clazz) {
return EntityInfo.load(clazz, this.cacheForbidden, this.readPool.props, this, fullloader);
}
@@ -150,7 +150,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
return CompletableFuture.runAsync(() -> insert(values), getExecutor());
}
private <T> void insert(final Connection conn, final EntityInfo<T> info, T... values) {
protected <T> void insert(final Connection conn, final EntityInfo<T> info, T... values) {
if (values.length == 0) return;
try {
if (!info.isVirtualEntity()) {
@@ -251,7 +251,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
}
}
private <T> PreparedStatement createInsertPreparedStatement(final Connection conn, final String sql,
protected <T> PreparedStatement createInsertPreparedStatement(final Connection conn, final String sql,
final EntityInfo<T> info, T... values) throws SQLException {
Attribute<T, Serializable>[] attrs = info.insertAttributes;
final PreparedStatement prestmt = info.autoGenerated ? conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS) : conn.prepareStatement(sql);
@@ -328,7 +328,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
return CompletableFuture.supplyAsync(() -> delete(values), getExecutor());
}
private <T> int delete(final Connection conn, final EntityInfo<T> info, T... values) {
protected <T> int delete(final Connection conn, final EntityInfo<T> info, T... values) {
if (values.length == 0) return -1;
final Attribute primary = info.getPrimary();
Serializable[] ids = new Serializable[values.length];
@@ -358,7 +358,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
return CompletableFuture.supplyAsync(() -> delete(clazz, ids), getExecutor());
}
private <T> int delete(final Connection conn, final EntityInfo<T> info, Serializable... keys) {
protected <T> int delete(final Connection conn, final EntityInfo<T> info, Serializable... keys) {
if (keys.length == 0) return -1;
int c = -1;
int c2 = 0;
@@ -430,7 +430,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
return CompletableFuture.supplyAsync(() -> delete(clazz, flipper, node), getExecutor());
}
private <T> int delete(final Connection conn, final EntityInfo<T> info, final Flipper flipper, final FilterNode node) {
protected <T> int delete(final Connection conn, final EntityInfo<T> info, final Flipper flipper, final FilterNode node) {
int c = -1;
try {
if (!info.isVirtualEntity()) {
@@ -521,7 +521,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
return CompletableFuture.supplyAsync(() -> update(values), getExecutor());
}
private <T> int update(final Connection conn, final EntityInfo<T> info, T... values) {
protected <T> int update(final Connection conn, final EntityInfo<T> info, T... values) {
try {
Class clazz = info.getType();
int c = -1;
@@ -617,7 +617,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
return CompletableFuture.supplyAsync(() -> updateColumn(clazz, id, column, value), getExecutor());
}
private <T> int updateColumn(Connection conn, final EntityInfo<T> info, Serializable id, String column, final Serializable value) {
protected <T> int updateColumn(Connection conn, final EntityInfo<T> info, Serializable id, String column, final Serializable value) {
try {
int c = -1;
if (!info.isVirtualEntity()) {
@@ -684,7 +684,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
return CompletableFuture.supplyAsync(() -> updateColumn(clazz, column, value, node), getExecutor());
}
private <T> int updateColumn(Connection conn, final EntityInfo<T> info, String column, final Serializable value, FilterNode node) {
protected <T> int updateColumn(Connection conn, final EntityInfo<T> info, String column, final Serializable value, FilterNode node) {
try {
int c = -1;
if (!info.isVirtualEntity()) {
@@ -766,7 +766,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
return CompletableFuture.supplyAsync(() -> updateColumn(clazz, id, values), getExecutor());
}
private <T> int updateColumn(final Connection conn, final EntityInfo<T> info, final Serializable id, final ColumnValue... values) {
protected <T> int updateColumn(final Connection conn, final EntityInfo<T> info, final Serializable id, final ColumnValue... values) {
if (values == null || values.length < 1) return -1;
try {
StringBuilder setsql = new StringBuilder();
@@ -882,7 +882,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
return CompletableFuture.supplyAsync(() -> updateColumn(clazz, node, flipper, values), getExecutor());
}
private <T> int updateColumn(final Connection conn, final EntityInfo<T> info, final FilterNode node, final Flipper flipper, final ColumnValue... values) {
protected <T> int updateColumn(final Connection conn, final EntityInfo<T> info, final FilterNode node, final Flipper flipper, final ColumnValue... values) {
if (values == null || values.length < 1) return -1;
try {
StringBuilder setsql = new StringBuilder();
@@ -992,7 +992,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
return CompletableFuture.supplyAsync(() -> updateColumn(bean, selects), getExecutor());
}
private <T> int updateColumns(final Connection conn, final EntityInfo<T> info, final T bean, final SelectColumn selects) {
protected <T> int updateColumns(final Connection conn, final EntityInfo<T> info, final T bean, final SelectColumn selects) {
if (bean == null || selects == null) return -1;
try {
final Class<T> clazz = (Class<T>) bean.getClass();
@@ -1068,7 +1068,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
return CompletableFuture.supplyAsync(() -> updateColumn(bean, node, selects), getExecutor());
}
private <T> int updateColumns(final Connection conn, final EntityInfo<T> info, final T bean, final FilterNode node, final SelectColumn selects) {
protected <T> int updateColumns(final Connection conn, final EntityInfo<T> info, final T bean, final FilterNode node, final SelectColumn selects) {
if (bean == null || node == null || selects == null) return -1;
try {
final Class<T> clazz = (Class<T>) bean.getClass();
@@ -1877,7 +1877,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
return CompletableFuture.supplyAsync(() -> queryColumnSheet(selectedColumn, clazz, flipper, node), getExecutor());
}
private <T, V extends Serializable> Sheet<V> queryColumnSheet(final boolean needtotal, final String selectedColumn, final Class<T> clazz, final Flipper flipper, final FilterNode node) {
protected <T, V extends Serializable> Sheet<V> queryColumnSheet(final boolean needtotal, final String selectedColumn, final Class<T> clazz, final Flipper flipper, final FilterNode node) {
Sheet<T> sheet = querySheet(true, needtotal, clazz, SelectColumn.createIncludes(selectedColumn), flipper, node);
final Sheet<V> rs = new Sheet<>();
if (sheet.isEmpty()) return rs;
@@ -2083,7 +2083,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
return CompletableFuture.supplyAsync(() -> querySheet(clazz, selects, flipper, node), getExecutor());
}
private <T> Sheet<T> querySheet(final boolean readcache, final boolean needtotal, final Class<T> clazz, final SelectColumn selects, final Flipper flipper, final FilterNode node) {
protected <T> Sheet<T> querySheet(final boolean readcache, final boolean needtotal, final Class<T> clazz, final SelectColumn selects, final Flipper flipper, final FilterNode node) {
final EntityInfo<T> info = loadEntityInfo(clazz);
final EntityCache<T> cache = info.getCache();
if (readcache && cache != null && cache.isFullLoaded()) {
@@ -2134,7 +2134,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
}
}
private static StringBuilder multisplit(char ch1, char ch2, String split, StringBuilder sb, String str, int from) {
protected static StringBuilder multisplit(char ch1, char ch2, String split, StringBuilder sb, String str, int from) {
if (str == null) return sb;
int pos1 = str.indexOf(ch1, from);
if (pos1 < 0) return sb;
@@ -2145,7 +2145,7 @@ public class DataJdbcSource extends AbstractService implements DataSource, Servi
return multisplit(ch1, ch2, split, sb, str, pos2 + 1);
}
private int[] directExecute(final Connection conn, String... sqls) {
protected int[] directExecute(final Connection conn, String... sqls) {
if (sqls.length == 0) return new int[0];
try {
conn.setReadOnly(false);

View File

@@ -5,14 +5,13 @@
*/
package org.redkale.source;
import com.sun.istack.internal.logging.Logger;
import java.io.Serializable;
import java.lang.reflect.*;
import java.sql.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.*;
import java.util.logging.Level;
import java.util.logging.*;
import javax.persistence.*;
import org.redkale.util.*;
@@ -32,7 +31,7 @@ public final class EntityInfo<T> {
private static final ConcurrentHashMap<Class, EntityInfo> entityInfos = new ConcurrentHashMap<>();
//日志
private static final Logger logger = Logger.getLogger(EntityInfo.class);
private static final Logger logger = Logger.getLogger(EntityInfo.class.getSimpleName());
//Entity类名
private final Class<T> type;
@@ -194,7 +193,7 @@ public final class EntityInfo<T> {
try {
loader = type.getAnnotation(VirtualEntity.class).loader().newInstance();
} catch (Exception e) {
logger.severe(type + " init @VirtualEntity.loader error", e);
logger.log(Level.SEVERE, type + " init @VirtualEntity.loader error", e);
}
this.fullloader = loader;
} else {
@@ -207,7 +206,7 @@ public final class EntityInfo<T> {
try {
dts = (dt == null) ? null : dt.strategy().newInstance();
} catch (Exception e) {
logger.severe(type + " init DistributeTableStrategy error", e);
logger.log(Level.SEVERE, type + " init DistributeTableStrategy error", e);
}
this.tableStrategy = dts;
@@ -216,7 +215,7 @@ public final class EntityInfo<T> {
try {
cp = this.creator.getClass().getMethod("create", Object[].class).getAnnotation(Creator.ConstructorParameters.class);
} catch (Exception e) {
logger.severe(type + " cannot find ConstructorParameters Creator", e);
logger.log(Level.SEVERE, type + " cannot find ConstructorParameters Creator", e);
}
this.constructorParameters = (cp == null || cp.value().length < 1) ? null : cp.value();
Attribute idAttr0 = null;

View File

@@ -85,6 +85,13 @@ public class PoolJdbcSource {
if (this.isOracle()) {
this.props.setProperty(JDBC_CONTAIN_SQLTEMPLATE, "INSTR(${keystr}, ${column}) > 0");
this.props.setProperty(JDBC_NOTCONTAIN_SQLTEMPLATE, "INSTR(${keystr}, ${column}) = 0");
if (!this.props.containsKey(JDBC_TABLENOTEXIST_SQLSTATES)) {
this.props.setProperty(JDBC_TABLENOTEXIST_SQLSTATES, "42000;42S02");
}
if (!this.props.containsKey(JDBC_TABLECOPY_SQLTEMPLATE)) {
//注意:此语句复制表结构会导致默认值和主键信息的丢失
this.props.setProperty(JDBC_TABLECOPY_SQLTEMPLATE, "CREATE TABLE ${newtable} AS SELECT * FROM ${oldtable} WHERE 1=2");
}
} else if (this.isSqlserver()) {
this.props.setProperty(JDBC_CONTAIN_SQLTEMPLATE, "CHARINDEX(${column}, ${keystr}) > 0");
this.props.setProperty(JDBC_NOTCONTAIN_SQLTEMPLATE, "CHARINDEX(${column}, ${keystr}) = 0");

View File

@@ -17,7 +17,7 @@ public final class Redkale {
}
public static String getDotedVersion() {
return "1.8.0";
return "1.8.1";
}
public static int getMajorVersion() {