From b768e0e8ef2c98d2e1acdec2f401b6a0040c9162 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=9C=B0=E5=B9=B3=E7=BA=BF?= <22250530@qq.com> Date: Fri, 20 Mar 2015 10:07:31 +0800 Subject: [PATCH] --- src/com/wentch/redkale/net/Async.java | 2 + .../wentch/redkale/net/AsyncConnection.java | 34 +++- .../redkale/net/AsyncDatagramChannel.java | 37 ++++- .../wentch/redkale/net/AsyncWriteHandler.java | 74 --------- src/com/wentch/redkale/net/PrepareRunner.java | 5 +- src/com/wentch/redkale/net/Request.java | 23 +++ src/com/wentch/redkale/net/Response.java | 120 +++++++++++-- src/com/wentch/redkale/net/Server.java | 12 +- .../redkale/net/http/HttpPrepareServlet.java | 20 +-- .../redkale/net/http/HttpProxyServlet.java | 20 +-- .../wentch/redkale/net/http/HttpRequest.java | 20 ++- .../redkale/net/http/HttpResourceServlet.java | 104 ++---------- .../wentch/redkale/net/http/HttpResponse.java | 72 +++----- .../redkale/net/http/WebSocketServlet.java | 7 +- .../wentch/redkale/net/sncp/SncpClient.java | 157 +++++++++++------- .../wentch/redkale/net/sncp/SncpContext.java | 4 + .../redkale/net/sncp/SncpDynServlet.java | 14 +- .../wentch/redkale/net/sncp/SncpRequest.java | 63 ++++--- .../wentch/redkale/net/sncp/SncpResponse.java | 37 ++++- .../wentch/redkale/source/DataJDBCSource.java | 24 ++- .../wentch/redkale/source/DataJPASource.java | 14 +- src/com/wentch/redkale/source/DataSource.java | 12 +- src/com/wentch/redkale/util/ObjectPool.java | 9 + 23 files changed, 495 insertions(+), 389 deletions(-) delete mode 100644 src/com/wentch/redkale/net/AsyncWriteHandler.java diff --git a/src/com/wentch/redkale/net/Async.java b/src/com/wentch/redkale/net/Async.java index df584a41a..a3539b753 100644 --- a/src/com/wentch/redkale/net/Async.java +++ b/src/com/wentch/redkale/net/Async.java @@ -11,6 +11,7 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME; /** * 当Service是Remote模式时, 用该注解标注在方法上可使数据变成异步传输, 该注解只能标注在返回类型为void的public方法上 + * 不再起作用, 屏蔽掉 * * @author zhangjx */ @@ -18,6 +19,7 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME; @Documented @Target({TYPE, METHOD}) @Retention(RUNTIME) +@Deprecated public @interface Async { } diff --git a/src/com/wentch/redkale/net/AsyncConnection.java b/src/com/wentch/redkale/net/AsyncConnection.java index d6897fa1c..f93f027a3 100644 --- a/src/com/wentch/redkale/net/AsyncConnection.java +++ b/src/com/wentch/redkale/net/AsyncConnection.java @@ -5,9 +5,9 @@ */ package com.wentch.redkale.net; -import java.io.IOException; +import java.io.*; import java.net.*; -import java.nio.ByteBuffer; +import java.nio.*; import java.nio.channels.*; import java.util.concurrent.*; @@ -29,6 +29,12 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl public abstract void setWriteTimeoutSecond(int writeTimeoutSecond); + public final void write(ByteBuffer[] srcs, A attachment, CompletionHandler handler) { + write(srcs, 0, srcs.length, attachment, handler); + } + + protected abstract void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler); + public abstract void dispose(); //同close, 只是去掉throws IOException public static AsyncConnection create(final String protocol, final SocketAddress address) throws IOException { @@ -103,6 +109,11 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl channel.send(src, remoteAddress, attachment, handler); } + @Override + public void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { + channel.send(srcs, offset, length, remoteAddress, attachment, handler); + } + @Override public void setReadTimeoutSecond(int readTimeoutSecond) { this.readTimeoutSecond = readTimeoutSecond; @@ -210,6 +221,24 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl } } + @Override + public void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { + channel.write(srcs, offset, length, writeTimeoutSecond > 0 ? writeTimeoutSecond : 60, TimeUnit.SECONDS, + attachment, new CompletionHandler() { + + @Override + public void completed(Long result, A attachment) { + handler.completed(result.intValue(), attachment); + } + + @Override + public void failed(Throwable exc, A attachment) { + handler.failed(exc, attachment); + } + + }); + } + @Override public void setReadTimeoutSecond(int readTimeoutSecond) { this.readTimeoutSecond = readTimeoutSecond; @@ -266,6 +295,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl } catch (IOException io) { } } + }; } diff --git a/src/com/wentch/redkale/net/AsyncDatagramChannel.java b/src/com/wentch/redkale/net/AsyncDatagramChannel.java index 477afd464..4d50ff544 100644 --- a/src/com/wentch/redkale/net/AsyncDatagramChannel.java +++ b/src/com/wentch/redkale/net/AsyncDatagramChannel.java @@ -7,16 +7,16 @@ package com.wentch.redkale.net; import java.io.*; import java.lang.invoke.*; -import java.lang.ref.SoftReference; +import java.lang.ref.*; import java.lang.reflect.*; import java.net.*; -import java.nio.ByteBuffer; +import java.nio.*; import java.nio.channels.*; import java.security.*; import java.util.*; import java.util.concurrent.*; -import sun.misc.Cleaner; -import sun.security.action.GetIntegerAction; +import sun.misc.*; +import sun.security.action.*; /** * @@ -336,6 +336,35 @@ public final class AsyncDatagramChannel implements AsynchronousByteChannel, Mult implSend(src, target, attachment, handler); } + public void send(ByteBuffer[] srcs, final int offset, final int length, SocketAddress target, A attachment, final CompletionHandler handler) { + if (handler == null) throw new NullPointerException("'handler' is null"); + final ByteBuffer[] buffers = srcs; + implSend(buffers[offset], target, attachment, new CompletionHandler() { + private int index = offset; + + private int resultSum; + + private final int max = length - 1; + + @Override + public void completed(Integer result, A attachment) { + resultSum += result; + if (buffers[index].hasRemaining()) { + implSend(buffers[index], target, attachment, this); + } else if (index == max) { + handler.completed(resultSum, attachment); + } else { + implSend(buffers[++index], target, attachment, this); + } + } + + @Override + public void failed(Throwable exc, A attachment) { + handler.failed(exc, attachment); + } + }); + } + private Future implWrite(ByteBuffer src, A attachment, CompletionHandler handler) { int n = 0; Throwable exc = null; diff --git a/src/com/wentch/redkale/net/AsyncWriteHandler.java b/src/com/wentch/redkale/net/AsyncWriteHandler.java deleted file mode 100644 index 6d0611b28..000000000 --- a/src/com/wentch/redkale/net/AsyncWriteHandler.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package com.wentch.redkale.net; - -import java.nio.ByteBuffer; -import java.nio.channels.*; -import java.util.logging.Level; - -/** - * - * @author zhangjx - * @param - */ -public final class AsyncWriteHandler implements CompletionHandler { - - protected final ByteBuffer buffer; - - protected final AsynchronousByteChannel channel; - - protected final Context context; - - protected final ByteBuffer buffer2; - - protected final A attachment; - - protected final CompletionHandler hander; - - public AsyncWriteHandler(Context context, ByteBuffer buffer, AsynchronousByteChannel channel) { - this(context, buffer, channel, null, null, null); - } - - public AsyncWriteHandler(Context context, ByteBuffer buffer, AsynchronousByteChannel channel, ByteBuffer buffer2, A attachment, CompletionHandler hander) { - this.buffer = buffer; - this.channel = channel; - this.context = context; - this.buffer2 = buffer2; - this.attachment = attachment; - this.hander = hander; - } - - @Override - public void completed(Integer result, A attachment) { - if (buffer.hasRemaining()) { - this.channel.write(buffer, attachment, this); - return; - } - if (context != null) context.offerBuffer(buffer); - if (hander != null) { - if (buffer2 == null) { - hander.completed(result, attachment); - } else { - this.channel.write(buffer2, attachment, hander); - } - } - } - - @Override - public void failed(Throwable exc, A attachment) { - if (context != null) context.offerBuffer(buffer); - if (hander == null) { - try { - this.channel.close(); - } catch (Exception e) { - context.logger.log(Level.FINE, "AsyncWriteHandler close channel erroneous", e); - } - } else { - hander.failed(exc, attachment); - } - } - -} diff --git a/src/com/wentch/redkale/net/PrepareRunner.java b/src/com/wentch/redkale/net/PrepareRunner.java index 6be6b3d8b..9ea3b2cd5 100644 --- a/src/com/wentch/redkale/net/PrepareRunner.java +++ b/src/com/wentch/redkale/net/PrepareRunner.java @@ -33,7 +33,6 @@ public final class PrepareRunner implements Runnable { public void run() { final PrepareServlet prepare = context.prepare; final ObjectPool responsePool = context.responsePool; - final ByteBuffer buffer = context.pollBuffer(); if (data != null) { final Response response = responsePool.poll(); response.init(channel); @@ -41,11 +40,11 @@ public final class PrepareRunner implements Runnable { prepare.prepare(data, response.request, response); } catch (Throwable t) { context.logger.log(Level.WARNING, "prepare servlet abort, forece to close channel ", t); - context.offerBuffer(buffer); response.finish(true); } return; } + final ByteBuffer buffer = context.pollBuffer(); try { channel.read(buffer, null, new CompletionHandler() { @Override @@ -63,7 +62,7 @@ public final class PrepareRunner implements Runnable { // buffer.flip(); // byte[] bytes = new byte[buffer.remaining()]; // buffer.get(bytes); -// System.out.println(new String(bytes)); +// System.println(new String(bytes)); // } buffer.flip(); final Response response = responsePool.poll(); diff --git a/src/com/wentch/redkale/net/Request.java b/src/com/wentch/redkale/net/Request.java index 089a814cc..998b32707 100644 --- a/src/com/wentch/redkale/net/Request.java +++ b/src/com/wentch/redkale/net/Request.java @@ -22,6 +22,12 @@ public abstract class Request { protected AsyncConnection channel; + /** + * properties 与 attributes 的区别在于:调用recycle时, attributes会被清空而properties会保留; + * properties 通常存放需要永久绑定在request里的一些对象 + */ + private final Map properties = new HashMap<>(); + protected final Map attributes = new HashMap<>(); protected Request(Context context) { @@ -47,6 +53,23 @@ public abstract class Request { channel = null; // close it by response } + protected void setProperty(String name, Object value) { + properties.put(name, value); + } + + @SuppressWarnings("unchecked") + protected T getProperty(String name) { + return (T) properties.get(name); + } + + protected void removeProperty(String name) { + properties.remove(name); + } + + protected Map getProperties() { + return properties; + } + public void setAttribute(String name, Object value) { attributes.put(name, value); } diff --git a/src/com/wentch/redkale/net/Response.java b/src/com/wentch/redkale/net/Response.java index a14563169..7588722cc 100644 --- a/src/com/wentch/redkale/net/Response.java +++ b/src/com/wentch/redkale/net/Response.java @@ -22,22 +22,56 @@ public abstract class Response { protected AsyncConnection channel; - protected final CompletionHandler finishHandler = new CompletionHandler() { + private final CompletionHandler finishHandler = new CompletionHandler() { @Override public void completed(Integer result, ByteBuffer attachment) { if (attachment.hasRemaining()) { channel.write(attachment, attachment, this); } else { - Response.this.context.offerBuffer(attachment); - Response.this.finish(); + context.offerBuffer(attachment); + finish(); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { - Response.this.context.offerBuffer(attachment); - Response.this.finish(true); + context.offerBuffer(attachment); + finish(true); + } + + }; + + private final CompletionHandler finishHandler2 = new CompletionHandler() { + + @Override + public void completed(Integer result, ByteBuffer[] attachments) { + int index = -1; + for (int i = 0; i < attachments.length; i++) { + if (attachments[i].hasRemaining()) { + index = i; + break; + } else { + context.offerBuffer(attachments[i]); + } + } + if (index == 0) { + channel.write(attachments, attachments, this); + } else if (index > 0) { + ByteBuffer[] newattachs = new ByteBuffer[attachments.length - index]; + System.arraycopy(attachments, index, newattachs, 0, newattachs.length); + channel.write(newattachs, newattachs, this); + } else { + finish(); + } + } + + @Override + public void failed(Throwable exc, ByteBuffer[] attachments) { + for (ByteBuffer attachment : attachments) { + context.offerBuffer(attachment); + } + finish(true); } }; @@ -64,8 +98,8 @@ public abstract class Response { if (channel.isOpen()) channel.close(); } catch (Exception e) { } - channel = null; } + channel = null; } return true; } @@ -85,22 +119,84 @@ public abstract class Response { } public void finish(boolean kill) { - //System.out.println("耗时: " + (System.currentTimeMillis() - request.createtime)); + //System.println("耗时: " + (System.currentTimeMillis() - request.createtime)); if (kill) refuseAlive(); this.context.responsePool.offer(this); } public void finish(ByteBuffer buffer) { - finish(buffer, false); + finish(false, buffer); } - public void finish(ByteBuffer buffer, boolean kill) { + public void finish(boolean kill, ByteBuffer buffer) { if (kill) refuseAlive(); - send(buffer, buffer, finishHandler); + this.channel.write(buffer, buffer, finishHandler); } - public void send(ByteBuffer buffer, A attachment, CompletionHandler handler) { - this.channel.write(buffer, attachment, handler); + public void finish(ByteBuffer... buffers) { + finish(false, buffers); + } + + public void finish(boolean kill, ByteBuffer... buffers) { + if (kill) refuseAlive(); + this.channel.write(buffers, buffers, finishHandler2); + } + + protected void send(final ByteBuffer buffer, final A attachment, final CompletionHandler handler) { + this.channel.write(buffer, attachment, new CompletionHandler() { + + @Override + public void completed(Integer result, A attachment) { + if (buffer.hasRemaining()) { + channel.write(buffer, attachment, this); + } else { + context.offerBuffer(buffer); + if (handler != null) handler.completed(result, attachment); + } + } + + @Override + public void failed(Throwable exc, A attachment) { + context.offerBuffer(buffer); + if (handler != null) handler.failed(exc, attachment); + } + + }); + } + + protected void send(final ByteBuffer[] buffers, A attachment, CompletionHandler handler) { + this.channel.write(buffers, attachment, new CompletionHandler() { + + @Override + public void completed(Integer result, A attachment) { + int index = -1; + for (int i = 0; i < buffers.length; i++) { + if (buffers[i].hasRemaining()) { + index = i; + break; + } + context.offerBuffer(buffers[i]); + } + if (index == 0) { + channel.write(buffers, attachment, this); + } else if (index > 0) { + ByteBuffer[] newattachs = new ByteBuffer[buffers.length - index]; + System.arraycopy(buffers, index, newattachs, 0, newattachs.length); + channel.write(newattachs, attachment, this); + } else { + if (handler != null) handler.completed(result, attachment); + } + } + + @Override + public void failed(Throwable exc, A attachment) { + for (ByteBuffer buffer : buffers) { + context.offerBuffer(buffer); + } + if (handler != null) handler.failed(exc, attachment); + } + + }); } public Context getContext() { diff --git a/src/com/wentch/redkale/net/Server.java b/src/com/wentch/redkale/net/Server.java index 3f05f17f6..cc7380c24 100644 --- a/src/com/wentch/redkale/net/Server.java +++ b/src/com/wentch/redkale/net/Server.java @@ -5,16 +5,16 @@ */ package com.wentch.redkale.net; -import com.wentch.redkale.util.AnyValue; -import com.wentch.redkale.watch.WatchFactory; +import com.wentch.redkale.util.*; +import com.wentch.redkale.watch.*; import java.io.*; -import java.lang.reflect.Method; +import java.lang.reflect.*; import java.net.*; -import java.nio.charset.Charset; +import java.nio.charset.*; import java.text.*; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; import java.util.logging.*; /** @@ -108,7 +108,7 @@ public abstract class Server { public void start() throws IOException { this.context = this.createContext(); this.context.prepare.init(this.context, config); - this.watch.inject(this.context.prepare); + if (this.watch != null) this.watch.inject(this.context.prepare); this.transport = ProtocolServer.create(this.protocol, context); this.transport.open(); transport.setOption(StandardSocketOptions.SO_REUSEADDR, true); diff --git a/src/com/wentch/redkale/net/http/HttpPrepareServlet.java b/src/com/wentch/redkale/net/http/HttpPrepareServlet.java index 8c4e9c4f4..f7641d59e 100644 --- a/src/com/wentch/redkale/net/http/HttpPrepareServlet.java +++ b/src/com/wentch/redkale/net/http/HttpPrepareServlet.java @@ -5,19 +5,17 @@ */ package com.wentch.redkale.net.http; -import com.wentch.redkale.net.PrepareServlet; -import com.wentch.redkale.net.Context; -import com.wentch.redkale.util.AnyValue; -import com.wentch.redkale.util.Utility; +import com.wentch.redkale.net.*; import com.wentch.redkale.util.AnyValue.DefaultAnyValue; -import com.wentch.redkale.watch.WatchFactory; -import java.io.IOException; -import java.nio.ByteBuffer; +import com.wentch.redkale.util.*; +import com.wentch.redkale.watch.*; +import java.io.*; +import java.nio.*; import java.util.*; import java.util.AbstractMap.SimpleEntry; -import java.util.function.Predicate; -import java.util.logging.Level; -import java.util.regex.Pattern; +import java.util.function.*; +import java.util.logging.*; +import java.util.regex.*; /** * @@ -88,7 +86,7 @@ public final class HttpPrepareServlet extends PrepareServlet" + "").getBytes()).asReadOnlyBuffer(); } - response.finish(flashPolicyBuffer.duplicate(), true); + response.finish(true, flashPolicyBuffer.duplicate()); return; } final String uri = request.getRequestURI(); diff --git a/src/com/wentch/redkale/net/http/HttpProxyServlet.java b/src/com/wentch/redkale/net/http/HttpProxyServlet.java index 6429fa8e7..e7be2b861 100644 --- a/src/com/wentch/redkale/net/http/HttpProxyServlet.java +++ b/src/com/wentch/redkale/net/http/HttpProxyServlet.java @@ -5,11 +5,11 @@ */ package com.wentch.redkale.net.http; -import com.wentch.redkale.net.AsyncConnection; -import com.wentch.redkale.util.AnyValue; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.CompletionHandler; +import com.wentch.redkale.net.*; +import com.wentch.redkale.util.*; +import java.io.*; +import java.nio.*; +import java.nio.channels.*; /** * 在appliation.xml中的HTTP类型的server节点加上forwardproxy="true"表示该HttpServer支持正向代理 @@ -76,21 +76,15 @@ public final class HttpProxyServlet extends HttpServlet { final ByteBuffer buffer0 = response.getContext().pollBuffer(); buffer0.put("HTTP/1.1 200 Connection established\r\nConnection: close\r\n\r\n".getBytes()); buffer0.flip(); - response.send(buffer0, null, new CompletionHandler() { + response.sendBody(buffer0, null, new CompletionHandler() { @Override public void completed(Integer result, Void attachment) { - if (buffer0.hasRemaining()) { - response.send(buffer0, attachment, this); - return; - } - response.getContext().offerBuffer(buffer0); new ProxyCompletionHandler(remote, request, response).completed(0, null); } @Override public void failed(Throwable exc, Void attachment) { - response.getContext().offerBuffer(buffer0); response.finish(true); try { remote.close(); @@ -124,7 +118,7 @@ public final class HttpProxyServlet extends HttpServlet { public void completed(Integer result, Void attachment) { rbuffer.flip(); CompletionHandler parent = this; - response.send(rbuffer, null, new CompletionHandler() { + response.sendBody(rbuffer.duplicate().asReadOnlyBuffer(), null, new CompletionHandler() { @Override public void completed(Integer result, Void attachment) { diff --git a/src/com/wentch/redkale/net/http/HttpRequest.java b/src/com/wentch/redkale/net/http/HttpRequest.java index 97bbbfa8f..d18dd7e6e 100644 --- a/src/com/wentch/redkale/net/http/HttpRequest.java +++ b/src/com/wentch/redkale/net/http/HttpRequest.java @@ -131,7 +131,7 @@ public final class HttpRequest extends Request { break; case "Connection": this.connection = value; - this.keepAlive = "Keep-Alive".equalsIgnoreCase(value); + this.setKeepAlive("Keep-Alive".equalsIgnoreCase(value)); break; default: header.addValue(name, value); @@ -158,7 +158,7 @@ public final class HttpRequest extends Request { @Override protected void prepare() { } - + private void parseBody() { if (this.boundary || array.isEmpty()) return; addParameter(array, 0, array.count()); @@ -200,6 +200,22 @@ public final class HttpRequest extends Request { return true; } + @Override + protected void setProperty(String name, Object value) { + super.setProperty(name, value); + } + + @Override + @SuppressWarnings("unchecked") + protected T getProperty(String name) { + return super.getProperty(name); + } + + @Override + protected void removeProperty(String name) { + super.removeProperty(name); + } + @Override public HttpContext getContext() { return (HttpContext) this.context; diff --git a/src/com/wentch/redkale/net/http/HttpResourceServlet.java b/src/com/wentch/redkale/net/http/HttpResourceServlet.java index 06f6096b9..dfff67d6c 100644 --- a/src/com/wentch/redkale/net/http/HttpResourceServlet.java +++ b/src/com/wentch/redkale/net/http/HttpResourceServlet.java @@ -5,11 +5,10 @@ */ package com.wentch.redkale.net.http; -import com.wentch.redkale.net.Context; -import com.wentch.redkale.util.AnyValue; +import com.wentch.redkale.net.*; +import com.wentch.redkale.util.*; import java.io.*; import java.nio.*; -import java.nio.channels.*; import java.nio.file.*; import static java.nio.file.StandardWatchEventKinds.*; import java.util.*; @@ -169,7 +168,6 @@ public final class HttpResourceServlet extends HttpServlet { } if (uri.length() == 0 || uri.equals("/")) uri = "/index.html"; //System.out.println(request); - response.skipHeader(); FileEntry entry = watcher == null ? createFileEntry(uri) : files.get(uri); if (entry == null) { entry = createFileEntry(uri); @@ -178,14 +176,14 @@ public final class HttpResourceServlet extends HttpServlet { if (entry == null) { response.finish404(); } else { - entry.send(request, response); + response.finishFile(entry.file, entry.content); } } private FileEntry createFileEntry(String uri) { File file = new File(root, uri); if (!file.isFile() || !file.canRead()) return null; - FileEntry en = new FileEntry(this, uri, file); + FileEntry en = new FileEntry(this, file); if (watcher == null) return en; try { Path p = file.getParentFile().toPath(); @@ -198,43 +196,26 @@ public final class HttpResourceServlet extends HttpServlet { private static final class FileEntry { - private final LongAdder counter = new LongAdder(); - - private final File file; + final File file; private final HttpResourceServlet servlet; - private final String uri; + ByteBuffer content; - private String mimeType; - - private long length; - - private String etag; - - private byte[] header; - - private ByteBuffer content; - - public FileEntry(final HttpResourceServlet servlet, String uri, File file) { + public FileEntry(final HttpResourceServlet servlet, File file) { this.servlet = servlet; - this.uri = uri; this.file = file; - this.mimeType = MimeType.getByFilename(file.getName()); - if (this.mimeType == null) this.mimeType = "application/octet-stream"; update(); } public void update() { - this.length = file.length(); - this.etag = file.lastModified() + "-" + this.length; - this.header = ("HTTP/1.1 200 OK\r\nContent-Type:" + mimeType + "\r\nETag:" + etag + (servlet.ranges != null && servlet.ranges.test(uri) ? "\r\nAccept-Ranges:bytes" : "") + "\r\nContent-Length:" + length + "\r\n\r\n").getBytes(); if (this.content != null) { this.servlet.cachedLength.add(0L - this.content.remaining()); this.content = null; } - if (this.length > this.servlet.cachelengthmax) return; - if (this.servlet.cachedLength.longValue() + this.length > this.servlet.cachelimit) return; + long length = this.file.length(); + if (length > this.servlet.cachelengthmax) return; + if (this.servlet.cachedLength.longValue() + length > this.servlet.cachelimit) return; try { FileInputStream in = new FileInputStream(file); ByteArrayOutputStream out = new ByteArrayOutputStream((int) file.length()); @@ -244,9 +225,9 @@ public final class HttpResourceServlet extends HttpServlet { out.write(bytes, 0, pos); } in.close(); - ByteBuffer buf = ByteBuffer.allocateDirect((int) (this.header.length + file.length())); - buf.put(this.header); - buf.put(out.toByteArray()); + byte[] bs = out.toByteArray(); + ByteBuffer buf = ByteBuffer.allocateDirect(bs.length); + buf.put(bs); buf.flip(); this.content = buf.asReadOnlyBuffer(); this.servlet.cachedLength.add(this.content.remaining()); @@ -265,64 +246,5 @@ public final class HttpResourceServlet extends HttpServlet { return this.content == null ? 0L : this.content.remaining(); } - public void send(HttpRequest request, final HttpResponse response) throws IOException { - counter.increment(); - final String match = request.getHeader("If-None-Match"); - if (match != null && this.etag.equals(match)) { - response.finish304(); - return; - } - final boolean acceptRange = (servlet.ranges != null && servlet.ranges.test(request.getRequestURI())); - String range = acceptRange ? request.getHeader("Range") : null; - if (acceptRange) { - String ifRange = request.getHeader("If-Range"); - if (ifRange != null && !this.etag.equals(ifRange)) range = null; - } - if (content != null && range == null) { - response.finish(content.duplicate()); - return; - } - final HttpContext context = response.getContext(); - final ByteBuffer buffer = context.pollBuffer(); - if (range != null && (!range.startsWith("bytes=") || range.indexOf(',') >= 0)) range = null; - if (range == null) { - buffer.put(header); - buffer.flip(); - response.finishFile(buffer, file); - return; - } - range = range.substring("bytes=".length()); - int pos = range.indexOf('-'); - final long start = pos == 0 ? 0 : Integer.parseInt(range.substring(0, pos)); - final long end = (pos == range.length() - 1) ? -1 : Long.parseLong(range.substring(pos + 1)); - long clen = end > 0 ? (end - start + 1) : (file.length() - start); - buffer.put(("HTTP/1.1 206 Partial Content\r\nContent-Type:" + mimeType + "\r\nAccept-Ranges:bytes\r\nContent-Range:bytes " + start + "-" + (end > 0 ? end : length - 1) + "/" + length + "\r\nContent-Length:" + clen + "\r\n\r\n").getBytes()); - buffer.flip(); - final ByteBuffer body = this.content; - if (body == null) { - response.finishFile(buffer, file, start, end > 0 ? clen : end); - } else { - final ByteBuffer body2 = body.duplicate(); - body2.position((int) (this.header.length + start)); - if (end > 0) body2.limit((int) (body2.position() + end - start + 1)); - response.send(buffer, buffer, new CompletionHandler() { - - @Override - public void completed(Integer result, ByteBuffer attachment) { - response.getContext().offerBuffer(attachment); - response.finish(body2); - } - - @Override - public void failed(Throwable exc, ByteBuffer attachment) { - if (attachment.limit() != attachment.capacity()) { - response.getContext().offerBuffer(attachment); - } - response.finish(true); - } - }); - } - - } } } diff --git a/src/com/wentch/redkale/net/http/HttpResponse.java b/src/com/wentch/redkale/net/http/HttpResponse.java index 842b1a3e1..afe3060aa 100644 --- a/src/com/wentch/redkale/net/http/HttpResponse.java +++ b/src/com/wentch/redkale/net/http/HttpResponse.java @@ -150,21 +150,26 @@ public final class HttpResponse extends Response { public void finishJson(Object obj) { this.contentType = "text/plain; charset=utf-8"; - finishString(request.convert.convertTo(obj)); + finish(request.convert.convertTo(obj)); } public void finishJson(Type type, Object obj) { this.contentType = "text/plain; charset=utf-8"; - finishString(request.convert.convertTo(type, obj)); + finish(request.convert.convertTo(type, obj)); } public void finishJson(Object... objs) { this.contentType = "text/plain; charset=utf-8"; - finishString(request.convert.convertTo(objs)); + finish(request.convert.convertTo(objs)); } - public void finishString(String obj) { - if (obj == null) obj = "null"; + public void finish(String obj) { + if (obj == null || obj.isEmpty()) { + final ByteBuffer headbuf = createHeader(); + headbuf.flip(); + super.finish(headbuf); + return; + } if (context.getCharset() == null) { final char[] chars = Utility.charArray(obj); this.contentLength = Utility.encodeUTF8Length(chars); @@ -172,27 +177,23 @@ public final class HttpResponse extends Response { ByteBuffer buf2 = Utility.encodeUTF8(headbuf, (int) this.contentLength, chars); headbuf.flip(); if (buf2 == null) { - super.send(headbuf, headbuf, finishHandler); + super.finish(headbuf); } else { - super.send(headbuf, buf2, new AsyncWriteHandler<>(this.context, headbuf, this.channel, buf2, buf2, finishHandler)); + super.finish(headbuf, buf2); } } else { ByteBuffer buffer = context.getCharset().encode(obj); this.contentLength = buffer.remaining(); - send(buffer, buffer, finishHandler); + final ByteBuffer headbuf = createHeader(); + headbuf.flip(); + super.finish(headbuf, buffer); } } public void finish(int status, String message) { this.status = status; if (status != 200) super.refuseAlive(); - if (message == null || message.isEmpty()) { - ByteBuffer headbuf = createHeader(); - headbuf.flip(); - super.send(headbuf, headbuf, finishHandler); - } else { - finishString(message); - } + finish(message); } public void finish304() { @@ -203,15 +204,14 @@ public final class HttpResponse extends Response { super.finish(buffer404.duplicate()); } - @Override - public void send(ByteBuffer buffer, A attachment, CompletionHandler handler) { + public void sendBody(ByteBuffer buffer, A attachment, CompletionHandler handler) { if (!this.headsended) { ByteBuffer headbuf = createHeader(); headbuf.flip(); if (buffer == null) { super.send(headbuf, attachment, handler); } else { - super.send(headbuf, attachment, new AsyncWriteHandler<>(this.context, headbuf, this.channel, buffer, attachment, handler)); + super.send(new ByteBuffer[]{headbuf, headbuf}, attachment, handler); } } else { super.send(buffer, attachment, handler); @@ -234,7 +234,7 @@ public final class HttpResponse extends Response { return; } this.contentLength = file.length(); - if (this.contentType == null) this.contentType = MimeType.getByFilename(file.getName()); + this.contentType = MimeType.getByFilename(file.getName()); if (this.contentType == null) this.contentType = "application/octet-stream"; String range = request.getHeader("Range"); if (range != null && (!range.startsWith("bytes=") || range.indexOf(',') >= 0)) range = null; @@ -252,41 +252,23 @@ public final class HttpResponse extends Response { this.contentLength = clen; len = end > 0 ? clen : end; } - ByteBuffer buffer = createHeader(); - buffer.flip(); + this.addHeader("ETag", file.lastModified() + "-" + length); + ByteBuffer hbuffer = createHeader(); + hbuffer.flip(); if (fileBody == null) { - HttpResponse.this.finishFile(buffer, file, start, len); + finishFile(hbuffer, file, start, len); } else { final ByteBuffer body = fileBody.duplicate().asReadOnlyBuffer(); if (start >= 0) { body.position((int) start); if (len > 0) body.limit((int) (body.position() + len)); } - send(buffer, buffer, new CompletionHandler() { - - @Override - public void completed(Integer result, ByteBuffer attachment) { - context.offerBuffer(attachment); - finish(body); - } - - @Override - public void failed(Throwable exc, ByteBuffer attachment) { - if (attachment.limit() != attachment.capacity()) { - context.offerBuffer(attachment); - } - finish(true); - } - }); + super.finish(hbuffer, body); } } - protected void finishFile(ByteBuffer buffer, File file) throws IOException { - finishFile(buffer, file, -1L, -1L); - } - - protected void finishFile(ByteBuffer buffer, File file, long offset, long length) throws IOException { - send(buffer, buffer, new TransferFileHandler(AsynchronousFileChannel.open(file.toPath(), options, ((HttpContext) context).getExecutor()), offset, length)); + private void finishFile(ByteBuffer hbuffer, File file, long offset, long length) throws IOException { + this.channel.write(hbuffer, hbuffer, new TransferFileHandler(AsynchronousFileChannel.open(file.toPath(), options, ((HttpContext) context).getExecutor()), offset, length)); } private ByteBuffer createHeader() { @@ -424,7 +406,7 @@ public final class HttpResponse extends Response { } } attachment.flip(); - send(attachment, attachment, this); + channel.write(attachment, attachment, this); } } diff --git a/src/com/wentch/redkale/net/http/WebSocketServlet.java b/src/com/wentch/redkale/net/http/WebSocketServlet.java index 9aa5b5934..d82e754c2 100644 --- a/src/com/wentch/redkale/net/http/WebSocketServlet.java +++ b/src/com/wentch/redkale/net/http/WebSocketServlet.java @@ -5,9 +5,10 @@ */ package com.wentch.redkale.net.http; -import com.wentch.redkale.net.Context; -import com.wentch.redkale.util.AnyValue; +import com.wentch.redkale.net.*; +import com.wentch.redkale.util.*; import java.io.*; +import java.nio.*; import java.nio.channels.*; import java.security.*; import java.util.*; @@ -74,7 +75,7 @@ public abstract class WebSocketServlet extends HttpServlet { response.setHeader("Connection", "Upgrade"); response.addHeader("Upgrade", "websocket"); response.addHeader("Sec-WebSocket-Accept", key); - response.send(null, null, new CompletionHandler() { + response.sendBody((ByteBuffer) null, null, new CompletionHandler() { @Override public void completed(Integer result, Void attachment) { diff --git a/src/com/wentch/redkale/net/sncp/SncpClient.java b/src/com/wentch/redkale/net/sncp/SncpClient.java index c3a05491c..cc368ab7b 100644 --- a/src/com/wentch/redkale/net/sncp/SncpClient.java +++ b/src/com/wentch/redkale/net/sncp/SncpClient.java @@ -141,27 +141,27 @@ public final class SncpClient { final SncpAction action = actions[index]; final long seqid = System.nanoTime(); final TwoLong actionid = action.actionid; - ByteBuffer buffer = transport.pollBuffer(); - if ((HEADER_SIZE + bodyLength) > buffer.limit()) { - if (debug) logger.finest(this.serviceid + "," + this.nameid + "," + action + " sncp length : " + (HEADER_SIZE + bodyLength)); - final int patch = bodyLength / (buffer.capacity() - HEADER_SIZE) + (bodyLength % (buffer.capacity() - HEADER_SIZE) > 0 ? 1 : 0); - AsyncConnection conn = transport.pollConnection(); - final int readto = conn.getReadTimeoutSecond(); - final int writeto = conn.getWriteTimeoutSecond(); - int pos = 0; - final byte[] all = new byte[bodyLength]; - all[pos++] = (byte) ((bytesarray.length & 0xff00) >> 8); - all[pos++] = (byte) (bytesarray.length & 0xff); - for (byte[] bs : bytesarray) { - all[pos++] = (byte) ((bs.length & 0xff000000) >> 24); - all[pos++] = (byte) ((bs.length & 0xff0000) >> 16); - all[pos++] = (byte) ((bs.length & 0xff00) >> 8); - all[pos++] = (byte) (bs.length & 0xff); - System.arraycopy(bs, 0, all, pos, bs.length); - pos += bs.length; - } - if (pos != all.length) logger.warning(this.serviceid + "," + this.nameid + "," + action + " sncp body.length : " + all.length + ", but pos=" + pos); - try { + final ByteBuffer buffer = transport.pollBuffer(); + final AsyncConnection conn = transport.pollConnection(); + final int readto = conn.getReadTimeoutSecond(); + final int writeto = conn.getWriteTimeoutSecond(); + try { + if ((HEADER_SIZE + bodyLength) > buffer.limit()) { + if (debug) logger.finest(this.serviceid + "," + this.nameid + "," + action + " sncp length : " + (HEADER_SIZE + bodyLength)); + final int patch = bodyLength / (buffer.capacity() - HEADER_SIZE) + (bodyLength % (buffer.capacity() - HEADER_SIZE) > 0 ? 1 : 0); + int pos = 0; + final byte[] all = new byte[bodyLength]; + all[pos++] = (byte) ((bytesarray.length & 0xff00) >> 8); + all[pos++] = (byte) (bytesarray.length & 0xff); + for (byte[] bs : bytesarray) { + all[pos++] = (byte) ((bs.length & 0xff000000) >> 24); + all[pos++] = (byte) ((bs.length & 0xff0000) >> 16); + all[pos++] = (byte) ((bs.length & 0xff00) >> 8); + all[pos++] = (byte) (bs.length & 0xff); + System.arraycopy(bs, 0, all, pos, bs.length); + pos += bs.length; + } + if (pos != all.length) logger.warning(this.serviceid + "," + this.nameid + "," + action + " sncp body.length : " + all.length + ", but pos=" + pos); pos = 0; for (int i = patch - 1; i >= 0; i--) { fillHeader(buffer, seqid, actionid, patch, i, bodyLength); @@ -169,55 +169,86 @@ public final class SncpClient { buffer.put(all, pos, len); pos += len; buffer.flip(); + Thread.sleep(10); conn.write(buffer).get(writeto > 0 ? writeto : 5, TimeUnit.SECONDS); buffer.clear(); } - conn.read(buffer).get(readto > 0 ? readto : 5, TimeUnit.SECONDS); - buffer.flip(); - } catch (Exception e) { - throw new RuntimeException(e); - } finally { - transport.offerConnection(conn); - } - } else { - { - //---------------------head---------------------------------- - fillHeader(buffer, seqid, actionid, 1, 0, bodyLength); - //---------------------body---------------------------------- - buffer.putChar((char) bytesarray.length); //参数数组大小 - for (byte[] bs : bytesarray) { - buffer.putInt(bs.length); - buffer.put(bs); + } else { //只有一帧的数据 + { + //---------------------head---------------------------------- + fillHeader(buffer, seqid, actionid, 1, 0, bodyLength); + //---------------------body---------------------------------- + buffer.putChar((char) bytesarray.length); //参数数组大小 + for (byte[] bs : bytesarray) { + buffer.putInt(bs.length); + buffer.put(bs); + } + buffer.flip(); } - buffer.flip(); + conn.write(buffer).get(writeto > 0 ? writeto : 5, TimeUnit.SECONDS); + buffer.clear(); } - if (action.async) { - transport.async(buffer, null, null); - return null; + conn.read(buffer).get(readto > 0 ? readto : 5, TimeUnit.SECONDS); + buffer.flip(); + long rseqid = buffer.getLong(); + if (rseqid != seqid) throw new RuntimeException("sncp send seqid = " + seqid + ", but receive seqid =" + rseqid); + if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp buffer receive header.length not " + HEADER_SIZE); + long rserviceid = buffer.getLong(); + if (rserviceid != serviceid) throw new RuntimeException("sncp send serviceid = " + serviceid + ", but receive serviceid =" + rserviceid); + long rnameid = buffer.getLong(); + if (rnameid != nameid) throw new RuntimeException("sncp send nameid = " + nameid + ", but receive nameid =" + rnameid); + long ractionid1 = buffer.getLong(); + long ractionid2 = buffer.getLong(); + if (!actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp send actionid = " + actionid + ", but receive actionid =(" + ractionid1 + "_" + ractionid2 + ")"); + final int frameCount = buffer.get(); + if (frameCount < 1) throw new RuntimeException("sncp send nameid = " + nameid + ", but frame.count =" + frameCount); + int frameIndex = buffer.get(); + if (frameIndex < 0 || frameIndex >= frameCount) throw new RuntimeException("sncp send nameid = " + nameid + ", but frame.count =" + frameCount + " & frame.index =" + frameIndex); + final int retcode = buffer.getInt(); + if (retcode != 0) throw new RuntimeException("remote service deal error (receive retcode =" + retcode + ")"); + final int bodylen = buffer.getInt(); + final byte[] body = new byte[bodylen]; + if (frameCount == 1) { + buffer.get(body); + return body; + } else { + int received = 0; + for (int i = 0; i < frameCount; i++) { + received += buffer.remaining(); + buffer.get(body, (frameCount - frameIndex - 1) * (buffer.capacity() - HEADER_SIZE), buffer.remaining()); + if (i == frameCount - 1) break; + buffer.clear(); + conn.read(buffer).get(readto > 0 ? readto : 5, TimeUnit.SECONDS); + buffer.flip(); + rseqid = buffer.getLong(); + if (rseqid != seqid) throw new RuntimeException("sncp send seqid = " + seqid + ", but receive next.seqid =" + rseqid); + if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp buffer receive header.length not " + HEADER_SIZE); + rserviceid = buffer.getLong(); + if (rserviceid != serviceid) throw new RuntimeException("sncp send serviceid = " + serviceid + ", but receive next.serviceid =" + rserviceid); + rnameid = buffer.getLong(); + if (rnameid != nameid) throw new RuntimeException("sncp send nameid = " + nameid + ", but receive next.nameid =" + rnameid); + ractionid1 = buffer.getLong(); + ractionid2 = buffer.getLong(); + if (!actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp send actionid = " + actionid + ", but receive next.actionid =(" + ractionid1 + "_" + ractionid2 + ")"); + if (buffer.get() < 1) throw new RuntimeException("sncp send nameid = " + nameid + ", but next.frame.count != " + frameCount); + frameIndex = buffer.get(); + if (frameIndex < 0 || frameIndex >= frameCount) throw new RuntimeException("sncp receive nameid = " + nameid + ", but frame.count =" + frameCount + " & next.frame.index =" + frameIndex); + int rretcode = buffer.getInt(); + if (rretcode != 0) throw new RuntimeException("remote service deal error (receive retcode =" + rretcode + ")"); + int rbodylen = buffer.getInt(); + if (rbodylen != bodylen) throw new RuntimeException("sncp receive bodylength = " + bodylen + ", but receive next.bodylength =" + rbodylen); + } + if (received != bodylen) throw new RuntimeException("sncp receive bodylength = " + bodylen + ", but receive next.receivedlength =" + received); + return body; } - buffer = transport.send(buffer); + } catch (RuntimeException ex) { + throw ex; + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + transport.offerBuffer(buffer); + transport.offerConnection(conn); } - long rseqid = buffer.getLong(); - if (rseqid != seqid) throw new RuntimeException("sncp send seqid = " + seqid + ", but receive seqid =" + rseqid); - if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp buffer receive header.length not " + HEADER_SIZE); - long rserviceid = buffer.getLong(); - if (rserviceid != serviceid) throw new RuntimeException("sncp send serviceid = " + serviceid + ", but receive serviceid =" + rserviceid); - long rnameid = buffer.getLong(); - if (rnameid != nameid) throw new RuntimeException("sncp send nameid = " + nameid + ", but receive nameid =" + rnameid); - long ractionid1 = buffer.getLong(); - long ractionid2 = buffer.getLong(); - if (!actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp send actionid = " + actionid + ", but receive actionid =(" + ractionid1 + "_" + ractionid2 + ")"); - int frameCount = buffer.get(); - if (frameCount < 1) throw new RuntimeException("sncp send nameid = " + nameid + ", but frame.count =" + frameCount); - int frameIndex = buffer.get(); - if (frameIndex < 0 || frameIndex >= frameCount) throw new RuntimeException("sncp send nameid = " + nameid + ", but frame.count =" + frameCount + " & frame.index =" + frameIndex); - int retcode = buffer.getInt(); - if (retcode != 0) throw new RuntimeException("remote service deal error (receive retcode =" + retcode + ")"); - int bodylen = buffer.getInt(); - byte[] bytes = new byte[bodylen]; - buffer.get(bytes); - transport.offerBuffer(buffer); - return bytes; } } diff --git a/src/com/wentch/redkale/net/sncp/SncpContext.java b/src/com/wentch/redkale/net/sncp/SncpContext.java index ee75bd70d..6b1d0cab1 100644 --- a/src/com/wentch/redkale/net/sncp/SncpContext.java +++ b/src/com/wentch/redkale/net/sncp/SncpContext.java @@ -78,6 +78,10 @@ public final class SncpContext extends Context { } } + protected void removeRequestEntity(long seqid) { + requests.remove(seqid); + } + protected RequestEntry getRequestEntity(long seqid) { return requests.get(seqid); } diff --git a/src/com/wentch/redkale/net/sncp/SncpDynServlet.java b/src/com/wentch/redkale/net/sncp/SncpDynServlet.java index 6d8b5796c..0c7c0e984 100644 --- a/src/com/wentch/redkale/net/sncp/SncpDynServlet.java +++ b/src/com/wentch/redkale/net/sncp/SncpDynServlet.java @@ -5,17 +5,15 @@ */ package com.wentch.redkale.net.sncp; -import com.wentch.redkale.util.AnyValue; -import com.wentch.redkale.util.DebugMethodVisitor; -import com.wentch.redkale.util.TwoLong; -import com.wentch.redkale.convert.bson.BsonConvert; +import com.wentch.redkale.convert.bson.*; import static com.wentch.redkale.net.sncp.SncpClient.getOnMethod; -import com.wentch.redkale.service.Service; +import com.wentch.redkale.service.*; +import com.wentch.redkale.util.*; import java.io.*; import java.lang.reflect.*; import java.util.*; import java.util.logging.*; -import javax.annotation.Resource; +import javax.annotation.*; import jdk.internal.org.objectweb.asm.*; import static jdk.internal.org.objectweb.asm.Opcodes.*; import jdk.internal.org.objectweb.asm.Type; @@ -84,14 +82,12 @@ public class SncpDynServlet extends SncpServlet { if (action == null) { response.finish(SncpResponse.RETCODE_ILLACTIONID, null); //无效actionid } else { - byte[] rs = null; try { - rs = action.action(request.getParamBytes()); + response.finish(0, action.action(request.getParamBytes())); } catch (Throwable t) { response.getContext().getLogger().log(Level.INFO, "sncp execute error(" + request + ")", t); response.finish(SncpResponse.RETCODE_THROWEXCEPTION, null); } - response.finish(0, rs); } } diff --git a/src/com/wentch/redkale/net/sncp/SncpRequest.java b/src/com/wentch/redkale/net/sncp/SncpRequest.java index a1d616d54..2aaacb9d5 100644 --- a/src/com/wentch/redkale/net/sncp/SncpRequest.java +++ b/src/com/wentch/redkale/net/sncp/SncpRequest.java @@ -16,36 +16,36 @@ import java.nio.*; * @author zhangjx */ public final class SncpRequest extends Request { - + public static final int HEADER_SIZE = 52; - + protected final BsonConvert convert; - + private long seqid; - + private int framecount; - + private int frameindex; - + private long nameid; - + private long serviceid; - + private TwoLong actionid; - + private int bodylength; - + private byte[][] paramBytes; - + private boolean ping; - + private byte[] body; - + protected SncpRequest(SncpContext context, BsonFactory factory) { super(context); this.convert = factory.getConvert(); } - + @Override protected int readHeader(ByteBuffer buffer) { if (buffer.remaining() < HEADER_SIZE) { @@ -85,20 +85,21 @@ public final class SncpRequest extends Request { RequestEntry entry = scontext.getRequestEntity(this.seqid); if (entry == null) entry = scontext.addRequestEntity(this.seqid, new byte[this.bodylength]); entry.add(buffer, (this.framecount - this.frameindex - 1) * (buffer.capacity() - HEADER_SIZE)); - + if (entry.isCompleted()) { //数据读取完毕 this.body = entry.body; + scontext.removeRequestEntity(this.seqid); return 0; } else { scontext.expireRequestEntry(10 * 1000); //10秒过期 - } + } return Integer.MIN_VALUE; //多帧数据返回 Integer.MIN_VALUE } - + @Override protected void readBody(ByteBuffer buffer) { } - + @Override protected void prepare() { if (this.body == null) return; @@ -115,22 +116,14 @@ public final class SncpRequest extends Request { } this.paramBytes = bbytes; } - + @Override public String toString() { return SncpRequest.class.getSimpleName() + "{seqid=" + this.seqid + ",serviceid=" + this.serviceid + ",actionid=" + this.actionid + ",framecount=" + this.framecount + ",frameindex=" + this.frameindex + ",bodylength=" + this.bodylength + "}"; } - - protected void setKeepAlive(boolean keepAlive) { - this.keepAlive = keepAlive; - } - - protected boolean isKeepAlive() { - return this.keepAlive; - } - + @Override protected void recycle() { this.seqid = 0; @@ -144,29 +137,29 @@ public final class SncpRequest extends Request { this.ping = false; super.recycle(); } - + protected boolean isPing() { return ping; } - + public byte[][] getParamBytes() { return paramBytes; } - + public long getSeqid() { return seqid; } - + public long getServiceid() { return serviceid; } - + public long getNameid() { return nameid; } - + public TwoLong getActionid() { return actionid; } - + } diff --git a/src/com/wentch/redkale/net/sncp/SncpResponse.java b/src/com/wentch/redkale/net/sncp/SncpResponse.java index 42bbfc5af..f37c5ec96 100644 --- a/src/com/wentch/redkale/net/sncp/SncpResponse.java +++ b/src/com/wentch/redkale/net/sncp/SncpResponse.java @@ -6,6 +6,7 @@ package com.wentch.redkale.net.sncp; import com.wentch.redkale.net.*; +import static com.wentch.redkale.net.sncp.SncpRequest.HEADER_SIZE; import com.wentch.redkale.util.*; import java.nio.*; import java.util.concurrent.atomic.*; @@ -34,6 +35,32 @@ public final class SncpResponse extends Response { public void finish(final int retcode, final byte[] bytes) { ByteBuffer buffer = context.pollBuffer(); + final int bodyLength = (bytes == null ? 0 : bytes.length); + final int patch = bodyLength / (buffer.capacity() - HEADER_SIZE) + (bodyLength % (buffer.capacity() - HEADER_SIZE) > 0 ? 1 : 0); + if (patch <= 1) { + //---------------------head---------------------------------- + fillHeader(buffer, retcode, 1, 0, bodyLength); + //---------------------body---------------------------------- + if (bytes != null) buffer.put(bytes); + buffer.flip(); + finish(buffer); + } else { + final ByteBuffer[] buffers = new ByteBuffer[patch]; + int pos = 0; + for (int i = patch - 1; i >= 0; i--) { + if (i != patch - 1) buffer = context.pollBuffer(); + fillHeader(buffer, retcode, patch, i, bodyLength); + buffers[i] = buffer; + int len = Math.min(buffer.remaining(), bytes.length - pos); + buffer.put(bytes, pos, len); + pos += len; + buffer.flip(); + } + finish(buffers); + } + } + + private void fillHeader(ByteBuffer buffer, int retcode, int frameCount, int frameIndex, int bodyLength) { //---------------------head---------------------------------- buffer.putLong(request.getSeqid()); buffer.putChar((char) SncpRequest.HEADER_SIZE); @@ -42,13 +69,9 @@ public final class SncpResponse extends Response { TwoLong actionid = request.getActionid(); buffer.putLong(actionid.getFirst()); buffer.putLong(actionid.getSecond()); - buffer.put((byte) 1); // frame count - buffer.put((byte) 0); //frame index + buffer.put((byte) frameCount); // frame count + buffer.put((byte) frameIndex); //frame index buffer.putInt(retcode); - buffer.putInt((bytes == null ? 0 : bytes.length)); - //---------------------body---------------------------------- - if (bytes != null) buffer.put(bytes); - buffer.flip(); - finish(buffer); + buffer.putInt(bodyLength); } } diff --git a/src/com/wentch/redkale/source/DataJDBCSource.java b/src/com/wentch/redkale/source/DataJDBCSource.java index bd41405f0..9ebdffd72 100644 --- a/src/com/wentch/redkale/source/DataJDBCSource.java +++ b/src/com/wentch/redkale/source/DataJDBCSource.java @@ -49,6 +49,8 @@ public final class DataJDBCSource implements DataSource { private static final String JDBC_SOURCE = "javax.persistence.jdbc.source"; + private static final Flipper FLIPPER_ONE = new Flipper(1); + private final Logger logger = Logger.getLogger(DataJDBCSource.class.getSimpleName()); private final AtomicBoolean debug = new AtomicBoolean(logger.isLoggable(Level.FINEST)); @@ -1101,7 +1103,7 @@ public final class DataJDBCSource implements DataSource { if (r != null || cache.isFullLoaded()) return r; } final Connection conn = createReadSQLConnection(); - try { + try { if (debug.get()) logger.finest(clazz.getSimpleName() + " find sql=" + info.query.sql.replace("?", String.valueOf(pk))); final PreparedStatement prestmt = conn.prepareStatement(info.query.sql); prestmt.setObject(1, pk); @@ -1198,6 +1200,20 @@ public final class DataJDBCSource implements DataSource { } } + /** + * 根据过滤对象FilterBean查询第一个符合条件的对象 + * + * @param + * @param clazz + * @param bean + * @return + */ + @Override + public T find(final Class clazz, final FilterBean bean) { + Sheet sheet = querySheet(clazz, FLIPPER_ONE, bean); + return sheet.isEmpty() ? null : sheet.list().get(0); + } + /** * 根据唯一索引获取对象 * @@ -1650,7 +1666,7 @@ public final class DataJDBCSource implements DataSource { public void connectionErrorOccurred(ConnectionEvent event) { usingCounter.decrementAndGet(); if ("08S01".equals(event.getSQLException().getSQLState())) return; //MySQL特性, 长时间连接没使用会抛出com.mysql.jdbc.exceptions.jdbc4.CommunicationsException - dataSource.logger.log(Level.WARNING, "connectionErronOccurred", event.getSQLException()); + dataSource.logger.log(Level.WARNING, "connectionErronOccurred " + event.getSQLException().getSQLState(), event.getSQLException()); } }; try { @@ -1780,7 +1796,9 @@ public final class DataJDBCSource implements DataSource { return poll(0, null); } } catch (SQLException ex) { - dataSource.logger.log(Level.FINER, "result.getConnection from pooled connection abort", ex); + if (!"08S01".equals(ex.getSQLState())) {//MySQL特性, 长时间连接没使用会抛出com.mysql.jdbc.exceptions.jdbc4.CommunicationsException + dataSource.logger.log(Level.FINER, "result.getConnection from pooled connection abort " + ex.getSQLState(), ex); + } return poll(0, null); } return conn; diff --git a/src/com/wentch/redkale/source/DataJPASource.java b/src/com/wentch/redkale/source/DataJPASource.java index e9c566cca..096071e2b 100644 --- a/src/com/wentch/redkale/source/DataJPASource.java +++ b/src/com/wentch/redkale/source/DataJPASource.java @@ -5,13 +5,12 @@ */ package com.wentch.redkale.source; -import com.wentch.redkale.util.Sheet; -import com.wentch.redkale.util.Attribute; import static com.wentch.redkale.source.FilterExpress.*; -import java.io.Serializable; -import java.lang.reflect.Array; +import com.wentch.redkale.util.*; +import java.io.*; +import java.lang.reflect.*; import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.*; import java.util.logging.*; import javax.persistence.*; import javax.persistence.criteria.*; @@ -45,6 +44,11 @@ final class DataJPASource implements DataSource { throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. } + @Override + public T find(Class clazz, FilterBean bean) { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + private static class DataJPAConnection extends DataConnection { private final EntityManager manager; diff --git a/src/com/wentch/redkale/source/DataSource.java b/src/com/wentch/redkale/source/DataSource.java index d09922f35..40f14392e 100644 --- a/src/com/wentch/redkale/source/DataSource.java +++ b/src/com/wentch/redkale/source/DataSource.java @@ -5,7 +5,7 @@ */ package com.wentch.redkale.source; -import com.wentch.redkale.util.Sheet; +import com.wentch.redkale.util.*; import java.io.*; import java.util.*; @@ -316,6 +316,16 @@ public interface DataSource { */ public T[] findByColumn(Class clazz, final SelectColumn selects, String column, Serializable... keys); + /** + * 根据过滤对象FilterBean查询第一个符合条件的对象 + * + * @param + * @param clazz + * @param bean + * @return + */ + public T find(final Class clazz, final FilterBean bean); + //-----------------------list---------------------------- /** * 根据指定字段值查询对象某个字段的集合 diff --git a/src/com/wentch/redkale/util/ObjectPool.java b/src/com/wentch/redkale/util/ObjectPool.java index 34ffeb50c..aadde2650 100644 --- a/src/com/wentch/redkale/util/ObjectPool.java +++ b/src/com/wentch/redkale/util/ObjectPool.java @@ -8,6 +8,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.function.*; +import java.util.logging.*; /** * @@ -16,6 +17,10 @@ import java.util.function.*; */ public final class ObjectPool { + private static final Logger logger = Logger.getLogger(ObjectPool.class.getSimpleName()); + + private final boolean debug; + private final Queue queue; private Creator creator; @@ -48,6 +53,7 @@ public final class ObjectPool { this.creator = creator; this.recycler = recycler; this.queue = new ArrayBlockingQueue<>(Math.max(Runtime.getRuntime().availableProcessors() * 2, max)); + this.debug = logger.isLoggable(Level.FINER); } public void setCreator(Creator creator) { @@ -66,6 +72,9 @@ public final class ObjectPool { public void offer(final T e) { if (e != null && recycler.test(e)) { if (cycleCounter != null) cycleCounter.incrementAndGet(); + if (debug) queue.forEach(t -> { + if (t == e) logger.log(Level.WARNING, "repeat offer the same object(" + e + ")", new Exception()); + }); queue.offer(e); } }