diff --git a/src/com/wentch/redkale/convert/bson/BsonReader.java b/src/com/wentch/redkale/convert/bson/BsonReader.java index 048a801fc..aabedb4dc 100644 --- a/src/com/wentch/redkale/convert/bson/BsonReader.java +++ b/src/com/wentch/redkale/convert/bson/BsonReader.java @@ -35,7 +35,7 @@ public final class BsonReader implements Reader { } public static ObjectPool createPool(int max) { - return new ObjectPool<>(max, (Object... params) -> new BsonReader(), (x) -> x.recycle()); + return new ObjectPool<>(max, (Object... params) -> new BsonReader(), null, (x) -> x.recycle()); } public BsonReader(byte[] bytes) { diff --git a/src/com/wentch/redkale/convert/bson/BsonWriter.java b/src/com/wentch/redkale/convert/bson/BsonWriter.java index f8d51f3a2..4145d7bb2 100644 --- a/src/com/wentch/redkale/convert/bson/BsonWriter.java +++ b/src/com/wentch/redkale/convert/bson/BsonWriter.java @@ -21,7 +21,7 @@ public final class BsonWriter implements Writer { private byte[] content; public static ObjectPool createPool(int max) { - return new ObjectPool<>(max, (Object... params) -> new BsonWriter(), (x) -> x.recycle()); + return new ObjectPool<>(max, (Object... params) -> new BsonWriter(), null, (x) -> x.recycle()); } public byte[] toArray() { diff --git a/src/com/wentch/redkale/convert/json/JsonReader.java b/src/com/wentch/redkale/convert/json/JsonReader.java index 3fbf37d0f..0cbf7eb72 100644 --- a/src/com/wentch/redkale/convert/json/JsonReader.java +++ b/src/com/wentch/redkale/convert/json/JsonReader.java @@ -22,7 +22,7 @@ public final class JsonReader implements Reader { private int limit; public static ObjectPool createPool(int max) { - return new ObjectPool<>(max, (Object... params) -> new JsonReader(), (x) -> x.recycle()); + return new ObjectPool<>(max, (Object... params) -> new JsonReader(), null, (x) -> x.recycle()); } public JsonReader() { diff --git a/src/com/wentch/redkale/convert/json/JsonWriter.java b/src/com/wentch/redkale/convert/json/JsonWriter.java index c980856a3..9fad1d692 100644 --- a/src/com/wentch/redkale/convert/json/JsonWriter.java +++ b/src/com/wentch/redkale/convert/json/JsonWriter.java @@ -27,7 +27,7 @@ public final class JsonWriter implements Writer { private char[] content; public static ObjectPool createPool(int max) { - return new ObjectPool<>(max, (Object... params) -> new JsonWriter(), (x) -> x.recycle()); + return new ObjectPool<>(max, (Object... params) -> new JsonWriter(), null, (x) -> x.recycle()); } public JsonWriter() { diff --git a/src/com/wentch/redkale/net/Response.java b/src/com/wentch/redkale/net/Response.java index ce8abfe60..f1258b30c 100644 --- a/src/com/wentch/redkale/net/Response.java +++ b/src/com/wentch/redkale/net/Response.java @@ -22,6 +22,8 @@ public abstract class Response { protected AsyncConnection channel; + private boolean inited = true; + private final CompletionHandler finishHandler = new CompletionHandler() { @Override @@ -87,7 +89,12 @@ public abstract class Response { return ch; } + protected void prepare() { + inited = true; + } + protected boolean recycle() { + if (!inited) return false; boolean keepAlive = request.keepAlive; request.recycle(); if (channel != null) { @@ -101,6 +108,7 @@ public abstract class Response { } channel = null; } + this.inited = false; return true; } diff --git a/src/com/wentch/redkale/net/Transport.java b/src/com/wentch/redkale/net/Transport.java index e187ca69a..e9af5e96b 100644 --- a/src/com/wentch/redkale/net/Transport.java +++ b/src/com/wentch/redkale/net/Transport.java @@ -52,7 +52,7 @@ public final class Transport { AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber(Transport.class.getSimpleName() + "_" + protocol + ".Buffer.cycleCounter"); int rcapacity = 8192; this.bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize, - (Object... params) -> ByteBuffer.allocateDirect(rcapacity), (e) -> { + (Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> { if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false; e.clear(); return true; diff --git a/src/com/wentch/redkale/net/http/ByteArray.java b/src/com/wentch/redkale/net/http/ByteArray.java index c86cdbcf4..eac9cbe38 100644 --- a/src/com/wentch/redkale/net/http/ByteArray.java +++ b/src/com/wentch/redkale/net/http/ByteArray.java @@ -5,9 +5,9 @@ */ package com.wentch.redkale.net.http; -import com.wentch.redkale.util.Utility; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; +import com.wentch.redkale.util.*; +import java.nio.*; +import java.nio.charset.*; /** * @@ -76,6 +76,10 @@ public final class ByteArray { return -1; } + public void removeLastByte() { + if (count > 0) count--; + } + public void add(byte value) { if (count >= content.length - 1) { byte[] ns = new byte[content.length + 8]; @@ -101,6 +105,10 @@ public final class ByteArray { return new String(content, 0, count); } + public String toString(final Charset charset) { + return toString(0, count, charset); + } + public String toString(final int offset, int len, final Charset charset) { if (charset == null) return new String(Utility.decodeUTF8(content, offset, len)); return new String(content, offset, len, charset); diff --git a/src/com/wentch/redkale/net/http/HttpRequest.java b/src/com/wentch/redkale/net/http/HttpRequest.java index d18dd7e6e..f3f534ce1 100644 --- a/src/com/wentch/redkale/net/http/HttpRequest.java +++ b/src/com/wentch/redkale/net/http/HttpRequest.java @@ -58,9 +58,12 @@ public final class HttpRequest extends Request { protected boolean boundary = false; - protected HttpRequest(Context context, JsonFactory factory) { + private final String remoteAddrHeader; + + protected HttpRequest(Context context, JsonFactory factory, String remoteAddrHeader) { super(context); this.convert = factory.getConvert(); + this.remoteAddrHeader = remoteAddrHeader; } protected void setKeepAlive(boolean keepAlive) { @@ -222,6 +225,10 @@ public final class HttpRequest extends Request { } public String getRemoteAddr() { + if (remoteAddrHeader != null) { + String val = getHeader(remoteAddrHeader); + if (val != null) return val; + } return String.valueOf(getRemoteAddress()); } @@ -235,7 +242,7 @@ public final class HttpRequest extends Request { return this.getClass().getSimpleName() + "{method:" + this.method + ", requestURI:" + this.requestURI + ", contentType:" + this.contentType + ", connection:" + this.connection + ", protocol:" + this.protocol + ", contentLength:" + this.contentLength + ", cookiestr:" + this.cookiestr - + ", host:" + this.host + ", params:" + this.params + ", header:" + this.header + "}"; + + ", host:" + this.host + ", params:" + this.params + ", header:" + this.header + "body:" + (array == null ? "null" : array.toString()) + "}"; } public final MultiContext getMultiContext() { diff --git a/src/com/wentch/redkale/net/http/HttpResponse.java b/src/com/wentch/redkale/net/http/HttpResponse.java index 1269fe4a0..8a1089ae6 100644 --- a/src/com/wentch/redkale/net/http/HttpResponse.java +++ b/src/com/wentch/redkale/net/http/HttpResponse.java @@ -106,7 +106,7 @@ public final class HttpResponse extends Response { private final HttpCookie defcookie; public static ObjectPool createPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator creator) { - return new ObjectPool<>(creatCounter, cycleCounter, max, creator, (x) -> ((HttpResponse) x).recycle()); + return new ObjectPool<>(creatCounter, cycleCounter, max, creator, (x) -> ((HttpResponse) x).prepare(), (x) -> ((HttpResponse) x).recycle()); } protected HttpResponse(HttpContext context, HttpRequest request, String[][] defaultAddHeaders, String[][] defaultSetHeaders, HttpCookie defcookie) { diff --git a/src/com/wentch/redkale/net/http/HttpServer.java b/src/com/wentch/redkale/net/http/HttpServer.java index 6d4bf97e1..241395078 100644 --- a/src/com/wentch/redkale/net/http/HttpServer.java +++ b/src/com/wentch/redkale/net/http/HttpServer.java @@ -51,7 +51,7 @@ public final class HttpServer extends Server { AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Buffer.cycleCounter"); int rcapacity = Math.max(this.capacity, 8 * 1024); ObjectPool bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, this.bufferPoolSize, - (Object... params) -> ByteBuffer.allocateDirect(rcapacity), (e) -> { + (Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> { if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false; e.clear(); return true; @@ -64,7 +64,21 @@ public final class HttpServer extends Server { String[][] defaultAddHeaders = null; String[][] defaultSetHeaders = null; HttpCookie defaultCookie = null; + String remoteAddrHeader = null; if (config != null) { + AnyValue reqs = config == null ? null : config.getAnyValue("request"); + if (reqs != null) { + AnyValue raddr = reqs.getAnyValue("remoteaddr"); + remoteAddrHeader = raddr == null ? null : raddr.getValue("value"); + if (remoteAddrHeader != null) { + if (remoteAddrHeader.startsWith("request.headers.")) { + remoteAddrHeader = remoteAddrHeader.substring("request.headers.".length()); + } else { + remoteAddrHeader = null; + } + } + } + AnyValue resps = config == null ? null : config.getAnyValue("response"); if (resps != null) { AnyValue[] addHeaders = resps.getAnyValues("addheader"); @@ -110,13 +124,14 @@ public final class HttpServer extends Server { final String[][] addHeaders = defaultAddHeaders; final String[][] setHeaders = defaultSetHeaders; final HttpCookie defCookie = defaultCookie; + final String addrHeader = remoteAddrHeader; AtomicLong createResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Response.creatCounter"); AtomicLong cycleResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Response.cycleCounter"); ObjectPool responsePool = HttpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null); HttpContext httpcontext = new HttpContext(this.serverStartTime, this.logger, executor, bufferPool, responsePool, this.maxbody, this.charset, this.address, prepare, this.watch, this.readTimeoutSecond, this.writeTimeoutSecond, contextPath); responsePool.setCreator((Object... params) - -> new HttpResponse(httpcontext, new HttpRequest(httpcontext, httpcontext.jsonFactory), addHeaders, setHeaders, defCookie)); + -> new HttpResponse(httpcontext, new HttpRequest(httpcontext, httpcontext.jsonFactory, addrHeader), addHeaders, setHeaders, defCookie)); return httpcontext; } diff --git a/src/com/wentch/redkale/net/http/MultiContext.java b/src/com/wentch/redkale/net/http/MultiContext.java index 8eacb9096..12436a939 100644 --- a/src/com/wentch/redkale/net/http/MultiContext.java +++ b/src/com/wentch/redkale/net/http/MultiContext.java @@ -6,11 +6,11 @@ package com.wentch.redkale.net.http; import java.io.*; -import java.nio.charset.Charset; +import java.nio.charset.*; import java.util.*; import java.util.concurrent.atomic.*; import java.util.logging.*; -import java.util.regex.Pattern; +import java.util.regex.*; /** * @@ -30,7 +30,9 @@ public final class MultiContext { private final String boundary; - private final ByteArrayOutputStream buf = new ByteArrayOutputStream(); + private final byte[] endboundarray; + + private final ByteArray buf = new ByteArray(64); private final Map parameters = new HashMap<>(); @@ -65,6 +67,7 @@ public final class MultiContext { this.charset = charsetName == null ? UTF8 : charsetName; this.contentType = contentType.trim(); this.boundary = parseBoundary(this.contentType); + this.endboundarray = ("--" + this.boundary + "--").getBytes(); this.in = in instanceof BufferedInputStream ? in : new BufferedInputStream(in); this.fielnamePattern = fielnameRegex == null || fielnameRegex.isEmpty() ? null : Pattern.compile(fielnameRegex); } @@ -117,7 +120,7 @@ public final class MultiContext { public Iterable listMultiPart() throws IOException { if (!isMultipart()) return emptyIterable; - final boolean debug = false; + final boolean debug = true; final String boundarystr = "--" + this.boundary; final Pattern fielnameReg = this.fielnamePattern; final String endboundary = boundarystr + "--"; @@ -139,7 +142,7 @@ public final class MultiContext { lastentry.skip(); if (finaled.get()) return false; } - if (boundaryline == null) boundaryline = readLine(); + if (boundaryline == null) boundaryline = readBoundary(); //if (debug) System.out.print("boundaryline=" + boundaryline + " "); if (endboundary.equals(boundaryline) || !boundarystr.equals(boundaryline)) { //结尾或异常 lastentry = null; @@ -240,16 +243,32 @@ public final class MultiContext { } private String readLine() throws IOException { - int lasted = '\r'; - buf.reset(); + return readLine(false); + } + + private String readBoundary() throws IOException { + return readLine(true); + } + + private String readLine(boolean bd) throws IOException { // bd : 是否是读取boundary + byte lasted = '\r'; + buf.clear(); + final int bc = this.endboundarray.length; + int c = 0; for (;;) { int b = in.read(); + c++; if (b == -1 || (lasted == '\r' && b == '\n')) break; - if (lasted != '\r') buf.write(lasted); - lasted = b; + if (lasted != '\r') buf.add(lasted); + lasted = (byte) b; + if (bd && bc == c) { + buf.add(lasted); + if (buf.equal(this.endboundarray)) break; + buf.removeLastByte(); + } } - if (buf.size() == 0) return ""; - return buf.toString(this.charset.name()).trim(); + if (buf.count() == 0) return ""; + return buf.toString(this.charset).trim(); } private static String parseValue(final String str, String name) { diff --git a/src/com/wentch/redkale/net/sncp/SncpResponse.java b/src/com/wentch/redkale/net/sncp/SncpResponse.java index f37c5ec96..95fee505c 100644 --- a/src/com/wentch/redkale/net/sncp/SncpResponse.java +++ b/src/com/wentch/redkale/net/sncp/SncpResponse.java @@ -26,7 +26,7 @@ public final class SncpResponse extends Response { public static final int RETCODE_THROWEXCEPTION = 10011; //内部异常 public static ObjectPool createPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator creator) { - return new ObjectPool<>(creatCounter, cycleCounter, max, creator, (x) -> ((SncpResponse) x).recycle()); + return new ObjectPool<>(creatCounter, cycleCounter, max, creator, (x)-> ((SncpResponse) x).prepare(), (x) -> ((SncpResponse) x).recycle()); } protected SncpResponse(Context context, SncpRequest request) { diff --git a/src/com/wentch/redkale/net/sncp/SncpServer.java b/src/com/wentch/redkale/net/sncp/SncpServer.java index 86b0a6c41..09a91a1b0 100644 --- a/src/com/wentch/redkale/net/sncp/SncpServer.java +++ b/src/com/wentch/redkale/net/sncp/SncpServer.java @@ -42,7 +42,7 @@ public final class SncpServer extends Server { AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Buffer.cycleCounter"); int rcapacity = Math.max(this.capacity, 8 * 1024); ObjectPool bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, this.bufferPoolSize, - (Object... params) -> ByteBuffer.allocateDirect(rcapacity), (e) -> { + (Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> { if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false; e.clear(); return true; diff --git a/src/com/wentch/redkale/util/ObjectPool.java b/src/com/wentch/redkale/util/ObjectPool.java index aadde2650..f9f6ead35 100644 --- a/src/com/wentch/redkale/util/ObjectPool.java +++ b/src/com/wentch/redkale/util/ObjectPool.java @@ -25,32 +25,35 @@ public final class ObjectPool { private Creator creator; + private final Consumer prepare; + private final Predicate recycler; private final AtomicLong creatCounter; private final AtomicLong cycleCounter; - public ObjectPool(Class clazz, Predicate recycler) { - this(2, clazz, recycler); + public ObjectPool(Class clazz, Consumer prepare, Predicate recycler) { + this(2, clazz, prepare, recycler); } - public ObjectPool(int max, Class clazz, Predicate recycler) { - this(max, Creator.create(clazz), recycler); + public ObjectPool(int max, Class clazz, Consumer prepare, Predicate recycler) { + this(max, Creator.create(clazz), prepare, recycler); } - public ObjectPool(Creator creator, Predicate recycler) { - this(2, creator, recycler); + public ObjectPool(Creator creator, Consumer prepare, Predicate recycler) { + this(2, creator, prepare, recycler); } - public ObjectPool(int max, Creator creator, Predicate recycler) { - this(null, null, max, creator, recycler); + public ObjectPool(int max, Creator creator, Consumer prepare, Predicate recycler) { + this(null, null, max, creator, prepare, recycler); } - public ObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator creator, Predicate recycler) { + public ObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator creator, Consumer prepare, Predicate recycler) { this.creatCounter = creatCounter; this.cycleCounter = cycleCounter; this.creator = creator; + this.prepare = prepare; this.recycler = recycler; this.queue = new ArrayBlockingQueue<>(Math.max(Runtime.getRuntime().availableProcessors() * 2, max)); this.debug = logger.isLoggable(Level.FINER); @@ -65,7 +68,8 @@ public final class ObjectPool { if (result == null) { if (creatCounter != null) creatCounter.incrementAndGet(); result = this.creator.create(); - } + } + if(prepare != null) prepare.accept(result); return result; }