From d81461ccc20910995b7adb690762000bccaba055 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=9C=B0=E5=B9=B3=E7=BA=BF?= <22250530@qq.com> Date: Mon, 16 Mar 2015 18:55:36 +0800 Subject: [PATCH] --- .../redkale/convert/bson/BsonConvert.java | 11 +- .../redkale/convert/bson/BsonReader.java | 25 +- .../redkale/convert/bson/BsonWriter.java | 22 +- .../redkale/convert/json/JsonConvert.java | 11 +- .../redkale/convert/json/JsonReader.java | 25 +- .../redkale/convert/json/JsonWriter.java | 20 +- src/com/wentch/redkale/net/BufferPool.java | 61 ----- src/com/wentch/redkale/net/ChunkBuffer.java | 228 ------------------ src/com/wentch/redkale/net/Context.java | 17 +- src/com/wentch/redkale/net/PrepareRunner.java | 7 +- .../wentch/redkale/net/PrepareServlet.java | 12 +- src/com/wentch/redkale/net/Request.java | 6 +- src/com/wentch/redkale/net/Response.java | 5 +- src/com/wentch/redkale/net/ResponsePool.java | 69 ------ src/com/wentch/redkale/net/Transport.java | 17 +- .../wentch/redkale/net/http/HttpContext.java | 29 +-- .../wentch/redkale/net/http/HttpRequest.java | 11 +- .../redkale/net/http/HttpResourceServlet.java | 4 +- .../wentch/redkale/net/http/HttpResponse.java | 45 ++-- .../wentch/redkale/net/http/HttpServer.java | 24 +- .../wentch/redkale/net/sncp/SncpClient.java | 119 ++++++--- .../wentch/redkale/net/sncp/SncpContext.java | 77 ++++-- .../wentch/redkale/net/sncp/SncpRequest.java | 84 +++++-- .../wentch/redkale/net/sncp/SncpResponse.java | 17 +- .../wentch/redkale/net/sncp/SncpServer.java | 26 +- src/com/wentch/redkale/util/ObjectPool.java | 63 +++-- src/com/wentch/redkale/util/Sheet.java | 6 +- 27 files changed, 424 insertions(+), 617 deletions(-) delete mode 100644 src/com/wentch/redkale/net/BufferPool.java delete mode 100644 src/com/wentch/redkale/net/ChunkBuffer.java delete mode 100644 src/com/wentch/redkale/net/ResponsePool.java diff --git a/src/com/wentch/redkale/convert/bson/BsonConvert.java b/src/com/wentch/redkale/convert/bson/BsonConvert.java index dd3b27539..08e1952d4 100644 --- a/src/com/wentch/redkale/convert/bson/BsonConvert.java +++ b/src/com/wentch/redkale/convert/bson/BsonConvert.java @@ -5,10 +5,9 @@ */ package com.wentch.redkale.convert.bson; -import com.wentch.redkale.convert.Convert; -import com.wentch.redkale.convert.Factory; -import com.wentch.redkale.util.ObjectPool; -import java.lang.reflect.Type; +import com.wentch.redkale.convert.*; +import com.wentch.redkale.util.*; +import java.lang.reflect.*; /** * @@ -16,9 +15,9 @@ import java.lang.reflect.Type; */ public final class BsonConvert extends Convert { - private static final ObjectPool readerPool = new ObjectPool<>(Integer.getInteger("convert.bson.pool.size", 16), BsonReader.class); + private static final ObjectPool readerPool = BsonReader.createPool(Integer.getInteger("convert.bson.pool.size", 16)); - private static final ObjectPool writerPool = new ObjectPool<>(Integer.getInteger("convert.bson.pool.size", 16), BsonWriter.class); + private static final ObjectPool writerPool = BsonWriter.createPool(Integer.getInteger("convert.bson.pool.size", 16)); protected BsonConvert(Factory factory) { super(factory); diff --git a/src/com/wentch/redkale/convert/bson/BsonReader.java b/src/com/wentch/redkale/convert/bson/BsonReader.java index 8095f6035..048a801fc 100644 --- a/src/com/wentch/redkale/convert/bson/BsonReader.java +++ b/src/com/wentch/redkale/convert/bson/BsonReader.java @@ -5,18 +5,15 @@ */ package com.wentch.redkale.convert.bson; -import com.wentch.redkale.convert.ConvertException; -import com.wentch.redkale.convert.DeMember; -import com.wentch.redkale.convert.Reader; -import com.wentch.redkale.util.ObjectPool.Poolable; -import com.wentch.redkale.util.Utility; -import java.util.concurrent.atomic.AtomicInteger; +import com.wentch.redkale.convert.*; +import com.wentch.redkale.util.*; +import java.util.concurrent.atomic.*; /** * * @author zhangjx */ -public final class BsonReader implements Reader, Poolable { +public final class BsonReader implements Reader { public static final short SIGN_OBJECTB = (short) 0xBB; @@ -37,6 +34,10 @@ public final class BsonReader implements Reader, Poolable { public BsonReader() { } + public static ObjectPool createPool(int max) { + return new ObjectPool<>(max, (Object... params) -> new BsonReader(), (x) -> x.recycle()); + } + public BsonReader(byte[] bytes) { setBytes(bytes, 0, bytes.length); } @@ -55,19 +56,15 @@ public final class BsonReader implements Reader, Poolable { //this.limit = start + len - 1; } - @Override - public void prepare() { - } - - @Override - public void release() { + protected boolean recycle() { this.position = -1; //this.limit = -1; this.content = null; + return true; } public void close() { - this.release(); + this.recycle(); } /** diff --git a/src/com/wentch/redkale/convert/bson/BsonWriter.java b/src/com/wentch/redkale/convert/bson/BsonWriter.java index d64382a0b..f8d51f3a2 100644 --- a/src/com/wentch/redkale/convert/bson/BsonWriter.java +++ b/src/com/wentch/redkale/convert/bson/BsonWriter.java @@ -5,18 +5,14 @@ */ package com.wentch.redkale.convert.bson; -import com.wentch.redkale.util.Utility; -import com.wentch.redkale.util.Attribute; -import com.wentch.redkale.convert.ConvertException; -import com.wentch.redkale.convert.Reader; -import com.wentch.redkale.convert.Writer; -import com.wentch.redkale.util.ObjectPool.Poolable; +import com.wentch.redkale.convert.*; +import com.wentch.redkale.util.*; /** * * @author zhangjx */ -public final class BsonWriter implements Writer, Poolable { +public final class BsonWriter implements Writer { private static final int defaultSize = Integer.getInteger("convert.bson.writer.buffer.defsize", 1024); @@ -24,6 +20,10 @@ public final class BsonWriter implements Writer, Poolable { private byte[] content; + public static ObjectPool createPool(int max) { + return new ObjectPool<>(max, (Object... params) -> new BsonWriter(), (x) -> x.recycle()); + } + public byte[] toArray() { if (count == content.length) return content; byte[] newdata = new byte[count]; @@ -75,16 +75,12 @@ public final class BsonWriter implements Writer, Poolable { count += len; } - @Override - public void prepare() { - } - - @Override - public void release() { + protected boolean recycle() { this.count = 0; if (this.content.length > defaultSize) { this.content = new byte[defaultSize]; } + return true; } //------------------------------------------------------------------------ diff --git a/src/com/wentch/redkale/convert/json/JsonConvert.java b/src/com/wentch/redkale/convert/json/JsonConvert.java index 3d69fe442..29b830567 100644 --- a/src/com/wentch/redkale/convert/json/JsonConvert.java +++ b/src/com/wentch/redkale/convert/json/JsonConvert.java @@ -5,10 +5,9 @@ */ package com.wentch.redkale.convert.json; -import com.wentch.redkale.util.Utility; -import com.wentch.redkale.util.ObjectPool; -import com.wentch.redkale.convert.Convert; -import java.lang.reflect.Type; +import com.wentch.redkale.convert.*; +import com.wentch.redkale.util.*; +import java.lang.reflect.*; /** * @@ -17,9 +16,9 @@ import java.lang.reflect.Type; @SuppressWarnings("unchecked") public final class JsonConvert extends Convert { - private static final ObjectPool readerPool = new ObjectPool<>(Integer.getInteger("convert.json.pool.size", 16), JsonReader.class); + private static final ObjectPool readerPool = JsonReader.createPool(Integer.getInteger("convert.json.pool.size", 16)); - private static final ObjectPool writerPool = new ObjectPool<>(Integer.getInteger("convert.json.pool.size", 16), JsonWriter.class); + private static final ObjectPool writerPool = JsonWriter.createPool(Integer.getInteger("convert.json.pool.size", 16)); protected JsonConvert(JsonFactory factory) { super(factory); diff --git a/src/com/wentch/redkale/convert/json/JsonReader.java b/src/com/wentch/redkale/convert/json/JsonReader.java index 2d6aa1c63..3fbf37d0f 100644 --- a/src/com/wentch/redkale/convert/json/JsonReader.java +++ b/src/com/wentch/redkale/convert/json/JsonReader.java @@ -5,18 +5,15 @@ */ package com.wentch.redkale.convert.json; -import com.wentch.redkale.convert.ConvertException; -import com.wentch.redkale.convert.DeMember; -import com.wentch.redkale.convert.Reader; -import com.wentch.redkale.util.ObjectPool.Poolable; -import com.wentch.redkale.util.Utility; -import java.util.concurrent.atomic.AtomicInteger; +import com.wentch.redkale.convert.*; +import com.wentch.redkale.util.*; +import java.util.concurrent.atomic.*; /** * * @author zhangjx */ -public final class JsonReader implements Reader, Poolable { +public final class JsonReader implements Reader { private int position = -1; @@ -24,6 +21,10 @@ public final class JsonReader implements Reader, Poolable { private int limit; + public static ObjectPool createPool(int max) { + return new ObjectPool<>(max, (Object... params) -> new JsonReader(), (x) -> x.recycle()); + } + public JsonReader() { } @@ -53,19 +54,15 @@ public final class JsonReader implements Reader, Poolable { this.limit = start + len - 1; } - @Override - public void prepare() { - } - - @Override - public void release() { + protected boolean recycle() { this.position = -1; this.limit = -1; this.text = null; + return true; } public void close() { - this.release(); + this.recycle(); } /** diff --git a/src/com/wentch/redkale/convert/json/JsonWriter.java b/src/com/wentch/redkale/convert/json/JsonWriter.java index 0f101613b..c980856a3 100644 --- a/src/com/wentch/redkale/convert/json/JsonWriter.java +++ b/src/com/wentch/redkale/convert/json/JsonWriter.java @@ -5,10 +5,8 @@ */ package com.wentch.redkale.convert.json; -import com.wentch.redkale.util.Attribute; -import com.wentch.redkale.util.Utility; -import com.wentch.redkale.convert.Writer; -import com.wentch.redkale.util.ObjectPool.Poolable; +import com.wentch.redkale.convert.*; +import com.wentch.redkale.util.*; /** * @@ -16,7 +14,7 @@ import com.wentch.redkale.util.ObjectPool.Poolable; * * @author zhangjx */ -public final class JsonWriter implements Writer, Poolable { +public final class JsonWriter implements Writer { private static final char[] CHARS_TUREVALUE = "true".toCharArray(); @@ -28,6 +26,10 @@ public final class JsonWriter implements Writer, Poolable { private char[] content; + public static ObjectPool createPool(int max) { + return new ObjectPool<>(max, (Object... params) -> new JsonWriter(), (x) -> x.recycle()); + } + public JsonWriter() { this(defaultSize); } @@ -87,16 +89,12 @@ public final class JsonWriter implements Writer, Poolable { if (quote) content[count++] = '"'; } - @Override - public void prepare() { - } - - @Override - public void release() { + protected boolean recycle() { this.count = 0; if (this.content.length > defaultSize) { this.content = new char[defaultSize]; } + return true; } public char[] toArray() { diff --git a/src/com/wentch/redkale/net/BufferPool.java b/src/com/wentch/redkale/net/BufferPool.java deleted file mode 100644 index bfb477f4a..000000000 --- a/src/com/wentch/redkale/net/BufferPool.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package com.wentch.redkale.net; - -import java.nio.ByteBuffer; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.atomic.*; - -/** - * - * @author zhangjx - */ -public final class BufferPool { - - private final int capacity; - - private final ArrayBlockingQueue queue; - - private final AtomicLong creatCounter; - - private final AtomicLong cycleCounter; - - public BufferPool(AtomicLong creatCounter, AtomicLong cycleCounter, int capacity) { - this(creatCounter, cycleCounter, capacity, 0); - } - - public BufferPool(AtomicLong creatCounter, AtomicLong cycleCounter, int capacity, int max) { - this.capacity = capacity; - this.queue = new ArrayBlockingQueue<>(Math.max(32, max)); - this.creatCounter = creatCounter; - this.cycleCounter = cycleCounter; - } - - public ByteBuffer poll() { - ByteBuffer result = queue.poll(); - if (result == null) { - creatCounter.incrementAndGet(); - result = ByteBuffer.allocateDirect(capacity); - } - return result; - } - - public void offer(final ByteBuffer e) { - if (e != null && !e.isReadOnly() && e.capacity() == this.capacity) { - cycleCounter.incrementAndGet(); - e.clear(); - queue.offer(e); - } - } - - public long getCreatCount() { - return creatCounter.longValue(); - } - - public long getCycleCount() { - return cycleCounter.longValue(); - } -} diff --git a/src/com/wentch/redkale/net/ChunkBuffer.java b/src/com/wentch/redkale/net/ChunkBuffer.java deleted file mode 100644 index 2a571df05..000000000 --- a/src/com/wentch/redkale/net/ChunkBuffer.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package com.wentch.redkale.net; - -import java.nio.*; - -/** - * - * @author zhangjx - */ -public final class ChunkBuffer { - - final ByteBuffer buffer; - - private final BufferPool pool; - - ChunkBuffer(BufferPool pool, ByteBuffer buffer) { - this.pool = pool; - this.buffer = buffer; - } - - public void release() { - //pool.offer(this); - } - - public int limit() { - return buffer.limit(); - } - - public void limit(int limit) { - buffer.limit(limit); - } - - public int position() { - return buffer.position(); - } - - public void position(int position) { - buffer.position(position); - } - - public void clear() { - buffer.clear(); - } - - public ChunkBuffer flip() { - buffer.flip(); - return this; - } - - public int remaining() { - return buffer.remaining(); - } - - public boolean hasRemaining() { - return buffer.hasRemaining(); - } - - public boolean isReadOnly() { - return buffer.isReadOnly(); - } - - public boolean hasArray() { - return buffer.hasArray(); - } - - public byte[] array() { - return buffer.array(); - } - - public int arrayOffset() { - return buffer.arrayOffset(); - } - - public boolean isDirect() { - return buffer.isDirect(); - } - - public ChunkBuffer slice() { - buffer.slice(); - return this; - } - - public ChunkBuffer duplicate() { - buffer.duplicate(); - return this; - } - - public ChunkBuffer asReadOnlyBuffer() { - buffer.asReadOnlyBuffer(); - return this; - } - - public byte get() { - return buffer.get(); - } - - public ChunkBuffer put(byte b) { - buffer.put(b); - return this; - } - - public byte get(int index) { - return buffer.get(index); - } - - public ChunkBuffer put(int index, byte b) { - buffer.put(index, b); - return this; - } - - public ChunkBuffer compact() { - buffer.compact(); - return this; - } - - public char getChar() { - return buffer.getChar(); - } - - public ChunkBuffer putChar(char value) { - buffer.putChar(value); - return this; - } - - public char getChar(int index) { - return buffer.getChar(index); - } - - public ChunkBuffer putChar(int index, char value) { - buffer.putChar(index, value); - return this; - } - - public short getShort() { - return buffer.getShort(); - } - - public ChunkBuffer putShort(short value) { - buffer.putShort(value); - return this; - } - - public short getShort(int index) { - return buffer.getShort(index); - } - - public ChunkBuffer putShort(int index, short value) { - buffer.putShort(index, value); - return this; - } - - public int getInt() { - return buffer.getInt(); - } - - public ChunkBuffer putInt(int value) { - buffer.putInt(value); - return this; - } - - public int getInt(int index) { - return buffer.getInt(index); - } - - public ChunkBuffer putInt(int index, int value) { - buffer.putInt(index, value); - return this; - } - - public long getLong() { - return buffer.getLong(); - } - - public ChunkBuffer putLong(long value) { - buffer.putLong(value); - return this; - } - - public long getLong(int index) { - return buffer.getLong(index); - } - - public ChunkBuffer putLong(int index, long value) { - buffer.putLong(index, value); - return this; - } - - public float getFloat() { - return buffer.getFloat(); - } - - public ChunkBuffer putFloat(float value) { - buffer.putFloat(value); - return this; - } - - public float getFloat(int index) { - return buffer.getFloat(index); - } - - public ChunkBuffer putFloat(int index, float value) { - buffer.putFloat(index, value); - return this; - } - - public double getDouble() { - return buffer.getDouble(); - } - - public ChunkBuffer putDouble(double value) { - buffer.putDouble(value); - return this; - } - - public double getDouble(int index) { - return buffer.getDouble(index); - } - - public ChunkBuffer putDouble(int index, double value) { - buffer.putDouble(index, value); - return this; - } - -} diff --git a/src/com/wentch/redkale/net/Context.java b/src/com/wentch/redkale/net/Context.java index 53e7f756b..248cafd62 100644 --- a/src/com/wentch/redkale/net/Context.java +++ b/src/com/wentch/redkale/net/Context.java @@ -5,11 +5,12 @@ */ package com.wentch.redkale.net; -import com.wentch.redkale.watch.WatchFactory; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.concurrent.ExecutorService; +import com.wentch.redkale.util.*; +import com.wentch.redkale.watch.*; +import java.net.*; +import java.nio.*; +import java.nio.charset.*; +import java.util.concurrent.*; import java.util.logging.*; /** @@ -24,9 +25,9 @@ public class Context { protected final ExecutorService executor; - protected final BufferPool bufferPool; + protected final ObjectPool bufferPool; - protected final ResponsePool responsePool; + protected final ObjectPool responsePool; protected final PrepareServlet prepare; @@ -44,7 +45,7 @@ public class Context { protected final WatchFactory watch; - public Context(long serverStartTime, Logger logger, ExecutorService executor, BufferPool bufferPool, ResponsePool responsePool, + public Context(long serverStartTime, Logger logger, ExecutorService executor, ObjectPool bufferPool, ObjectPool responsePool, final int maxbody, Charset charset, InetSocketAddress address, final PrepareServlet prepare, final WatchFactory watch, final int readTimeoutSecond, final int writeTimeoutSecond) { this.serverStartTime = serverStartTime; diff --git a/src/com/wentch/redkale/net/PrepareRunner.java b/src/com/wentch/redkale/net/PrepareRunner.java index 43a6beb1a..6be6b3d8b 100644 --- a/src/com/wentch/redkale/net/PrepareRunner.java +++ b/src/com/wentch/redkale/net/PrepareRunner.java @@ -5,9 +5,10 @@ */ package com.wentch.redkale.net; -import java.nio.ByteBuffer; +import com.wentch.redkale.util.*; +import java.nio.*; import java.nio.channels.*; -import java.util.logging.Level; +import java.util.logging.*; /** * @@ -31,7 +32,7 @@ public final class PrepareRunner implements Runnable { @Override public void run() { final PrepareServlet prepare = context.prepare; - final ResponsePool responsePool = context.responsePool; + final ObjectPool responsePool = context.responsePool; final ByteBuffer buffer = context.pollBuffer(); if (data != null) { final Response response = responsePool.poll(); diff --git a/src/com/wentch/redkale/net/PrepareServlet.java b/src/com/wentch/redkale/net/PrepareServlet.java index d72ba36df..0568dec2c 100644 --- a/src/com/wentch/redkale/net/PrepareServlet.java +++ b/src/com/wentch/redkale/net/PrepareServlet.java @@ -5,11 +5,11 @@ */ package com.wentch.redkale.net; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.CompletionHandler; +import java.io.*; +import java.nio.*; +import java.nio.channels.*; import java.util.concurrent.atomic.*; -import java.util.logging.Level; +import java.util.logging.*; /** * @@ -28,10 +28,11 @@ public abstract class PrepareServlet> i final int rs = request.readHeader(buffer); if (rs < 0) { response.context.offerBuffer(buffer); - illRequestCounter.incrementAndGet(); + if (rs != Integer.MIN_VALUE) illRequestCounter.incrementAndGet(); response.finish(true); } else if (rs == 0) { response.context.offerBuffer(buffer); + request.prepare(); this.execute(request, response); } else { buffer.clear(); @@ -48,6 +49,7 @@ public abstract class PrepareServlet> i request.channel.read(buffer, buffer, this); } else { response.context.offerBuffer(buffer); + request.prepare(); try { execute(request, response); } catch (Exception e) { diff --git a/src/com/wentch/redkale/net/Request.java b/src/com/wentch/redkale/net/Request.java index 5005bc621..089a814cc 100644 --- a/src/com/wentch/redkale/net/Request.java +++ b/src/com/wentch/redkale/net/Request.java @@ -5,7 +5,7 @@ */ package com.wentch.redkale.net; -import java.nio.ByteBuffer; +import java.nio.*; import java.util.*; /** @@ -29,7 +29,7 @@ public abstract class Request { } /** - * 返回值: -1:数据不合法; 0:解析完毕; >0: 需再读取的字节数。 + * 返回值:Integer.MIN_VALUE: 帧数据; -1:数据不合法; 0:解析完毕; >0: 需再读取的字节数。 * * @param buffer * @return @@ -38,6 +38,8 @@ public abstract class Request { protected abstract void readBody(ByteBuffer buffer); + protected abstract void prepare(); + protected void recycle() { createtime = 0; keepAlive = false; diff --git a/src/com/wentch/redkale/net/Response.java b/src/com/wentch/redkale/net/Response.java index a498e9a4e..a14563169 100644 --- a/src/com/wentch/redkale/net/Response.java +++ b/src/com/wentch/redkale/net/Response.java @@ -5,7 +5,7 @@ */ package com.wentch.redkale.net; -import java.nio.ByteBuffer; +import java.nio.*; import java.nio.channels.*; /** @@ -53,7 +53,7 @@ public abstract class Response { return ch; } - protected void recycle() { + protected boolean recycle() { boolean keepAlive = request.keepAlive; request.recycle(); if (channel != null) { @@ -67,6 +67,7 @@ public abstract class Response { channel = null; } } + return true; } protected void refuseAlive() { diff --git a/src/com/wentch/redkale/net/ResponsePool.java b/src/com/wentch/redkale/net/ResponsePool.java deleted file mode 100644 index 04d8aea77..000000000 --- a/src/com/wentch/redkale/net/ResponsePool.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package com.wentch.redkale.net; - -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.atomic.*; - -/** - * - * @author zhangjx - * @param - */ -public final class ResponsePool { - - public static interface ResponseFactory { - - T createResponse(); - } - - private final AtomicLong creatCounter; - - private final AtomicLong cycleCounter; - - private ResponseFactory factory; - - private final ArrayBlockingQueue queue; - - public ResponsePool(AtomicLong creatCounter, AtomicLong cycleCounter) { - this(creatCounter, cycleCounter, 0); - } - - public ResponsePool(AtomicLong creatCounter, AtomicLong cycleCounter, int max) { - this.queue = new ArrayBlockingQueue<>(Math.max(32, max)); - this.creatCounter = creatCounter; - this.cycleCounter = cycleCounter; - } - - public void setResponseFactory(ResponseFactory factory) { - this.factory = factory; - } - - public T poll() { - T result = queue.poll(); - if (result == null) { - creatCounter.incrementAndGet(); - result = factory.createResponse(); - } - return result; - } - - public void offer(final T e) { - if (e != null) { - cycleCounter.incrementAndGet(); - e.recycle(); - queue.offer(e); - } - } - - public long getCreatCount() { - return creatCounter.longValue(); - } - - public long getCycleCount() { - return cycleCounter.longValue(); - } -} diff --git a/src/com/wentch/redkale/net/Transport.java b/src/com/wentch/redkale/net/Transport.java index 84dc68b12..9586183e6 100644 --- a/src/com/wentch/redkale/net/Transport.java +++ b/src/com/wentch/redkale/net/Transport.java @@ -5,10 +5,11 @@ */ package com.wentch.redkale.net; -import com.wentch.redkale.watch.WatchFactory; -import java.io.IOException; +import com.wentch.redkale.util.*; +import com.wentch.redkale.watch.*; +import java.io.*; import java.net.*; -import java.nio.ByteBuffer; +import java.nio.*; import java.nio.channels.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -22,7 +23,7 @@ public final class Transport { protected SocketAddress[] remoteAddres; - protected BufferPool bufferPool; + protected ObjectPool bufferPool; protected String name; @@ -49,7 +50,13 @@ public final class Transport { this.group = g; AtomicLong createBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber(Transport.class.getSimpleName() + "_" + protocol + ".Buffer.creatCounter"); AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber(Transport.class.getSimpleName() + "_" + protocol + ".Buffer.cycleCounter"); - this.bufferPool = new BufferPool(createBufferCounter, cycleBufferCounter, 8192, bufferPoolSize); + int rcapacity = 8192; + this.bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize, + (Object... params) -> ByteBuffer.allocateDirect(rcapacity), (e) -> { + if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false; + e.clear(); + return true; + }); this.remoteAddres = addresses; } diff --git a/src/com/wentch/redkale/net/http/HttpContext.java b/src/com/wentch/redkale/net/http/HttpContext.java index b5a3f530b..79a107789 100644 --- a/src/com/wentch/redkale/net/http/HttpContext.java +++ b/src/com/wentch/redkale/net/http/HttpContext.java @@ -5,19 +5,16 @@ */ package com.wentch.redkale.net.http; -import com.wentch.redkale.convert.json.JsonConvert; -import com.wentch.redkale.convert.json.JsonFactory; -import com.wentch.redkale.net.PrepareServlet; -import com.wentch.redkale.net.ResponsePool; -import com.wentch.redkale.net.Context; -import com.wentch.redkale.net.BufferPool; -import com.wentch.redkale.util.Utility; -import com.wentch.redkale.watch.WatchFactory; -import java.net.InetSocketAddress; -import java.nio.charset.Charset; -import java.security.SecureRandom; -import java.util.concurrent.ExecutorService; -import java.util.logging.Logger; +import com.wentch.redkale.convert.json.*; +import com.wentch.redkale.net.*; +import com.wentch.redkale.util.*; +import com.wentch.redkale.watch.*; +import java.net.*; +import java.nio.*; +import java.nio.charset.*; +import java.security.*; +import java.util.concurrent.*; +import java.util.logging.*; /** * @@ -31,8 +28,8 @@ public final class HttpContext extends Context { protected final SecureRandom random = new SecureRandom(); - public HttpContext(long serverStartTime, Logger logger, ExecutorService executor, BufferPool bufferPool, - ResponsePool responsePool, int maxbody, Charset charset, InetSocketAddress address, + public HttpContext(long serverStartTime, Logger logger, ExecutorService executor, ObjectPool bufferPool, + ObjectPool responsePool, int maxbody, Charset charset, InetSocketAddress address, PrepareServlet prepare, WatchFactory watch, int readTimeoutSecond, int writeTimeoutSecond, String contextPath) { super(serverStartTime, logger, executor, bufferPool, responsePool, maxbody, charset, address, prepare, watch, readTimeoutSecond, writeTimeoutSecond); @@ -59,7 +56,7 @@ public final class HttpContext extends Context { return executor; } - protected ResponsePool getResponsePool() { + protected ObjectPool getResponsePool() { return responsePool; } diff --git a/src/com/wentch/redkale/net/http/HttpRequest.java b/src/com/wentch/redkale/net/http/HttpRequest.java index 346aff4d7..97bbbfa8f 100644 --- a/src/com/wentch/redkale/net/http/HttpRequest.java +++ b/src/com/wentch/redkale/net/http/HttpRequest.java @@ -5,11 +5,8 @@ */ package com.wentch.redkale.net.http; -import com.wentch.redkale.convert.json.JsonFactory; -import com.wentch.redkale.convert.json.JsonConvert; -import com.wentch.redkale.net.AsyncConnection; -import com.wentch.redkale.net.Request; -import com.wentch.redkale.net.Context; +import com.wentch.redkale.convert.json.*; +import com.wentch.redkale.net.*; import com.wentch.redkale.util.AnyValue.DefaultAnyValue; import java.io.*; import java.net.*; @@ -158,6 +155,10 @@ public final class HttpRequest extends Request { array.add(buffer, buffer.remaining()); } + @Override + protected void prepare() { + } + private void parseBody() { if (this.boundary || array.isEmpty()) return; addParameter(array, 0, array.count()); diff --git a/src/com/wentch/redkale/net/http/HttpResourceServlet.java b/src/com/wentch/redkale/net/http/HttpResourceServlet.java index d8e8a610d..06f6096b9 100644 --- a/src/com/wentch/redkale/net/http/HttpResourceServlet.java +++ b/src/com/wentch/redkale/net/http/HttpResourceServlet.java @@ -288,7 +288,7 @@ public final class HttpResourceServlet extends HttpServlet { if (range == null) { buffer.put(header); buffer.flip(); - response.send(buffer, file); + response.finishFile(buffer, file); return; } range = range.substring("bytes=".length()); @@ -300,7 +300,7 @@ public final class HttpResourceServlet extends HttpServlet { buffer.flip(); final ByteBuffer body = this.content; if (body == null) { - response.send(buffer, file, start, end > 0 ? clen : end); + response.finishFile(buffer, file, start, end > 0 ? clen : end); } else { final ByteBuffer body2 = body.duplicate(); body2.position((int) (this.header.length + start)); diff --git a/src/com/wentch/redkale/net/http/HttpResponse.java b/src/com/wentch/redkale/net/http/HttpResponse.java index 12047ddfb..842b1a3e1 100644 --- a/src/com/wentch/redkale/net/http/HttpResponse.java +++ b/src/com/wentch/redkale/net/http/HttpResponse.java @@ -6,9 +6,9 @@ package com.wentch.redkale.net.http; import com.wentch.redkale.net.*; +import com.wentch.redkale.util.*; import com.wentch.redkale.util.AnyValue.DefaultAnyValue; import com.wentch.redkale.util.AnyValue.Entry; -import com.wentch.redkale.util.*; import java.io.*; import java.lang.reflect.*; import java.net.*; @@ -17,6 +17,7 @@ import java.nio.channels.*; import java.nio.file.*; import java.text.*; import java.util.*; +import java.util.concurrent.atomic.*; /** * @@ -98,6 +99,10 @@ public final class HttpResponse extends Response { private final DefaultAnyValue header = new DefaultAnyValue(); + public static ObjectPool createPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator creator) { + return new ObjectPool<>(creatCounter, cycleCounter, max, creator, (x) -> ((HttpResponse) x).recycle()); + } + protected HttpResponse(HttpContext context, HttpRequest request) { super(context, request); } @@ -108,14 +113,14 @@ public final class HttpResponse extends Response { } @Override - protected void recycle() { + protected boolean recycle() { this.status = 200; this.contentLength = -1; this.contentType = null; this.cookies = null; this.headsended = false; this.header.clear(); - super.recycle(); + return super.recycle(); } protected String getHttpCode(int status) { @@ -143,22 +148,22 @@ public final class HttpResponse extends Response { } } - public void sendJson(Object obj) { + public void finishJson(Object obj) { this.contentType = "text/plain; charset=utf-8"; - sendString(request.convert.convertTo(obj)); + finishString(request.convert.convertTo(obj)); } - public void sendJson(Type type, Object obj) { + public void finishJson(Type type, Object obj) { this.contentType = "text/plain; charset=utf-8"; - sendString(request.convert.convertTo(type, obj)); + finishString(request.convert.convertTo(type, obj)); } - public void sendJson(Object... objs) { + public void finishJson(Object... objs) { this.contentType = "text/plain; charset=utf-8"; - sendString(request.convert.convertTo(objs)); + finishString(request.convert.convertTo(objs)); } - public void sendString(String obj) { + public void finishString(String obj) { if (obj == null) obj = "null"; if (context.getCharset() == null) { final char[] chars = Utility.charArray(obj); @@ -186,16 +191,16 @@ public final class HttpResponse extends Response { headbuf.flip(); super.send(headbuf, headbuf, finishHandler); } else { - sendString(message); + finishString(message); } } public void finish304() { - finish(buffer304.duplicate()); + super.finish(buffer304.duplicate()); } public void finish404() { - finish(buffer404.duplicate()); + super.finish(buffer404.duplicate()); } @Override @@ -213,11 +218,11 @@ public final class HttpResponse extends Response { } } - public void send(File file) throws IOException { - send(file, null); + public void finish(File file) throws IOException { + finishFile(file, null); } - protected void send(final File file, final ByteBuffer fileBody) throws IOException { + protected void finishFile(final File file, final ByteBuffer fileBody) throws IOException { if (file == null || !file.isFile() || !file.canRead()) { finish404(); return; @@ -250,7 +255,7 @@ public final class HttpResponse extends Response { ByteBuffer buffer = createHeader(); buffer.flip(); if (fileBody == null) { - send(buffer, file, start, len); + HttpResponse.this.finishFile(buffer, file, start, len); } else { final ByteBuffer body = fileBody.duplicate().asReadOnlyBuffer(); if (start >= 0) { @@ -276,11 +281,11 @@ public final class HttpResponse extends Response { } } - protected void send(ByteBuffer buffer, File file) throws IOException { - send(buffer, file, -1L, -1L); + protected void finishFile(ByteBuffer buffer, File file) throws IOException { + finishFile(buffer, file, -1L, -1L); } - protected void send(ByteBuffer buffer, File file, long offset, long length) throws IOException { + protected void finishFile(ByteBuffer buffer, File file, long offset, long length) throws IOException { send(buffer, buffer, new TransferFileHandler(AsynchronousFileChannel.open(file.toPath(), options, ((HttpContext) context).getExecutor()), offset, length)); } diff --git a/src/com/wentch/redkale/net/http/HttpServer.java b/src/com/wentch/redkale/net/http/HttpServer.java index 37629c145..7260bbbcf 100644 --- a/src/com/wentch/redkale/net/http/HttpServer.java +++ b/src/com/wentch/redkale/net/http/HttpServer.java @@ -5,12 +5,10 @@ */ package com.wentch.redkale.net.http; -import com.wentch.redkale.util.AnyValue; -import com.wentch.redkale.net.Server; -import com.wentch.redkale.net.ResponsePool; -import com.wentch.redkale.net.Context; -import com.wentch.redkale.net.BufferPool; -import com.wentch.redkale.watch.WatchFactory; +import com.wentch.redkale.net.*; +import com.wentch.redkale.util.*; +import com.wentch.redkale.watch.*; +import java.nio.*; import java.util.*; import java.util.AbstractMap.SimpleEntry; import java.util.concurrent.atomic.*; @@ -50,7 +48,13 @@ public final class HttpServer extends Server { final int port = this.address.getPort(); AtomicLong createBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Buffer.creatCounter"); AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Buffer.cycleCounter"); - BufferPool bufferPool = new BufferPool(createBufferCounter, cycleBufferCounter, Math.max(this.capacity, 8 * 1024), this.bufferPoolSize); + int rcapacity = Math.max(this.capacity, 8 * 1024); + ObjectPool bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, this.bufferPoolSize, + (Object... params) -> ByteBuffer.allocateDirect(rcapacity), (e) -> { + if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false; + e.clear(); + return true; + }); HttpPrepareServlet prepare = new HttpPrepareServlet(); this.servlets.entrySet().stream().forEach((en) -> { prepare.addHttpServlet(en.getKey().getKey(), en.getKey().getValue(), en.getValue()); @@ -58,10 +62,10 @@ public final class HttpServer extends Server { this.servlets.clear(); AtomicLong createResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Response.creatCounter"); AtomicLong cycleResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Response.cycleCounter"); - HttpContext httpcontext = new HttpContext(this.serverStartTime, this.logger, executor, bufferPool, - new ResponsePool(createResponseCounter, cycleResponseCounter, this.responsePoolSize), + ObjectPool responsePool = HttpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null); + HttpContext httpcontext = new HttpContext(this.serverStartTime, this.logger, executor, bufferPool, responsePool, this.maxbody, this.charset, this.address, prepare, this.watch, this.readTimeoutSecond, this.writeTimeoutSecond, contextPath); - httpcontext.getResponsePool().setResponseFactory(() -> new HttpResponse(httpcontext, new HttpRequest(httpcontext, httpcontext.jsonFactory))); + responsePool.setCreator((Object... params) -> new HttpResponse(httpcontext, new HttpRequest(httpcontext, httpcontext.jsonFactory))); return httpcontext; } diff --git a/src/com/wentch/redkale/net/sncp/SncpClient.java b/src/com/wentch/redkale/net/sncp/SncpClient.java index 92dee61ff..c3a05491c 100644 --- a/src/com/wentch/redkale/net/sncp/SncpClient.java +++ b/src/com/wentch/redkale/net/sncp/SncpClient.java @@ -5,17 +5,16 @@ */ package com.wentch.redkale.net.sncp; -import com.wentch.redkale.net.Async; -import com.wentch.redkale.net.Transport; -import com.wentch.redkale.convert.bson.BsonConvert; -import com.wentch.redkale.service.MultiService; -import com.wentch.redkale.service.RemoteOn; +import com.wentch.redkale.convert.bson.*; +import com.wentch.redkale.net.*; import static com.wentch.redkale.net.sncp.SncpRequest.HEADER_SIZE; -import com.wentch.redkale.util.TwoLong; +import com.wentch.redkale.service.*; +import com.wentch.redkale.util.*; import java.lang.reflect.*; -import java.nio.ByteBuffer; +import java.nio.*; import java.util.*; -import java.util.logging.Logger; +import java.util.concurrent.*; +import java.util.logging.*; /** * @@ -25,6 +24,8 @@ public final class SncpClient { private final Logger logger = Logger.getLogger(SncpClient.class.getSimpleName()); + private final boolean debug = logger.isLoggable(Level.FINEST); + protected static final class SncpAction { protected final TwoLong actionid; @@ -115,46 +116,87 @@ public final class SncpClient { return convert.convertFrom(actions[index].resultTypes, send(convert, transport, index, params)); } + private void fillHeader(ByteBuffer buffer, long seqid, TwoLong actionid, int frameCount, int frameIndex, int bodyLength) { + //---------------------head---------------------------------- + buffer.putLong(seqid); //序列号 + buffer.putChar((char) HEADER_SIZE); //header长度 + buffer.putLong(this.serviceid); + buffer.putLong(this.nameid); + buffer.putLong(actionid.getFirst()); + buffer.putLong(actionid.getSecond()); + buffer.put((byte) frameCount); //数据的帧数, 最小值为1 + buffer.put((byte) frameIndex); //数据的帧数序号, 从frame.count-1开始, 0表示最后一帧 + buffer.putInt(0); //结果码, 请求方固定传0 + buffer.putInt(bodyLength); //body长度 + } + private byte[] send(final BsonConvert convert, Transport transport, final int index, Object... params) { int bodyLength = 2; Type[] myparamtypes = actions[index].paramTypes; byte[][] bytesarray = new byte[params.length][]; for (int i = 0; i < bytesarray.length; i++) { bytesarray[i] = convert.convertTo(myparamtypes[i], params[i]); - bodyLength += 2 + bytesarray[i].length; - } - ByteBuffer buffer = transport.pollBuffer(); - if ((HEADER_SIZE + bodyLength) > buffer.limit()) { - throw new RuntimeException("send buffer size too large(" + (HEADER_SIZE + bodyLength) + ")"); + bodyLength += 4 + bytesarray[i].length; } final SncpAction action = actions[index]; final long seqid = System.nanoTime(); final TwoLong actionid = action.actionid; - { - //---------------------head---------------------------------- - buffer.putLong(seqid); //序列号 - buffer.putChar((char) HEADER_SIZE); //header长度 - buffer.putLong(this.serviceid); - buffer.putLong(this.nameid); - buffer.putLong(actionid.getFirst()); - buffer.putLong(actionid.getSecond()); - buffer.put((byte) 0); //剩下还有多少帧数据, 0表示只有当前一帧数据 - buffer.putInt(0); //结果码, 请求方固定传0 - buffer.putChar((char) bodyLength); //body长度 - //---------------------body---------------------------------- - buffer.putChar((char) bytesarray.length); //参数数组大小 + ByteBuffer buffer = transport.pollBuffer(); + if ((HEADER_SIZE + bodyLength) > buffer.limit()) { + if (debug) logger.finest(this.serviceid + "," + this.nameid + "," + action + " sncp length : " + (HEADER_SIZE + bodyLength)); + final int patch = bodyLength / (buffer.capacity() - HEADER_SIZE) + (bodyLength % (buffer.capacity() - HEADER_SIZE) > 0 ? 1 : 0); + AsyncConnection conn = transport.pollConnection(); + final int readto = conn.getReadTimeoutSecond(); + final int writeto = conn.getWriteTimeoutSecond(); + int pos = 0; + final byte[] all = new byte[bodyLength]; + all[pos++] = (byte) ((bytesarray.length & 0xff00) >> 8); + all[pos++] = (byte) (bytesarray.length & 0xff); for (byte[] bs : bytesarray) { - buffer.putChar((char) bs.length); - buffer.put(bs); + all[pos++] = (byte) ((bs.length & 0xff000000) >> 24); + all[pos++] = (byte) ((bs.length & 0xff0000) >> 16); + all[pos++] = (byte) ((bs.length & 0xff00) >> 8); + all[pos++] = (byte) (bs.length & 0xff); + System.arraycopy(bs, 0, all, pos, bs.length); + pos += bs.length; } - buffer.flip(); + if (pos != all.length) logger.warning(this.serviceid + "," + this.nameid + "," + action + " sncp body.length : " + all.length + ", but pos=" + pos); + try { + pos = 0; + for (int i = patch - 1; i >= 0; i--) { + fillHeader(buffer, seqid, actionid, patch, i, bodyLength); + int len = Math.min(buffer.remaining(), all.length - pos); + buffer.put(all, pos, len); + pos += len; + buffer.flip(); + conn.write(buffer).get(writeto > 0 ? writeto : 5, TimeUnit.SECONDS); + buffer.clear(); + } + conn.read(buffer).get(readto > 0 ? readto : 5, TimeUnit.SECONDS); + buffer.flip(); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + transport.offerConnection(conn); + } + } else { + { + //---------------------head---------------------------------- + fillHeader(buffer, seqid, actionid, 1, 0, bodyLength); + //---------------------body---------------------------------- + buffer.putChar((char) bytesarray.length); //参数数组大小 + for (byte[] bs : bytesarray) { + buffer.putInt(bs.length); + buffer.put(bs); + } + buffer.flip(); + } + if (action.async) { + transport.async(buffer, null, null); + return null; + } + buffer = transport.send(buffer); } - if (action.async) { - transport.async(buffer, null, null); - return null; - } - buffer = transport.send(buffer); - long rseqid = buffer.getLong(); if (rseqid != seqid) throw new RuntimeException("sncp send seqid = " + seqid + ", but receive seqid =" + rseqid); if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp buffer receive header.length not " + HEADER_SIZE); @@ -165,10 +207,13 @@ public final class SncpClient { long ractionid1 = buffer.getLong(); long ractionid2 = buffer.getLong(); if (!actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp send actionid = " + actionid + ", but receive actionid =(" + ractionid1 + "_" + ractionid2 + ")"); - int frame = buffer.get(); + int frameCount = buffer.get(); + if (frameCount < 1) throw new RuntimeException("sncp send nameid = " + nameid + ", but frame.count =" + frameCount); + int frameIndex = buffer.get(); + if (frameIndex < 0 || frameIndex >= frameCount) throw new RuntimeException("sncp send nameid = " + nameid + ", but frame.count =" + frameCount + " & frame.index =" + frameIndex); int retcode = buffer.getInt(); if (retcode != 0) throw new RuntimeException("remote service deal error (receive retcode =" + retcode + ")"); - int bodylen = buffer.getChar(); + int bodylen = buffer.getInt(); byte[] bytes = new byte[bodylen]; buffer.get(bytes); transport.offerBuffer(buffer); diff --git a/src/com/wentch/redkale/net/sncp/SncpContext.java b/src/com/wentch/redkale/net/sncp/SncpContext.java index fc0f674ad..ee75bd70d 100644 --- a/src/com/wentch/redkale/net/sncp/SncpContext.java +++ b/src/com/wentch/redkale/net/sncp/SncpContext.java @@ -5,17 +5,16 @@ */ package com.wentch.redkale.net.sncp; -import com.wentch.redkale.convert.bson.BsonConvert; -import com.wentch.redkale.convert.bson.BsonFactory; -import com.wentch.redkale.net.ResponsePool; -import com.wentch.redkale.net.BufferPool; -import com.wentch.redkale.net.PrepareServlet; -import com.wentch.redkale.net.Context; -import com.wentch.redkale.watch.WatchFactory; -import java.net.InetSocketAddress; -import java.nio.charset.Charset; -import java.util.concurrent.ExecutorService; -import java.util.logging.Logger; +import com.wentch.redkale.convert.bson.*; +import com.wentch.redkale.net.*; +import com.wentch.redkale.util.*; +import com.wentch.redkale.watch.*; +import java.net.*; +import java.nio.*; +import java.nio.charset.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.logging.*; /** * @@ -23,16 +22,66 @@ import java.util.logging.Logger; */ public final class SncpContext extends Context { + protected static class RequestEntry { + + protected final long seqid; + + protected final byte[] body; + + protected final long time = System.currentTimeMillis(); + + private int received; + + public RequestEntry(long seqid, byte[] body) { + this.seqid = seqid; + this.body = body; + } + + public void add(ByteBuffer buffer, int pos) { + this.received += buffer.remaining(); + buffer.get(body, pos, buffer.remaining()); + } + + public boolean isCompleted() { + return this.body.length <= this.received; + } + + } + + private final ConcurrentHashMap requests = new ConcurrentHashMap<>(); + protected final BsonFactory bsonFactory; - public SncpContext(long serverStartTime, Logger logger, ExecutorService executor, BufferPool bufferPool, - ResponsePool responsePool, int maxbody, Charset charset, InetSocketAddress address, + public SncpContext(long serverStartTime, Logger logger, ExecutorService executor, ObjectPool bufferPool, + ObjectPool responsePool, int maxbody, Charset charset, InetSocketAddress address, PrepareServlet prepare, WatchFactory watch, int readTimeoutSecond, int writeTimeoutSecond) { super(serverStartTime, logger, executor, bufferPool, responsePool, maxbody, charset, address, prepare, watch, readTimeoutSecond, writeTimeoutSecond); this.bsonFactory = BsonFactory.root(); } + protected RequestEntry addRequestEntity(long seqid, byte[] bodys) { + RequestEntry entry = new RequestEntry(seqid, bodys); + requests.put(seqid, entry); + return entry; + } + + protected void expireRequestEntry(long milliSecond) { + if (requests.size() < 32) return; + List seqids = new ArrayList<>(); + long t = System.currentTimeMillis() - milliSecond; + requests.forEach((x, y) -> { + if (y.time < t) seqids.add(x); + }); + for (long seqid : seqids) { + requests.remove(seqid); + } + } + + protected RequestEntry getRequestEntity(long seqid) { + return requests.get(seqid); + } + protected WatchFactory getWatchFactory() { return watch; } @@ -41,7 +90,7 @@ public final class SncpContext extends Context { return executor; } - protected ResponsePool getResponsePool() { + protected ObjectPool getResponsePool() { return responsePool; } diff --git a/src/com/wentch/redkale/net/sncp/SncpRequest.java b/src/com/wentch/redkale/net/sncp/SncpRequest.java index ef2230627..a1d616d54 100644 --- a/src/com/wentch/redkale/net/sncp/SncpRequest.java +++ b/src/com/wentch/redkale/net/sncp/SncpRequest.java @@ -5,12 +5,11 @@ */ package com.wentch.redkale.net.sncp; -import com.wentch.redkale.convert.bson.BsonConvert; -import com.wentch.redkale.convert.bson.BsonFactory; -import com.wentch.redkale.net.Request; -import com.wentch.redkale.net.Context; -import com.wentch.redkale.util.TwoLong; -import java.nio.ByteBuffer; +import com.wentch.redkale.convert.bson.*; +import com.wentch.redkale.net.*; +import com.wentch.redkale.net.sncp.SncpContext.RequestEntry; +import com.wentch.redkale.util.*; +import java.nio.*; /** * @@ -18,13 +17,15 @@ import java.nio.ByteBuffer; */ public final class SncpRequest extends Request { - public static final int HEADER_SIZE = 49; + public static final int HEADER_SIZE = 52; protected final BsonConvert convert; private long seqid; - private int frame; + private int framecount; + + private int frameindex; private long nameid; @@ -38,14 +39,16 @@ public final class SncpRequest extends Request { private boolean ping; - protected SncpRequest(Context context, BsonFactory factory) { + private byte[] body; + + protected SncpRequest(SncpContext context, BsonFactory factory) { super(context); this.convert = factory.getConvert(); } @Override protected int readHeader(ByteBuffer buffer) { - if (buffer.remaining() < 8) { + if (buffer.remaining() < HEADER_SIZE) { this.ping = true; return 0; } @@ -58,29 +61,66 @@ public final class SncpRequest extends Request { this.serviceid = buffer.getLong(); this.nameid = buffer.getLong(); this.actionid = new TwoLong(buffer.getLong(), buffer.getLong()); - this.frame = buffer.get(); + this.framecount = buffer.get(); + this.frameindex = buffer.get(); if (buffer.getInt() != 0) { context.getLogger().finest("sncp buffer header.retcode not 0"); return -1; } - this.bodylength = buffer.getChar(); + this.bodylength = buffer.getInt(); //---------------------body---------------------------------- - int paramlen = buffer.getChar(); + if (this.framecount == 1) { //只有一帧的数据 + int paramlen = buffer.getChar(); + byte[][] bbytes = new byte[paramlen + 1][]; //占位第0个byte[] + for (int i = 1; i <= paramlen; i++) { + byte[] bytes = new byte[buffer.getInt()]; + buffer.get(bytes); + bbytes[i] = bytes; + } + this.paramBytes = bbytes; + return 0; + } + //多帧数据 + final SncpContext scontext = (SncpContext) this.context; + RequestEntry entry = scontext.getRequestEntity(this.seqid); + if (entry == null) entry = scontext.addRequestEntity(this.seqid, new byte[this.bodylength]); + entry.add(buffer, (this.framecount - this.frameindex - 1) * (buffer.capacity() - HEADER_SIZE)); + + if (entry.isCompleted()) { //数据读取完毕 + this.body = entry.body; + return 0; + } else { + scontext.expireRequestEntry(10 * 1000); //10秒过期 + } + return Integer.MIN_VALUE; //多帧数据返回 Integer.MIN_VALUE + } + + @Override + protected void readBody(ByteBuffer buffer) { + } + + @Override + protected void prepare() { + if (this.body == null) return; + byte[] bytes = this.body; + int pos = 0; + int paramlen = ((0xff00 & (bytes[pos++] << 8)) | (0xff & bytes[pos++])); byte[][] bbytes = new byte[paramlen + 1][]; //占位第0个byte[] for (int i = 1; i <= paramlen; i++) { - byte[] bytes = new byte[(int) buffer.getChar()]; - buffer.get(bytes); - bbytes[i] = bytes; + byte[] bs = new byte[(0xff000000 & (bytes[pos++] << 24)) | (0xff0000 & (bytes[pos++] << 16)) + | (0xff00 & (bytes[pos++] << 8)) | (0xff & bytes[pos++])]; + System.arraycopy(bytes, pos, bs, 0, bs.length); + pos += bs.length; + bbytes[i] = bs; } this.paramBytes = bbytes; - return 0; } @Override public String toString() { return SncpRequest.class.getSimpleName() + "{seqid=" + this.seqid + ",serviceid=" + this.serviceid + ",actionid=" + this.actionid - + ",frame=" + this.frame + ",bodylength=" + this.bodylength + "}"; + + ",framecount=" + this.framecount + ",frameindex=" + this.frameindex + ",bodylength=" + this.bodylength + "}"; } protected void setKeepAlive(boolean keepAlive) { @@ -91,17 +131,15 @@ public final class SncpRequest extends Request { return this.keepAlive; } - @Override - protected void readBody(ByteBuffer buffer) { - } - @Override protected void recycle() { this.seqid = 0; - this.frame = 0; + this.framecount = 0; + this.frameindex = 0; this.serviceid = 0; this.actionid = null; this.bodylength = 0; + this.body = null; this.paramBytes = null; this.ping = false; super.recycle(); diff --git a/src/com/wentch/redkale/net/sncp/SncpResponse.java b/src/com/wentch/redkale/net/sncp/SncpResponse.java index 8af354691..42bbfc5af 100644 --- a/src/com/wentch/redkale/net/sncp/SncpResponse.java +++ b/src/com/wentch/redkale/net/sncp/SncpResponse.java @@ -5,10 +5,10 @@ */ package com.wentch.redkale.net.sncp; -import com.wentch.redkale.net.Response; -import com.wentch.redkale.net.Context; -import com.wentch.redkale.util.TwoLong; -import java.nio.ByteBuffer; +import com.wentch.redkale.net.*; +import com.wentch.redkale.util.*; +import java.nio.*; +import java.util.concurrent.atomic.*; /** * @@ -24,6 +24,10 @@ public final class SncpResponse extends Response { public static final int RETCODE_THROWEXCEPTION = 10011; //内部异常 + public static ObjectPool createPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator creator) { + return new ObjectPool<>(creatCounter, cycleCounter, max, creator, (x) -> ((SncpResponse) x).recycle()); + } + protected SncpResponse(Context context, SncpRequest request) { super(context, request); } @@ -38,9 +42,10 @@ public final class SncpResponse extends Response { TwoLong actionid = request.getActionid(); buffer.putLong(actionid.getFirst()); buffer.putLong(actionid.getSecond()); - buffer.put((byte) 0); + buffer.put((byte) 1); // frame count + buffer.put((byte) 0); //frame index buffer.putInt(retcode); - buffer.putChar((char) (bytes == null ? 0 : bytes.length)); + buffer.putInt((bytes == null ? 0 : bytes.length)); //---------------------body---------------------------------- if (bytes != null) buffer.put(bytes); buffer.flip(); diff --git a/src/com/wentch/redkale/net/sncp/SncpServer.java b/src/com/wentch/redkale/net/sncp/SncpServer.java index d2c158a26..86b0a6c41 100644 --- a/src/com/wentch/redkale/net/sncp/SncpServer.java +++ b/src/com/wentch/redkale/net/sncp/SncpServer.java @@ -5,13 +5,11 @@ */ package com.wentch.redkale.net.sncp; -import com.wentch.redkale.convert.bson.BsonConvert; -import com.wentch.redkale.convert.bson.BsonFactory; -import com.wentch.redkale.net.ResponsePool; -import com.wentch.redkale.net.BufferPool; -import com.wentch.redkale.net.Server; -import com.wentch.redkale.net.Context; -import com.wentch.redkale.watch.WatchFactory; +import com.wentch.redkale.convert.bson.*; +import com.wentch.redkale.net.*; +import com.wentch.redkale.util.*; +import com.wentch.redkale.watch.*; +import java.nio.*; import java.util.*; import java.util.concurrent.atomic.*; @@ -42,17 +40,23 @@ public final class SncpServer extends Server { final int port = this.address.getPort(); AtomicLong createBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Buffer.creatCounter"); AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Buffer.cycleCounter"); - BufferPool bufferPool = new BufferPool(createBufferCounter, cycleBufferCounter, Math.max(this.capacity, 8 * 1024), this.bufferPoolSize); + int rcapacity = Math.max(this.capacity, 8 * 1024); + ObjectPool bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, this.bufferPoolSize, + (Object... params) -> ByteBuffer.allocateDirect(rcapacity), (e) -> { + if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false; + e.clear(); + return true; + }); SncpPrepareServlet prepare = new SncpPrepareServlet(); final BsonConvert convert = BsonFactory.root().getConvert(); this.services.stream().forEach(x -> x.getNames().forEach(y -> prepare.addSncpServlet(new SncpDynServlet(convert, y, x.getService(), x.getServiceConf())))); this.services.clear(); AtomicLong createResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Response.creatCounter"); AtomicLong cycleResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Response.cycleCounter"); - SncpContext sncpcontext = new SncpContext(this.serverStartTime, this.logger, executor, bufferPool, - new ResponsePool(createResponseCounter, cycleResponseCounter, this.responsePoolSize), + ObjectPool responsePool = SncpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null); + SncpContext sncpcontext = new SncpContext(this.serverStartTime, this.logger, executor, bufferPool, responsePool, this.maxbody, this.charset, this.address, prepare, this.watch, this.readTimeoutSecond, this.writeTimeoutSecond); - sncpcontext.getResponsePool().setResponseFactory(() -> new SncpResponse(sncpcontext, new SncpRequest(sncpcontext, sncpcontext.bsonFactory))); + responsePool.setCreator((Object... params) -> new SncpResponse(sncpcontext, new SncpRequest(sncpcontext, sncpcontext.bsonFactory))); return sncpcontext; } diff --git a/src/com/wentch/redkale/util/ObjectPool.java b/src/com/wentch/redkale/util/ObjectPool.java index 3302d03cb..34ffeb50c 100644 --- a/src/com/wentch/redkale/util/ObjectPool.java +++ b/src/com/wentch/redkale/util/ObjectPool.java @@ -4,58 +4,77 @@ */ package com.wentch.redkale.util; -import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.function.*; /** * * @author zhangjx * @param */ -public final class ObjectPool { - - public static interface Poolable { - - public void prepare(); - - public void release(); - } +public final class ObjectPool { private final Queue queue; - private final Creator creator; + private Creator creator; - public ObjectPool(Class clazz) { - this(2, clazz); + private final Predicate recycler; + + private final AtomicLong creatCounter; + + private final AtomicLong cycleCounter; + + public ObjectPool(Class clazz, Predicate recycler) { + this(2, clazz, recycler); } - public ObjectPool(int max, Class clazz) { - this(max, Creator.create(clazz)); + public ObjectPool(int max, Class clazz, Predicate recycler) { + this(max, Creator.create(clazz), recycler); } - public ObjectPool(Creator creator) { - this(2, creator); + public ObjectPool(Creator creator, Predicate recycler) { + this(2, creator, recycler); } - public ObjectPool(int max, Creator creator) { + public ObjectPool(int max, Creator creator, Predicate recycler) { + this(null, null, max, creator, recycler); + } + + public ObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator creator, Predicate recycler) { + this.creatCounter = creatCounter; + this.cycleCounter = cycleCounter; this.creator = creator; + this.recycler = recycler; this.queue = new ArrayBlockingQueue<>(Math.max(Runtime.getRuntime().availableProcessors() * 2, max)); } + public void setCreator(Creator creator) { + this.creator = creator; + } + public T poll() { T result = queue.poll(); if (result == null) { + if (creatCounter != null) creatCounter.incrementAndGet(); result = this.creator.create(); - } else { - result.prepare(); } return result; } public void offer(final T e) { - if (e != null) { - e.release(); + if (e != null && recycler.test(e)) { + if (cycleCounter != null) cycleCounter.incrementAndGet(); queue.offer(e); } } + + public long getCreatCount() { + return creatCounter.longValue(); + } + + public long getCycleCount() { + return cycleCounter.longValue(); + } } diff --git a/src/com/wentch/redkale/util/Sheet.java b/src/com/wentch/redkale/util/Sheet.java index f010b56cb..4316df164 100644 --- a/src/com/wentch/redkale/util/Sheet.java +++ b/src/com/wentch/redkale/util/Sheet.java @@ -33,13 +33,11 @@ public class Sheet implements java.io.Serializable { } public static Sheet asSheet(Collection data) { - return new Sheet<>(data.size(), data); + return data == null ? new Sheet<>() : new Sheet<>(data.size(), data); } public void copyTo(Sheet copy) { - if (copy == null) { - return; - } + if (copy == null) return; copy.total = this.total; if (this.getRows() != null) { copy.setRows(new ArrayList<>(this.getRows()));