This commit is contained in:
地平线
2015-03-27 18:15:13 +08:00
parent 2ca814fab2
commit 6615f58901
14 changed files with 97 additions and 36 deletions

View File

@@ -35,7 +35,7 @@ public final class BsonReader implements Reader {
} }
public static ObjectPool<BsonReader> createPool(int max) { public static ObjectPool<BsonReader> createPool(int max) {
return new ObjectPool<>(max, (Object... params) -> new BsonReader(), (x) -> x.recycle()); return new ObjectPool<>(max, (Object... params) -> new BsonReader(), null, (x) -> x.recycle());
} }
public BsonReader(byte[] bytes) { public BsonReader(byte[] bytes) {

View File

@@ -21,7 +21,7 @@ public final class BsonWriter implements Writer {
private byte[] content; private byte[] content;
public static ObjectPool<BsonWriter> createPool(int max) { public static ObjectPool<BsonWriter> createPool(int max) {
return new ObjectPool<>(max, (Object... params) -> new BsonWriter(), (x) -> x.recycle()); return new ObjectPool<>(max, (Object... params) -> new BsonWriter(), null, (x) -> x.recycle());
} }
public byte[] toArray() { public byte[] toArray() {

View File

@@ -22,7 +22,7 @@ public final class JsonReader implements Reader {
private int limit; private int limit;
public static ObjectPool<JsonReader> createPool(int max) { public static ObjectPool<JsonReader> createPool(int max) {
return new ObjectPool<>(max, (Object... params) -> new JsonReader(), (x) -> x.recycle()); return new ObjectPool<>(max, (Object... params) -> new JsonReader(), null, (x) -> x.recycle());
} }
public JsonReader() { public JsonReader() {

View File

@@ -27,7 +27,7 @@ public final class JsonWriter implements Writer {
private char[] content; private char[] content;
public static ObjectPool<JsonWriter> createPool(int max) { public static ObjectPool<JsonWriter> createPool(int max) {
return new ObjectPool<>(max, (Object... params) -> new JsonWriter(), (x) -> x.recycle()); return new ObjectPool<>(max, (Object... params) -> new JsonWriter(), null, (x) -> x.recycle());
} }
public JsonWriter() { public JsonWriter() {

View File

@@ -22,6 +22,8 @@ public abstract class Response<R extends Request> {
protected AsyncConnection channel; protected AsyncConnection channel;
private boolean inited = true;
private final CompletionHandler finishHandler = new CompletionHandler<Integer, ByteBuffer>() { private final CompletionHandler finishHandler = new CompletionHandler<Integer, ByteBuffer>() {
@Override @Override
@@ -87,7 +89,12 @@ public abstract class Response<R extends Request> {
return ch; return ch;
} }
protected void prepare() {
inited = true;
}
protected boolean recycle() { protected boolean recycle() {
if (!inited) return false;
boolean keepAlive = request.keepAlive; boolean keepAlive = request.keepAlive;
request.recycle(); request.recycle();
if (channel != null) { if (channel != null) {
@@ -101,6 +108,7 @@ public abstract class Response<R extends Request> {
} }
channel = null; channel = null;
} }
this.inited = false;
return true; return true;
} }

View File

@@ -52,7 +52,7 @@ public final class Transport {
AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber(Transport.class.getSimpleName() + "_" + protocol + ".Buffer.cycleCounter"); AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber(Transport.class.getSimpleName() + "_" + protocol + ".Buffer.cycleCounter");
int rcapacity = 8192; int rcapacity = 8192;
this.bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize, this.bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), (e) -> { (Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false; if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false;
e.clear(); e.clear();
return true; return true;

View File

@@ -5,9 +5,9 @@
*/ */
package com.wentch.redkale.net.http; package com.wentch.redkale.net.http;
import com.wentch.redkale.util.Utility; import com.wentch.redkale.util.*;
import java.nio.ByteBuffer; import java.nio.*;
import java.nio.charset.Charset; import java.nio.charset.*;
/** /**
* *
@@ -76,6 +76,10 @@ public final class ByteArray {
return -1; return -1;
} }
public void removeLastByte() {
if (count > 0) count--;
}
public void add(byte value) { public void add(byte value) {
if (count >= content.length - 1) { if (count >= content.length - 1) {
byte[] ns = new byte[content.length + 8]; byte[] ns = new byte[content.length + 8];
@@ -101,6 +105,10 @@ public final class ByteArray {
return new String(content, 0, count); return new String(content, 0, count);
} }
public String toString(final Charset charset) {
return toString(0, count, charset);
}
public String toString(final int offset, int len, final Charset charset) { public String toString(final int offset, int len, final Charset charset) {
if (charset == null) return new String(Utility.decodeUTF8(content, offset, len)); if (charset == null) return new String(Utility.decodeUTF8(content, offset, len));
return new String(content, offset, len, charset); return new String(content, offset, len, charset);

View File

@@ -58,9 +58,12 @@ public final class HttpRequest extends Request {
protected boolean boundary = false; protected boolean boundary = false;
protected HttpRequest(Context context, JsonFactory factory) { private final String remoteAddrHeader;
protected HttpRequest(Context context, JsonFactory factory, String remoteAddrHeader) {
super(context); super(context);
this.convert = factory.getConvert(); this.convert = factory.getConvert();
this.remoteAddrHeader = remoteAddrHeader;
} }
protected void setKeepAlive(boolean keepAlive) { protected void setKeepAlive(boolean keepAlive) {
@@ -222,6 +225,10 @@ public final class HttpRequest extends Request {
} }
public String getRemoteAddr() { public String getRemoteAddr() {
if (remoteAddrHeader != null) {
String val = getHeader(remoteAddrHeader);
if (val != null) return val;
}
return String.valueOf(getRemoteAddress()); return String.valueOf(getRemoteAddress());
} }
@@ -235,7 +242,7 @@ public final class HttpRequest extends Request {
return this.getClass().getSimpleName() + "{method:" + this.method + ", requestURI:" + this.requestURI return this.getClass().getSimpleName() + "{method:" + this.method + ", requestURI:" + this.requestURI
+ ", contentType:" + this.contentType + ", connection:" + this.connection + ", protocol:" + this.protocol + ", contentType:" + this.contentType + ", connection:" + this.connection + ", protocol:" + this.protocol
+ ", contentLength:" + this.contentLength + ", cookiestr:" + this.cookiestr + ", contentLength:" + this.contentLength + ", cookiestr:" + this.cookiestr
+ ", host:" + this.host + ", params:" + this.params + ", header:" + this.header + "}"; + ", host:" + this.host + ", params:" + this.params + ", header:" + this.header + "body:" + (array == null ? "null" : array.toString()) + "}";
} }
public final MultiContext getMultiContext() { public final MultiContext getMultiContext() {

View File

@@ -106,7 +106,7 @@ public final class HttpResponse extends Response<HttpRequest> {
private final HttpCookie defcookie; private final HttpCookie defcookie;
public static ObjectPool<Response> createPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator<Response> creator) { public static ObjectPool<Response> createPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator<Response> creator) {
return new ObjectPool<>(creatCounter, cycleCounter, max, creator, (x) -> ((HttpResponse) x).recycle()); return new ObjectPool<>(creatCounter, cycleCounter, max, creator, (x) -> ((HttpResponse) x).prepare(), (x) -> ((HttpResponse) x).recycle());
} }
protected HttpResponse(HttpContext context, HttpRequest request, String[][] defaultAddHeaders, String[][] defaultSetHeaders, HttpCookie defcookie) { protected HttpResponse(HttpContext context, HttpRequest request, String[][] defaultAddHeaders, String[][] defaultSetHeaders, HttpCookie defcookie) {

View File

@@ -51,7 +51,7 @@ public final class HttpServer extends Server {
AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Buffer.cycleCounter"); AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Buffer.cycleCounter");
int rcapacity = Math.max(this.capacity, 8 * 1024); int rcapacity = Math.max(this.capacity, 8 * 1024);
ObjectPool<ByteBuffer> bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, this.bufferPoolSize, ObjectPool<ByteBuffer> bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, this.bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), (e) -> { (Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false; if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false;
e.clear(); e.clear();
return true; return true;
@@ -64,7 +64,21 @@ public final class HttpServer extends Server {
String[][] defaultAddHeaders = null; String[][] defaultAddHeaders = null;
String[][] defaultSetHeaders = null; String[][] defaultSetHeaders = null;
HttpCookie defaultCookie = null; HttpCookie defaultCookie = null;
String remoteAddrHeader = null;
if (config != null) { if (config != null) {
AnyValue reqs = config == null ? null : config.getAnyValue("request");
if (reqs != null) {
AnyValue raddr = reqs.getAnyValue("remoteaddr");
remoteAddrHeader = raddr == null ? null : raddr.getValue("value");
if (remoteAddrHeader != null) {
if (remoteAddrHeader.startsWith("request.headers.")) {
remoteAddrHeader = remoteAddrHeader.substring("request.headers.".length());
} else {
remoteAddrHeader = null;
}
}
}
AnyValue resps = config == null ? null : config.getAnyValue("response"); AnyValue resps = config == null ? null : config.getAnyValue("response");
if (resps != null) { if (resps != null) {
AnyValue[] addHeaders = resps.getAnyValues("addheader"); AnyValue[] addHeaders = resps.getAnyValues("addheader");
@@ -110,13 +124,14 @@ public final class HttpServer extends Server {
final String[][] addHeaders = defaultAddHeaders; final String[][] addHeaders = defaultAddHeaders;
final String[][] setHeaders = defaultSetHeaders; final String[][] setHeaders = defaultSetHeaders;
final HttpCookie defCookie = defaultCookie; final HttpCookie defCookie = defaultCookie;
final String addrHeader = remoteAddrHeader;
AtomicLong createResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Response.creatCounter"); AtomicLong createResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Response.creatCounter");
AtomicLong cycleResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Response.cycleCounter"); AtomicLong cycleResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Response.cycleCounter");
ObjectPool<Response> responsePool = HttpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null); ObjectPool<Response> responsePool = HttpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null);
HttpContext httpcontext = new HttpContext(this.serverStartTime, this.logger, executor, bufferPool, responsePool, HttpContext httpcontext = new HttpContext(this.serverStartTime, this.logger, executor, bufferPool, responsePool,
this.maxbody, this.charset, this.address, prepare, this.watch, this.readTimeoutSecond, this.writeTimeoutSecond, contextPath); this.maxbody, this.charset, this.address, prepare, this.watch, this.readTimeoutSecond, this.writeTimeoutSecond, contextPath);
responsePool.setCreator((Object... params) responsePool.setCreator((Object... params)
-> new HttpResponse(httpcontext, new HttpRequest(httpcontext, httpcontext.jsonFactory), addHeaders, setHeaders, defCookie)); -> new HttpResponse(httpcontext, new HttpRequest(httpcontext, httpcontext.jsonFactory, addrHeader), addHeaders, setHeaders, defCookie));
return httpcontext; return httpcontext;
} }

View File

@@ -6,11 +6,11 @@
package com.wentch.redkale.net.http; package com.wentch.redkale.net.http;
import java.io.*; import java.io.*;
import java.nio.charset.Charset; import java.nio.charset.*;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
import java.util.logging.*; import java.util.logging.*;
import java.util.regex.Pattern; import java.util.regex.*;
/** /**
* *
@@ -30,7 +30,9 @@ public final class MultiContext {
private final String boundary; private final String boundary;
private final ByteArrayOutputStream buf = new ByteArrayOutputStream(); private final byte[] endboundarray;
private final ByteArray buf = new ByteArray(64);
private final Map<String, String> parameters = new HashMap<>(); private final Map<String, String> parameters = new HashMap<>();
@@ -65,6 +67,7 @@ public final class MultiContext {
this.charset = charsetName == null ? UTF8 : charsetName; this.charset = charsetName == null ? UTF8 : charsetName;
this.contentType = contentType.trim(); this.contentType = contentType.trim();
this.boundary = parseBoundary(this.contentType); this.boundary = parseBoundary(this.contentType);
this.endboundarray = ("--" + this.boundary + "--").getBytes();
this.in = in instanceof BufferedInputStream ? in : new BufferedInputStream(in); this.in = in instanceof BufferedInputStream ? in : new BufferedInputStream(in);
this.fielnamePattern = fielnameRegex == null || fielnameRegex.isEmpty() ? null : Pattern.compile(fielnameRegex); this.fielnamePattern = fielnameRegex == null || fielnameRegex.isEmpty() ? null : Pattern.compile(fielnameRegex);
} }
@@ -117,7 +120,7 @@ public final class MultiContext {
public Iterable<MultiPart> listMultiPart() throws IOException { public Iterable<MultiPart> listMultiPart() throws IOException {
if (!isMultipart()) return emptyIterable; if (!isMultipart()) return emptyIterable;
final boolean debug = false; final boolean debug = true;
final String boundarystr = "--" + this.boundary; final String boundarystr = "--" + this.boundary;
final Pattern fielnameReg = this.fielnamePattern; final Pattern fielnameReg = this.fielnamePattern;
final String endboundary = boundarystr + "--"; final String endboundary = boundarystr + "--";
@@ -139,7 +142,7 @@ public final class MultiContext {
lastentry.skip(); lastentry.skip();
if (finaled.get()) return false; if (finaled.get()) return false;
} }
if (boundaryline == null) boundaryline = readLine(); if (boundaryline == null) boundaryline = readBoundary();
//if (debug) System.out.print("boundaryline=" + boundaryline + " "); //if (debug) System.out.print("boundaryline=" + boundaryline + " ");
if (endboundary.equals(boundaryline) || !boundarystr.equals(boundaryline)) { //结尾或异常 if (endboundary.equals(boundaryline) || !boundarystr.equals(boundaryline)) { //结尾或异常
lastentry = null; lastentry = null;
@@ -240,16 +243,32 @@ public final class MultiContext {
} }
private String readLine() throws IOException { private String readLine() throws IOException {
int lasted = '\r'; return readLine(false);
buf.reset(); }
private String readBoundary() throws IOException {
return readLine(true);
}
private String readLine(boolean bd) throws IOException { // bd : 是否是读取boundary
byte lasted = '\r';
buf.clear();
final int bc = this.endboundarray.length;
int c = 0;
for (;;) { for (;;) {
int b = in.read(); int b = in.read();
c++;
if (b == -1 || (lasted == '\r' && b == '\n')) break; if (b == -1 || (lasted == '\r' && b == '\n')) break;
if (lasted != '\r') buf.write(lasted); if (lasted != '\r') buf.add(lasted);
lasted = b; lasted = (byte) b;
if (bd && bc == c) {
buf.add(lasted);
if (buf.equal(this.endboundarray)) break;
buf.removeLastByte();
} }
if (buf.size() == 0) return ""; }
return buf.toString(this.charset.name()).trim(); if (buf.count() == 0) return "";
return buf.toString(this.charset).trim();
} }
private static String parseValue(final String str, String name) { private static String parseValue(final String str, String name) {

View File

@@ -26,7 +26,7 @@ public final class SncpResponse extends Response<SncpRequest> {
public static final int RETCODE_THROWEXCEPTION = 10011; //内部异常 public static final int RETCODE_THROWEXCEPTION = 10011; //内部异常
public static ObjectPool<Response> createPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator<Response> creator) { public static ObjectPool<Response> createPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator<Response> creator) {
return new ObjectPool<>(creatCounter, cycleCounter, max, creator, (x) -> ((SncpResponse) x).recycle()); return new ObjectPool<>(creatCounter, cycleCounter, max, creator, (x)-> ((SncpResponse) x).prepare(), (x) -> ((SncpResponse) x).recycle());
} }
protected SncpResponse(Context context, SncpRequest request) { protected SncpResponse(Context context, SncpRequest request) {

View File

@@ -42,7 +42,7 @@ public final class SncpServer extends Server {
AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Buffer.cycleCounter"); AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Buffer.cycleCounter");
int rcapacity = Math.max(this.capacity, 8 * 1024); int rcapacity = Math.max(this.capacity, 8 * 1024);
ObjectPool<ByteBuffer> bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, this.bufferPoolSize, ObjectPool<ByteBuffer> bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, this.bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), (e) -> { (Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false; if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false;
e.clear(); e.clear();
return true; return true;

View File

@@ -25,32 +25,35 @@ public final class ObjectPool<T> {
private Creator<T> creator; private Creator<T> creator;
private final Consumer<T> prepare;
private final Predicate<T> recycler; private final Predicate<T> recycler;
private final AtomicLong creatCounter; private final AtomicLong creatCounter;
private final AtomicLong cycleCounter; private final AtomicLong cycleCounter;
public ObjectPool(Class<T> clazz, Predicate<T> recycler) { public ObjectPool(Class<T> clazz, Consumer<T> prepare, Predicate<T> recycler) {
this(2, clazz, recycler); this(2, clazz, prepare, recycler);
} }
public ObjectPool(int max, Class<T> clazz, Predicate<T> recycler) { public ObjectPool(int max, Class<T> clazz, Consumer<T> prepare, Predicate<T> recycler) {
this(max, Creator.create(clazz), recycler); this(max, Creator.create(clazz), prepare, recycler);
} }
public ObjectPool(Creator<T> creator, Predicate<T> recycler) { public ObjectPool(Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
this(2, creator, recycler); this(2, creator, prepare, recycler);
} }
public ObjectPool(int max, Creator<T> creator, Predicate<T> recycler) { public ObjectPool(int max, Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
this(null, null, max, creator, recycler); this(null, null, max, creator, prepare, recycler);
} }
public ObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator<T> creator, Predicate<T> recycler) { public ObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator<T> creator, Consumer<T> prepare, Predicate<T> recycler) {
this.creatCounter = creatCounter; this.creatCounter = creatCounter;
this.cycleCounter = cycleCounter; this.cycleCounter = cycleCounter;
this.creator = creator; this.creator = creator;
this.prepare = prepare;
this.recycler = recycler; this.recycler = recycler;
this.queue = new ArrayBlockingQueue<>(Math.max(Runtime.getRuntime().availableProcessors() * 2, max)); this.queue = new ArrayBlockingQueue<>(Math.max(Runtime.getRuntime().availableProcessors() * 2, max));
this.debug = logger.isLoggable(Level.FINER); this.debug = logger.isLoggable(Level.FINER);
@@ -66,6 +69,7 @@ public final class ObjectPool<T> {
if (creatCounter != null) creatCounter.incrementAndGet(); if (creatCounter != null) creatCounter.incrementAndGet();
result = this.creator.create(); result = this.creator.create();
} }
if(prepare != null) prepare.accept(result);
return result; return result;
} }