This commit is contained in:
地平线
2015-03-20 10:07:31 +08:00
parent d81461ccc2
commit b768e0e8ef
23 changed files with 495 additions and 389 deletions

View File

@@ -11,6 +11,7 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
/** /**
* 当Service是Remote模式时 用该注解标注在方法上可使数据变成异步传输, 该注解只能标注在返回类型为void的public方法上 * 当Service是Remote模式时 用该注解标注在方法上可使数据变成异步传输, 该注解只能标注在返回类型为void的public方法上
* 不再起作用, 屏蔽掉
* *
* @author zhangjx * @author zhangjx
*/ */
@@ -18,6 +19,7 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
@Documented @Documented
@Target({TYPE, METHOD}) @Target({TYPE, METHOD})
@Retention(RUNTIME) @Retention(RUNTIME)
@Deprecated
public @interface Async { public @interface Async {
} }

View File

@@ -5,9 +5,9 @@
*/ */
package com.wentch.redkale.net; package com.wentch.redkale.net;
import java.io.IOException; import java.io.*;
import java.net.*; import java.net.*;
import java.nio.ByteBuffer; import java.nio.*;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.concurrent.*; import java.util.concurrent.*;
@@ -29,6 +29,12 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
public abstract void setWriteTimeoutSecond(int writeTimeoutSecond); public abstract void setWriteTimeoutSecond(int writeTimeoutSecond);
public final <A> void write(ByteBuffer[] srcs, A attachment, CompletionHandler<Integer, ? super A> handler) {
write(srcs, 0, srcs.length, attachment, handler);
}
protected abstract <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler);
public abstract void dispose(); //同close 只是去掉throws IOException public abstract void dispose(); //同close 只是去掉throws IOException
public static AsyncConnection create(final String protocol, final SocketAddress address) throws IOException { public static AsyncConnection create(final String protocol, final SocketAddress address) throws IOException {
@@ -103,6 +109,11 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
channel.send(src, remoteAddress, attachment, handler); channel.send(src, remoteAddress, attachment, handler);
} }
@Override
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
channel.send(srcs, offset, length, remoteAddress, attachment, handler);
}
@Override @Override
public void setReadTimeoutSecond(int readTimeoutSecond) { public void setReadTimeoutSecond(int readTimeoutSecond) {
this.readTimeoutSecond = readTimeoutSecond; this.readTimeoutSecond = readTimeoutSecond;
@@ -210,6 +221,24 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
} }
} }
@Override
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
channel.write(srcs, offset, length, writeTimeoutSecond > 0 ? writeTimeoutSecond : 60, TimeUnit.SECONDS,
attachment, new CompletionHandler<Long, A>() {
@Override
public void completed(Long result, A attachment) {
handler.completed(result.intValue(), attachment);
}
@Override
public void failed(Throwable exc, A attachment) {
handler.failed(exc, attachment);
}
});
}
@Override @Override
public void setReadTimeoutSecond(int readTimeoutSecond) { public void setReadTimeoutSecond(int readTimeoutSecond) {
this.readTimeoutSecond = readTimeoutSecond; this.readTimeoutSecond = readTimeoutSecond;
@@ -266,6 +295,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
} catch (IOException io) { } catch (IOException io) {
} }
} }
}; };
} }

View File

@@ -7,16 +7,16 @@ package com.wentch.redkale.net;
import java.io.*; import java.io.*;
import java.lang.invoke.*; import java.lang.invoke.*;
import java.lang.ref.SoftReference; import java.lang.ref.*;
import java.lang.reflect.*; import java.lang.reflect.*;
import java.net.*; import java.net.*;
import java.nio.ByteBuffer; import java.nio.*;
import java.nio.channels.*; import java.nio.channels.*;
import java.security.*; import java.security.*;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import sun.misc.Cleaner; import sun.misc.*;
import sun.security.action.GetIntegerAction; import sun.security.action.*;
/** /**
* *
@@ -336,6 +336,35 @@ public final class AsyncDatagramChannel implements AsynchronousByteChannel, Mult
implSend(src, target, attachment, handler); implSend(src, target, attachment, handler);
} }
public <A> void send(ByteBuffer[] srcs, final int offset, final int length, SocketAddress target, A attachment, final CompletionHandler<Integer, ? super A> handler) {
if (handler == null) throw new NullPointerException("'handler' is null");
final ByteBuffer[] buffers = srcs;
implSend(buffers[offset], target, attachment, new CompletionHandler<Integer, A>() {
private int index = offset;
private int resultSum;
private final int max = length - 1;
@Override
public void completed(Integer result, A attachment) {
resultSum += result;
if (buffers[index].hasRemaining()) {
implSend(buffers[index], target, attachment, this);
} else if (index == max) {
handler.completed(resultSum, attachment);
} else {
implSend(buffers[++index], target, attachment, this);
}
}
@Override
public void failed(Throwable exc, A attachment) {
handler.failed(exc, attachment);
}
});
}
private <A> Future<Integer> implWrite(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) { private <A> Future<Integer> implWrite(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
int n = 0; int n = 0;
Throwable exc = null; Throwable exc = null;

View File

@@ -1,74 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.wentch.redkale.net;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.logging.Level;
/**
*
* @author zhangjx
* @param <A>
*/
public final class AsyncWriteHandler<A> implements CompletionHandler<Integer, A> {
protected final ByteBuffer buffer;
protected final AsynchronousByteChannel channel;
protected final Context context;
protected final ByteBuffer buffer2;
protected final A attachment;
protected final CompletionHandler hander;
public AsyncWriteHandler(Context context, ByteBuffer buffer, AsynchronousByteChannel channel) {
this(context, buffer, channel, null, null, null);
}
public AsyncWriteHandler(Context context, ByteBuffer buffer, AsynchronousByteChannel channel, ByteBuffer buffer2, A attachment, CompletionHandler hander) {
this.buffer = buffer;
this.channel = channel;
this.context = context;
this.buffer2 = buffer2;
this.attachment = attachment;
this.hander = hander;
}
@Override
public void completed(Integer result, A attachment) {
if (buffer.hasRemaining()) {
this.channel.write(buffer, attachment, this);
return;
}
if (context != null) context.offerBuffer(buffer);
if (hander != null) {
if (buffer2 == null) {
hander.completed(result, attachment);
} else {
this.channel.write(buffer2, attachment, hander);
}
}
}
@Override
public void failed(Throwable exc, A attachment) {
if (context != null) context.offerBuffer(buffer);
if (hander == null) {
try {
this.channel.close();
} catch (Exception e) {
context.logger.log(Level.FINE, "AsyncWriteHandler close channel erroneous", e);
}
} else {
hander.failed(exc, attachment);
}
}
}

View File

