Compare commits
40 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
333ba0f162 | ||
|
|
e039b0e9f6 | ||
|
|
b3c5e3beca | ||
|
|
70ba45c3bd | ||
|
|
14ae44fcac | ||
|
|
f7618f5da4 | ||
|
|
d6c6e4c02e | ||
|
|
9365052b85 | ||
|
|
ef88063094 | ||
|
|
4cbaf85eea | ||
|
|
6c07123da3 | ||
|
|
04a4ce12c7 | ||
|
|
e00ed8ae37 | ||
|
|
218a79b60a | ||
|
|
19ae86c71f | ||
|
|
9250d4e64d | ||
|
|
9f5aa58a31 | ||
|
|
c8b0a88573 | ||
|
|
7cf3f49aa1 | ||
|
|
be2c803f82 | ||
|
|
fd1197e8dc | ||
|
|
115f91b64a | ||
|
|
dfb800473a | ||
|
|
c69c1bb134 | ||
|
|
5b501c7c2f | ||
|
|
8d5ce56ec2 | ||
|
|
15c97ddc18 | ||
|
|
36d7fbf4e9 | ||
|
|
919e7aa5c6 | ||
|
|
2174de2b71 | ||
|
|
cb9e914e44 | ||
|
|
15f856b762 | ||
|
|
991dba0d62 | ||
|
|
aa2685d6e4 | ||
|
|
0952150328 | ||
|
|
e6ef4d1546 | ||
|
|
16cf85abb9 | ||
|
|
5afe0ead94 | ||
|
|
b6c933f989 | ||
|
|
feaf1a1f06 |
@@ -1,9 +1,10 @@
|
||||
|
||||
|
||||
handlers = java.util.logging.ConsoleHandler
|
||||
# handlers = java.util.logging.FileHandler
|
||||
|
||||
############################################################
|
||||
.level = FINER
|
||||
.level = FINEST
|
||||
|
||||
java.level = INFO
|
||||
javax.level = INFO
|
||||
@@ -14,11 +15,11 @@ jdk.level = INFO
|
||||
|
||||
java.util.logging.FileHandler.level = FINER
|
||||
#10M
|
||||
java.util.logging.FileHandler.limit = 10485760
|
||||
java.util.logging.FileHandler.count = 10000
|
||||
java.util.logging.FileHandler.limit = 10M
|
||||
java.util.logging.FileHandler.count = 20
|
||||
java.util.logging.FileHandler.encoding = UTF-8
|
||||
java.util.logging.FileHandler.pattern = ${APP_HOME}/logs-%m/log-%d.log
|
||||
java.util.logging.FileHandler.unusual = ${APP_HOME}/logs-%m/log-warnerr-%d.log
|
||||
java.util.logging.FileHandler.append = true
|
||||
|
||||
java.util.logging.ConsoleHandler.level = FINER
|
||||
java.util.logging.ConsoleHandler.level = FINEST
|
||||
|
||||
@@ -171,6 +171,7 @@ public final class Application {
|
||||
this.singletonrun = singletonrun;
|
||||
this.config = config;
|
||||
System.setProperty("redkale.version", Redkale.getDotedVersion());
|
||||
System.setProperty("sun.nio.ch.maxCompletionHandlersOnStack", String.valueOf(Math.max(256, Runtime.getRuntime().availableProcessors() * 8)));
|
||||
|
||||
final File root = new File(System.getProperty(RESNAME_APP_HOME));
|
||||
this.resourceFactory.register(RESNAME_APP_TIME, long.class, this.startTime);
|
||||
@@ -388,7 +389,6 @@ public final class Application {
|
||||
}
|
||||
|
||||
public void init() throws Exception {
|
||||
System.setProperty("sun.nio.ch.internalThreadPoolSize", "" + Runtime.getRuntime().availableProcessors() * 4);
|
||||
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "" + Runtime.getRuntime().availableProcessors() * 4);
|
||||
System.setProperty("net.transport.poolmaxconns", "100");
|
||||
System.setProperty("net.transport.pinginterval", "30");
|
||||
|
||||
@@ -241,7 +241,21 @@ public class LogFileHandler extends Handler {
|
||||
}
|
||||
String limitstr = manager.getProperty(cname + ".limit");
|
||||
try {
|
||||
if (limitstr != null) this.limit = Math.abs(Integer.decode(limitstr));
|
||||
if (limitstr != null) {
|
||||
limitstr = limitstr.toUpperCase();
|
||||
boolean g = limitstr.indexOf('G') > 0;
|
||||
boolean m = limitstr.indexOf('M') > 0;
|
||||
boolean k = limitstr.indexOf('K') > 0;
|
||||
int ls = Math.abs(Integer.decode(limitstr.replace("G", "").replace("M", "").replace("K", "").replace("B", "")));
|
||||
if (g) {
|
||||
ls *= 1024 * 1024 * 1024;
|
||||
} else if (m) {
|
||||
ls *= 1024 * 1024;
|
||||
} else if (k) {
|
||||
ls *= 1024;
|
||||
}
|
||||
this.limit = ls;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
}
|
||||
String countstr = manager.getProperty(cname + ".count");
|
||||
|
||||
@@ -126,7 +126,14 @@ public class JsonByteBufferReader extends JsonReader {
|
||||
if (ch == '{') return "";
|
||||
if (ch == 'n' && nextChar() == 'u' && nextChar() == 'l' && nextChar() == 'l') return null;
|
||||
if (ch == 'N' && nextChar() == 'U' && nextChar() == 'L' && nextChar() == 'L') return null;
|
||||
throw new ConvertException("a json object text must begin with '{' (position = " + position + ") but '" + ch + "'");
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(ch);
|
||||
char one;
|
||||
try {
|
||||
while ((one = nextChar()) != 0) sb.append(one);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
throw new ConvertException("a json object text must begin with '{' (position = " + position + ") but '" + ch + "' in (" + sb + ")");
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -143,7 +150,14 @@ public class JsonByteBufferReader extends JsonReader {
|
||||
if (ch == '[' || ch == '{') return SIGN_NOLENGTH;
|
||||
if (ch == 'n' && nextChar() == 'u' && nextChar() == 'l' && nextChar() == 'l') return SIGN_NULL;
|
||||
if (ch == 'N' && nextChar() == 'U' && nextChar() == 'L' && nextChar() == 'L') return SIGN_NULL;
|
||||
throw new ConvertException("a json array text must begin with '[' (position = " + position + ") but '" + ch + "'");
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(ch);
|
||||
char one;
|
||||
try {
|
||||
while ((one = nextChar()) != 0) sb.append(one);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
throw new ConvertException("a json array text must begin with '[' (position = " + position + ") but '" + ch + "' in (" + sb + ")");
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -153,7 +167,14 @@ public class JsonByteBufferReader extends JsonReader {
|
||||
public final void readBlank() {
|
||||
char ch = nextGoodChar();
|
||||
if (ch == ':') return;
|
||||
throw new ConvertException("expected a ':' but '" + ch + "'(position = " + position + ")");
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(ch);
|
||||
char one;
|
||||
try {
|
||||
while ((one = nextChar()) != 0) sb.append(one);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
throw new ConvertException("expected a ':' but '" + ch + "'(position = " + position + ") in (" + sb + ")");
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -11,7 +11,7 @@ import java.nio.ByteBuffer;
|
||||
import java.nio.channels.*;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.*;
|
||||
import java.util.function.Consumer;
|
||||
import javax.net.ssl.SSLContext;
|
||||
|
||||
@@ -42,6 +42,9 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
|
||||
|
||||
protected Consumer<AsyncConnection> beforeCloseListener;
|
||||
|
||||
//关联的事件数, 小于1表示没有事件
|
||||
protected final AtomicInteger eventing = new AtomicInteger();
|
||||
|
||||
public final long getLastReadTime() {
|
||||
return readtime;
|
||||
}
|
||||
@@ -50,6 +53,14 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
|
||||
return writetime;
|
||||
}
|
||||
|
||||
public final int increEventing() {
|
||||
return eventing.incrementAndGet();
|
||||
}
|
||||
|
||||
public final int decreEventing() {
|
||||
return eventing.decrementAndGet();
|
||||
}
|
||||
|
||||
public abstract boolean isTCP();
|
||||
|
||||
public abstract boolean shutdownInput();
|
||||
|
||||
@@ -218,7 +218,11 @@ public abstract class PrepareServlet<K extends Serializable, C extends Context,
|
||||
if (rs != Integer.MIN_VALUE) illRequestCounter.incrementAndGet();
|
||||
response.finish(true);
|
||||
} else if (rs == 0) {
|
||||
request.offerReadBuffer(buffer);
|
||||
if (buffer.hasRemaining()) {
|
||||
request.setMoredata(buffer);
|
||||
} else {
|
||||
request.offerReadBuffer(buffer);
|
||||
}
|
||||
request.prepare();
|
||||
response.filter = this.headFilter;
|
||||
response.servlet = this;
|
||||
@@ -236,7 +240,11 @@ public abstract class PrepareServlet<K extends Serializable, C extends Context,
|
||||
buffer.clear();
|
||||
request.channel.read(buffer, buffer, this);
|
||||
} else {
|
||||
request.offerReadBuffer(buffer);
|
||||
if (buffer.hasRemaining()) {
|
||||
request.setMoredata(buffer);
|
||||
} else {
|
||||
request.offerReadBuffer(buffer);
|
||||
}
|
||||
request.prepare();
|
||||
try {
|
||||
response.filter = PrepareServlet.this.headFilter;
|
||||
|
||||
@@ -31,6 +31,10 @@ public abstract class Request<C extends Context> {
|
||||
|
||||
protected boolean keepAlive;
|
||||
|
||||
protected boolean more; //pipeline模式
|
||||
|
||||
protected ByteBuffer moredata; //pipeline模式
|
||||
|
||||
protected AsyncConnection channel;
|
||||
|
||||
protected ByteBuffer readBuffer;
|
||||
@@ -50,6 +54,16 @@ public abstract class Request<C extends Context> {
|
||||
this.jsonConvert = context.getJsonConvert();
|
||||
}
|
||||
|
||||
protected void setMoredata(ByteBuffer buffer) {
|
||||
this.moredata = buffer;
|
||||
}
|
||||
|
||||
protected ByteBuffer removeMoredata() {
|
||||
ByteBuffer rs = this.moredata;
|
||||
this.moredata = null;
|
||||
return rs;
|
||||
}
|
||||
|
||||
protected ByteBuffer pollReadBuffer() {
|
||||
ByteBuffer buffer = this.readBuffer;
|
||||
this.readBuffer = null;
|
||||
@@ -90,6 +104,8 @@ public abstract class Request<C extends Context> {
|
||||
protected void recycle() {
|
||||
createtime = 0;
|
||||
keepAlive = false;
|
||||
more = false;
|
||||
moredata = null;
|
||||
attributes.clear();
|
||||
channel = null; // close it by response
|
||||
}
|
||||
|
||||
@@ -251,7 +251,8 @@ public abstract class Response<C extends Context, R extends Request<C>> {
|
||||
}
|
||||
this.recycleListener = null;
|
||||
}
|
||||
if (request.keepAlive && channel != null) {
|
||||
if (request.more) removeChannel();
|
||||
if (request.keepAlive && !request.more && channel != null) {
|
||||
if (channel.isOpen()) {
|
||||
AsyncConnection conn = removeChannel();
|
||||
this.recycle();
|
||||
@@ -279,24 +280,44 @@ public abstract class Response<C extends Context, R extends Request<C>> {
|
||||
|
||||
public void finish(ByteBuffer buffer) {
|
||||
if (!this.inited) return; //避免重复关闭
|
||||
this.channel.write(buffer, buffer, finishHandler);
|
||||
ByteBuffer data = this.request.removeMoredata();
|
||||
final AsyncConnection conn = this.channel;
|
||||
final boolean more = data != null && this.request.keepAlive;
|
||||
this.request.more = more;
|
||||
conn.write(buffer, buffer, finishHandler);
|
||||
if (more) new PrepareRunner(this.context, conn, data, null).run();
|
||||
}
|
||||
|
||||
public void finish(boolean kill, ByteBuffer buffer) {
|
||||
if (!this.inited) return; //避免重复关闭
|
||||
if (kill) refuseAlive();
|
||||
this.channel.write(buffer, buffer, finishHandler);
|
||||
ByteBuffer data = this.request.removeMoredata();
|
||||
final AsyncConnection conn = this.channel;
|
||||
final boolean more = data != null && this.request.keepAlive;
|
||||
this.request.more = more;
|
||||
conn.write(buffer, buffer, finishHandler);
|
||||
if (more) new PrepareRunner(this.context, conn, data, null).run();
|
||||
}
|
||||
|
||||
public void finish(ByteBuffer... buffers) {
|
||||
if (!this.inited) return; //避免重复关闭
|
||||
this.channel.write(buffers, buffers, finishHandler2);
|
||||
final AsyncConnection conn = this.channel;
|
||||
ByteBuffer data = this.request.removeMoredata();
|
||||
final boolean more = data != null && this.request.keepAlive;
|
||||
this.request.more = more;
|
||||
conn.write(buffers, buffers, finishHandler2);
|
||||
if (more) new PrepareRunner(this.context, conn, data, null).run();
|
||||
}
|
||||
|
||||
public void finish(boolean kill, ByteBuffer... buffers) {
|
||||
if (!this.inited) return; //避免重复关闭
|
||||
if (kill) refuseAlive();
|
||||
this.channel.write(buffers, buffers, finishHandler2);
|
||||
final AsyncConnection conn = this.channel;
|
||||
ByteBuffer data = this.request.removeMoredata();
|
||||
final boolean more = data != null && this.request.keepAlive;
|
||||
this.request.more = more;
|
||||
conn.write(buffers, buffers, finishHandler2);
|
||||
if (more) new PrepareRunner(this.context, conn, data, null).run();
|
||||
}
|
||||
|
||||
protected <A> void send(final ByteBuffer buffer, final A attachment, final CompletionHandler<Integer, A> handler) {
|
||||
|
||||
@@ -32,7 +32,8 @@ public class TcpAioProtocolServer extends ProtocolServer {
|
||||
|
||||
@Override
|
||||
public void open(AnyValue config) throws IOException {
|
||||
group = AsynchronousChannelGroup.withThreadPool(context.executor);
|
||||
//group = AsynchronousChannelGroup.withThreadPool(context.executor);
|
||||
group = AsynchronousChannelGroup.withFixedThreadPool(context.executor.getCorePoolSize(), context.executor.getThreadFactory());
|
||||
this.serverChannel = AsynchronousServerSocketChannel.open(group);
|
||||
|
||||
final Set<SocketOption<?>> options = this.serverChannel.supportedOptions();
|
||||
|
||||
@@ -22,17 +22,17 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||
*/
|
||||
public class TcpNioAsyncConnection extends AsyncConnection {
|
||||
|
||||
private int readTimeoutSeconds;
|
||||
protected int readTimeoutSeconds;
|
||||
|
||||
private int writeTimeoutSeconds;
|
||||
protected int writeTimeoutSeconds;
|
||||
|
||||
private final Selector selector;
|
||||
protected final Selector selector;
|
||||
|
||||
private SelectionKey key;
|
||||
protected SelectionKey key;
|
||||
|
||||
private final SocketChannel channel;
|
||||
protected final SocketChannel channel;
|
||||
|
||||
private final SocketAddress remoteAddress;
|
||||
protected final SocketAddress remoteAddress;
|
||||
|
||||
ByteBuffer readBuffer;
|
||||
|
||||
@@ -362,4 +362,5 @@ public class TcpNioAsyncConnection extends AsyncConnection {
|
||||
public final boolean isTCP() {
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ import java.net.*;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.*;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.*;
|
||||
import org.redkale.util.AnyValue;
|
||||
|
||||
/**
|
||||
@@ -27,9 +27,9 @@ public class TcpNioProtocolServer extends ProtocolServer {
|
||||
|
||||
private ServerSocketChannel serverChannel;
|
||||
|
||||
private NIOThreadWorker[] workers;
|
||||
private NioThreadWorker[] workers;
|
||||
|
||||
private NIOThreadWorker currWorker;
|
||||
private NioThreadWorker currWorker;
|
||||
|
||||
private boolean running;
|
||||
|
||||
@@ -82,11 +82,11 @@ public class TcpNioProtocolServer extends ProtocolServer {
|
||||
@Override
|
||||
public void accept() throws IOException {
|
||||
this.serverChannel.register(acceptSelector, SelectionKey.OP_ACCEPT);
|
||||
final CountDownLatch cdl = new CountDownLatch(1);
|
||||
this.running = true;
|
||||
this.workers = new NIOThreadWorker[Runtime.getRuntime().availableProcessors()];
|
||||
this.workers = new NioThreadWorker[Runtime.getRuntime().availableProcessors()];
|
||||
final CountDownLatch wkcdl = new CountDownLatch(workers.length);
|
||||
for (int i = 0; i < workers.length; i++) {
|
||||
workers[i] = new NIOThreadWorker();
|
||||
workers[i] = new NioThreadWorker(wkcdl, i + 1, workers.length);
|
||||
workers[i].setDaemon(true);
|
||||
workers[i].start();
|
||||
}
|
||||
@@ -95,6 +95,12 @@ public class TcpNioProtocolServer extends ProtocolServer {
|
||||
}
|
||||
workers[workers.length - 1].next = workers[0];
|
||||
currWorker = workers[0];
|
||||
try {
|
||||
wkcdl.await(3, TimeUnit.SECONDS);
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
final CountDownLatch cdl = new CountDownLatch(1);
|
||||
new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
@@ -111,12 +117,6 @@ public class TcpNioProtocolServer extends ProtocolServer {
|
||||
if (key.isAcceptable()) {
|
||||
try {
|
||||
SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
|
||||
channel.configureBlocking(false);
|
||||
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
|
||||
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
|
||||
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
|
||||
channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
|
||||
channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
|
||||
createCounter.incrementAndGet();
|
||||
livingCounter.incrementAndGet();
|
||||
currWorker.addChannel(channel);
|
||||
@@ -134,45 +134,91 @@ public class TcpNioProtocolServer extends ProtocolServer {
|
||||
}
|
||||
}.start();
|
||||
try {
|
||||
cdl.await();
|
||||
cdl.await(3, TimeUnit.SECONDS);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (!this.running) return;
|
||||
this.running = false;
|
||||
serverChannel.close();
|
||||
acceptSelector.close();
|
||||
for (NIOThreadWorker worker : workers) {
|
||||
for (NioThreadWorker worker : workers) {
|
||||
worker.interrupt();
|
||||
}
|
||||
this.running = false;
|
||||
}
|
||||
|
||||
class NIOThreadWorker extends Thread {
|
||||
class NioThreadWorker extends Thread {
|
||||
|
||||
final Selector selector;
|
||||
|
||||
NIOThreadWorker next;
|
||||
final CountDownLatch cdl;
|
||||
|
||||
public NIOThreadWorker() {
|
||||
private final Queue<TcpNioAsyncConnection> connected;
|
||||
|
||||
private final CopyOnWriteArrayList<TcpNioAsyncConnection> done;
|
||||
|
||||
protected volatile Thread ownerThread;
|
||||
|
||||
NioThreadWorker next;
|
||||
|
||||
public NioThreadWorker(final CountDownLatch cdl, int idx, int count) {
|
||||
this.cdl = cdl;
|
||||
String idxstr = "000000" + idx;
|
||||
this.setName("NioThreadWorker:" + context.getServerAddress().getPort() + "-" + idxstr.substring(idxstr.length() - ("" + count).length()));
|
||||
try {
|
||||
this.selector = Selector.open();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
this.connected = new ArrayBlockingQueue<>(1000000);
|
||||
this.done = new CopyOnWriteArrayList<>();
|
||||
}
|
||||
|
||||
public void addChannel(SocketChannel channel) throws IOException {
|
||||
AsyncConnection conn = new TcpNioAsyncConnection(channel, null, selector, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null);
|
||||
context.runAsync(new PrepareRunner(context, conn, null, null));
|
||||
public boolean addChannel(SocketChannel channel) throws IOException {
|
||||
TcpNioAsyncConnection conn = new TcpNioAsyncConnection(channel, null, selector, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null);
|
||||
return connected.add(conn);
|
||||
}
|
||||
|
||||
protected void processConnected() {
|
||||
TcpNioAsyncConnection schannel;
|
||||
try {
|
||||
while ((schannel = connected.poll()) != null) {
|
||||
SocketChannel channel = schannel.channel;
|
||||
channel.configureBlocking(false);
|
||||
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
|
||||
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
|
||||
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
|
||||
channel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);
|
||||
channel.setOption(StandardSocketOptions.SO_SNDBUF, 16 * 1024);
|
||||
channel.register(selector, SelectionKey.OP_READ).attach(schannel);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// do nothing
|
||||
}
|
||||
synchronized (done) {
|
||||
for (TcpNioAsyncConnection conn : done) {
|
||||
if (conn.key != null && conn.key.isValid()) {
|
||||
conn.key.interestOps(SelectionKey.OP_WRITE);
|
||||
}
|
||||
}
|
||||
done.clear();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isSameThread() {
|
||||
return this.ownerThread == Thread.currentThread();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
this.ownerThread = Thread.currentThread();
|
||||
if (cdl != null) cdl.countDown();
|
||||
while (running) {
|
||||
processConnected();
|
||||
try {
|
||||
selector.select(50);
|
||||
} catch (IOException e) {
|
||||
@@ -207,13 +253,28 @@ public class TcpNioProtocolServer extends ProtocolServer {
|
||||
return;
|
||||
}
|
||||
if (conn == null) return;
|
||||
if (key.isWritable()) {
|
||||
if (conn.writeHandler != null) writeOP(key, socket, conn);
|
||||
} else if (key.isReadable()) {
|
||||
if (key.isReadable()) {
|
||||
if (conn.readHandler != null) readOP(key, socket, conn);
|
||||
} else if (key.isWritable()) {
|
||||
if (conn.writeHandler != null) writeOP(key, socket, conn);
|
||||
}
|
||||
}
|
||||
|
||||
private void closeOP(SelectionKey key) {
|
||||
if (key == null) return;
|
||||
TcpNioAsyncConnection conn = (TcpNioAsyncConnection) key.attachment();
|
||||
try {
|
||||
if (key.isValid()) {
|
||||
SocketChannel socketChannel = (SocketChannel) key.channel();
|
||||
socketChannel.close();
|
||||
key.attach(null);
|
||||
key.cancel();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
}
|
||||
conn.dispose();
|
||||
}
|
||||
|
||||
private void readOP(SelectionKey key, SocketChannel socket, TcpNioAsyncConnection conn) {
|
||||
final CompletionHandler handler = conn.removeReadHandler();
|
||||
final ByteBuffer buffer = conn.removeReadBuffer();
|
||||
|
||||
@@ -165,17 +165,18 @@ public class HttpRequest extends Request<HttpContext> {
|
||||
header.addValue(name, value);
|
||||
}
|
||||
}
|
||||
array.clear();
|
||||
if (buffer.hasRemaining()) array.write(buffer, buffer.remaining());
|
||||
if (this.contentType != null && this.contentType.contains("boundary=")) {
|
||||
this.boundary = true;
|
||||
}
|
||||
if (this.contentType != null && this.contentType.contains("boundary=")) this.boundary = true;
|
||||
if (this.boundary) this.keepAlive = false; //文件上传必须设置keepAlive为false,因为文件过大时用户不一定会skip掉多余的数据
|
||||
|
||||
array.clear();
|
||||
if (this.contentLength > 0 && (this.contentType == null || !this.boundary)) {
|
||||
if (this.contentLength > context.getMaxbody()) return -1;
|
||||
array.write(buffer, Math.min((int) this.contentLength, buffer.remaining()));
|
||||
int lr = (int) this.contentLength - array.size();
|
||||
return lr > 0 ? lr : 0;
|
||||
}
|
||||
if (buffer.hasRemaining() && (this.boundary || !this.keepAlive)) array.write(buffer, buffer.remaining()); //文件上传、HTTP1.0或Connection:close
|
||||
//暂不考虑是keep-alive且存在body却没有指定Content-Length的情况
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
@@ -37,9 +37,20 @@ import org.redkale.util.Comment;
|
||||
*/
|
||||
public abstract class WebSocket<G extends Serializable, T> {
|
||||
|
||||
@Comment("强制关闭结果码")
|
||||
public static final int CLOSECODE_FORCED = 1;
|
||||
//--------------------------- CLOSECODE -------------------------------
|
||||
@Comment("服务器主动关闭")
|
||||
public static final int CLOSECODE_SERVERCLOSE = 3001;
|
||||
|
||||
@Comment("客户端主动关闭")
|
||||
public static final int CLOSECODE_CLIENTCLOSE = 3002;
|
||||
|
||||
@Comment("异常关闭")
|
||||
public static final int CLOSECODE_WSEXCEPTION = 3003;
|
||||
|
||||
@Comment("异常数据强制关闭")
|
||||
public static final int CLOSECODE_ILLPACKET = 3004;
|
||||
|
||||
//---------------------------- RETCODE --------------------------------
|
||||
@Comment("消息不合法")
|
||||
public static final int RETCODE_SEND_ILLPACKET = 1 << 1; //2
|
||||
|
||||
@@ -803,7 +814,7 @@ public abstract class WebSocket<G extends Serializable, T> {
|
||||
*/
|
||||
public final void close() {
|
||||
if (this._runner != null) {
|
||||
CompletableFuture<Void> future = this._runner.closeRunner(CLOSECODE_FORCED, "user close");
|
||||
CompletableFuture<Void> future = this._runner.closeRunner(CLOSECODE_SERVERCLOSE, "user close");
|
||||
if (future != null) future.join();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -243,7 +243,7 @@ public class WebSocketEngine {
|
||||
}
|
||||
}
|
||||
}
|
||||
if (future != null) future = future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers));
|
||||
if (future != null) future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers));
|
||||
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
|
||||
} else {
|
||||
CompletableFuture<Integer> future = null;
|
||||
@@ -302,7 +302,7 @@ public class WebSocketEngine {
|
||||
}
|
||||
}
|
||||
}
|
||||
if (future != null) future = future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers));
|
||||
if (future != null) future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers));
|
||||
return future == null ? CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY) : future;
|
||||
} else {
|
||||
CompletableFuture<Integer> future = null;
|
||||
|
||||
@@ -92,13 +92,13 @@ public abstract class WebSocketNode {
|
||||
|
||||
protected abstract CompletableFuture<Integer> broadcastMessage(@RpcTargetAddress InetSocketAddress targetAddress, WebSocketRange wsrange, Object message, boolean last);
|
||||
|
||||
protected abstract CompletableFuture<Void> connect(Serializable userid, InetSocketAddress addr);
|
||||
protected abstract CompletableFuture<Void> connect(Serializable userid, InetSocketAddress sncpAddr);
|
||||
|
||||
protected abstract CompletableFuture<Void> disconnect(Serializable userid, InetSocketAddress addr);
|
||||
protected abstract CompletableFuture<Void> disconnect(Serializable userid, InetSocketAddress sncpAddr);
|
||||
|
||||
protected abstract CompletableFuture<Void> changeUserid(Serializable fromuserid, Serializable touserid, InetSocketAddress addr);
|
||||
protected abstract CompletableFuture<Void> changeUserid(Serializable fromuserid, Serializable touserid, InetSocketAddress sncpAddr);
|
||||
|
||||
protected abstract CompletableFuture<Integer> forceCloseWebSocket(Serializable userid, InetSocketAddress addr);
|
||||
protected abstract CompletableFuture<Integer> forceCloseWebSocket(Serializable userid, @RpcTargetAddress InetSocketAddress targetAddress);
|
||||
|
||||
//--------------------------------------------------------------------------------
|
||||
final CompletableFuture<Void> connect(final Serializable userid) {
|
||||
@@ -244,7 +244,7 @@ public abstract class WebSocketNode {
|
||||
}
|
||||
return future == null ? CompletableFuture.completedFuture(0) : future;
|
||||
});
|
||||
return localFuture.thenCombine(remoteFuture, (a, b) -> a + b);
|
||||
return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a + b);
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------------------------
|
||||
|
||||
@@ -75,7 +75,7 @@ class WebSocketRunner implements Runnable {
|
||||
public void completed(Integer count, Void attachment1) {
|
||||
if (count < 1) {
|
||||
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner(userid=" + webSocket.getUserid() + ") abort on read buffer count, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds");
|
||||
closeRunner(0, "read buffer count is " + count);
|
||||
closeRunner(CLOSECODE_ILLPACKET, "read buffer count is " + count);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
@@ -182,17 +182,17 @@ class WebSocketRunner implements Runnable {
|
||||
}
|
||||
} else if (packet.type == FrameType.CLOSE) {
|
||||
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner onMessage by CLOSE FrameType : " + packet);
|
||||
closeRunner(0, "received CLOSE frame-type message");
|
||||
closeRunner(CLOSECODE_CLIENTCLOSE, "received CLOSE frame-type message");
|
||||
return;
|
||||
} else {
|
||||
context.getLogger().log(Level.WARNING, "WebSocketRunner onMessage by unknown FrameType : " + packet);
|
||||
closeRunner(0, "received unknown frame-type message");
|
||||
closeRunner(CLOSECODE_ILLPACKET, "received unknown frame-type message");
|
||||
return;
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
context.getLogger().log(Level.WARNING, "WebSocketRunner(userid=" + webSocket.getUserid() + ") onMessage by received error", e);
|
||||
closeRunner(0, "websocket-received error");
|
||||
closeRunner(CLOSECODE_WSEXCEPTION, "websocket-received error");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -200,9 +200,9 @@ class WebSocketRunner implements Runnable {
|
||||
public void failed(Throwable exc, Void attachment2) {
|
||||
if (exc != null) {
|
||||
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner read WebSocketPacket failed, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", exc);
|
||||
closeRunner(0, "read websocket-packet failed");
|
||||
closeRunner(CLOSECODE_WSEXCEPTION, "read websocket-packet failed");
|
||||
} else {
|
||||
closeRunner(RETCODE_ILLEGALBUFFER, "decode websocket-packet error");
|
||||
closeRunner(CLOSECODE_WSEXCEPTION, "decode websocket-packet error");
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -212,7 +212,7 @@ class WebSocketRunner implements Runnable {
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read bytes from channel, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds", e);
|
||||
closeRunner(0, "read bytes from channel error");
|
||||
closeRunner(CLOSECODE_WSEXCEPTION, "read bytes from channel error");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -53,6 +53,7 @@ public final class SncpRequest extends Request<SncpContext> {
|
||||
@Override
|
||||
protected int readHeader(ByteBuffer buffer) {
|
||||
if (buffer.remaining() < HEADER_SIZE) {
|
||||
if (buffer.hasRemaining()) buffer.get(new byte[buffer.remaining()]);
|
||||
this.ping = true;
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -56,13 +56,13 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Integer> sendMessage(@RpcTargetAddress InetSocketAddress addr, Object message, boolean last, Serializable userid) {
|
||||
public CompletableFuture<Integer> sendMessage(@RpcTargetAddress InetSocketAddress targetAddress, Object message, boolean last, Serializable userid) {
|
||||
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||
return this.localEngine.sendMessage(message, last, userid);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Integer> broadcastMessage(@RpcTargetAddress InetSocketAddress addr, final WebSocketRange wsrange, Object message, boolean last) {
|
||||
public CompletableFuture<Integer> broadcastMessage(@RpcTargetAddress InetSocketAddress targetAddress, final WebSocketRange wsrange, Object message, boolean last) {
|
||||
if (this.localEngine == null) return CompletableFuture.completedFuture(RETCODE_GROUP_EMPTY);
|
||||
return this.localEngine.broadcastMessage(wsrange, message, last);
|
||||
}
|
||||
@@ -124,15 +124,15 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
|
||||
/**
|
||||
* 强制关闭用户的WebSocket
|
||||
*
|
||||
* @param userid Serializable
|
||||
* @param sncpAddr InetSocketAddress
|
||||
* @param userid Serializable
|
||||
* @param targetAddress InetSocketAddress
|
||||
*
|
||||
* @return 无返回值
|
||||
*/
|
||||
@Override
|
||||
public CompletableFuture<Integer> forceCloseWebSocket(Serializable userid, InetSocketAddress sncpAddr) {
|
||||
public CompletableFuture<Integer> forceCloseWebSocket(Serializable userid, @RpcTargetAddress InetSocketAddress targetAddress) {
|
||||
//不能从sncpNodeAddresses中移除,因为engine.forceCloseWebSocket 会调用到disconnect
|
||||
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " forceCloseWebSocket from " + sncpAddr);
|
||||
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " forceCloseWebSocket from " + targetAddress);
|
||||
if (localEngine == null) return CompletableFuture.completedFuture(0);
|
||||
return CompletableFuture.completedFuture(localEngine.forceCloseLocalWebSocket(userid));
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@ import java.sql.*;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.*;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.*;
|
||||
import java.util.logging.Level;
|
||||
import org.redkale.service.Local;
|
||||
import org.redkale.util.*;
|
||||
@@ -45,8 +45,8 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PoolSource<Connection> createPoolSource(DataSource source, String rwtype, ArrayBlockingQueue queue, Properties prop) {
|
||||
return new PoolJdbcSource(this.name, this.persistxml, rwtype, queue, prop, this.logger);
|
||||
protected PoolSource<Connection> createPoolSource(DataSource source, String rwtype, ArrayBlockingQueue queue, Semaphore semaphore, Properties prop) {
|
||||
return new PoolJdbcSource(this.name, this.persistxml, rwtype, queue, semaphore, prop, this.logger);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -483,7 +483,8 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
|
||||
final Map<Class, String> joinTabalis = node == null ? null : node.getJoinTabalis();
|
||||
final CharSequence join = node == null ? null : node.createSQLJoin(this, false, joinTabalis, new HashSet<>(), info);
|
||||
final CharSequence where = node == null ? null : node.createSQLExpress(info, joinTabalis);
|
||||
if ("mysql".equals(this.readPool.getDbtype()) || "postgresql".equals(this.readPool.getDbtype())) {
|
||||
final String dbtype = this.readPool.getDbtype();
|
||||
if ("mysql".equals(dbtype) || "postgresql".equals(dbtype)) {
|
||||
final String listsql = "SELECT " + info.getQueryColumns("a", selects) + " FROM " + info.getTable(node) + " a" + (join == null ? "" : join)
|
||||
+ ((where == null || where.length() == 0) ? "" : (" WHERE " + where)) + createSQLOrderby(info, flipper) + (flipper == null || flipper.getLimit() < 1 ? "" : (" LIMIT " + flipper.getLimit() + " OFFSET " + flipper.getOffset()));
|
||||
if (info.isLoggable(logger, Level.FINEST, listsql)) {
|
||||
@@ -497,13 +498,17 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
|
||||
set.close();
|
||||
ps.close();
|
||||
long total = list.size();
|
||||
final String countsql = "SELECT COUNT(*) FROM " + info.getTable(node) + " a" + (join == null ? "" : join)
|
||||
+ ((where == null || where.length() == 0) ? "" : (" WHERE " + where));
|
||||
ps = conn.prepareStatement(countsql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
|
||||
set = ps.executeQuery();
|
||||
if (set.next()) total = set.getLong(1);
|
||||
set.close();
|
||||
ps.close();
|
||||
if (needtotal) {
|
||||
final String countsql = "SELECT COUNT(*) FROM " + info.getTable(node) + " a" + (join == null ? "" : join) + ((where == null || where.length() == 0) ? "" : (" WHERE " + where));
|
||||
if (info.isLoggable(logger, Level.FINEST, countsql)) {
|
||||
logger.finest(info.getType().getSimpleName() + " query countsql=" + countsql);
|
||||
}
|
||||
ps = conn.prepareStatement(countsql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
|
||||
set = ps.executeQuery();
|
||||
if (set.next()) total = set.getLong(1);
|
||||
set.close();
|
||||
ps.close();
|
||||
}
|
||||
return CompletableFuture.completedFuture(new Sheet<>(total, list));
|
||||
}
|
||||
final String sql = "SELECT " + info.getQueryColumns("a", selects) + " FROM " + info.getTable(node) + " a" + (join == null ? "" : join)
|
||||
@@ -589,12 +594,15 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
|
||||
* 直接本地执行SQL语句进行查询,远程模式不可用 <br>
|
||||
* 通常用于复杂的关联查询 <br>
|
||||
*
|
||||
* @param sql SQL语句
|
||||
* @param consumer 回调函数
|
||||
* @param <V> 泛型
|
||||
* @param sql SQL语句
|
||||
* @param handler 回调函数
|
||||
*
|
||||
* @return 结果
|
||||
*/
|
||||
@Local
|
||||
@Override
|
||||
public void directQuery(String sql, Consumer<ResultSet> consumer) {
|
||||
public <V> V directQuery(String sql, Function<ResultSet, V> handler) {
|
||||
final Connection conn = readPool.poll();
|
||||
try {
|
||||
if (logger.isLoggable(Level.FINEST)) logger.finest("direct query sql=" + sql);
|
||||
@@ -602,9 +610,10 @@ public class DataJdbcSource extends DataSqlSource<Connection> {
|
||||
final Statement statement = conn.createStatement();
|
||||
//final PreparedStatement statement = conn.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
|
||||
final ResultSet set = statement.executeQuery(sql);// ps.executeQuery();
|
||||
consumer.accept(set);
|
||||
V rs = handler.apply(set);
|
||||
set.close();
|
||||
statement.close();
|
||||
return rs;
|
||||
} catch (Exception ex) {
|
||||
throw new RuntimeException(ex);
|
||||
} finally {
|
||||
|
||||
141
src/org/redkale/source/DataMemorySource.java
Normal file
141
src/org/redkale/source/DataMemorySource.java
Normal file
@@ -0,0 +1,141 @@
|
||||
/*
|
||||
* To change this license header, choose License Headers in Project Properties.
|
||||
* To change this template file, choose Tools | Templates
|
||||
* and open the template in the editor.
|
||||
*/
|
||||
package org.redkale.source;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.net.URL;
|
||||
import java.sql.ResultSet;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.function.*;
|
||||
import org.redkale.service.Local;
|
||||
import org.redkale.util.*;
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
* @author zhangjx
|
||||
*/
|
||||
/**
|
||||
* DataSource的Memory实现类 <br>
|
||||
* 注意: javax.persistence.jdbc.url 需要指定为 memory:source
|
||||
*
|
||||
* <p>
|
||||
* 详情见: https://redkale.org
|
||||
*
|
||||
* @author zhangjx
|
||||
*/
|
||||
@Local
|
||||
@AutoLoad(false)
|
||||
@SuppressWarnings("unchecked")
|
||||
@ResourceType(DataSource.class)
|
||||
public class DataMemorySource extends DataSqlSource<Void> {
|
||||
|
||||
public DataMemorySource(String unitName, URL persistxml, Properties readprop, Properties writeprop) {
|
||||
super(unitName, persistxml, readprop, writeprop);
|
||||
this.cacheForbidden = false;
|
||||
}
|
||||
|
||||
@Local
|
||||
@Override
|
||||
public String getType() {
|
||||
return "memory";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isOnlyCache(EntityInfo info) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Local
|
||||
@Override
|
||||
public int directExecute(String sql) {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
|
||||
@Local
|
||||
@Override
|
||||
public int[] directExecute(String... sqls) {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
|
||||
@Local
|
||||
@Override
|
||||
public <V> V directQuery(String sql, Function<ResultSet, V> handler) {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isAsync() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String prepareParamSign(int index) {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PoolSource<Void> createPoolSource(DataSource source, String rwtype, ArrayBlockingQueue queue, Semaphore semaphore, Properties prop) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> CompletableFuture<Integer> insertDB(EntityInfo<T> info, T... values) {
|
||||
return CompletableFuture.completedFuture(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> CompletableFuture<Integer> deleteDB(EntityInfo<T> info, Flipper flipper, String sql) {
|
||||
return CompletableFuture.completedFuture(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> CompletableFuture<Integer> updateDB(EntityInfo<T> info, T... values) {
|
||||
return CompletableFuture.completedFuture(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> CompletableFuture<Integer> updateDB(EntityInfo<T> info, Flipper flipper, String sql, boolean prepared, Object... params) {
|
||||
return CompletableFuture.completedFuture(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T, N extends Number> CompletableFuture<Map<String, N>> getNumberMapDB(EntityInfo<T> info, String sql, FilterFuncColumn... columns) {
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> CompletableFuture<Number> getNumberResultDB(EntityInfo<T> info, String sql, Number defVal, String column) {
|
||||
return CompletableFuture.completedFuture(defVal);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T, K extends Serializable, N extends Number> CompletableFuture<Map<K, N>> queryColumnMapDB(EntityInfo<T> info, String sql, String keyColumn) {
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> CompletableFuture<T> findDB(EntityInfo<T> info, String sql, boolean onlypk, SelectColumn selects) {
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> CompletableFuture<Serializable> findColumnDB(EntityInfo<T> info, String sql, boolean onlypk, String column, Serializable defValue) {
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> CompletableFuture<Boolean> existsDB(EntityInfo<T> info, String sql, boolean onlypk) {
|
||||
return CompletableFuture.completedFuture(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> CompletableFuture<Sheet<T>> querySheetDB(EntityInfo<T> info, boolean needtotal, SelectColumn selects, Flipper flipper, FilterNode node) {
|
||||
return CompletableFuture.completedFuture(new Sheet<>());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -56,11 +56,11 @@ public final class DataSources {
|
||||
private DataSources() {
|
||||
}
|
||||
|
||||
public static DataSource createDataSource(final String unitName, Properties prop) throws IOException {
|
||||
public static DataSource createDataSource2(final String unitName, Properties prop) throws IOException {
|
||||
return new DataJdbcSource(unitName, null, prop, prop);
|
||||
}
|
||||
|
||||
public static DataSource createDataSource(final String unitName, Properties readprop, Properties writeprop) throws IOException {
|
||||
public static DataSource createDataSource2(final String unitName, Properties readprop, Properties writeprop) throws IOException {
|
||||
return new DataJdbcSource(unitName, null, readprop, writeprop);
|
||||
}
|
||||
|
||||
@@ -72,7 +72,7 @@ public final class DataSources {
|
||||
|
||||
public static DataSource createDataSource(final String unitName, URL persistxml) throws IOException {
|
||||
if (persistxml == null) persistxml = DataSources.class.getResource("/persistence.xml");
|
||||
InputStream in = persistxml.openStream();
|
||||
InputStream in = persistxml == null ? null : persistxml.openStream();
|
||||
if (in == null) return null;
|
||||
Map<String, Properties> map = loadPersistenceXml(in);
|
||||
Properties readprop = null;
|
||||
@@ -103,6 +103,8 @@ public final class DataSources {
|
||||
}
|
||||
if (readprop == null) throw new IOException("Cannot find (resource.name = '" + unitName + "') DataSource");
|
||||
if (writeprop == null) writeprop = readprop;
|
||||
if (readprop.getProperty(JDBC_URL, "").startsWith("memory:source")) return new DataMemorySource(unitName, persistxml, readprop, writeprop);
|
||||
|
||||
String impl = readprop.getProperty(JDBC_DATASOURCE_CLASS, DataJdbcSource.class.getName());
|
||||
if (DataJdbcSource.class.getName().equals(impl)) {
|
||||
try {
|
||||
|
||||
@@ -67,9 +67,11 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
|
||||
@SuppressWarnings({"OverridableMethodCallInConstructor", "LeakingThisInConstructor"})
|
||||
public DataSqlSource(String unitName, URL persistxml, Properties readprop, Properties writeprop) {
|
||||
if (readprop == null) readprop = new Properties();
|
||||
if (writeprop == null) writeprop = readprop;
|
||||
final AtomicInteger counter = new AtomicInteger();
|
||||
this.threads = Integer.decode(readprop.getProperty(JDBC_CONNECTIONS_LIMIT, "" + Runtime.getRuntime().availableProcessors() * 16));
|
||||
int maxconns = Math.max(8, Integer.decode(readprop.getProperty(JDBC_CONNECTIONS_LIMIT, "" + Runtime.getRuntime().availableProcessors() * 200)));
|
||||
int maxconns = Math.max(8, Integer.decode(readprop.getProperty(JDBC_CONNECTIONS_LIMIT, "" + Math.min(1000, Runtime.getRuntime().availableProcessors() * 200))));
|
||||
if (readprop != writeprop) {
|
||||
this.threads += Integer.decode(writeprop.getProperty(JDBC_CONNECTIONS_LIMIT, "" + Runtime.getRuntime().availableProcessors() * 16));
|
||||
maxconns = 0;
|
||||
@@ -92,7 +94,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
return t;
|
||||
});
|
||||
final int bufferCapacity = Math.max(8 * 1024, Integer.decode(readprop.getProperty(JDBC_CONNECTIONSCAPACITY, "" + 8 * 1024)));
|
||||
this.bufferPool = new ObjectPool<>(new AtomicLong(), new AtomicLong(), this.threads,
|
||||
this.bufferPool = new ObjectPool<>(new AtomicLong(), new AtomicLong(), Math.max(maxconns, this.threads * 2),
|
||||
(Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> {
|
||||
if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) return false;
|
||||
e.clear();
|
||||
@@ -102,8 +104,9 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
this.persistxml = persistxml;
|
||||
this.cacheForbidden = "NONE".equalsIgnoreCase(readprop.getProperty(JDBC_CACHE_MODE));
|
||||
ArrayBlockingQueue<DBChannel> queue = maxconns > 0 ? new ArrayBlockingQueue(maxconns) : null;
|
||||
this.readPool = createPoolSource(this, "read", queue, readprop);
|
||||
this.writePool = createPoolSource(this, "write", queue, writeprop);
|
||||
Semaphore semaphore = maxconns > 0 ? new Semaphore(maxconns) : null;
|
||||
this.readPool = createPoolSource(this, "read", queue, semaphore, readprop);
|
||||
this.writePool = createPoolSource(this, "write", queue, semaphore, writeprop);
|
||||
}
|
||||
|
||||
@Local
|
||||
@@ -113,7 +116,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
public abstract int[] directExecute(String... sqls);
|
||||
|
||||
@Local
|
||||
public abstract void directQuery(String sql, Consumer<ResultSet> consumer);
|
||||
public abstract <V> V directQuery(String sql, Function<ResultSet, V> handler);
|
||||
|
||||
//是否异步, 为true则只能调用pollAsync方法,为false则只能调用poll方法
|
||||
protected abstract boolean isAsync();
|
||||
@@ -122,7 +125,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
protected abstract String prepareParamSign(int index);
|
||||
|
||||
//创建连接池
|
||||
protected abstract PoolSource<DBChannel> createPoolSource(DataSource source, String rwtype, ArrayBlockingQueue queue, Properties prop);
|
||||
protected abstract PoolSource<DBChannel> createPoolSource(DataSource source, String rwtype, ArrayBlockingQueue queue, Semaphore semaphore, Properties prop);
|
||||
|
||||
//插入纪录
|
||||
protected abstract <T> CompletableFuture<Integer> insertDB(final EntityInfo<T> info, T... values);
|
||||
@@ -205,8 +208,8 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
@Local
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
readPool.close();
|
||||
writePool.close();
|
||||
if (readPool != null) readPool.close();
|
||||
if (writePool != null) writePool.close();
|
||||
}
|
||||
|
||||
@Local
|
||||
@@ -226,7 +229,11 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
}
|
||||
|
||||
protected <T> EntityInfo<T> loadEntityInfo(Class<T> clazz) {
|
||||
return EntityInfo.load(clazz, this.cacheForbidden, this.readPool.props, this, fullloader);
|
||||
return EntityInfo.load(clazz, this.cacheForbidden, this.readPool == null ? null : this.readPool.props, this, fullloader);
|
||||
}
|
||||
|
||||
protected boolean isOnlyCache(EntityInfo info) {
|
||||
return info.isVirtualEntity();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -282,7 +289,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
info.createPrimaryValue(value);
|
||||
}
|
||||
}
|
||||
if (info.isVirtualEntity()) return insertCache(info, values);
|
||||
if (isOnlyCache(info)) return insertCache(info, values);
|
||||
return insertDB(info, values).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
@@ -303,7 +310,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
info.createPrimaryValue(value);
|
||||
}
|
||||
}
|
||||
if (info.isVirtualEntity()) {
|
||||
if (isOnlyCache(info)) {
|
||||
return CompletableFuture.supplyAsync(() -> insertCache(info, values), getExecutor());
|
||||
}
|
||||
if (isAsync()) return insertDB(info, values).whenComplete((rs, t) -> {
|
||||
@@ -390,7 +397,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
public <T> int delete(Class<T> clazz, Serializable... ids) {
|
||||
if (ids.length == 0) return -1;
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) return deleteCache(info, -1, ids);
|
||||
if (isOnlyCache(info)) return deleteCache(info, -1, ids);
|
||||
return deleteCompose(info, ids).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
@@ -404,7 +411,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
public <T> CompletableFuture<Integer> deleteAsync(final Class<T> clazz, final Serializable... ids) {
|
||||
if (ids.length == 0) return CompletableFuture.completedFuture(-1);
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) {
|
||||
if (isOnlyCache(info)) {
|
||||
return CompletableFuture.supplyAsync(() -> deleteCache(info, -1, ids), getExecutor());
|
||||
}
|
||||
if (isAsync()) return deleteCompose(info, ids).whenComplete((rs, t) -> {
|
||||
@@ -436,7 +443,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
@Override
|
||||
public <T> int delete(Class<T> clazz, final Flipper flipper, FilterNode node) {
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) return deleteCache(info, -1, flipper, node);
|
||||
if (isOnlyCache(info)) return deleteCache(info, -1, flipper, node);
|
||||
return DataSqlSource.this.deleteCompose(info, flipper, node).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
@@ -449,7 +456,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
@Override
|
||||
public <T> CompletableFuture<Integer> deleteAsync(final Class<T> clazz, final Flipper flipper, FilterNode node) {
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) {
|
||||
if (isOnlyCache(info)) {
|
||||
return CompletableFuture.supplyAsync(() -> deleteCache(info, -1, flipper, node), getExecutor());
|
||||
}
|
||||
if (isAsync()) return DataSqlSource.this.deleteCompose(info, flipper, node).whenComplete((rs, t) -> {
|
||||
@@ -561,7 +568,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
checkEntity("update", false, values);
|
||||
final Class<T> clazz = (Class<T>) values[0].getClass();
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) return updateCache(info, -1, values);
|
||||
if (isOnlyCache(info)) return updateCache(info, -1, values);
|
||||
return updateDB(info, values).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
@@ -578,7 +585,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
if (future != null) return future;
|
||||
final Class<T> clazz = (Class<T>) values[0].getClass();
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) return CompletableFuture.supplyAsync(() -> updateCache(info, -1, values), getExecutor());
|
||||
if (isOnlyCache(info)) return CompletableFuture.supplyAsync(() -> updateCache(info, -1, values), getExecutor());
|
||||
if (isAsync()) return updateDB(info, values).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
@@ -609,7 +616,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
@Override
|
||||
public <T> int updateColumn(Class<T> clazz, Serializable id, String column, Serializable value) {
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) return updateCache(info, -1, id, column, value);
|
||||
if (isOnlyCache(info)) return updateCache(info, -1, id, column, value);
|
||||
return updateColumnCompose(info, id, column, value).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
@@ -622,7 +629,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
@Override
|
||||
public <T> CompletableFuture<Integer> updateColumnAsync(final Class<T> clazz, final Serializable id, final String column, final Serializable value) {
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) {
|
||||
if (isOnlyCache(info)) {
|
||||
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, id, column, value), getExecutor());
|
||||
}
|
||||
if (isAsync()) return updateColumnCompose(info, id, column, value).whenComplete((rs, t) -> {
|
||||
@@ -666,7 +673,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
@Override
|
||||
public <T> int updateColumn(Class<T> clazz, String column, Serializable value, FilterNode node) {
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) return updateCache(info, -1, column, value, node);
|
||||
if (isOnlyCache(info)) return updateCache(info, -1, column, value, node);
|
||||
return DataSqlSource.this.updateColumnCompose(info, column, value, node).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
@@ -679,7 +686,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
@Override
|
||||
public <T> CompletableFuture<Integer> updateColumnAsync(final Class<T> clazz, final String column, final Serializable value, final FilterNode node) {
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) {
|
||||
if (isOnlyCache(info)) {
|
||||
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, column, value, node), getExecutor());
|
||||
}
|
||||
if (isAsync()) return DataSqlSource.this.updateColumnCompose(info, column, value, node).whenComplete((rs, t) -> {
|
||||
@@ -740,7 +747,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
public <T> int updateColumn(final Class<T> clazz, final Serializable id, final ColumnValue... values) {
|
||||
if (values == null || values.length < 1) return -1;
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) return updateCache(info, -1, id, values);
|
||||
if (isOnlyCache(info)) return updateCache(info, -1, id, values);
|
||||
return DataSqlSource.this.updateColumnCompose(info, id, values).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
@@ -754,7 +761,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
public <T> CompletableFuture<Integer> updateColumnAsync(final Class<T> clazz, final Serializable id, final ColumnValue... values) {
|
||||
if (values == null || values.length < 1) return CompletableFuture.completedFuture(-1);
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) {
|
||||
if (isOnlyCache(info)) {
|
||||
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, id, values), getExecutor());
|
||||
}
|
||||
if (isAsync()) return DataSqlSource.this.updateColumnCompose(info, id, values).whenComplete((rs, t) -> {
|
||||
@@ -819,7 +826,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
public <T> int updateColumn(final Class<T> clazz, final FilterNode node, final Flipper flipper, final ColumnValue... values) {
|
||||
if (values == null || values.length < 1) return -1;
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) return updateCache(info, -1, node, flipper, values);
|
||||
if (isOnlyCache(info)) return updateCache(info, -1, node, flipper, values);
|
||||
return DataSqlSource.this.updateColumnCompose(info, node, flipper, values).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
@@ -833,7 +840,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
public <T> CompletableFuture<Integer> updateColumnAsync(final Class<T> clazz, final FilterNode node, final Flipper flipper, final ColumnValue... values) {
|
||||
if (values == null || values.length < 1) return CompletableFuture.completedFuture(-1);
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) {
|
||||
if (isOnlyCache(info)) {
|
||||
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, node, flipper, values), getExecutor());
|
||||
}
|
||||
if (isAsync()) return DataSqlSource.this.updateColumnCompose(info, node, flipper, values).whenComplete((rs, t) -> {
|
||||
@@ -890,22 +897,22 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
|
||||
@Override
|
||||
public <T> int updateColumn(final T bean, final String... columns) {
|
||||
return updateColumn(bean, SelectColumn.createIncludes(columns));
|
||||
return updateColumn(bean, SelectColumn.includes(columns));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> CompletableFuture<Integer> updateColumnAsync(final T bean, final String... columns) {
|
||||
return updateColumnAsync(bean, SelectColumn.createIncludes(columns));
|
||||
return updateColumnAsync(bean, SelectColumn.includes(columns));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> int updateColumn(final T bean, final FilterNode node, final String... columns) {
|
||||
return updateColumn(bean, node, SelectColumn.createIncludes(columns));
|
||||
return updateColumn(bean, node, SelectColumn.includes(columns));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> CompletableFuture<Integer> updateColumnAsync(final T bean, final FilterNode node, final String... columns) {
|
||||
return updateColumnAsync(bean, node, SelectColumn.createIncludes(columns));
|
||||
return updateColumnAsync(bean, node, SelectColumn.includes(columns));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -913,7 +920,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
if (bean == null || selects == null) return -1;
|
||||
Class<T> clazz = (Class) bean.getClass();
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) return updateCache(info, -1, false, bean, null, selects);
|
||||
if (isOnlyCache(info)) return updateCache(info, -1, false, bean, null, selects);
|
||||
return DataSqlSource.this.updateColumnCompose(info, false, bean, null, selects).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
@@ -928,7 +935,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
if (bean == null || selects == null) return CompletableFuture.completedFuture(-1);
|
||||
Class<T> clazz = (Class) bean.getClass();
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) {
|
||||
if (isOnlyCache(info)) {
|
||||
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, false, bean, null, selects), getExecutor());
|
||||
}
|
||||
if (isAsync()) return DataSqlSource.this.updateColumnCompose(info, false, bean, null, selects).whenComplete((rs, t) -> {
|
||||
@@ -952,7 +959,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
if (bean == null || node == null || selects == null) return -1;
|
||||
Class<T> clazz = (Class) bean.getClass();
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) return updateCache(info, -1, true, bean, node, selects);
|
||||
if (isOnlyCache(info)) return updateCache(info, -1, true, bean, node, selects);
|
||||
return DataSqlSource.this.updateColumnCompose(info, true, bean, node, selects).whenComplete((rs, t) -> {
|
||||
if (t != null) {
|
||||
futureCompleteConsumer.accept(rs, t);
|
||||
@@ -967,7 +974,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
if (bean == null || node == null || selects == null) return CompletableFuture.completedFuture(-1);
|
||||
Class<T> clazz = (Class) bean.getClass();
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
if (info.isVirtualEntity()) {
|
||||
if (isOnlyCache(info)) {
|
||||
return CompletableFuture.supplyAsync(() -> updateCache(info, -1, true, bean, node, selects), getExecutor());
|
||||
}
|
||||
if (isAsync()) return DataSqlSource.this.updateColumnCompose(info, true, bean, node, selects).whenComplete((rs, t) -> {
|
||||
@@ -1158,7 +1165,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
public <N extends Number> Map<String, N> getNumberMap(final Class entityClass, final FilterNode node, final FilterFuncColumn... columns) {
|
||||
final EntityInfo info = loadEntityInfo(entityClass);
|
||||
final EntityCache cache = info.getCache();
|
||||
if (cache != null && (info.isVirtualEntity() || cache.isFullLoaded())) {
|
||||
if (cache != null && (isOnlyCache(info) || cache.isFullLoaded())) {
|
||||
final Map map = new HashMap<>();
|
||||
if (node == null || node.isCacheUseable(this)) {
|
||||
for (FilterFuncColumn ffc : columns) {
|
||||
@@ -1176,7 +1183,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
public <N extends Number> CompletableFuture<Map<String, N>> getNumberMapAsync(final Class entityClass, final FilterNode node, final FilterFuncColumn... columns) {
|
||||
final EntityInfo info = loadEntityInfo(entityClass);
|
||||
final EntityCache cache = info.getCache();
|
||||
if (cache != null && (info.isVirtualEntity() || cache.isFullLoaded())) {
|
||||
if (cache != null && (isOnlyCache(info) || cache.isFullLoaded())) {
|
||||
final Map map = new HashMap<>();
|
||||
if (node == null || node.isCacheUseable(this)) {
|
||||
for (FilterFuncColumn ffc : columns) {
|
||||
@@ -1264,7 +1271,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
public Number getNumberResult(final Class entityClass, final FilterFunc func, final Number defVal, final String column, final FilterNode node) {
|
||||
final EntityInfo info = loadEntityInfo(entityClass);
|
||||
final EntityCache cache = info.getCache();
|
||||
if (cache != null && (info.isVirtualEntity() || cache.isFullLoaded())) {
|
||||
if (cache != null && (isOnlyCache(info) || cache.isFullLoaded())) {
|
||||
if (node == null || node.isCacheUseable(this)) {
|
||||
return cache.getNumberResult(func, defVal, column, node);
|
||||
}
|
||||
@@ -1276,7 +1283,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
public CompletableFuture<Number> getNumberResultAsync(final Class entityClass, final FilterFunc func, final Number defVal, final String column, final FilterNode node) {
|
||||
final EntityInfo info = loadEntityInfo(entityClass);
|
||||
final EntityCache cache = info.getCache();
|
||||
if (cache != null && (info.isVirtualEntity() || cache.isFullLoaded())) {
|
||||
if (cache != null && (isOnlyCache(info) || cache.isFullLoaded())) {
|
||||
if (node == null || node.isCacheUseable(this)) {
|
||||
return CompletableFuture.completedFuture(cache.getNumberResult(func, defVal, column, node));
|
||||
}
|
||||
@@ -1321,7 +1328,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
public <T, K extends Serializable, N extends Number> Map<K, N> queryColumnMap(final Class<T> entityClass, final String keyColumn, final FilterFunc func, final String funcColumn, FilterNode node) {
|
||||
final EntityInfo info = loadEntityInfo(entityClass);
|
||||
final EntityCache cache = info.getCache();
|
||||
if (cache != null && (info.isVirtualEntity() || cache.isFullLoaded())) {
|
||||
if (cache != null && (isOnlyCache(info) || cache.isFullLoaded())) {
|
||||
if (node == null || node.isCacheUseable(this)) {
|
||||
return cache.queryColumnMap(keyColumn, func, funcColumn, node);
|
||||
}
|
||||
@@ -1333,7 +1340,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
public <T, K extends Serializable, N extends Number> CompletableFuture<Map<K, N>> queryColumnMapAsync(final Class<T> entityClass, final String keyColumn, final FilterFunc func, final String funcColumn, FilterNode node) {
|
||||
final EntityInfo info = loadEntityInfo(entityClass);
|
||||
final EntityCache cache = info.getCache();
|
||||
if (cache != null && (info.isVirtualEntity() || cache.isFullLoaded())) {
|
||||
if (cache != null && (isOnlyCache(info) || cache.isFullLoaded())) {
|
||||
if (node == null || node.isCacheUseable(this)) {
|
||||
return CompletableFuture.completedFuture(cache.queryColumnMap(keyColumn, func, funcColumn, node));
|
||||
}
|
||||
@@ -1717,7 +1724,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
|
||||
@Override
|
||||
public <T, V extends Serializable> List<V> queryColumnList(final String selectedColumn, final Class<T> clazz, final Flipper flipper, final FilterNode node) {
|
||||
final List<T> list = queryList(clazz, SelectColumn.createIncludes(selectedColumn), flipper, node);
|
||||
final List<T> list = queryList(clazz, SelectColumn.includes(selectedColumn), flipper, node);
|
||||
final List<V> rs = new ArrayList<>();
|
||||
if (list.isEmpty()) return rs;
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
@@ -1730,7 +1737,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
|
||||
@Override
|
||||
public <T, V extends Serializable> CompletableFuture<List<V>> queryColumnListAsync(final String selectedColumn, final Class<T> clazz, final Flipper flipper, final FilterNode node) {
|
||||
return queryListAsync(clazz, SelectColumn.createIncludes(selectedColumn), flipper, node).thenApply((List<T> list) -> {
|
||||
return queryListAsync(clazz, SelectColumn.includes(selectedColumn), flipper, node).thenApply((List<T> list) -> {
|
||||
final List<V> rs = new ArrayList<>();
|
||||
if (list.isEmpty()) return rs;
|
||||
final EntityInfo<T> info = loadEntityInfo(clazz);
|
||||
@@ -1766,7 +1773,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
|
||||
@Override
|
||||
public <T, V extends Serializable> Sheet<V> queryColumnSheet(final String selectedColumn, final Class<T> clazz, final Flipper flipper, final FilterNode node) {
|
||||
Sheet<T> sheet = querySheet(clazz, SelectColumn.createIncludes(selectedColumn), flipper, node);
|
||||
Sheet<T> sheet = querySheet(clazz, SelectColumn.includes(selectedColumn), flipper, node);
|
||||
final Sheet<V> rs = new Sheet<>();
|
||||
if (sheet.isEmpty()) return rs;
|
||||
rs.setTotal(sheet.getTotal());
|
||||
@@ -1782,7 +1789,7 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
|
||||
|
||||
@Override
|
||||
public <T, V extends Serializable> CompletableFuture<Sheet<V>> queryColumnSheetAsync(final String selectedColumn, final Class<T> clazz, final Flipper flipper, final FilterNode node) {
|
||||
return querySheetAsync(clazz, SelectColumn.createIncludes(selectedColumn), flipper, node).thenApply((Sheet<T> sheet) -> {
|
||||
return querySheetAsync(clazz, SelectColumn.includes(selectedColumn), flipper, node).thenApply((Sheet<T> sheet) -> {
|
||||
final Sheet<V> rs = new Sheet<>();
|
||||
if (sheet.isEmpty()) return rs;
|
||||
rs.setTotal(sheet.getTotal());
|
||||
|
||||
@@ -99,7 +99,12 @@ public final class EntityCache<T> {
|
||||
}
|
||||
|
||||
public void fullLoad() {
|
||||
if (info.fullloader == null) return;
|
||||
if (info.fullloader == null) {
|
||||
this.list = new ConcurrentLinkedQueue();
|
||||
this.map = new ConcurrentHashMap();
|
||||
this.fullloaded = true;
|
||||
return;
|
||||
}
|
||||
this.fullloaded = false;
|
||||
ConcurrentHashMap newmap = new ConcurrentHashMap();
|
||||
List<T> all = info.fullloader.apply(info.source, type);
|
||||
@@ -437,7 +442,7 @@ public final class EntityCache<T> {
|
||||
public int insert(T value) {
|
||||
if (value == null) return 0;
|
||||
final T rs = newReproduce.apply(this.creator.create(), value); //确保同一主键值的map与list中的对象必须共用。
|
||||
T old = this.map.put(this.primary.get(rs), rs);
|
||||
T old = this.map.putIfAbsent(this.primary.get(rs), rs);
|
||||
if (old == null) {
|
||||
this.list.add(rs);
|
||||
return 1;
|
||||
|
||||
@@ -235,11 +235,12 @@ public final class EntityInfo<T> {
|
||||
}
|
||||
//---------------------------------------------
|
||||
Table t = type.getAnnotation(Table.class);
|
||||
if (type.getAnnotation(VirtualEntity.class) != null) {
|
||||
if (type.getAnnotation(VirtualEntity.class) != null || "memory".equalsIgnoreCase(source.getType())) {
|
||||
this.table = null;
|
||||
BiFunction<DataSource, Class, List> loader = null;
|
||||
try {
|
||||
loader = type.getAnnotation(VirtualEntity.class).loader().getDeclaredConstructor().newInstance();
|
||||
VirtualEntity ve = type.getAnnotation(VirtualEntity.class);
|
||||
if (ve != null) loader = ve.loader().getDeclaredConstructor().newInstance();
|
||||
} catch (Exception e) {
|
||||
logger.log(Level.SEVERE, type + " init @VirtualEntity.loader error", e);
|
||||
}
|
||||
|
||||
@@ -33,8 +33,8 @@ public class PoolJdbcSource extends PoolSource<Connection> {
|
||||
|
||||
protected final URL persistxml;
|
||||
|
||||
public PoolJdbcSource(String unitName, URL persistxml, String rwtype, ArrayBlockingQueue aqueue, Properties prop, Logger logger) {
|
||||
super(rwtype, prop, logger);
|
||||
public PoolJdbcSource(String unitName, URL persistxml, String rwtype, ArrayBlockingQueue aqueue, Semaphore semaphore, Properties prop, Logger logger) {
|
||||
super(rwtype, semaphore, prop, logger);
|
||||
this.unitName = unitName;
|
||||
this.persistxml = persistxml;
|
||||
this.source = createDataSource(prop);
|
||||
|
||||
@@ -23,6 +23,8 @@ import static org.redkale.source.DataSources.*;
|
||||
*/
|
||||
public abstract class PoolSource<DBChannel> {
|
||||
|
||||
protected final AtomicLong closeCounter = new AtomicLong();
|
||||
|
||||
protected final AtomicLong usingCounter = new AtomicLong();
|
||||
|
||||
protected final AtomicLong creatCounter = new AtomicLong();
|
||||
@@ -64,7 +66,7 @@ public abstract class PoolSource<DBChannel> {
|
||||
protected Properties attributes = new Properties();
|
||||
|
||||
@SuppressWarnings("OverridableMethodCallInConstructor")
|
||||
public PoolSource(String rwtype, Properties prop, Logger logger) {
|
||||
public PoolSource(String rwtype, Semaphore semaphore, Properties prop, Logger logger) {
|
||||
this.logger = logger;
|
||||
this.rwtype = rwtype;
|
||||
this.props = prop;
|
||||
@@ -76,7 +78,7 @@ public abstract class PoolSource<DBChannel> {
|
||||
this.readTimeoutSeconds = Integer.decode(prop.getProperty(JDBC_READTIMEOUT_SECONDS, "3"));
|
||||
this.writeTimeoutSeconds = Integer.decode(prop.getProperty(JDBC_WRITETIMEOUT_SECONDS, "3"));
|
||||
this.maxconns = Math.max(8, Integer.decode(prop.getProperty(JDBC_CONNECTIONS_LIMIT, "" + Runtime.getRuntime().availableProcessors() * 100)));
|
||||
this.semaphore = new Semaphore(this.maxconns);
|
||||
this.semaphore = semaphore == null ? new Semaphore(this.maxconns) : semaphore;
|
||||
String dbtype0 = "";
|
||||
{ //jdbc:mysql:// jdbc:microsoft:sqlserver:// 取://之前的到最后一个:之间的字符串
|
||||
int pos = this.url.indexOf("://");
|
||||
@@ -161,6 +163,10 @@ public abstract class PoolSource<DBChannel> {
|
||||
return dbtype;
|
||||
}
|
||||
|
||||
public final long getCloseCount() {
|
||||
return closeCounter.longValue();
|
||||
}
|
||||
|
||||
public final long getUsingCount() {
|
||||
return usingCounter.longValue();
|
||||
}
|
||||
|
||||
@@ -40,8 +40,8 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
|
||||
|
||||
protected final ArrayBlockingQueue<AsyncConnection> connQueue;
|
||||
|
||||
public PoolTcpSource(String rwtype, ArrayBlockingQueue queue, Properties prop, Logger logger, ObjectPool<ByteBuffer> bufferPool, ThreadPoolExecutor executor) {
|
||||
super(rwtype, prop, logger);
|
||||
public PoolTcpSource(String rwtype, ArrayBlockingQueue queue, Semaphore semaphore, Properties prop, Logger logger, ObjectPool<ByteBuffer> bufferPool, ThreadPoolExecutor executor) {
|
||||
super(rwtype, semaphore, prop, logger);
|
||||
this.bufferPool = bufferPool;
|
||||
this.executor = executor;
|
||||
try {
|
||||
@@ -55,7 +55,7 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
|
||||
@Override
|
||||
public void offerConnection(final AsyncConnection conn) {
|
||||
if (conn == null) return;
|
||||
if (connQueue.offer(conn)) {
|
||||
if (conn.isOpen() && connQueue.offer(conn)) {
|
||||
saveCounter.incrementAndGet();
|
||||
usingCounter.decrementAndGet();
|
||||
} else {
|
||||
@@ -136,6 +136,7 @@ public abstract class PoolTcpSource extends PoolSource<AsyncConnection> {
|
||||
return AsyncConnection.createTCP(group, this.servaddr, this.readTimeoutSeconds, this.writeTimeoutSeconds).thenCompose(conn -> {
|
||||
conn.beforeCloseListener((c) -> {
|
||||
semaphore.release();
|
||||
closeCounter.incrementAndGet();
|
||||
usingCounter.decrementAndGet();
|
||||
});
|
||||
CompletableFuture<AsyncConnection> future = new CompletableFuture();
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
*/
|
||||
package org.redkale.util;
|
||||
|
||||
/**
|
||||
/**
|
||||
* <p>
|
||||
* 详情见: https://redkale.org
|
||||
*
|
||||
@@ -17,7 +17,7 @@ public final class Redkale {
|
||||
}
|
||||
|
||||
public static String getDotedVersion() {
|
||||
return "1.9.5";
|
||||
return "1.9.6";
|
||||
}
|
||||
|
||||
public static int getMajorVersion() {
|
||||
|
||||
@@ -70,6 +70,7 @@ public class SelectColumn implements Predicate<String> {
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
* class中的字段名
|
||||
*
|
||||
* @param columns 包含的字段名集合
|
||||
@@ -81,6 +82,18 @@ public class SelectColumn implements Predicate<String> {
|
||||
}
|
||||
|
||||
/**
|
||||
* class中的字段名
|
||||
*
|
||||
* @param columns 包含的字段名集合
|
||||
*
|
||||
* @return SelectColumn
|
||||
*/
|
||||
public static SelectColumn includes(String... columns) {
|
||||
return new SelectColumn(columns, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
* class中的字段名
|
||||
*
|
||||
* @param cols 包含的字段名集合
|
||||
@@ -93,6 +106,20 @@ public class SelectColumn implements Predicate<String> {
|
||||
}
|
||||
|
||||
/**
|
||||
* class中的字段名
|
||||
*
|
||||
* @param cols 包含的字段名集合
|
||||
* @param columns 包含的字段名集合
|
||||
*
|
||||
* @return SelectColumn
|
||||
*/
|
||||
public static SelectColumn includes(String[] cols, String... columns) {
|
||||
return new SelectColumn(Utility.append(cols, columns), false);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
*
|
||||
* class中的字段名
|
||||
*
|
||||
* @param columns 排除的字段名集合
|
||||
@@ -104,6 +131,18 @@ public class SelectColumn implements Predicate<String> {
|
||||
}
|
||||
|
||||
/**
|
||||
* class中的字段名
|
||||
*
|
||||
* @param columns 排除的字段名集合
|
||||
*
|
||||
* @return SelectColumn
|
||||
*/
|
||||
public static SelectColumn excludes(String... columns) {
|
||||
return new SelectColumn(columns, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
* class中的字段名
|
||||
*
|
||||
* @param cols 排除的字段名集合
|
||||
@@ -115,6 +154,19 @@ public class SelectColumn implements Predicate<String> {
|
||||
return new SelectColumn(Utility.append(cols, columns), true);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* class中的字段名
|
||||
*
|
||||
* @param cols 排除的字段名集合
|
||||
* @param columns 排除的字段名集合
|
||||
*
|
||||
* @return SelectColumn
|
||||
*/
|
||||
public static SelectColumn excludes(String[] cols, String... columns) {
|
||||
return new SelectColumn(Utility.append(cols, columns), true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean test(final String column) {
|
||||
if (this.columns != null) {
|
||||
|
||||
@@ -391,7 +391,7 @@ public final class Utility {
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 将int数组用分隔符拼接成字符串
|
||||
*
|
||||
@@ -616,7 +616,7 @@ public final class Utility {
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 判断指定值是否包含指定的数组中,包含返回true
|
||||
*
|
||||
@@ -1742,10 +1742,10 @@ public final class Utility {
|
||||
conn.setRequestProperty(en.getKey(), en.getValue());
|
||||
}
|
||||
}
|
||||
if (body != null) {
|
||||
{
|
||||
conn.setDoInput(true);
|
||||
conn.setDoOutput(true);
|
||||
conn.getOutputStream().write(body.getBytes(UTF_8));
|
||||
conn.getOutputStream().write(body == null ? new byte[0] : body.getBytes(UTF_8));
|
||||
}
|
||||
conn.connect();
|
||||
int rs = conn.getResponseCode();
|
||||
|
||||
Reference in New Issue
Block a user