This commit is contained in:
地平线
2015-03-16 18:55:36 +08:00
parent 935f52e9df
commit d81461ccc2
27 changed files with 424 additions and 617 deletions

View File

@@ -5,10 +5,9 @@
*/ */
package com.wentch.redkale.convert.bson; package com.wentch.redkale.convert.bson;
import com.wentch.redkale.convert.Convert; import com.wentch.redkale.convert.*;
import com.wentch.redkale.convert.Factory; import com.wentch.redkale.util.*;
import com.wentch.redkale.util.ObjectPool; import java.lang.reflect.*;
import java.lang.reflect.Type;
/** /**
* *
@@ -16,9 +15,9 @@ import java.lang.reflect.Type;
*/ */
public final class BsonConvert extends Convert<BsonReader, BsonWriter> { public final class BsonConvert extends Convert<BsonReader, BsonWriter> {
private static final ObjectPool<BsonReader> readerPool = new ObjectPool<>(Integer.getInteger("convert.bson.pool.size", 16), BsonReader.class); private static final ObjectPool<BsonReader> readerPool = BsonReader.createPool(Integer.getInteger("convert.bson.pool.size", 16));
private static final ObjectPool<BsonWriter> writerPool = new ObjectPool<>(Integer.getInteger("convert.bson.pool.size", 16), BsonWriter.class); private static final ObjectPool<BsonWriter> writerPool = BsonWriter.createPool(Integer.getInteger("convert.bson.pool.size", 16));
protected BsonConvert(Factory<BsonReader, BsonWriter> factory) { protected BsonConvert(Factory<BsonReader, BsonWriter> factory) {
super(factory); super(factory);

View File

@@ -5,18 +5,15 @@
*/ */
package com.wentch.redkale.convert.bson; package com.wentch.redkale.convert.bson;
import com.wentch.redkale.convert.ConvertException; import com.wentch.redkale.convert.*;
import com.wentch.redkale.convert.DeMember; import com.wentch.redkale.util.*;
import com.wentch.redkale.convert.Reader; import java.util.concurrent.atomic.*;
import com.wentch.redkale.util.ObjectPool.Poolable;
import com.wentch.redkale.util.Utility;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* *
* @author zhangjx * @author zhangjx
*/ */
public final class BsonReader implements Reader, Poolable { public final class BsonReader implements Reader {
public static final short SIGN_OBJECTB = (short) 0xBB; public static final short SIGN_OBJECTB = (short) 0xBB;
@@ -37,6 +34,10 @@ public final class BsonReader implements Reader, Poolable {
public BsonReader() { public BsonReader() {
} }
public static ObjectPool<BsonReader> createPool(int max) {
return new ObjectPool<>(max, (Object... params) -> new BsonReader(), (x) -> x.recycle());
}
public BsonReader(byte[] bytes) { public BsonReader(byte[] bytes) {
setBytes(bytes, 0, bytes.length); setBytes(bytes, 0, bytes.length);
} }
@@ -55,19 +56,15 @@ public final class BsonReader implements Reader, Poolable {
//this.limit = start + len - 1; //this.limit = start + len - 1;
} }
@Override protected boolean recycle() {
public void prepare() {
}
@Override
public void release() {
this.position = -1; this.position = -1;
//this.limit = -1; //this.limit = -1;
this.content = null; this.content = null;
return true;
} }
public void close() { public void close() {
this.release(); this.recycle();
} }
/** /**

View File

@@ -5,18 +5,14 @@
*/ */
package com.wentch.redkale.convert.bson; package com.wentch.redkale.convert.bson;
import com.wentch.redkale.util.Utility; import com.wentch.redkale.convert.*;
import com.wentch.redkale.util.Attribute; import com.wentch.redkale.util.*;
import com.wentch.redkale.convert.ConvertException;
import com.wentch.redkale.convert.Reader;
import com.wentch.redkale.convert.Writer;
import com.wentch.redkale.util.ObjectPool.Poolable;
/** /**
* *
* @author zhangjx * @author zhangjx
*/ */
public final class BsonWriter implements Writer, Poolable { public final class BsonWriter implements Writer {
private static final int defaultSize = Integer.getInteger("convert.bson.writer.buffer.defsize", 1024); private static final int defaultSize = Integer.getInteger("convert.bson.writer.buffer.defsize", 1024);
@@ -24,6 +20,10 @@ public final class BsonWriter implements Writer, Poolable {
private byte[] content; private byte[] content;
public static ObjectPool<BsonWriter> createPool(int max) {
return new ObjectPool<>(max, (Object... params) -> new BsonWriter(), (x) -> x.recycle());
}
public byte[] toArray() { public byte[] toArray() {
if (count == content.length) return content; if (count == content.length) return content;
byte[] newdata = new byte[count]; byte[] newdata = new byte[count];
@@ -75,16 +75,12 @@ public final class BsonWriter implements Writer, Poolable {
count += len; count += len;
} }
@Override protected boolean recycle() {
public void prepare() {
}
@Override
public void release() {
this.count = 0; this.count = 0;
if (this.content.length > defaultSize) { if (this.content.length > defaultSize) {
this.content = new byte[defaultSize]; this.content = new byte[defaultSize];
} }
return true;
} }
//------------------------------------------------------------------------ //------------------------------------------------------------------------

View File

@@ -5,10 +5,9 @@
*/ */
package com.wentch.redkale.convert.json; package com.wentch.redkale.convert.json;
import com.wentch.redkale.util.Utility; import com.wentch.redkale.convert.*;
import com.wentch.redkale.util.ObjectPool; import com.wentch.redkale.util.*;
import com.wentch.redkale.convert.Convert; import java.lang.reflect.*;
import java.lang.reflect.Type;
/** /**
* *
@@ -17,9 +16,9 @@ import java.lang.reflect.Type;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public final class JsonConvert extends Convert<JsonReader, JsonWriter> { public final class JsonConvert extends Convert<JsonReader, JsonWriter> {
private static final ObjectPool<JsonReader> readerPool = new ObjectPool<>(Integer.getInteger("convert.json.pool.size", 16), JsonReader.class); private static final ObjectPool<JsonReader> readerPool = JsonReader.createPool(Integer.getInteger("convert.json.pool.size", 16));
private static final ObjectPool<JsonWriter> writerPool = new ObjectPool<>(Integer.getInteger("convert.json.pool.size", 16), JsonWriter.class); private static final ObjectPool<JsonWriter> writerPool = JsonWriter.createPool(Integer.getInteger("convert.json.pool.size", 16));
protected JsonConvert(JsonFactory factory) { protected JsonConvert(JsonFactory factory) {
super(factory); super(factory);

View File

@@ -5,18 +5,15 @@
*/ */
package com.wentch.redkale.convert.json; package com.wentch.redkale.convert.json;
import com.wentch.redkale.convert.ConvertException; import com.wentch.redkale.convert.*;
import com.wentch.redkale.convert.DeMember; import com.wentch.redkale.util.*;
import com.wentch.redkale.convert.Reader; import java.util.concurrent.atomic.*;
import com.wentch.redkale.util.ObjectPool.Poolable;
import com.wentch.redkale.util.Utility;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* *
* @author zhangjx * @author zhangjx
*/ */
public final class JsonReader implements Reader, Poolable { public final class JsonReader implements Reader {
private int position = -1; private int position = -1;
@@ -24,6 +21,10 @@ public final class JsonReader implements Reader, Poolable {
private int limit; private int limit;
public static ObjectPool<JsonReader> createPool(int max) {
return new ObjectPool<>(max, (Object... params) -> new JsonReader(), (x) -> x.recycle());
}
public JsonReader() { public JsonReader() {
} }
@@ -53,19 +54,15 @@ public final class JsonReader implements Reader, Poolable {
this.limit = start + len - 1; this.limit = start + len - 1;
} }
@Override protected boolean recycle() {
public void prepare() {
}
@Override
public void release() {
this.position = -1; this.position = -1;
this.limit = -1; this.limit = -1;
this.text = null; this.text = null;
return true;
} }
public void close() { public void close() {
this.release(); this.recycle();
} }
/** /**

View File

@@ -5,10 +5,8 @@
*/ */
package com.wentch.redkale.convert.json; package com.wentch.redkale.convert.json;
import com.wentch.redkale.util.Attribute; import com.wentch.redkale.convert.*;
import com.wentch.redkale.util.Utility; import com.wentch.redkale.util.*;
import com.wentch.redkale.convert.Writer;
import com.wentch.redkale.util.ObjectPool.Poolable;
/** /**
* *
@@ -16,7 +14,7 @@ import com.wentch.redkale.util.ObjectPool.Poolable;
* *
* @author zhangjx * @author zhangjx
*/ */
public final class JsonWriter implements Writer, Poolable { public final class JsonWriter implements Writer {
private static final char[] CHARS_TUREVALUE = "true".toCharArray(); private static final char[] CHARS_TUREVALUE = "true".toCharArray();
@@ -28,6 +26,10 @@ public final class JsonWriter implements Writer, Poolable {
private char[] content; private char[] content;
public static ObjectPool<JsonWriter> createPool(int max) {
return new ObjectPool<>(max, (Object... params) -> new JsonWriter(), (x) -> x.recycle());
}
public JsonWriter() { public JsonWriter() {
this(defaultSize); this(defaultSize);
} }
@@ -87,16 +89,12 @@ public final class JsonWriter implements Writer, Poolable {
if (quote) content[count++] = '"'; if (quote) content[count++] = '"';
} }
@Override protected boolean recycle() {
public void prepare() {
}
@Override
public void release() {
this.count = 0; this.count = 0;
if (this.content.length > defaultSize) { if (this.content.length > defaultSize) {
this.content = new char[defaultSize]; this.content = new char[defaultSize];
} }
return true;
} }
public char[] toArray() { public char[] toArray() {

View File

@@ -1,61 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.wentch.redkale.net;
import java.nio.ByteBuffer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.*;
/**
*
* @author zhangjx
*/
public final class BufferPool {
private final int capacity;
private final ArrayBlockingQueue<ByteBuffer> queue;
private final AtomicLong creatCounter;
private final AtomicLong cycleCounter;
public BufferPool(AtomicLong creatCounter, AtomicLong cycleCounter, int capacity) {
this(creatCounter, cycleCounter, capacity, 0);
}
public BufferPool(AtomicLong creatCounter, AtomicLong cycleCounter, int capacity, int max) {
this.capacity = capacity;
this.queue = new ArrayBlockingQueue<>(Math.max(32, max));
this.creatCounter = creatCounter;
this.cycleCounter = cycleCounter;
}
public ByteBuffer poll() {
ByteBuffer result = queue.poll();
if (result == null) {
creatCounter.incrementAndGet();
result = ByteBuffer.allocateDirect(capacity);
}
return result;
}
public void offer(final ByteBuffer e) {
if (e != null && !e.isReadOnly() && e.capacity() == this.capacity) {
cycleCounter.incrementAndGet();
e.clear();
queue.offer(e);
}
}
public long getCreatCount() {
return creatCounter.longValue();
}
public long getCycleCount() {
return cycleCounter.longValue();
}
}

View File

@@ -1,228 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.wentch.redkale.net;
import java.nio.*;
/**
*
* @author zhangjx
*/
public final class ChunkBuffer {
final ByteBuffer buffer;
private final BufferPool pool;
ChunkBuffer(BufferPool pool, ByteBuffer buffer) {
this.pool = pool;
this.buffer = buffer;
}
public void release() {
//pool.offer(this);
}
public int limit() {
return buffer.limit();
}
public void limit(int limit) {
buffer.limit(limit);
}
public int position() {
return buffer.position();
}
public void position(int position) {
buffer.position(position);
}
public void clear() {
buffer.clear();
}
public ChunkBuffer flip() {
buffer.flip();
return this;
}
public int remaining() {
return buffer.remaining();
}
public boolean hasRemaining() {
return buffer.hasRemaining();
}
public boolean isReadOnly() {
return buffer.isReadOnly();
}
public boolean hasArray() {
return buffer.hasArray();
}
public byte[] array() {
return buffer.array();
}
public int arrayOffset() {
return buffer.arrayOffset();
}
public boolean isDirect() {
return buffer.isDirect();
}
public ChunkBuffer slice() {
buffer.slice();
return this;
}
public ChunkBuffer duplicate() {
buffer.duplicate();
return this;
}
public ChunkBuffer asReadOnlyBuffer() {
buffer.asReadOnlyBuffer();
return this;
}
public byte get() {
return buffer.get();
}
public ChunkBuffer put(byte b) {
buffer.put(b);
return this;
}
public byte get(int index) {
return buffer.get(index);
}
public ChunkBuffer put(int index, byte b) {
buffer.put(index, b);
return this;
}
public ChunkBuffer compact() {
buffer.compact();
return this;
}
public char getChar() {
return buffer.getChar();
}
public ChunkBuffer putChar(char value) {
buffer.putChar(value);
return this;
}
public char getChar(int index) {
return buffer.getChar(index);
}
public ChunkBuffer putChar(int index, char value) {
buffer.putChar(index, value);
return this;
}
public short getShort() {
return buffer.getShort();
}
public ChunkBuffer putShort(short value) {
buffer.putShort(value);
return this;
}
public short getShort(int index) {
return buffer.getShort(index);
}
public ChunkBuffer putShort(int index, short value) {
buffer.putShort(index, value);
return this;
}
public int getInt() {
return buffer.getInt();
}
public ChunkBuffer putInt(int value) {
buffer.putInt(value);
return this;
}
public int getInt(int index) {
return buffer.getInt(index);
}
public ChunkBuffer putInt(int index, int value) {
buffer.putInt(index, value);
return this;
}
public long getLong() {
return buffer.getLong();
}
public ChunkBuffer putLong(long value) {
buffer.putLong(value);
return this;
}
public long getLong(int index) {
return buffer.getLong(index);
}
public ChunkBuffer putLong(int index, long value) {
buffer.putLong(index, value);
return this;
}
public float getFloat() {
return buffer.getFloat();
}
public ChunkBuffer putFloat(float value) {
buffer.putFloat(value);
return this;
}
public float getFloat(int index) {
return buffer.getFloat(index);
}
public ChunkBuffer putFloat(int index, float value) {
buffer.putFloat(index, value);
return this;
}
public double getDouble() {
return buffer.getDouble();
}
public ChunkBuffer putDouble(double value) {
buffer.putDouble(value);
return this;
}
public double getDouble(int index) {
return buffer.getDouble(index);
}
public ChunkBuffer putDouble(int index, double value) {
buffer.putDouble(index, value);
return this;
}
}

View File

@@ -5,11 +5,12 @@
*/ */
package com.wentch.redkale.net; package com.wentch.redkale.net;
import com.wentch.redkale.watch.WatchFactory; import com.wentch.redkale.util.*;
import java.net.InetSocketAddress; import com.wentch.redkale.watch.*;
import java.nio.ByteBuffer; import java.net.*;
import java.nio.charset.Charset; import java.nio.*;
import java.util.concurrent.ExecutorService; import java.nio.charset.*;
import java.util.concurrent.*;
import java.util.logging.*; import java.util.logging.*;
/** /**
@@ -24,9 +25,9 @@ public class Context {
protected final ExecutorService executor; protected final ExecutorService executor;
protected final BufferPool bufferPool; protected final ObjectPool<ByteBuffer> bufferPool;
protected final ResponsePool responsePool; protected final ObjectPool<Response> responsePool;
protected final PrepareServlet prepare; protected final PrepareServlet prepare;
@@ -44,7 +45,7 @@ public class Context {
protected final WatchFactory watch; protected final WatchFactory watch;
public Context(long serverStartTime, Logger logger, ExecutorService executor, BufferPool bufferPool, ResponsePool responsePool, public Context(long serverStartTime, Logger logger, ExecutorService executor, ObjectPool<ByteBuffer> bufferPool, ObjectPool<Response> responsePool,
final int maxbody, Charset charset, InetSocketAddress address, final PrepareServlet prepare, final WatchFactory watch, final int maxbody, Charset charset, InetSocketAddress address, final PrepareServlet prepare, final WatchFactory watch,
final int readTimeoutSecond, final int writeTimeoutSecond) { final int readTimeoutSecond, final int writeTimeoutSecond) {
this.serverStartTime = serverStartTime; this.serverStartTime = serverStartTime;

View File

@@ -5,9 +5,10 @@
*/ */
package com.wentch.redkale.net; package com.wentch.redkale.net;
import java.nio.ByteBuffer; import com.wentch.redkale.util.*;
import java.nio.*;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.logging.Level; import java.util.logging.*;
/** /**
* *
@@ -31,7 +32,7 @@ public final class PrepareRunner implements Runnable {
@Override @Override
public void run() { public void run() {
final PrepareServlet prepare = context.prepare; final PrepareServlet prepare = context.prepare;
final ResponsePool responsePool = context.responsePool; final ObjectPool<? extends Response> responsePool = context.responsePool;
final ByteBuffer buffer = context.pollBuffer(); final ByteBuffer buffer = context.pollBuffer();
if (data != null) { if (data != null) {
final Response response = responsePool.poll(); final Response response = responsePool.poll();

View File

@@ -5,11 +5,11 @@
*/ */
package com.wentch.redkale.net; package com.wentch.redkale.net;
import java.io.IOException; import java.io.*;
import java.nio.ByteBuffer; import java.nio.*;
import java.nio.channels.CompletionHandler; import java.nio.channels.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
import java.util.logging.Level; import java.util.logging.*;
/** /**
* *
@@ -28,10 +28,11 @@ public abstract class PrepareServlet<R extends Request, P extends Response<R>> i
final int rs = request.readHeader(buffer); final int rs = request.readHeader(buffer);
if (rs < 0) { if (rs < 0) {
response.context.offerBuffer(buffer); response.context.offerBuffer(buffer);
illRequestCounter.incrementAndGet(); if (rs != Integer.MIN_VALUE) illRequestCounter.incrementAndGet();
response.finish(true); response.finish(true);
} else if (rs == 0) { } else if (rs == 0) {
response.context.offerBuffer(buffer); response.context.offerBuffer(buffer);
request.prepare();
this.execute(request, response); this.execute(request, response);
} else { } else {
buffer.clear(); buffer.clear();
@@ -48,6 +49,7 @@ public abstract class PrepareServlet<R extends Request, P extends Response<R>> i
request.channel.read(buffer, buffer, this); request.channel.read(buffer, buffer, this);
} else { } else {
response.context.offerBuffer(buffer); response.context.offerBuffer(buffer);
request.prepare();
try { try {
execute(request, response); execute(request, response);
} catch (Exception e) { } catch (Exception e) {

View File

@@ -5,7 +5,7 @@
*/ */
package com.wentch.redkale.net; package com.wentch.redkale.net;
import java.nio.ByteBuffer; import java.nio.*;
import java.util.*; import java.util.*;
/** /**
@@ -29,7 +29,7 @@ public abstract class Request {
} }
/** /**
* 返回值: -1数据不合法 0解析完毕 >0: 需再读取的字节数。 * 返回值:Integer.MIN_VALUE: 帧数据; -1数据不合法 0解析完毕 >0: 需再读取的字节数。
* *
* @param buffer * @param buffer
* @return * @return
@@ -38,6 +38,8 @@ public abstract class Request {
protected abstract void readBody(ByteBuffer buffer); protected abstract void readBody(ByteBuffer buffer);
protected abstract void prepare();
protected void recycle() { protected void recycle() {
createtime = 0; createtime = 0;
keepAlive = false; keepAlive = false;

View File

@@ -5,7 +5,7 @@
*/ */
package com.wentch.redkale.net; package com.wentch.redkale.net;
import java.nio.ByteBuffer; import java.nio.*;
import java.nio.channels.*; import java.nio.channels.*;
/** /**
@@ -53,7 +53,7 @@ public abstract class Response<R extends Request> {
return ch; return ch;
} }
protected void recycle() { protected boolean recycle() {
boolean keepAlive = request.keepAlive; boolean keepAlive = request.keepAlive;
request.recycle(); request.recycle();
if (channel != null) { if (channel != null) {
@@ -67,6 +67,7 @@ public abstract class Response<R extends Request> {
channel = null; channel = null;
} }
} }
return true;
} }
protected void refuseAlive() { protected void refuseAlive() {

View File

@@ -1,69 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.wentch.redkale.net;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.*;
/**
*
* @author zhangjx
* @param <T>
*/
public final class ResponsePool<T extends Response> {
public static interface ResponseFactory<T> {
T createResponse();
}
private final AtomicLong creatCounter;
private final AtomicLong cycleCounter;
private ResponseFactory<T> factory;
private final ArrayBlockingQueue<T> queue;
public ResponsePool(AtomicLong creatCounter, AtomicLong cycleCounter) {
this(creatCounter, cycleCounter, 0);
}
public ResponsePool(AtomicLong creatCounter, AtomicLong cycleCounter, int max) {
this.queue = new ArrayBlockingQueue<>(Math.max(32, max));
this.creatCounter = creatCounter;
this.cycleCounter = cycleCounter;
}
public void setResponseFactory(ResponseFactory<T> factory) {
this.factory = factory;
}
public T poll() {
T result = queue.poll();
if (result == null) {
creatCounter.incrementAndGet();
result = factory.createResponse();
}
return result;
}
public void offer(final T e) {
if (e != null) {
cycleCounter.incrementAndGet();
e.recycle();
queue.offer(e);
}
}
public long getCreatCount() {
return creatCounter.longValue();
}
public long getCycleCount() {
return cycleCounter.longValue();
}
}

View File

@@ -5,10 +5,11 @@
*/ */
package com.wentch.redkale.net; package com.wentch.redkale.net;
import com.wentch.redkale.watch.WatchFactory; import com.wentch.redkale.util.*;
import java.io.IOException; import com.wentch.redkale.watch.*;
import java.io.*;
import java.net.*; import java.net.*;
import java.nio.ByteBuffer; import java.nio.*;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
@@ -22,7 +23,7 @@ public final class Transport {
protected SocketAddress[] remoteAddres; protected SocketAddress[] remoteAddres;
protected BufferPool bufferPool; protected ObjectPool<ByteBuffer> bufferPool;
protected String name; protected String name;
@@ -49,7 +50,13 @@ public final class Transport {
this.group = g; this.group = g;
AtomicLong createBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber(Transport.class.getSimpleName() + "_" + protocol + ".Buffer.creatCounter"); AtomicLong createBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber(Transport.class.getSimpleName() + "_" + protocol + ".Buffer.creatCounter");
AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber(Transport.class.getSimpleName() + "_" + protocol + ".Buffer.cycleCounter"); AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber(Transport.class.getSimpleName() + "_" + protocol + ".Buffer.cycleCounter");
this.bufferPool = new BufferPool(createBufferCounter, cycleBufferCounter, 8192, bufferPoolSize); int rcapacity = 8192;
this.bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false;
e.clear();
return true;
});
this.remoteAddres = addresses; this.remoteAddres = addresses;
} }

View File

@@ -5,19 +5,16 @@
*/ */
package com.wentch.redkale.net.http; package com.wentch.redkale.net.http;
import com.wentch.redkale.convert.json.JsonConvert; import com.wentch.redkale.convert.json.*;
import com.wentch.redkale.convert.json.JsonFactory; import com.wentch.redkale.net.*;
import com.wentch.redkale.net.PrepareServlet; import com.wentch.redkale.util.*;
import com.wentch.redkale.net.ResponsePool; import com.wentch.redkale.watch.*;
import com.wentch.redkale.net.Context; import java.net.*;
import com.wentch.redkale.net.BufferPool; import java.nio.*;
import com.wentch.redkale.util.Utility; import java.nio.charset.*;
import com.wentch.redkale.watch.WatchFactory; import java.security.*;
import java.net.InetSocketAddress; import java.util.concurrent.*;
import java.nio.charset.Charset; import java.util.logging.*;
import java.security.SecureRandom;
import java.util.concurrent.ExecutorService;
import java.util.logging.Logger;
/** /**
* *
@@ -31,8 +28,8 @@ public final class HttpContext extends Context {
protected final SecureRandom random = new SecureRandom(); protected final SecureRandom random = new SecureRandom();
public HttpContext(long serverStartTime, Logger logger, ExecutorService executor, BufferPool bufferPool, public HttpContext(long serverStartTime, Logger logger, ExecutorService executor, ObjectPool<ByteBuffer> bufferPool,
ResponsePool responsePool, int maxbody, Charset charset, InetSocketAddress address, ObjectPool<Response> responsePool, int maxbody, Charset charset, InetSocketAddress address,
PrepareServlet prepare, WatchFactory watch, int readTimeoutSecond, int writeTimeoutSecond, String contextPath) { PrepareServlet prepare, WatchFactory watch, int readTimeoutSecond, int writeTimeoutSecond, String contextPath) {
super(serverStartTime, logger, executor, bufferPool, responsePool, maxbody, charset, super(serverStartTime, logger, executor, bufferPool, responsePool, maxbody, charset,
address, prepare, watch, readTimeoutSecond, writeTimeoutSecond); address, prepare, watch, readTimeoutSecond, writeTimeoutSecond);
@@ -59,7 +56,7 @@ public final class HttpContext extends Context {
return executor; return executor;
} }
protected ResponsePool getResponsePool() { protected ObjectPool<Response> getResponsePool() {
return responsePool; return responsePool;
} }

View File

@@ -5,11 +5,8 @@
*/ */
package com.wentch.redkale.net.http; package com.wentch.redkale.net.http;
import com.wentch.redkale.convert.json.JsonFactory; import com.wentch.redkale.convert.json.*;
import com.wentch.redkale.convert.json.JsonConvert; import com.wentch.redkale.net.*;
import com.wentch.redkale.net.AsyncConnection;
import com.wentch.redkale.net.Request;
import com.wentch.redkale.net.Context;
import com.wentch.redkale.util.AnyValue.DefaultAnyValue; import com.wentch.redkale.util.AnyValue.DefaultAnyValue;
import java.io.*; import java.io.*;
import java.net.*; import java.net.*;
@@ -158,6 +155,10 @@ public final class HttpRequest extends Request {
array.add(buffer, buffer.remaining()); array.add(buffer, buffer.remaining());
} }
@Override
protected void prepare() {
}
private void parseBody() { private void parseBody() {
if (this.boundary || array.isEmpty()) return; if (this.boundary || array.isEmpty()) return;
addParameter(array, 0, array.count()); addParameter(array, 0, array.count());

View File

@@ -288,7 +288,7 @@ public final class HttpResourceServlet extends HttpServlet {
if (range == null) { if (range == null) {
buffer.put(header); buffer.put(header);
buffer.flip(); buffer.flip();
response.send(buffer, file); response.finishFile(buffer, file);
return; return;
} }
range = range.substring("bytes=".length()); range = range.substring("bytes=".length());
@@ -300,7 +300,7 @@ public final class HttpResourceServlet extends HttpServlet {
buffer.flip(); buffer.flip();
final ByteBuffer body = this.content; final ByteBuffer body = this.content;
if (body == null) { if (body == null) {
response.send(buffer, file, start, end > 0 ? clen : end); response.finishFile(buffer, file, start, end > 0 ? clen : end);
} else { } else {
final ByteBuffer body2 = body.duplicate(); final ByteBuffer body2 = body.duplicate();
body2.position((int) (this.header.length + start)); body2.position((int) (this.header.length + start));

View File

@@ -6,9 +6,9 @@
package com.wentch.redkale.net.http; package com.wentch.redkale.net.http;
import com.wentch.redkale.net.*; import com.wentch.redkale.net.*;
import com.wentch.redkale.util.*;
import com.wentch.redkale.util.AnyValue.DefaultAnyValue; import com.wentch.redkale.util.AnyValue.DefaultAnyValue;
import com.wentch.redkale.util.AnyValue.Entry; import com.wentch.redkale.util.AnyValue.Entry;
import com.wentch.redkale.util.*;
import java.io.*; import java.io.*;
import java.lang.reflect.*; import java.lang.reflect.*;
import java.net.*; import java.net.*;
@@ -17,6 +17,7 @@ import java.nio.channels.*;
import java.nio.file.*; import java.nio.file.*;
import java.text.*; import java.text.*;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.*;
/** /**
* *
@@ -98,6 +99,10 @@ public final class HttpResponse extends Response<HttpRequest> {
private final DefaultAnyValue header = new DefaultAnyValue(); private final DefaultAnyValue header = new DefaultAnyValue();
public static ObjectPool<Response> createPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator<Response> creator) {
return new ObjectPool<>(creatCounter, cycleCounter, max, creator, (x) -> ((HttpResponse) x).recycle());
}
protected HttpResponse(HttpContext context, HttpRequest request) { protected HttpResponse(HttpContext context, HttpRequest request) {
super(context, request); super(context, request);
} }
@@ -108,14 +113,14 @@ public final class HttpResponse extends Response<HttpRequest> {
} }
@Override @Override
protected void recycle() { protected boolean recycle() {
this.status = 200; this.status = 200;
this.contentLength = -1; this.contentLength = -1;
this.contentType = null; this.contentType = null;
this.cookies = null; this.cookies = null;
this.headsended = false; this.headsended = false;
this.header.clear(); this.header.clear();
super.recycle(); return super.recycle();
} }
protected String getHttpCode(int status) { protected String getHttpCode(int status) {
@@ -143,22 +148,22 @@ public final class HttpResponse extends Response<HttpRequest> {
} }
} }
public void sendJson(Object obj) { public void finishJson(Object obj) {
this.contentType = "text/plain; charset=utf-8"; this.contentType = "text/plain; charset=utf-8";
sendString(request.convert.convertTo(obj)); finishString(request.convert.convertTo(obj));
} }
public void sendJson(Type type, Object obj) { public void finishJson(Type type, Object obj) {
this.contentType = "text/plain; charset=utf-8"; this.contentType = "text/plain; charset=utf-8";
sendString(request.convert.convertTo(type, obj)); finishString(request.convert.convertTo(type, obj));
} }
public void sendJson(Object... objs) { public void finishJson(Object... objs) {
this.contentType = "text/plain; charset=utf-8"; this.contentType = "text/plain; charset=utf-8";
sendString(request.convert.convertTo(objs)); finishString(request.convert.convertTo(objs));
} }
public void sendString(String obj) { public void finishString(String obj) {
if (obj == null) obj = "null"; if (obj == null) obj = "null";
if (context.getCharset() == null) { if (context.getCharset() == null) {
final char[] chars = Utility.charArray(obj); final char[] chars = Utility.charArray(obj);
@@ -186,16 +191,16 @@ public final class HttpResponse extends Response<HttpRequest> {
headbuf.flip(); headbuf.flip();
super.send(headbuf, headbuf, finishHandler); super.send(headbuf, headbuf, finishHandler);
} else { } else {
sendString(message); finishString(message);
} }
} }
public void finish304() { public void finish304() {
finish(buffer304.duplicate()); super.finish(buffer304.duplicate());
} }
public void finish404() { public void finish404() {
finish(buffer404.duplicate()); super.finish(buffer404.duplicate());
} }
@Override @Override
@@ -213,11 +218,11 @@ public final class HttpResponse extends Response<HttpRequest> {
} }
} }
public <A> void send(File file) throws IOException { public <A> void finish(File file) throws IOException {
send(file, null); finishFile(file, null);
} }
protected <A> void send(final File file, final ByteBuffer fileBody) throws IOException { protected <A> void finishFile(final File file, final ByteBuffer fileBody) throws IOException {
if (file == null || !file.isFile() || !file.canRead()) { if (file == null || !file.isFile() || !file.canRead()) {
finish404(); finish404();
return; return;
@@ -250,7 +255,7 @@ public final class HttpResponse extends Response<HttpRequest> {
ByteBuffer buffer = createHeader(); ByteBuffer buffer = createHeader();
buffer.flip(); buffer.flip();
if (fileBody == null) { if (fileBody == null) {
send(buffer, file, start, len); HttpResponse.this.finishFile(buffer, file, start, len);
} else { } else {
final ByteBuffer body = fileBody.duplicate().asReadOnlyBuffer(); final ByteBuffer body = fileBody.duplicate().asReadOnlyBuffer();
if (start >= 0) { if (start >= 0) {
@@ -276,11 +281,11 @@ public final class HttpResponse extends Response<HttpRequest> {
} }
} }
protected <A> void send(ByteBuffer buffer, File file) throws IOException { protected <A> void finishFile(ByteBuffer buffer, File file) throws IOException {
send(buffer, file, -1L, -1L); finishFile(buffer, file, -1L, -1L);
} }
protected <A> void send(ByteBuffer buffer, File file, long offset, long length) throws IOException { protected <A> void finishFile(ByteBuffer buffer, File file, long offset, long length) throws IOException {
send(buffer, buffer, new TransferFileHandler(AsynchronousFileChannel.open(file.toPath(), options, ((HttpContext) context).getExecutor()), offset, length)); send(buffer, buffer, new TransferFileHandler(AsynchronousFileChannel.open(file.toPath(), options, ((HttpContext) context).getExecutor()), offset, length));
} }

View File

@@ -5,12 +5,10 @@
*/ */
package com.wentch.redkale.net.http; package com.wentch.redkale.net.http;
import com.wentch.redkale.util.AnyValue; import com.wentch.redkale.net.*;
import com.wentch.redkale.net.Server; import com.wentch.redkale.util.*;
import com.wentch.redkale.net.ResponsePool; import com.wentch.redkale.watch.*;
import com.wentch.redkale.net.Context; import java.nio.*;
import com.wentch.redkale.net.BufferPool;
import com.wentch.redkale.watch.WatchFactory;
import java.util.*; import java.util.*;
import java.util.AbstractMap.SimpleEntry; import java.util.AbstractMap.SimpleEntry;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
@@ -50,7 +48,13 @@ public final class HttpServer extends Server {
final int port = this.address.getPort(); final int port = this.address.getPort();
AtomicLong createBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Buffer.creatCounter"); AtomicLong createBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Buffer.creatCounter");
AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Buffer.cycleCounter"); AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Buffer.cycleCounter");
BufferPool bufferPool = new BufferPool(createBufferCounter, cycleBufferCounter, Math.max(this.capacity, 8 * 1024), this.bufferPoolSize); int rcapacity = Math.max(this.capacity, 8 * 1024);
ObjectPool<ByteBuffer> bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, this.bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false;
e.clear();
return true;
});
HttpPrepareServlet prepare = new HttpPrepareServlet(); HttpPrepareServlet prepare = new HttpPrepareServlet();
this.servlets.entrySet().stream().forEach((en) -> { this.servlets.entrySet().stream().forEach((en) -> {
prepare.addHttpServlet(en.getKey().getKey(), en.getKey().getValue(), en.getValue()); prepare.addHttpServlet(en.getKey().getKey(), en.getKey().getValue(), en.getValue());
@@ -58,10 +62,10 @@ public final class HttpServer extends Server {
this.servlets.clear(); this.servlets.clear();
AtomicLong createResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Response.creatCounter"); AtomicLong createResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Response.creatCounter");
AtomicLong cycleResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Response.cycleCounter"); AtomicLong cycleResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("HTTP_" + port + ".Response.cycleCounter");
HttpContext httpcontext = new HttpContext(this.serverStartTime, this.logger, executor, bufferPool, ObjectPool<Response> responsePool = HttpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null);
new ResponsePool(createResponseCounter, cycleResponseCounter, this.responsePoolSize), HttpContext httpcontext = new HttpContext(this.serverStartTime, this.logger, executor, bufferPool, responsePool,
this.maxbody, this.charset, this.address, prepare, this.watch, this.readTimeoutSecond, this.writeTimeoutSecond, contextPath); this.maxbody, this.charset, this.address, prepare, this.watch, this.readTimeoutSecond, this.writeTimeoutSecond, contextPath);
httpcontext.getResponsePool().setResponseFactory(() -> new HttpResponse(httpcontext, new HttpRequest(httpcontext, httpcontext.jsonFactory))); responsePool.setCreator((Object... params) -> new HttpResponse(httpcontext, new HttpRequest(httpcontext, httpcontext.jsonFactory)));
return httpcontext; return httpcontext;
} }

View File

@@ -5,17 +5,16 @@
*/ */
package com.wentch.redkale.net.sncp; package com.wentch.redkale.net.sncp;
import com.wentch.redkale.net.Async; import com.wentch.redkale.convert.bson.*;
import com.wentch.redkale.net.Transport; import com.wentch.redkale.net.*;
import com.wentch.redkale.convert.bson.BsonConvert;
import com.wentch.redkale.service.MultiService;
import com.wentch.redkale.service.RemoteOn;
import static com.wentch.redkale.net.sncp.SncpRequest.HEADER_SIZE; import static com.wentch.redkale.net.sncp.SncpRequest.HEADER_SIZE;
import com.wentch.redkale.util.TwoLong; import com.wentch.redkale.service.*;
import com.wentch.redkale.util.*;
import java.lang.reflect.*; import java.lang.reflect.*;
import java.nio.ByteBuffer; import java.nio.*;
import java.util.*; import java.util.*;
import java.util.logging.Logger; import java.util.concurrent.*;
import java.util.logging.*;
/** /**
* *
@@ -25,6 +24,8 @@ public final class SncpClient {
private final Logger logger = Logger.getLogger(SncpClient.class.getSimpleName()); private final Logger logger = Logger.getLogger(SncpClient.class.getSimpleName());
private final boolean debug = logger.isLoggable(Level.FINEST);
protected static final class SncpAction { protected static final class SncpAction {
protected final TwoLong actionid; protected final TwoLong actionid;
@@ -115,22 +116,7 @@ public final class SncpClient {
return convert.convertFrom(actions[index].resultTypes, send(convert, transport, index, params)); return convert.convertFrom(actions[index].resultTypes, send(convert, transport, index, params));
} }
private byte[] send(final BsonConvert convert, Transport transport, final int index, Object... params) { private void fillHeader(ByteBuffer buffer, long seqid, TwoLong actionid, int frameCount, int frameIndex, int bodyLength) {
int bodyLength = 2;
Type[] myparamtypes = actions[index].paramTypes;
byte[][] bytesarray = new byte[params.length][];
for (int i = 0; i < bytesarray.length; i++) {
bytesarray[i] = convert.convertTo(myparamtypes[i], params[i]);
bodyLength += 2 + bytesarray[i].length;
}
ByteBuffer buffer = transport.pollBuffer();
if ((HEADER_SIZE + bodyLength) > buffer.limit()) {
throw new RuntimeException("send buffer size too large(" + (HEADER_SIZE + bodyLength) + ")");
}
final SncpAction action = actions[index];
final long seqid = System.nanoTime();
final TwoLong actionid = action.actionid;
{
//---------------------head---------------------------------- //---------------------head----------------------------------
buffer.putLong(seqid); //序列号 buffer.putLong(seqid); //序列号
buffer.putChar((char) HEADER_SIZE); //header长度 buffer.putChar((char) HEADER_SIZE); //header长度
@@ -138,13 +124,69 @@ public final class SncpClient {
buffer.putLong(this.nameid); buffer.putLong(this.nameid);
buffer.putLong(actionid.getFirst()); buffer.putLong(actionid.getFirst());
buffer.putLong(actionid.getSecond()); buffer.putLong(actionid.getSecond());
buffer.put((byte) 0); //剩下还有多少帧数 0表示只有当前一帧数据 buffer.put((byte) frameCount); //数据的帧数, 最小值为1
buffer.put((byte) frameIndex); //数据的帧数序号, 从frame.count-1开始, 0表示最后一帧
buffer.putInt(0); //结果码, 请求方固定传0 buffer.putInt(0); //结果码, 请求方固定传0
buffer.putChar((char) bodyLength); //body长度 buffer.putInt(bodyLength); //body长度
}
private byte[] send(final BsonConvert convert, Transport transport, final int index, Object... params) {
int bodyLength = 2;
Type[] myparamtypes = actions[index].paramTypes;
byte[][] bytesarray = new byte[params.length][];
for (int i = 0; i < bytesarray.length; i++) {
bytesarray[i] = convert.convertTo(myparamtypes[i], params[i]);
bodyLength += 4 + bytesarray[i].length;
}
final SncpAction action = actions[index];
final long seqid = System.nanoTime();
final TwoLong actionid = action.actionid;
ByteBuffer buffer = transport.pollBuffer();
if ((HEADER_SIZE + bodyLength) > buffer.limit()) {
if (debug) logger.finest(this.serviceid + "," + this.nameid + "," + action + " sncp length : " + (HEADER_SIZE + bodyLength));
final int patch = bodyLength / (buffer.capacity() - HEADER_SIZE) + (bodyLength % (buffer.capacity() - HEADER_SIZE) > 0 ? 1 : 0);
AsyncConnection conn = transport.pollConnection();
final int readto = conn.getReadTimeoutSecond();
final int writeto = conn.getWriteTimeoutSecond();
int pos = 0;
final byte[] all = new byte[bodyLength];
all[pos++] = (byte) ((bytesarray.length & 0xff00) >> 8);
all[pos++] = (byte) (bytesarray.length & 0xff);
for (byte[] bs : bytesarray) {
all[pos++] = (byte) ((bs.length & 0xff000000) >> 24);
all[pos++] = (byte) ((bs.length & 0xff0000) >> 16);
all[pos++] = (byte) ((bs.length & 0xff00) >> 8);
all[pos++] = (byte) (bs.length & 0xff);
System.arraycopy(bs, 0, all, pos, bs.length);
pos += bs.length;
}
if (pos != all.length) logger.warning(this.serviceid + "," + this.nameid + "," + action + " sncp body.length : " + all.length + ", but pos=" + pos);
try {
pos = 0;
for (int i = patch - 1; i >= 0; i--) {
fillHeader(buffer, seqid, actionid, patch, i, bodyLength);
int len = Math.min(buffer.remaining(), all.length - pos);
buffer.put(all, pos, len);
pos += len;
buffer.flip();
conn.write(buffer).get(writeto > 0 ? writeto : 5, TimeUnit.SECONDS);
buffer.clear();
}
conn.read(buffer).get(readto > 0 ? readto : 5, TimeUnit.SECONDS);
buffer.flip();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
transport.offerConnection(conn);
}
} else {
{
//---------------------head----------------------------------
fillHeader(buffer, seqid, actionid, 1, 0, bodyLength);
//---------------------body---------------------------------- //---------------------body----------------------------------
buffer.putChar((char) bytesarray.length); //参数数组大小 buffer.putChar((char) bytesarray.length); //参数数组大小
for (byte[] bs : bytesarray) { for (byte[] bs : bytesarray) {
buffer.putChar((char) bs.length); buffer.putInt(bs.length);
buffer.put(bs); buffer.put(bs);
} }
buffer.flip(); buffer.flip();
@@ -154,7 +196,7 @@ public final class SncpClient {
return null; return null;
} }
buffer = transport.send(buffer); buffer = transport.send(buffer);
}
long rseqid = buffer.getLong(); long rseqid = buffer.getLong();
if (rseqid != seqid) throw new RuntimeException("sncp send seqid = " + seqid + ", but receive seqid =" + rseqid); if (rseqid != seqid) throw new RuntimeException("sncp send seqid = " + seqid + ", but receive seqid =" + rseqid);
if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp buffer receive header.length not " + HEADER_SIZE); if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp buffer receive header.length not " + HEADER_SIZE);
@@ -165,10 +207,13 @@ public final class SncpClient {
long ractionid1 = buffer.getLong(); long ractionid1 = buffer.getLong();
long ractionid2 = buffer.getLong(); long ractionid2 = buffer.getLong();
if (!actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp send actionid = " + actionid + ", but receive actionid =(" + ractionid1 + "_" + ractionid2 + ")"); if (!actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp send actionid = " + actionid + ", but receive actionid =(" + ractionid1 + "_" + ractionid2 + ")");
int frame = buffer.get(); int frameCount = buffer.get();
if (frameCount < 1) throw new RuntimeException("sncp send nameid = " + nameid + ", but frame.count =" + frameCount);
int frameIndex = buffer.get();
if (frameIndex < 0 || frameIndex >= frameCount) throw new RuntimeException("sncp send nameid = " + nameid + ", but frame.count =" + frameCount + " & frame.index =" + frameIndex);
int retcode = buffer.getInt(); int retcode = buffer.getInt();
if (retcode != 0) throw new RuntimeException("remote service deal error (receive retcode =" + retcode + ")"); if (retcode != 0) throw new RuntimeException("remote service deal error (receive retcode =" + retcode + ")");
int bodylen = buffer.getChar(); int bodylen = buffer.getInt();
byte[] bytes = new byte[bodylen]; byte[] bytes = new byte[bodylen];
buffer.get(bytes); buffer.get(bytes);
transport.offerBuffer(buffer); transport.offerBuffer(buffer);

View File

@@ -5,17 +5,16 @@
*/ */
package com.wentch.redkale.net.sncp; package com.wentch.redkale.net.sncp;
import com.wentch.redkale.convert.bson.BsonConvert; import com.wentch.redkale.convert.bson.*;
import com.wentch.redkale.convert.bson.BsonFactory; import com.wentch.redkale.net.*;
import com.wentch.redkale.net.ResponsePool; import com.wentch.redkale.util.*;
import com.wentch.redkale.net.BufferPool; import com.wentch.redkale.watch.*;
import com.wentch.redkale.net.PrepareServlet; import java.net.*;
import com.wentch.redkale.net.Context; import java.nio.*;
import com.wentch.redkale.watch.WatchFactory; import java.nio.charset.*;
import java.net.InetSocketAddress; import java.util.*;
import java.nio.charset.Charset; import java.util.concurrent.*;
import java.util.concurrent.ExecutorService; import java.util.logging.*;
import java.util.logging.Logger;
/** /**
* *
@@ -23,16 +22,66 @@ import java.util.logging.Logger;
*/ */
public final class SncpContext extends Context { public final class SncpContext extends Context {
protected static class RequestEntry {
protected final long seqid;
protected final byte[] body;
protected final long time = System.currentTimeMillis();
private int received;
public RequestEntry(long seqid, byte[] body) {
this.seqid = seqid;
this.body = body;
}
public void add(ByteBuffer buffer, int pos) {
this.received += buffer.remaining();
buffer.get(body, pos, buffer.remaining());
}
public boolean isCompleted() {
return this.body.length <= this.received;
}
}
private final ConcurrentHashMap<Long, RequestEntry> requests = new ConcurrentHashMap<>();
protected final BsonFactory bsonFactory; protected final BsonFactory bsonFactory;
public SncpContext(long serverStartTime, Logger logger, ExecutorService executor, BufferPool bufferPool, public SncpContext(long serverStartTime, Logger logger, ExecutorService executor, ObjectPool<ByteBuffer> bufferPool,
ResponsePool responsePool, int maxbody, Charset charset, InetSocketAddress address, ObjectPool<Response> responsePool, int maxbody, Charset charset, InetSocketAddress address,
PrepareServlet prepare, WatchFactory watch, int readTimeoutSecond, int writeTimeoutSecond) { PrepareServlet prepare, WatchFactory watch, int readTimeoutSecond, int writeTimeoutSecond) {
super(serverStartTime, logger, executor, bufferPool, responsePool, maxbody, charset, super(serverStartTime, logger, executor, bufferPool, responsePool, maxbody, charset,
address, prepare, watch, readTimeoutSecond, writeTimeoutSecond); address, prepare, watch, readTimeoutSecond, writeTimeoutSecond);
this.bsonFactory = BsonFactory.root(); this.bsonFactory = BsonFactory.root();
} }
protected RequestEntry addRequestEntity(long seqid, byte[] bodys) {
RequestEntry entry = new RequestEntry(seqid, bodys);
requests.put(seqid, entry);
return entry;
}
protected void expireRequestEntry(long milliSecond) {
if (requests.size() < 32) return;
List<Long> seqids = new ArrayList<>();
long t = System.currentTimeMillis() - milliSecond;
requests.forEach((x, y) -> {
if (y.time < t) seqids.add(x);
});
for (long seqid : seqids) {
requests.remove(seqid);
}
}
protected RequestEntry getRequestEntity(long seqid) {
return requests.get(seqid);
}
protected WatchFactory getWatchFactory() { protected WatchFactory getWatchFactory() {
return watch; return watch;
} }
@@ -41,7 +90,7 @@ public final class SncpContext extends Context {
return executor; return executor;
} }
protected ResponsePool getResponsePool() { protected ObjectPool<Response> getResponsePool() {
return responsePool; return responsePool;
} }

View File

@@ -5,12 +5,11 @@
*/ */
package com.wentch.redkale.net.sncp; package com.wentch.redkale.net.sncp;
import com.wentch.redkale.convert.bson.BsonConvert; import com.wentch.redkale.convert.bson.*;
import com.wentch.redkale.convert.bson.BsonFactory; import com.wentch.redkale.net.*;
import com.wentch.redkale.net.Request; import com.wentch.redkale.net.sncp.SncpContext.RequestEntry;
import com.wentch.redkale.net.Context; import com.wentch.redkale.util.*;
import com.wentch.redkale.util.TwoLong; import java.nio.*;
import java.nio.ByteBuffer;
/** /**
* *
@@ -18,13 +17,15 @@ import java.nio.ByteBuffer;
*/ */
public final class SncpRequest extends Request { public final class SncpRequest extends Request {
public static final int HEADER_SIZE = 49; public static final int HEADER_SIZE = 52;
protected final BsonConvert convert; protected final BsonConvert convert;
private long seqid; private long seqid;
private int frame; private int framecount;
private int frameindex;
private long nameid; private long nameid;
@@ -38,14 +39,16 @@ public final class SncpRequest extends Request {
private boolean ping; private boolean ping;
protected SncpRequest(Context context, BsonFactory factory) { private byte[] body;
protected SncpRequest(SncpContext context, BsonFactory factory) {
super(context); super(context);
this.convert = factory.getConvert(); this.convert = factory.getConvert();
} }
@Override @Override
protected int readHeader(ByteBuffer buffer) { protected int readHeader(ByteBuffer buffer) {
if (buffer.remaining() < 8) { if (buffer.remaining() < HEADER_SIZE) {
this.ping = true; this.ping = true;
return 0; return 0;
} }
@@ -58,29 +61,66 @@ public final class SncpRequest extends Request {
this.serviceid = buffer.getLong(); this.serviceid = buffer.getLong();
this.nameid = buffer.getLong(); this.nameid = buffer.getLong();
this.actionid = new TwoLong(buffer.getLong(), buffer.getLong()); this.actionid = new TwoLong(buffer.getLong(), buffer.getLong());
this.frame = buffer.get(); this.framecount = buffer.get();
this.frameindex = buffer.get();
if (buffer.getInt() != 0) { if (buffer.getInt() != 0) {
context.getLogger().finest("sncp buffer header.retcode not 0"); context.getLogger().finest("sncp buffer header.retcode not 0");
return -1; return -1;
} }
this.bodylength = buffer.getChar(); this.bodylength = buffer.getInt();
//---------------------body---------------------------------- //---------------------body----------------------------------
if (this.framecount == 1) { //只有一帧的数据
int paramlen = buffer.getChar(); int paramlen = buffer.getChar();
byte[][] bbytes = new byte[paramlen + 1][]; //占位第0个byte[] byte[][] bbytes = new byte[paramlen + 1][]; //占位第0个byte[]
for (int i = 1; i <= paramlen; i++) { for (int i = 1; i <= paramlen; i++) {
byte[] bytes = new byte[(int) buffer.getChar()]; byte[] bytes = new byte[buffer.getInt()];
buffer.get(bytes); buffer.get(bytes);
bbytes[i] = bytes; bbytes[i] = bytes;
} }
this.paramBytes = bbytes; this.paramBytes = bbytes;
return 0; return 0;
} }
//多帧数据
final SncpContext scontext = (SncpContext) this.context;
RequestEntry entry = scontext.getRequestEntity(this.seqid);
if (entry == null) entry = scontext.addRequestEntity(this.seqid, new byte[this.bodylength]);
entry.add(buffer, (this.framecount - this.frameindex - 1) * (buffer.capacity() - HEADER_SIZE));
if (entry.isCompleted()) { //数据读取完毕
this.body = entry.body;
return 0;
} else {
scontext.expireRequestEntry(10 * 1000); //10秒过期
}
return Integer.MIN_VALUE; //多帧数据返回 Integer.MIN_VALUE
}
@Override
protected void readBody(ByteBuffer buffer) {
}
@Override
protected void prepare() {
if (this.body == null) return;
byte[] bytes = this.body;
int pos = 0;
int paramlen = ((0xff00 & (bytes[pos++] << 8)) | (0xff & bytes[pos++]));
byte[][] bbytes = new byte[paramlen + 1][]; //占位第0个byte[]
for (int i = 1; i <= paramlen; i++) {
byte[] bs = new byte[(0xff000000 & (bytes[pos++] << 24)) | (0xff0000 & (bytes[pos++] << 16))
| (0xff00 & (bytes[pos++] << 8)) | (0xff & bytes[pos++])];
System.arraycopy(bytes, pos, bs, 0, bs.length);
pos += bs.length;
bbytes[i] = bs;
}
this.paramBytes = bbytes;
}
@Override @Override
public String toString() { public String toString() {
return SncpRequest.class.getSimpleName() + "{seqid=" + this.seqid return SncpRequest.class.getSimpleName() + "{seqid=" + this.seqid
+ ",serviceid=" + this.serviceid + ",actionid=" + this.actionid + ",serviceid=" + this.serviceid + ",actionid=" + this.actionid
+ ",frame=" + this.frame + ",bodylength=" + this.bodylength + "}"; + ",framecount=" + this.framecount + ",frameindex=" + this.frameindex + ",bodylength=" + this.bodylength + "}";
} }
protected void setKeepAlive(boolean keepAlive) { protected void setKeepAlive(boolean keepAlive) {
@@ -91,17 +131,15 @@ public final class SncpRequest extends Request {
return this.keepAlive; return this.keepAlive;
} }
@Override
protected void readBody(ByteBuffer buffer) {
}
@Override @Override
protected void recycle() { protected void recycle() {
this.seqid = 0; this.seqid = 0;
this.frame = 0; this.framecount = 0;
this.frameindex = 0;
this.serviceid = 0; this.serviceid = 0;
this.actionid = null; this.actionid = null;
this.bodylength = 0; this.bodylength = 0;
this.body = null;
this.paramBytes = null; this.paramBytes = null;
this.ping = false; this.ping = false;
super.recycle(); super.recycle();

View File

@@ -5,10 +5,10 @@
*/ */
package com.wentch.redkale.net.sncp; package com.wentch.redkale.net.sncp;
import com.wentch.redkale.net.Response; import com.wentch.redkale.net.*;
import com.wentch.redkale.net.Context; import com.wentch.redkale.util.*;
import com.wentch.redkale.util.TwoLong; import java.nio.*;
import java.nio.ByteBuffer; import java.util.concurrent.atomic.*;
/** /**
* *
@@ -24,6 +24,10 @@ public final class SncpResponse extends Response<SncpRequest> {
public static final int RETCODE_THROWEXCEPTION = 10011; //内部异常 public static final int RETCODE_THROWEXCEPTION = 10011; //内部异常
public static ObjectPool<Response> createPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator<Response> creator) {
return new ObjectPool<>(creatCounter, cycleCounter, max, creator, (x) -> ((SncpResponse) x).recycle());
}
protected SncpResponse(Context context, SncpRequest request) { protected SncpResponse(Context context, SncpRequest request) {
super(context, request); super(context, request);
} }
@@ -38,9 +42,10 @@ public final class SncpResponse extends Response<SncpRequest> {
TwoLong actionid = request.getActionid(); TwoLong actionid = request.getActionid();
buffer.putLong(actionid.getFirst()); buffer.putLong(actionid.getFirst());
buffer.putLong(actionid.getSecond()); buffer.putLong(actionid.getSecond());
buffer.put((byte) 0); buffer.put((byte) 1); // frame count
buffer.put((byte) 0); //frame index
buffer.putInt(retcode); buffer.putInt(retcode);
buffer.putChar((char) (bytes == null ? 0 : bytes.length)); buffer.putInt((bytes == null ? 0 : bytes.length));
//---------------------body---------------------------------- //---------------------body----------------------------------
if (bytes != null) buffer.put(bytes); if (bytes != null) buffer.put(bytes);
buffer.flip(); buffer.flip();

View File

@@ -5,13 +5,11 @@
*/ */
package com.wentch.redkale.net.sncp; package com.wentch.redkale.net.sncp;
import com.wentch.redkale.convert.bson.BsonConvert; import com.wentch.redkale.convert.bson.*;
import com.wentch.redkale.convert.bson.BsonFactory; import com.wentch.redkale.net.*;
import com.wentch.redkale.net.ResponsePool; import com.wentch.redkale.util.*;
import com.wentch.redkale.net.BufferPool; import com.wentch.redkale.watch.*;
import com.wentch.redkale.net.Server; import java.nio.*;
import com.wentch.redkale.net.Context;
import com.wentch.redkale.watch.WatchFactory;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
@@ -42,17 +40,23 @@ public final class SncpServer extends Server {
final int port = this.address.getPort(); final int port = this.address.getPort();
AtomicLong createBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Buffer.creatCounter"); AtomicLong createBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Buffer.creatCounter");
AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Buffer.cycleCounter"); AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Buffer.cycleCounter");
BufferPool bufferPool = new BufferPool(createBufferCounter, cycleBufferCounter, Math.max(this.capacity, 8 * 1024), this.bufferPoolSize); int rcapacity = Math.max(this.capacity, 8 * 1024);
ObjectPool<ByteBuffer> bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, this.bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false;
e.clear();
return true;
});
SncpPrepareServlet prepare = new SncpPrepareServlet(); SncpPrepareServlet prepare = new SncpPrepareServlet();
final BsonConvert convert = BsonFactory.root().getConvert(); final BsonConvert convert = BsonFactory.root().getConvert();
this.services.stream().forEach(x -> x.getNames().forEach(y -> prepare.addSncpServlet(new SncpDynServlet(convert, y, x.getService(), x.getServiceConf())))); this.services.stream().forEach(x -> x.getNames().forEach(y -> prepare.addSncpServlet(new SncpDynServlet(convert, y, x.getService(), x.getServiceConf()))));
this.services.clear(); this.services.clear();
AtomicLong createResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Response.creatCounter"); AtomicLong createResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Response.creatCounter");
AtomicLong cycleResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Response.cycleCounter"); AtomicLong cycleResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Response.cycleCounter");
SncpContext sncpcontext = new SncpContext(this.serverStartTime, this.logger, executor, bufferPool, ObjectPool<Response> responsePool = SncpResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null);
new ResponsePool(createResponseCounter, cycleResponseCounter, this.responsePoolSize), SncpContext sncpcontext = new SncpContext(this.serverStartTime, this.logger, executor, bufferPool, responsePool,
this.maxbody, this.charset, this.address, prepare, this.watch, this.readTimeoutSecond, this.writeTimeoutSecond); this.maxbody, this.charset, this.address, prepare, this.watch, this.readTimeoutSecond, this.writeTimeoutSecond);
sncpcontext.getResponsePool().setResponseFactory(() -> new SncpResponse(sncpcontext, new SncpRequest(sncpcontext, sncpcontext.bsonFactory))); responsePool.setCreator((Object... params) -> new SncpResponse(sncpcontext, new SncpRequest(sncpcontext, sncpcontext.bsonFactory)));
return sncpcontext; return sncpcontext;
} }

View File

@@ -4,58 +4,77 @@
*/ */
package com.wentch.redkale.util; package com.wentch.redkale.util;
import java.util.Queue; import java.util.*;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.*;
/** /**
* *
* @author zhangjx * @author zhangjx
* @param <T> * @param <T>
*/ */
public final class ObjectPool<T extends ObjectPool.Poolable> { public final class ObjectPool<T> {
public static interface Poolable {
public void prepare();
public void release();
}
private final Queue<T> queue; private final Queue<T> queue;
private final Creator<T> creator; private Creator<T> creator;
public ObjectPool(Class<T> clazz) { private final Predicate<T> recycler;
this(2, clazz);
private final AtomicLong creatCounter;
private final AtomicLong cycleCounter;
public ObjectPool(Class<T> clazz, Predicate<T> recycler) {
this(2, clazz, recycler);
} }
public ObjectPool(int max, Class<T> clazz) { public ObjectPool(int max, Class<T> clazz, Predicate<T> recycler) {
this(max, Creator.create(clazz)); this(max, Creator.create(clazz), recycler);
} }
public ObjectPool(Creator<T> creator) { public ObjectPool(Creator<T> creator, Predicate<T> recycler) {
this(2, creator); this(2, creator, recycler);
} }
public ObjectPool(int max, Creator<T> creator) { public ObjectPool(int max, Creator<T> creator, Predicate<T> recycler) {
this(null, null, max, creator, recycler);
}
public ObjectPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator<T> creator, Predicate<T> recycler) {
this.creatCounter = creatCounter;
this.cycleCounter = cycleCounter;
this.creator = creator; this.creator = creator;
this.recycler = recycler;
this.queue = new ArrayBlockingQueue<>(Math.max(Runtime.getRuntime().availableProcessors() * 2, max)); this.queue = new ArrayBlockingQueue<>(Math.max(Runtime.getRuntime().availableProcessors() * 2, max));
} }
public void setCreator(Creator<T> creator) {
this.creator = creator;
}
public T poll() { public T poll() {
T result = queue.poll(); T result = queue.poll();
if (result == null) { if (result == null) {
if (creatCounter != null) creatCounter.incrementAndGet();
result = this.creator.create(); result = this.creator.create();
} else {
result.prepare();
} }
return result; return result;
} }
public void offer(final T e) { public void offer(final T e) {
if (e != null) { if (e != null && recycler.test(e)) {
e.release(); if (cycleCounter != null) cycleCounter.incrementAndGet();
queue.offer(e); queue.offer(e);
} }
} }
public long getCreatCount() {
return creatCounter.longValue();
}
public long getCycleCount() {
return cycleCounter.longValue();
}
} }

View File

@@ -33,13 +33,11 @@ public class Sheet<T> implements java.io.Serializable {
} }
public static <E> Sheet<E> asSheet(Collection<E> data) { public static <E> Sheet<E> asSheet(Collection<E> data) {
return new Sheet<>(data.size(), data); return data == null ? new Sheet<>() : new Sheet<>(data.size(), data);
} }
public void copyTo(Sheet<T> copy) { public void copyTo(Sheet<T> copy) {
if (copy == null) { if (copy == null) return;
return;
}
copy.total = this.total; copy.total = this.total;
if (this.getRows() != null) { if (this.getRows() != null) {
copy.setRows(new ArrayList<>(this.getRows())); copy.setRows(new ArrayList<>(this.getRows()));