@@ -33,7 +33,6 @@ public final class PrepareRunner implements Runnable {
public void run() { public void run() {
final PrepareServlet prepare = context.prepare; final PrepareServlet prepare = context.prepare;
final ObjectPool<? extends Response> responsePool = context.responsePool; final ObjectPool<? extends Response> responsePool = context.responsePool;
final ByteBuffer buffer = context.pollBuffer();
if (data != null) { if (data != null) {
final Response response = responsePool.poll(); final Response response = responsePool.poll();
response.init(channel); response.init(channel);
@@ -41,11 +40,11 @@ public final class PrepareRunner implements Runnable {
prepare.prepare(data, response.request, response); prepare.prepare(data, response.request, response);
} catch (Throwable t) { } catch (Throwable t) {
context.logger.log(Level.WARNING, "prepare servlet abort, forece to close channel ", t); context.logger.log(Level.WARNING, "prepare servlet abort, forece to close channel ", t);
context.offerBuffer(buffer);
response.finish(true); response.finish(true);
} }
return; return;
} }
final ByteBuffer buffer = context.pollBuffer();
try { try {
channel.read(buffer, null, new CompletionHandler<Integer, Void>() { channel.read(buffer, null, new CompletionHandler<Integer, Void>() {
@Override @Override
@@ -63,7 +62,7 @@ public final class PrepareRunner implements Runnable {
// buffer.flip(); // buffer.flip();
// byte[] bytes = new byte[buffer.remaining()]; // byte[] bytes = new byte[buffer.remaining()];
// buffer.get(bytes); // buffer.get(bytes);
// System.out.println(new String(bytes)); // System.println(new String(bytes));
// } // }
buffer.flip(); buffer.flip();
final Response response = responsePool.poll(); final Response response = responsePool.poll();

View File

@@ -22,6 +22,12 @@ public abstract class Request {
protected AsyncConnection channel; protected AsyncConnection channel;
/**
* properties 与 attributes 的区别在于调用recycle时 attributes会被清空而properties会保留;
* properties 通常存放需要永久绑定在request里的一些对象
*/
private final Map<String, Object> properties = new HashMap<>();
protected final Map<String, Object> attributes = new HashMap<>(); protected final Map<String, Object> attributes = new HashMap<>();
protected Request(Context context) { protected Request(Context context) {
@@ -47,6 +53,23 @@ public abstract class Request {
channel = null; // close it by response channel = null; // close it by response
} }
protected void setProperty(String name, Object value) {
properties.put(name, value);
}
@SuppressWarnings("unchecked")
protected <T> T getProperty(String name) {
return (T) properties.get(name);
}
protected void removeProperty(String name) {
properties.remove(name);
}
protected Map<String, Object> getProperties() {
return properties;
}
public void setAttribute(String name, Object value) { public void setAttribute(String name, Object value) {
attributes.put(name, value); attributes.put(name, value);
} }

View File

@@ -22,22 +22,56 @@ public abstract class Response<R extends Request> {
protected AsyncConnection channel; protected AsyncConnection channel;
protected final CompletionHandler finishHandler = new CompletionHandler<Integer, ByteBuffer>() { private final CompletionHandler finishHandler = new CompletionHandler<Integer, ByteBuffer>() {
@Override @Override
public void completed(Integer result, ByteBuffer attachment) { public void completed(Integer result, ByteBuffer attachment) {
if (attachment.hasRemaining()) { if (attachment.hasRemaining()) {
channel.write(attachment, attachment, this); channel.write(attachment, attachment, this);
} else { } else {
Response.this.context.offerBuffer(attachment); context.offerBuffer(attachment);
Response.this.finish(); finish();
} }
} }
@Override @Override
public void failed(Throwable exc, ByteBuffer attachment) { public void failed(Throwable exc, ByteBuffer attachment) {
Response.this.context.offerBuffer(attachment); context.offerBuffer(attachment);
Response.this.finish(true); finish(true);
}
};
private final CompletionHandler finishHandler2 = new CompletionHandler<Integer, ByteBuffer[]>() {
@Override
public void completed(Integer result, ByteBuffer[] attachments) {
int index = -1;
for (int i = 0; i < attachments.length; i++) {
if (attachments[i].hasRemaining()) {
index = i;
break;
} else {
context.offerBuffer(attachments[i]);
}
}
if (index == 0) {
channel.write(attachments, attachments, this);
} else if (index > 0) {
ByteBuffer[] newattachs = new ByteBuffer[attachments.length - index];
System.arraycopy(attachments, index, newattachs, 0, newattachs.length);
channel.write(newattachs, newattachs, this);
} else {
finish();
}
}
@Override
public void failed(Throwable exc, ByteBuffer[] attachments) {
for (ByteBuffer attachment : attachments) {
context.offerBuffer(attachment);
}
finish(true);
} }
}; };
@@ -64,8 +98,8 @@ public abstract class Response<R extends Request> {
if (channel.isOpen()) channel.close(); if (channel.isOpen()) channel.close();
} catch (Exception e) { } catch (Exception e) {
} }
channel = null;
} }
channel = null;
} }
return true; return true;
} }
@@ -85,22 +119,84 @@ public abstract class Response<R extends Request> {
} }
public void finish(boolean kill) { public void finish(boolean kill) {
//System.out.println("耗时: " + (System.currentTimeMillis() - request.createtime)); //System.println("耗时: " + (System.currentTimeMillis() - request.createtime));
if (kill) refuseAlive(); if (kill) refuseAlive();
this.context.responsePool.offer(this); this.context.responsePool.offer(this);
} }
public void finish(ByteBuffer buffer) { public void finish(ByteBuffer buffer) {
finish(buffer, false); finish(false, buffer);
} }
public void finish(ByteBuffer buffer, boolean kill) { public void finish(boolean kill, ByteBuffer buffer) {
if (kill) refuseAlive(); if (kill) refuseAlive();
send(buffer, buffer, finishHandler); this.channel.write(buffer, buffer, finishHandler);
} }
public <A> void send(ByteBuffer buffer, A attachment, CompletionHandler<Integer, A> handler) { public void finish(ByteBuffer... buffers) {
this.channel.write(buffer, attachment, handler); finish(false, buffers);
}
public void finish(boolean kill, ByteBuffer... buffers) {
if (kill) refuseAlive();
this.channel.write(buffers, buffers, finishHandler2);
}
protected <A> void send(final ByteBuffer buffer, final A attachment, final CompletionHandler<Integer, A> handler) {
this.channel.write(buffer, attachment, new CompletionHandler<Integer, A>() {
@Override
public void completed(Integer result, A attachment) {
if (buffer.hasRemaining()) {
channel.write(buffer, attachment, this);
} else {
context.offerBuffer(buffer);
if (handler != null) handler.completed(result, attachment);
}
}
@Override
public void failed(Throwable exc, A attachment) {
context.offerBuffer(buffer);
if (handler != null) handler.failed(exc, attachment);
}
});
}
protected <A> void send(final ByteBuffer[] buffers, A attachment, CompletionHandler<Integer, A> handler) {
this.channel.write(buffers, attachment, new CompletionHandler<Integer, A>() {
@Override
public void completed(Integer result, A attachment) {
int index = -1;
for (int i = 0; i < buffers.length; i++) {
if (buffers[i].hasRemaining()) {
index = i;
break;
}
context.offerBuffer(buffers[i]);
}
if (index == 0) {
channel.write(buffers, attachment, this);
} else if (index > 0) {
ByteBuffer[] newattachs = new ByteBuffer[buffers.length - index];
System.arraycopy(buffers, index, newattachs, 0, newattachs.length);
channel.write(newattachs, attachment, this);
} else {
if (handler != null) handler.completed(result, attachment);
}
}
@Override
public void failed(Throwable exc, A attachment) {
for (ByteBuffer buffer : buffers) {
context.offerBuffer(buffer);
}
if (handler != null) handler.failed(exc, attachment);
}
});
} }
public Context getContext() { public Context getContext() {

View File

@@ -5,16 +5,16 @@
*/ */
package com.wentch.redkale.net; package com.wentch.redkale.net;
import com.wentch.redkale.util.AnyValue; import com.wentch.redkale.util.*;
import com.wentch.redkale.watch.WatchFactory; import com.wentch.redkale.watch.*;
import java.io.*; import java.io.*;
import java.lang.reflect.Method; import java.lang.reflect.*;
import java.net.*; import java.net.*;
import java.nio.charset.Charset; import java.nio.charset.*;
import java.text.*; import java.text.*;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.*;
import java.util.logging.*; import java.util.logging.*;
/** /**
@@ -108,7 +108,7 @@ public abstract class Server {
public void start() throws IOException { public void start() throws IOException {
this.context = this.createContext(); this.context = this.createContext();
this.context.prepare.init(this.context, config); this.context.prepare.init(this.context, config);
this.watch.inject(this.context.prepare); if (this.watch != null) this.watch.inject(this.context.prepare);
this.transport = ProtocolServer.create(this.protocol, context); this.transport = ProtocolServer.create(this.protocol, context);
this.transport.open(); this.transport.open();
transport.setOption(StandardSocketOptions.SO_REUSEADDR, true); transport.setOption(StandardSocketOptions.SO_REUSEADDR, true);

View File

@@ -5,19 +5,17 @@
*/ */
package com.wentch.redkale.net.http; package com.wentch.redkale.net.http;
import com.wentch.redkale.net.PrepareServlet; import com.wentch.redkale.net.*;
import com.wentch.redkale.net.Context;
import com.wentch.redkale.util.AnyValue;
import com.wentch.redkale.util.Utility;
import com.wentch.redkale.util.AnyValue.DefaultAnyValue; import com.wentch.redkale.util.AnyValue.DefaultAnyValue;
import com.wentch.redkale.watch.WatchFactory; import com.wentch.redkale.util.*;
import java.io.IOException; import com.wentch.redkale.watch.*;
import java.nio.ByteBuffer; import java.io.*;
import java.nio.*;
import java.util.*; import java.util.*;
import java.util.AbstractMap.SimpleEntry; import java.util.AbstractMap.SimpleEntry;
import java.util.function.Predicate; import java.util.function.*;
import java.util.logging.Level; import java.util.logging.*;
import java.util.regex.Pattern; import java.util.regex.*;
/** /**
* *
@@ -88,7 +86,7 @@ public final class HttpPrepareServlet extends PrepareServlet<HttpRequest, HttpRe
+ flashports.replace("$", "" + request.getContext().getServerAddress().getPort()) + "\"/>" + flashports.replace("$", "" + request.getContext().getServerAddress().getPort()) + "\"/>"
+ "</cross-domain-policy>").getBytes()).asReadOnlyBuffer(); + "</cross-domain-policy>").getBytes()).asReadOnlyBuffer();
} }
response.finish(flashPolicyBuffer.duplicate(), true); response.finish(true, flashPolicyBuffer.duplicate());
return; return;
} }
final String uri = request.getRequestURI(); final String uri = request.getRequestURI();

View File

@@ -5,11 +5,11 @@
*/ */
package com.wentch.redkale.net.http; package com.wentch.redkale.net.http;
import com.wentch.redkale.net.AsyncConnection; import com.wentch.redkale.net.*;
import com.wentch.redkale.util.AnyValue; import com.wentch.redkale.util.*;
import java.io.IOException; import java.io.*;
import java.nio.ByteBuffer; import java.nio.*;
import java.nio.channels.CompletionHandler; import java.nio.channels.*;
/** /**
* 在appliation.xml中的HTTP类型的server节点加上forwardproxy="true"表示该HttpServer支持正向代理 * 在appliation.xml中的HTTP类型的server节点加上forwardproxy="true"表示该HttpServer支持正向代理
@@ -76,21 +76,15 @@ public final class HttpProxyServlet extends HttpServlet {
final ByteBuffer buffer0 = response.getContext().pollBuffer(); final ByteBuffer buffer0 = response.getContext().pollBuffer();
buffer0.put("HTTP/1.1 200 Connection established\r\nConnection: close\r\n\r\n".getBytes()); buffer0.put("HTTP/1.1 200 Connection established\r\nConnection: close\r\n\r\n".getBytes());
buffer0.flip(); buffer0.flip();
response.send(buffer0, null, new CompletionHandler<Integer, Void>() { response.sendBody(buffer0, null, new CompletionHandler<Integer, Void>() {
@Override @Override
public void completed(Integer result, Void attachment) { public void completed(Integer result, Void attachment) {
if (buffer0.hasRemaining()) {
response.send(buffer0, attachment, this);
return;
}
response.getContext().offerBuffer(buffer0);
new ProxyCompletionHandler(remote, request, response).completed(0, null); new ProxyCompletionHandler(remote, request, response).completed(0, null);
} }
@Override @Override
public void failed(Throwable exc, Void attachment) { public void failed(Throwable exc, Void attachment) {
response.getContext().offerBuffer(buffer0);
response.finish(true); response.finish(true);
try { try {
remote.close(); remote.close();
@@ -124,7 +118,7 @@ public final class HttpProxyServlet extends HttpServlet {
public void completed(Integer result, Void attachment) { public void completed(Integer result, Void attachment) {
rbuffer.flip(); rbuffer.flip();
CompletionHandler parent = this; CompletionHandler parent = this;
response.send(rbuffer, null, new CompletionHandler<Integer, Void>() { response.sendBody(rbuffer.duplicate().asReadOnlyBuffer(), null, new CompletionHandler<Integer, Void>() {
@Override @Override
public void completed(Integer result, Void attachment) { public void completed(Integer result, Void attachment) {

View File

@@ -131,7 +131,7 @@ public final class HttpRequest extends Request {
break; break;
case "Connection": case "Connection":
this.connection = value; this.connection = value;
this.keepAlive = "Keep-Alive".equalsIgnoreCase(value); this.setKeepAlive("Keep-Alive".equalsIgnoreCase(value));
break; break;
default: default:
header.addValue(name, value); header.addValue(name, value);
@@ -158,7 +158,7 @@ public final class HttpRequest extends Request {
@Override @Override
protected void prepare() { protected void prepare() {
} }
private void parseBody() { private void parseBody() {
if (this.boundary || array.isEmpty()) return; if (this.boundary || array.isEmpty()) return;
addParameter(array, 0, array.count()); addParameter(array, 0, array.count());
@@ -200,6 +200,22 @@ public final class HttpRequest extends Request {
return true; return true;
} }
@Override
protected void setProperty(String name, Object value) {
super.setProperty(name, value);
}
@Override
@SuppressWarnings("unchecked")
protected <T> T getProperty(String name) {
return super.getProperty(name);
}
@Override
protected void removeProperty(String name) {
super.removeProperty(name);
}
@Override @Override
public HttpContext getContext() { public HttpContext getContext() {
return (HttpContext) this.context; return (HttpContext) this.context;

View File

@@ -5,11 +5,10 @@
*/ */
package com.wentch.redkale.net.http; package com.wentch.redkale.net.http;
import com.wentch.redkale.net.Context; import com.wentch.redkale.net.*;
import com.wentch.redkale.util.AnyValue; import com.wentch.redkale.util.*;
import java.io.*; import java.io.*;
import java.nio.*; import java.nio.*;
import java.nio.channels.*;
import java.nio.file.*; import java.nio.file.*;
import static java.nio.file.StandardWatchEventKinds.*; import static java.nio.file.StandardWatchEventKinds.*;
import java.util.*; import java.util.*;
@@ -169,7 +168,6 @@ public final class HttpResourceServlet extends HttpServlet {
} }
if (uri.length() == 0 || uri.equals("/")) uri = "/index.html"; if (uri.length() == 0 || uri.equals("/")) uri = "/index.html";
//System.out.println(request); //System.out.println(request);
response.skipHeader();
FileEntry entry = watcher == null ? createFileEntry(uri) : files.get(uri); FileEntry entry = watcher == null ? createFileEntry(uri) : files.get(uri);
if (entry == null) { if (entry == null) {
entry = createFileEntry(uri); entry = createFileEntry(uri);
@@ -178,14 +176,14 @@ public final class HttpResourceServlet extends HttpServlet {
if (entry == null) { if (entry == null) {
response.finish404(); response.finish404();
} else { } else {
entry.send(request, response); response.finishFile(entry.file, entry.content);
} }
} }
private FileEntry createFileEntry(String uri) { private FileEntry createFileEntry(String uri) {
File file = new File(root, uri); File file = new File(root, uri);
if (!file.isFile() || !file.canRead()) return null; if (!file.isFile() || !file.canRead()) return null;
FileEntry en = new FileEntry(this, uri, file); FileEntry en = new FileEntry(this, file);
if (watcher == null) return en; if (watcher == null) return en;
try { try {
Path p = file.getParentFile().toPath(); Path p = file.getParentFile().toPath();
@@ -198,43 +196,26 @@ public final class HttpResourceServlet extends HttpServlet {
private static final class FileEntry { private static final class FileEntry {
private final LongAdder counter = new LongAdder(); final File file;
private final File file;
private final HttpResourceServlet servlet; private final HttpResourceServlet servlet;
private final String uri; ByteBuffer content;
private String mimeType; public FileEntry(final HttpResourceServlet servlet, File file) {
private long length;
private String etag;
private byte[] header;
private ByteBuffer content;
public FileEntry(final HttpResourceServlet servlet, String uri, File file) {
this.servlet = servlet; this.servlet = servlet;
this.uri = uri;
this.file = file; this.file = file;
this.mimeType = MimeType.getByFilename(file.getName());
if (this.mimeType == null) this.mimeType = "application/octet-stream";
update(); update();
} }
public void update() { public void update() {
this.length = file.length();
this.etag = file.lastModified() + "-" + this.length;
this.header = ("HTTP/1.1 200 OK\r\nContent-Type:" + mimeType + "\r\nETag:" + etag + (servlet.ranges != null && servlet.ranges.test(uri) ? "\r\nAccept-Ranges:bytes" : "") + "\r\nContent-Length:" + length + "\r\n\r\n").getBytes();
if (this.content != null) { if (this.content != null) {
this.servlet.cachedLength.add(0L - this.content.remaining()); this.servlet.cachedLength.add(0L - this.content.remaining());
this.content = null; this.content = null;
} }
if (this.length > this.servlet.cachelengthmax) return; long length = this.file.length();
if (this.servlet.cachedLength.longValue() + this.length > this.servlet.cachelimit) return; if (length > this.servlet.cachelengthmax) return;
if (this.servlet.cachedLength.longValue() + length > this.servlet.cachelimit) return;
try { try {
FileInputStream in = new FileInputStream(file); FileInputStream in = new FileInputStream(file);
ByteArrayOutputStream out = new ByteArrayOutputStream((int) file.length()); ByteArrayOutputStream out = new ByteArrayOutputStream((int) file.length());
@@ -244,9 +225,9 @@ public final class HttpResourceServlet extends HttpServlet {
out.write(bytes, 0, pos); out.write(bytes, 0, pos);
} }
in.close(); in.close();
ByteBuffer buf = ByteBuffer.allocateDirect((int) (this.header.length + file.length())); byte[] bs = out.toByteArray();
buf.put(this.header); ByteBuffer buf = ByteBuffer.allocateDirect(bs.length);
buf.put(out.toByteArray()); buf.put(bs);
buf.flip(); buf.flip();
this.content = buf.asReadOnlyBuffer(); this.content = buf.asReadOnlyBuffer();
this.servlet.cachedLength.add(this.content.remaining()); this.servlet.cachedLength.add(this.content.remaining());
@@ -265,64 +246,5 @@ public final class HttpResourceServlet extends HttpServlet {
return this.content == null ? 0L : this.content.remaining(); return this.content == null ? 0L : this.content.remaining();
} }
public void send(HttpRequest request, final HttpResponse response) throws IOException {
counter.increment();
final String match = request.getHeader("If-None-Match");
if (match != null && this.etag.equals(match)) {
response.finish304();
return;
}
final boolean acceptRange = (servlet.ranges != null && servlet.ranges.test(request.getRequestURI()));
String range = acceptRange ? request.getHeader("Range") : null;
if (acceptRange) {
String ifRange = request.getHeader("If-Range");
if (ifRange != null && !this.etag.equals(ifRange)) range = null;
}
if (content != null && range == null) {
response.finish(content.duplicate());
return;
}
final HttpContext context = response.getContext();
final ByteBuffer buffer = context.pollBuffer();
if (range != null && (!range.startsWith("bytes=") || range.indexOf(',') >= 0)) range = null;
if (range == null) {
buffer.put(header);
buffer.flip();
response.finishFile(buffer, file);
return;
}
range = range.substring("bytes=".length());
int pos = range.indexOf('-');
final long start = pos == 0 ? 0 : Integer.parseInt(range.substring(0, pos));
final long end = (pos == range.length() - 1) ? -1 : Long.parseLong(range.substring(pos + 1));
long clen = end > 0 ? (end - start + 1) : (file.length() - start);
buffer.put(("HTTP/1.1 206 Partial Content\r\nContent-Type:" + mimeType + "\r\nAccept-Ranges:bytes\r\nContent-Range:bytes " + start + "-" + (end > 0 ? end : length - 1) + "/" + length + "\r\nContent-Length:" + clen + "\r\n\r\n").getBytes());
buffer.flip();
final ByteBuffer body = this.content;
if (body == null) {
response.finishFile(buffer, file, start, end > 0 ? clen : end);
} else {
final ByteBuffer body2 = body.duplicate();
body2.position((int) (this.header.length + start));
if (end > 0) body2.limit((int) (body2.position() + end - start + 1));
response.send(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
response.getContext().offerBuffer(attachment);
response.finish(body2);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
if (attachment.limit() != attachment.capacity()) {
response.getContext().offerBuffer(attachment);
}
response.finish(true);
}
});
}
}
} }
} }

View File

@@ -150,21 +150,26 @@ public final class HttpResponse extends Response<HttpRequest> {
public void finishJson(Object obj) { public void finishJson(Object obj) {
this.contentType = "text/plain; charset=utf-8"; this.contentType = "text/plain; charset=utf-8";
finishString(request.convert.convertTo(obj)); finish(request.convert.convertTo(obj));
} }
public void finishJson(Type type, Object obj) { public void finishJson(Type type, Object obj) {
this.contentType = "text/plain; charset=utf-8"; this.contentType = "text/plain; charset=utf-8";
finishString(request.convert.convertTo(type, obj)); finish(request.convert.convertTo(type, obj));
} }
public void finishJson(Object... objs) { public void finishJson(Object... objs) {
this.contentType = "text/plain; charset=utf-8"; this.contentType = "text/plain; charset=utf-8";
finishString(request.convert.convertTo(objs)); finish(request.convert.convertTo(objs));
} }
public void finishString(String obj) { public void finish(String obj) {
if (obj == null) obj = "null"; if (obj == null || obj.isEmpty()) {
final ByteBuffer headbuf = createHeader();
headbuf.flip();
super.finish(headbuf);
return;
}
if (context.getCharset() == null) { if (context.getCharset() == null) {
final char[] chars = Utility.charArray(obj); final char[] chars = Utility.charArray(obj);
this.contentLength = Utility.encodeUTF8Length(chars); this.contentLength = Utility.encodeUTF8Length(chars);
@@ -172,27 +177,23 @@ public final class HttpResponse extends Response<HttpRequest> {
ByteBuffer buf2 = Utility.encodeUTF8(headbuf, (int) this.contentLength, chars); ByteBuffer buf2 = Utility.encodeUTF8(headbuf, (int) this.contentLength, chars);
headbuf.flip(); headbuf.flip();
if (buf2 == null) { if (buf2 == null) {
super.send(headbuf, headbuf, finishHandler); super.finish(headbuf);
} else { } else {
super.send(headbuf, buf2, new AsyncWriteHandler<>(this.context, headbuf, this.channel, buf2, buf2, finishHandler)); super.finish(headbuf, buf2);
} }
} else { } else {
ByteBuffer buffer = context.getCharset().encode(obj); ByteBuffer buffer = context.getCharset().encode(obj);
this.contentLength = buffer.remaining(); this.contentLength = buffer.remaining();
send(buffer, buffer, finishHandler); final ByteBuffer headbuf = createHeader();
headbuf.flip();
super.finish(headbuf, buffer);
} }
} }
public void finish(int status, String message) { public void finish(int status, String message) {
this.status = status; this.status = status;
if (status != 200) super.refuseAlive(); if (status != 200) super.refuseAlive();
if (message == null || message.isEmpty()) { finish(message);
ByteBuffer headbuf = createHeader();
headbuf.flip();
super.send(headbuf, headbuf, finishHandler);
} else {
finishString(message);
}
} }
public void finish304() { public void finish304() {
@@ -203,15 +204,14 @@ public final class HttpResponse extends Response<HttpRequest> {
super.finish(buffer404.duplicate()); super.finish(buffer404.duplicate());
} }
@Override public <A> void sendBody(ByteBuffer buffer, A attachment, CompletionHandler<Integer, A> handler) {
public <A> void send(ByteBuffer buffer, A attachment, CompletionHandler<Integer, A> handler) {
if (!this.headsended) { if (!this.headsended) {
ByteBuffer headbuf = createHeader(); ByteBuffer headbuf = createHeader();
headbuf.flip(); headbuf.flip();
if (buffer == null) { if (buffer == null) {
super.send(headbuf, attachment, handler); super.send(headbuf, attachment, handler);
} else { } else {
super.send(headbuf, attachment, new AsyncWriteHandler<>(this.context, headbuf, this.channel, buffer, attachment, handler)); super.send(new ByteBuffer[]{headbuf, headbuf}, attachment, handler);
} }
} else { } else {
super.send(buffer, attachment, handler); super.send(buffer, attachment, handler);
@@ -234,7 +234,7 @@ public final class HttpResponse extends Response<HttpRequest> {
return; return;
} }
this.contentLength = file.length(); this.contentLength = file.length();
if (this.contentType == null) this.contentType = MimeType.getByFilename(file.getName()); this.contentType = MimeType.getByFilename(file.getName());
if (this.contentType == null) this.contentType = "application/octet-stream"; if (this.contentType == null) this.contentType = "application/octet-stream";
String range = request.getHeader("Range"); String range = request.getHeader("Range");
if (range != null && (!range.startsWith("bytes=") || range.indexOf(',') >= 0)) range = null; if (range != null && (!range.startsWith("bytes=") || range.indexOf(',') >= 0)) range = null;
@@ -252,41 +252,23 @@ public final class HttpResponse extends Response<HttpRequest> {
this.contentLength = clen; this.contentLength = clen;
len = end > 0 ? clen : end; len = end > 0 ? clen : end;
} }
ByteBuffer buffer = createHeader(); this.addHeader("ETag", file.lastModified() + "-" + length);
buffer.flip(); ByteBuffer hbuffer = createHeader();
hbuffer.flip();
if (fileBody == null) { if (fileBody == null) {
HttpResponse.this.finishFile(buffer, file, start, len); finishFile(hbuffer, file, start, len);
} else { } else {
final ByteBuffer body = fileBody.duplicate().asReadOnlyBuffer(); final ByteBuffer body = fileBody.duplicate().asReadOnlyBuffer();
if (start >= 0) { if (start >= 0) {
body.position((int) start); body.position((int) start);
if (len > 0) body.limit((int) (body.position() + len)); if (len > 0) body.limit((int) (body.position() + len));
} }
send(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { super.finish(hbuffer, body);
@Override
public void completed(Integer result, ByteBuffer attachment) {
context.offerBuffer(attachment);
finish(body);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
if (attachment.limit() != attachment.capacity()) {
context.offerBuffer(attachment);
}
finish(true);
}
});
} }
} }
protected <A> void finishFile(ByteBuffer buffer, File file) throws IOException { private <A> void finishFile(ByteBuffer hbuffer, File file, long offset, long length) throws IOException {
finishFile(buffer, file, -1L, -1L); this.channel.write(hbuffer, hbuffer, new TransferFileHandler(AsynchronousFileChannel.open(file.toPath(), options, ((HttpContext) context).getExecutor()), offset, length));
}
protected <A> void finishFile(ByteBuffer buffer, File file, long offset, long length) throws IOException {
send(buffer, buffer, new TransferFileHandler(AsynchronousFileChannel.open(file.toPath(), options, ((HttpContext) context).getExecutor()), offset, length));
} }
private ByteBuffer createHeader() { private ByteBuffer createHeader() {
@@ -424,7 +406,7 @@ public final class HttpResponse extends Response<HttpRequest> {
} }
} }
attachment.flip(); attachment.flip();
send(attachment, attachment, this); channel.write(attachment, attachment, this);
} }
} }

View File

@@ -5,9 +5,10 @@
*/ */
package com.wentch.redkale.net.http; package com.wentch.redkale.net.http;
import com.wentch.redkale.net.Context; import com.wentch.redkale.net.*;
import com.wentch.redkale.util.AnyValue; import com.wentch.redkale.util.*;
import java.io.*; import java.io.*;
import java.nio.*;
import java.nio.channels.*; import java.nio.channels.*;
import java.security.*; import java.security.*;
import java.util.*; import java.util.*;
@@ -74,7 +75,7 @@ public abstract class WebSocketServlet extends HttpServlet {
response.setHeader("Connection", "Upgrade"); response.setHeader("Connection", "Upgrade");
response.addHeader("Upgrade", "websocket"); response.addHeader("Upgrade", "websocket");
response.addHeader("Sec-WebSocket-Accept", key); response.addHeader("Sec-WebSocket-Accept", key);
response.send(null, null, new CompletionHandler<Integer, Void>() { response.sendBody((ByteBuffer) null, null, new CompletionHandler<Integer, Void>() {
@Override @Override
public void completed(Integer result, Void attachment) { public void completed(Integer result, Void attachment) {

View File

@@ -141,27 +141,27 @@ public final class SncpClient {
final SncpAction action = actions[index]; final SncpAction action = actions[index];
final long seqid = System.nanoTime(); final long seqid = System.nanoTime();
final TwoLong actionid = action.actionid; final TwoLong actionid = action.actionid;
ByteBuffer buffer = transport.pollBuffer(); final ByteBuffer buffer = transport.pollBuffer();
if ((HEADER_SIZE + bodyLength) > buffer.limit()) { final AsyncConnection conn = transport.pollConnection();
if (debug) logger.finest(this.serviceid + "," + this.nameid + "," + action + " sncp length : " + (HEADER_SIZE + bodyLength)); final int readto = conn.getReadTimeoutSecond();
final int patch = bodyLength / (buffer.capacity() - HEADER_SIZE) + (bodyLength % (buffer.capacity() - HEADER_SIZE) > 0 ? 1 : 0); final int writeto = conn.getWriteTimeoutSecond();
AsyncConnection conn = transport.pollConnection(); try {
final int readto = conn.getReadTimeoutSecond(); if ((HEADER_SIZE + bodyLength) > buffer.limit()) {
final int writeto = conn.getWriteTimeoutSecond(); if (debug) logger.finest(this.serviceid + "," + this.nameid + "," + action + " sncp length : " + (HEADER_SIZE + bodyLength));
int pos = 0; final int patch = bodyLength / (buffer.capacity() - HEADER_SIZE) + (bodyLength % (buffer.capacity() - HEADER_SIZE) > 0 ? 1 : 0);
final byte[] all = new byte[bodyLength]; int pos = 0;
all[pos++] = (byte) ((bytesarray.length & 0xff00) >> 8); final byte[] all = new byte[bodyLength];
all[pos++] = (byte) (bytesarray.length & 0xff); all[pos++] = (byte) ((bytesarray.length & 0xff00) >> 8);
for (byte[] bs : bytesarray) { all[pos++] = (byte) (bytesarray.length & 0xff);
all[pos++] = (byte) ((bs.length & 0xff000000) >> 24); for (byte[] bs : bytesarray) {
all[pos++] = (byte) ((bs.length & 0xff0000) >> 16); all[pos++] = (byte) ((bs.length & 0xff000000) >> 24);
all[pos++] = (byte) ((bs.length & 0xff00) >> 8); all[pos++] = (byte) ((bs.length & 0xff0000) >> 16);
all[pos++] = (byte) (bs.length & 0xff); all[pos++] = (byte) ((bs.length & 0xff00) >> 8);
System.arraycopy(bs, 0, all, pos, bs.length); all[pos++] = (byte) (bs.length & 0xff);
pos += bs.length; System.arraycopy(bs, 0, all, pos, bs.length);
} pos += bs.length;
if (pos != all.length) logger.warning(this.serviceid + "," + this.nameid + "," + action + " sncp body.length : " + all.length + ", but pos=" + pos); }
try { if (pos != all.length) logger.warning(this.serviceid + "," + this.nameid + "," + action + " sncp body.length : " + all.length + ", but pos=" + pos);
pos = 0; pos = 0;
for (int i = patch - 1; i >= 0; i--) { for (int i = patch - 1; i >= 0; i--) {
fillHeader(buffer, seqid, actionid, patch, i, bodyLength); fillHeader(buffer, seqid, actionid, patch, i, bodyLength);
@@ -169,55 +169,86 @@ public final class SncpClient {
buffer.put(all, pos, len); buffer.put(all, pos, len);
pos += len; pos += len;
buffer.flip(); buffer.flip();
Thread.sleep(10);
conn.write(buffer).get(writeto > 0 ? writeto : 5, TimeUnit.SECONDS); conn.write(buffer).get(writeto > 0 ? writeto : 5, TimeUnit.SECONDS);
buffer.clear(); buffer.clear();
} }
conn.read(buffer).get(readto > 0 ? readto : 5, TimeUnit.SECONDS); } else { //只有一帧的数据
buffer.flip(); {
} catch (Exception e) { //---------------------head----------------------------------
throw new RuntimeException(e); fillHeader(buffer, seqid, actionid, 1, 0, bodyLength);
} finally { //---------------------body----------------------------------
transport.offerConnection(conn); buffer.putChar((char) bytesarray.length); //参数数组大小
} for (byte[] bs : bytesarray) {
} else { buffer.putInt(bs.length);
{ buffer.put(bs);
//---------------------head---------------------------------- }
fillHeader(buffer, seqid, actionid, 1, 0, bodyLength); buffer.flip();
//---------------------body----------------------------------
buffer.putChar((char) bytesarray.length); //参数数组大小
for (byte[] bs : bytesarray) {
buffer.putInt(bs.length);
buffer.put(bs);
} }
buffer.flip(); conn.write(buffer).get(writeto > 0 ? writeto : 5, TimeUnit.SECONDS);
buffer.clear();
} }
if (action.async) { conn.read(buffer).get(readto > 0 ? readto : 5, TimeUnit.SECONDS);
transport.async(buffer, null, null); buffer.flip();
return null; long rseqid = buffer.getLong();
if (rseqid != seqid) throw new RuntimeException("sncp send seqid = " + seqid + ", but receive seqid =" + rseqid);
if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp buffer receive header.length not " + HEADER_SIZE);
long rserviceid = buffer.getLong();
if (rserviceid != serviceid) throw new RuntimeException("sncp send serviceid = " + serviceid + ", but receive serviceid =" + rserviceid);
long rnameid = buffer.getLong();
if (rnameid != nameid) throw new RuntimeException("sncp send nameid = " + nameid + ", but receive nameid =" + rnameid);
long ractionid1 = buffer.getLong();
long ractionid2 = buffer.getLong();
if (!actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp send actionid = " + actionid + ", but receive actionid =(" + ractionid1 + "_" + ractionid2 + ")");
final int frameCount = buffer.get();
if (frameCount < 1) throw new RuntimeException("sncp send nameid = " + nameid + ", but frame.count =" + frameCount);
int frameIndex = buffer.get();
if (frameIndex < 0 || frameIndex >= frameCount) throw new RuntimeException("sncp send nameid = " + nameid + ", but frame.count =" + frameCount + " & frame.index =" + frameIndex);
final int retcode = buffer.getInt();
if (retcode != 0) throw new RuntimeException("remote service deal error (receive retcode =" + retcode + ")");
final int bodylen = buffer.getInt();
final byte[] body = new byte[bodylen];
if (frameCount == 1) {
buffer.get(body);
return body;
} else {
int received = 0;
for (int i = 0; i < frameCount; i++) {
received += buffer.remaining();
buffer.get(body, (frameCount - frameIndex - 1) * (buffer.capacity() - HEADER_SIZE), buffer.remaining());
if (i == frameCount - 1) break;
buffer.clear();
conn.read(buffer).get(readto > 0 ? readto : 5, TimeUnit.SECONDS);
buffer.flip();
rseqid = buffer.getLong();
if (rseqid != seqid) throw new RuntimeException("sncp send seqid = " + seqid + ", but receive next.seqid =" + rseqid);
if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp buffer receive header.length not " + HEADER_SIZE);
rserviceid = buffer.getLong();
if (rserviceid != serviceid) throw new RuntimeException("sncp send serviceid = " + serviceid + ", but receive next.serviceid =" + rserviceid);
rnameid = buffer.getLong();
if (rnameid != nameid) throw new RuntimeException("sncp send nameid = " + nameid + ", but receive next.nameid =" + rnameid);
ractionid1 = buffer.getLong();
ractionid2 = buffer.getLong();
if (!actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp send actionid = " + actionid + ", but receive next.actionid =(" + ractionid1 + "_" + ractionid2 + ")");
if (buffer.get() < 1) throw new RuntimeException("sncp send nameid = " + nameid + ", but next.frame.count != " + frameCount);
frameIndex = buffer.get();
if (frameIndex < 0 || frameIndex >= frameCount) throw new RuntimeException("sncp receive nameid = " + nameid + ", but frame.count =" + frameCount + " & next.frame.index =" + frameIndex);
int rretcode = buffer.getInt();
if (rretcode != 0) throw new RuntimeException("remote service deal error (receive retcode =" + rretcode + ")");
int rbodylen = buffer.getInt();
if (rbodylen != bodylen) throw new RuntimeException("sncp receive bodylength = " + bodylen + ", but receive next.bodylength =" + rbodylen);
}
if (received != bodylen) throw new RuntimeException("sncp receive bodylength = " + bodylen + ", but receive next.receivedlength =" + received);
return body;
} }
buffer = transport.send(buffer); } catch (RuntimeException ex) {
throw ex;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
transport.offerBuffer(buffer);
transport.offerConnection(conn);
} }
long rseqid = buffer.getLong();
if (rseqid != seqid) throw new RuntimeException("sncp send seqid = " + seqid + ", but receive seqid =" + rseqid);
if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp buffer receive header.length not " + HEADER_SIZE);
long rserviceid = buffer.getLong();
if (rserviceid != serviceid) throw new RuntimeException("sncp send serviceid = " + serviceid + ", but receive serviceid =" + rserviceid);
long rnameid = buffer.getLong();
if (rnameid != nameid) throw new RuntimeException("sncp send nameid = " + nameid + ", but receive nameid =" + rnameid);
long ractionid1 = buffer.getLong();
long ractionid2 = buffer.getLong();
if (!actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp send actionid = " + actionid + ", but receive actionid =(" + ractionid1 + "_" + ractionid2 + ")");
int frameCount = buffer.get();
if (frameCount < 1) throw new RuntimeException("sncp send nameid = " + nameid + ", but frame.count =" + frameCount);
int frameIndex = buffer.get();
if (frameIndex < 0 || frameIndex >= frameCount) throw new RuntimeException("sncp send nameid = " + nameid + ", but frame.count =" + frameCount + " & frame.index =" + frameIndex);
int retcode = buffer.getInt();
if (retcode != 0) throw new RuntimeException("remote service deal error (receive retcode =" + retcode + ")");
int bodylen = buffer.getInt();
byte[] bytes = new byte[bodylen];
buffer.get(bytes);
transport.offerBuffer(buffer);
return bytes;
} }
} }

View File

@@ -78,6 +78,10 @@ public final class SncpContext extends Context {
} }
} }
protected void removeRequestEntity(long seqid) {
requests.remove(seqid);
}
protected RequestEntry getRequestEntity(long seqid) { protected RequestEntry getRequestEntity(long seqid) {
return requests.get(seqid); return requests.get(seqid);
} }

View File

@@ -5,17 +5,15 @@
*/ */
package com.wentch.redkale.net.sncp; package com.wentch.redkale.net.sncp;
import com.wentch.redkale.util.AnyValue; import com.wentch.redkale.convert.bson.*;
import com.wentch.redkale.util.DebugMethodVisitor;
import com.wentch.redkale.util.TwoLong;
import com.wentch.redkale.convert.bson.BsonConvert;
import static com.wentch.redkale.net.sncp.SncpClient.getOnMethod; import static com.wentch.redkale.net.sncp.SncpClient.getOnMethod;
import com.wentch.redkale.service.Service; import com.wentch.redkale.service.*;
import com.wentch.redkale.util.*;
import java.io.*; import java.io.*;
import java.lang.reflect.*; import java.lang.reflect.*;
import java.util.*; import java.util.*;
import java.util.logging.*; import java.util.logging.*;
import javax.annotation.Resource; import javax.annotation.*;
import jdk.internal.org.objectweb.asm.*; import jdk.internal.org.objectweb.asm.*;
import static jdk.internal.org.objectweb.asm.Opcodes.*; import static jdk.internal.org.objectweb.asm.Opcodes.*;
import jdk.internal.org.objectweb.asm.Type; import jdk.internal.org.objectweb.asm.Type;
@@ -84,14 +82,12 @@ public class SncpDynServlet extends SncpServlet {
if (action == null) { if (action == null) {
response.finish(SncpResponse.RETCODE_ILLACTIONID, null); //无效actionid response.finish(SncpResponse.RETCODE_ILLACTIONID, null); //无效actionid
} else { } else {
byte[] rs = null;
try { try {
rs = action.action(request.getParamBytes()); response.finish(0, action.action(request.getParamBytes()));
} catch (Throwable t) { } catch (Throwable t) {
response.getContext().getLogger().log(Level.INFO, "sncp execute error(" + request + ")", t); response.getContext().getLogger().log(Level.INFO, "sncp execute error(" + request + ")", t);
response.finish(SncpResponse.RETCODE_THROWEXCEPTION, null); response.finish(SncpResponse.RETCODE_THROWEXCEPTION, null);
} }
response.finish(0, rs);
} }
} }

View File

@@ -16,36 +16,36 @@ import java.nio.*;
* @author zhangjx * @author zhangjx
*/ */
public final class SncpRequest extends Request { public final class SncpRequest extends Request {
public static final int HEADER_SIZE = 52; public static final int HEADER_SIZE = 52;
protected final BsonConvert convert; protected final BsonConvert convert;
private long seqid; private long seqid;
private int framecount; private int framecount;
private int frameindex; private int frameindex;
private long nameid; private long nameid;
private long serviceid; private long serviceid;
private TwoLong actionid; private TwoLong actionid;
private int bodylength; private int bodylength;
private byte[][] paramBytes; private byte[][] paramBytes;
private boolean ping; private boolean ping;
private byte[] body; private byte[] body;
protected SncpRequest(SncpContext context, BsonFactory factory) { protected SncpRequest(SncpContext context, BsonFactory factory) {
super(context); super(context);
this.convert = factory.getConvert(); this.convert = factory.getConvert();
} }
@Override @Override
protected int readHeader(ByteBuffer buffer) { protected int readHeader(ByteBuffer buffer) {
if (buffer.remaining() < HEADER_SIZE) { if (buffer.remaining() < HEADER_SIZE) {
@@ -85,20 +85,21 @@ public final class SncpRequest extends Request {
RequestEntry entry = scontext.getRequestEntity(this.seqid); RequestEntry entry = scontext.getRequestEntity(this.seqid);
if (entry == null) entry = scontext.addRequestEntity(this.seqid, new byte[this.bodylength]); if (entry == null) entry = scontext.addRequestEntity(this.seqid, new byte[this.bodylength]);
entry.add(buffer, (this.framecount - this.frameindex - 1) * (buffer.capacity() - HEADER_SIZE)); entry.add(buffer, (this.framecount - this.frameindex - 1) * (buffer.capacity() - HEADER_SIZE));
if (entry.isCompleted()) { //数据读取完毕 if (entry.isCompleted()) { //数据读取完毕
this.body = entry.body; this.body = entry.body;
scontext.removeRequestEntity(this.seqid);
return 0; return 0;
} else { } else {
scontext.expireRequestEntry(10 * 1000); //10秒过期 scontext.expireRequestEntry(10 * 1000); //10秒过期
} }
return Integer.MIN_VALUE; //多帧数据返回 Integer.MIN_VALUE return Integer.MIN_VALUE; //多帧数据返回 Integer.MIN_VALUE
} }
@Override @Override
protected void readBody(ByteBuffer buffer) { protected void readBody(ByteBuffer buffer) {
} }
@Override @Override
protected void prepare() { protected void prepare() {
if (this.body == null) return; if (this.body == null) return;
@@ -115,22 +116,14 @@ public final class SncpRequest extends Request {
} }
this.paramBytes = bbytes; this.paramBytes = bbytes;
} }
@Override @Override
public String toString() { public String toString() {
return SncpRequest.class.getSimpleName() + "{seqid=" + this.seqid return SncpRequest.class.getSimpleName() + "{seqid=" + this.seqid
+ ",serviceid=" + this.serviceid + ",actionid=" + this.actionid + ",serviceid=" + this.serviceid + ",actionid=" + this.actionid
+ ",framecount=" + this.framecount + ",frameindex=" + this.frameindex + ",bodylength=" + this.bodylength + "}"; + ",framecount=" + this.framecount + ",frameindex=" + this.frameindex + ",bodylength=" + this.bodylength + "}";
} }
protected void setKeepAlive(boolean keepAlive) {
this.keepAlive = keepAlive;
}
protected boolean isKeepAlive() {
return this.keepAlive;
}
@Override @Override
protected void recycle() { protected void recycle() {
this.seqid = 0; this.seqid = 0;
@@ -144,29 +137,29 @@ public final class SncpRequest extends Request {
this.ping = false; this.ping = false;
super.recycle(); super.recycle();
} }
protected boolean isPing() { protected boolean isPing() {
return ping; return ping;
} }
public byte[][] getParamBytes() { public byte[][] getParamBytes() {
return paramBytes; return paramBytes;
} }
public long getSeqid() { public long getSeqid() {
return seqid; return seqid;
} }
public long getServiceid() { public long getServiceid() {
return serviceid; return serviceid;
} }
public long getNameid() { public long getNameid() {
return nameid; return nameid;
} }
public TwoLong getActionid() { public TwoLong getActionid() {
return actionid; return actionid;
} }
} }

View File

@@ -6,6 +6,7 @@
package com.wentch.redkale.net.sncp; package com.wentch.redkale.net.sncp;
import com.wentch.redkale.net.*; import com.wentch.redkale.net.*;
import static com.wentch.redkale.net.sncp.SncpRequest.HEADER_SIZE;
import com.wentch.redkale.util.*; import com.wentch.redkale.util.*;
import java.nio.*; import java.nio.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
@@ -34,6 +35,32 @@ public final class SncpResponse extends Response<SncpRequest> {
public void finish(final int retcode, final byte[] bytes) { public void finish(final int retcode, final byte[] bytes) {
ByteBuffer buffer = context.pollBuffer(); ByteBuffer buffer = context.pollBuffer();
final int bodyLength = (bytes == null ? 0 : bytes.length);
final int patch = bodyLength / (buffer.capacity() - HEADER_SIZE) + (bodyLength % (buffer.capacity() - HEADER_SIZE) > 0 ? 1 : 0);
if (patch <= 1) {
//---------------------head----------------------------------
fillHeader(buffer, retcode, 1, 0, bodyLength);
//---------------------body----------------------------------
if (bytes != null) buffer.put(bytes);
buffer.flip();
finish(buffer);
} else {
final ByteBuffer[] buffers = new ByteBuffer[patch];
int pos = 0;
for (int i = patch - 1; i >= 0; i--) {
if (i != patch - 1) buffer = context.pollBuffer();
fillHeader(buffer, retcode, patch, i, bodyLength);
buffers[i] = buffer;
int len = Math.min(buffer.remaining(), bytes.length - pos);
buffer.put(bytes, pos, len);
pos += len;
buffer.flip();
}
finish(buffers);
}
}
private void fillHeader(ByteBuffer buffer, int retcode, int frameCount, int frameIndex, int bodyLength) {
//---------------------head---------------------------------- //---------------------head----------------------------------
buffer.putLong(request.getSeqid()); buffer.putLong(request.getSeqid());
buffer.putChar((char) SncpRequest.HEADER_SIZE); buffer.putChar((char) SncpRequest.HEADER_SIZE);
@@ -42,13 +69,9 @@ public final class SncpResponse extends Response<SncpRequest> {
TwoLong actionid = request.getActionid(); TwoLong actionid = request.getActionid();
buffer.putLong(actionid.getFirst()); buffer.putLong(actionid.getFirst());
buffer.putLong(actionid.getSecond()); buffer.putLong(actionid.getSecond());
buffer.put((byte) 1); // frame count buffer.put((byte) frameCount); // frame count
buffer.put((byte) 0); //frame index buffer.put((byte) frameIndex); //frame index
buffer.putInt(retcode); buffer.putInt(retcode);
buffer.putInt((bytes == null ? 0 : bytes.length)); buffer.putInt(bodyLength);
//---------------------body----------------------------------
if (bytes != null) buffer.put(bytes);
buffer.flip();
finish(buffer);
} }
} }

View File

@@ -49,6 +49,8 @@ public final class DataJDBCSource implements DataSource {
private static final String JDBC_SOURCE = "javax.persistence.jdbc.source"; private static final String JDBC_SOURCE = "javax.persistence.jdbc.source";
private static final Flipper FLIPPER_ONE = new Flipper(1);
private final Logger logger = Logger.getLogger(DataJDBCSource.class.getSimpleName()); private final Logger logger = Logger.getLogger(DataJDBCSource.class.getSimpleName());
private final AtomicBoolean debug = new AtomicBoolean(logger.isLoggable(Level.FINEST)); private final AtomicBoolean debug = new AtomicBoolean(logger.isLoggable(Level.FINEST));
@@ -1101,7 +1103,7 @@ public final class DataJDBCSource implements DataSource {
if (r != null || cache.isFullLoaded()) return r; if (r != null || cache.isFullLoaded()) return r;
} }
final Connection conn = createReadSQLConnection(); final Connection conn = createReadSQLConnection();
try { try {
if (debug.get()) logger.finest(clazz.getSimpleName() + " find sql=" + info.query.sql.replace("?", String.valueOf(pk))); if (debug.get()) logger.finest(clazz.getSimpleName() + " find sql=" + info.query.sql.replace("?", String.valueOf(pk)));
final PreparedStatement prestmt = conn.prepareStatement(info.query.sql); final PreparedStatement prestmt = conn.prepareStatement(info.query.sql);
prestmt.setObject(1, pk); prestmt.setObject(1, pk);
@@ -1198,6 +1200,20 @@ public final class DataJDBCSource implements DataSource {
} }
} }
/**
* 根据过滤对象FilterBean查询第一个符合条件的对象
*
* @param <T>
* @param clazz
* @param bean
* @return
*/
@Override
public <T> T find(final Class<T> clazz, final FilterBean bean) {
Sheet<T> sheet = querySheet(clazz, FLIPPER_ONE, bean);
return sheet.isEmpty() ? null : sheet.list().get(0);
}
/** /**
* 根据唯一索引获取对象 * 根据唯一索引获取对象
* *
@@ -1650,7 +1666,7 @@ public final class DataJDBCSource implements DataSource {
public void connectionErrorOccurred(ConnectionEvent event) { public void connectionErrorOccurred(ConnectionEvent event) {
usingCounter.decrementAndGet(); usingCounter.decrementAndGet();
if ("08S01".equals(event.getSQLException().getSQLState())) return; //MySQL特性 长时间连接没使用会抛出com.mysql.jdbc.exceptions.jdbc4.CommunicationsException if ("08S01".equals(event.getSQLException().getSQLState())) return; //MySQL特性 长时间连接没使用会抛出com.mysql.jdbc.exceptions.jdbc4.CommunicationsException
dataSource.logger.log(Level.WARNING, "connectionErronOccurred", event.getSQLException()); dataSource.logger.log(Level.WARNING, "connectionErronOccurred " + event.getSQLException().getSQLState(), event.getSQLException());
} }
}; };
try { try {
@@ -1780,7 +1796,9 @@ public final class DataJDBCSource implements DataSource {
return poll(0, null); return poll(0, null);
} }
} catch (SQLException ex) { } catch (SQLException ex) {
dataSource.logger.log(Level.FINER, "result.getConnection from pooled connection abort", ex); if (!"08S01".equals(ex.getSQLState())) {//MySQL特性 长时间连接没使用会抛出com.mysql.jdbc.exceptions.jdbc4.CommunicationsException
dataSource.logger.log(Level.FINER, "result.getConnection from pooled connection abort " + ex.getSQLState(), ex);
}
return poll(0, null); return poll(0, null);
} }
return conn; return conn;

View File

@@ -5,13 +5,12 @@
*/ */
package com.wentch.redkale.source; package com.wentch.redkale.source;
import com.wentch.redkale.util.Sheet;
import com.wentch.redkale.util.Attribute;
import static com.wentch.redkale.source.FilterExpress.*; import static com.wentch.redkale.source.FilterExpress.*;
import java.io.Serializable; import com.wentch.redkale.util.*;
import java.lang.reflect.Array; import java.io.*;
import java.lang.reflect.*;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.*;
import java.util.logging.*; import java.util.logging.*;
import javax.persistence.*; import javax.persistence.*;
import javax.persistence.criteria.*; import javax.persistence.criteria.*;
@@ -45,6 +44,11 @@ final class DataJPASource implements DataSource {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
} }
@Override
public <T> T find(Class<T> clazz, FilterBean bean) {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}
private static class DataJPAConnection extends DataConnection { private static class DataJPAConnection extends DataConnection {
private final EntityManager manager; private final EntityManager manager;

View File

@@ -5,7 +5,7 @@
*/ */
package com.wentch.redkale.source; package com.wentch.redkale.source;
import com.wentch.redkale.util.Sheet; import com.wentch.redkale.util.*;
import java.io.*; import java.io.*;
import java.util.*; import java.util.*;
@@ -316,6 +316,16 @@ public interface DataSource {
*/ */
public <T> T[] findByColumn(Class<T> clazz, final SelectColumn selects, String column, Serializable... keys); public <T> T[] findByColumn(Class<T> clazz, final SelectColumn selects, String column, Serializable... keys);
/**
* 根据过滤对象FilterBean查询第一个符合条件的对象
*
* @param <T>
* @param clazz
* @param bean
* @return
*/
public <T> T find(final Class<T> clazz, final FilterBean bean);
//-----------------------list---------------------------- //-----------------------list----------------------------
/** /**
* 根据指定字段值查询对象某个字段的集合 * 根据指定字段值查询对象某个字段的集合

View File

@@ -8,6 +8,7 @@ import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
import java.util.function.*; import java.util.function.*;
import java.util.logging.*;
/** /**
* *
@@ -16,6 +17,10 @@ import java.util.function.*;
*/ */
public final class ObjectPool<T> { public final class ObjectPool<T> {
private static final Logger logger = Logger.getLogger(ObjectPool.class.getSimpleName());
private final boolean debug;
private final Queue<T> queue; private final Queue<T> queue;
private Creator<T> creator; private Creator<T> creator;
@@ -48,6 +53,7 @@ public final class ObjectPool<T> {
this.creator = creator; this.creator = creator;
this.recycler = recycler; this.recycler = recycler;
this.queue = new ArrayBlockingQueue<>(Math.max(Runtime.getRuntime().availableProcessors() * 2, max)); this.queue = new ArrayBlockingQueue<>(Math.max(Runtime.getRuntime().availableProcessors() * 2, max));
this.debug = logger.isLoggable(Level.FINER);
} }
public void setCreator(Creator<T> creator) { public void setCreator(Creator<T> creator) {
@@ -66,6 +72,9 @@ public final class ObjectPool<T> {
public void offer(final T e) { public void offer(final T e) {
if (e != null && recycler.test(e)) { if (e != null && recycler.test(e)) {
if (cycleCounter != null) cycleCounter.incrementAndGet(); if (cycleCounter != null) cycleCounter.incrementAndGet();
if (debug) queue.forEach(t -> {
if (t == e) logger.log(Level.WARNING, "repeat offer the same object(" + e + ")", new Exception());
});
queue.offer(e); queue.offer(e);
} }
} }