This commit is contained in:
地平线
2015-11-03 16:04:48 +08:00
parent 62e6196bb4
commit 15273faa48
12 changed files with 84 additions and 90 deletions

View File

@@ -62,7 +62,7 @@
</properties>
</resources>
<!--
protocol: required server所启动的协议有HTTP、SNCP 目前只支持HTTP、SNCP。SNCP分TCP、UDP实现默认使用UDP实现TCP实现则使用SNCP.TCP值;
protocol: required server所启动的协议有HTTP、SNCP 目前只支持HTTP、SNCP。SNCP分TCP、UDP实现默认使用TCP实现UDP实现则使用SNCP.UDP值;
host: 服务所占address 默认: 0.0.0.0
port: required 服务所占端口
root: 如果是web类型服务则包含页面 默认:{APP_HOME}/root

View File

@@ -44,9 +44,7 @@ public final class BsonConvert extends Convert<BsonReader, BsonWriter> {
}
public BsonWriter pollBsonWriter() {
final BsonWriter out = writerPool.poll();
out.setTiny(tiny);
return out;
return writerPool.poll().setTiny(tiny);
}
public void offerBsonWriter(BsonWriter out) {
@@ -85,8 +83,7 @@ public final class BsonConvert extends Convert<BsonReader, BsonWriter> {
public byte[] convertTo(final Type type, Object value) {
if (type == null) return null;
final BsonWriter out = writerPool.poll();
out.setTiny(tiny);
final BsonWriter out = writerPool.poll().setTiny(tiny);
factory.loadEncoder(type).convertTo(out, value);
byte[] result = out.toArray();
writerPool.offer(out);
@@ -108,8 +105,7 @@ public final class BsonConvert extends Convert<BsonReader, BsonWriter> {
public byte[] convertTo(Object value) {
if (value == null) {
final BsonWriter out = writerPool.poll();
out.setTiny(tiny);
final BsonWriter out = writerPool.poll().setTiny(tiny);
out.writeNull();
byte[] result = out.toArray();
writerPool.offer(out);
@@ -120,8 +116,7 @@ public final class BsonConvert extends Convert<BsonReader, BsonWriter> {
public ByteBuffer convertToBuffer(final Type type, Object value) {
if (type == null) return null;
final BsonWriter out = writerPool.poll();
out.setTiny(tiny);
final BsonWriter out = writerPool.poll().setTiny(tiny);
factory.loadEncoder(type).convertTo(out, value);
ByteBuffer result = out.toBuffer();
writerPool.offer(out);
@@ -130,8 +125,7 @@ public final class BsonConvert extends Convert<BsonReader, BsonWriter> {
public ByteBuffer convertToBuffer(Object value) {
if (value == null) {
final BsonWriter out = writerPool.poll();
out.setTiny(tiny);
final BsonWriter out = writerPool.poll().setTiny(tiny);
out.writeNull();
ByteBuffer result = out.toBuffer();
writerPool.offer(out);
@@ -139,4 +133,16 @@ public final class BsonConvert extends Convert<BsonReader, BsonWriter> {
}
return convertToBuffer(value.getClass(), value);
}
public BsonWriter convertToWriter(final Type type, Object value) {
if (type == null) return null;
final BsonWriter out = writerPool.poll().setTiny(tiny);
factory.loadEncoder(type).convertTo(out, value);
return out;
}
public BsonWriter convertToWriter(Object value) {
if (value == null) return null;
return convertToWriter(value.getClass(), value);
}
}

View File

@@ -50,7 +50,11 @@ public final class BsonReader implements Reader {
}
public final void setBytes(byte[] bytes) {
setBytes(bytes, 0, bytes.length);
if (bytes == null) {
this.position = 0;
} else {
setBytes(bytes, 0, bytes.length);
}
}
public final void setBytes(byte[] bytes, int start, int len) {

View File

@@ -62,8 +62,9 @@ public final class BsonWriter implements Writer {
return tiny;
}
public void setTiny(boolean tiny) {
public BsonWriter setTiny(boolean tiny) {
this.tiny = tiny;
return this;
}
//-----------------------------------------------------------------------

View File

@@ -125,7 +125,7 @@ public abstract class Server {
transport.accept();
final String threadName = "[" + Thread.currentThread().getName() + "] ";
logger.info(threadName + this.getClass().getSimpleName() + "." + protocol + " listen: " + address
+ ", threads: " + threads + ", bufferPoolSize: " + bufferPoolSize + ", responsePoolSize: " + responsePoolSize
+ ", threads: " + threads + ", bufferCapacity: " + capacity + ", bufferPoolSize: " + bufferPoolSize + ", responsePoolSize: " + responsePoolSize
+ ", started in " + (System.currentTimeMillis() - context.getServerStartTime()) + " ms");
}

View File

@@ -29,6 +29,8 @@ public final class Transport {
protected final int bufferPoolSize;
protected final int bufferCapacity;
protected final String protocol;
protected final AsynchronousChannelGroup group;
@@ -54,6 +56,7 @@ public final class Transport {
this.protocol = protocol;
this.aio = aio;
this.bufferPoolSize = bufferPoolSize;
this.bufferCapacity = 8192;
AsynchronousChannelGroup g = null;
try {
final AtomicInteger counter = new AtomicInteger();
@@ -70,7 +73,7 @@ public final class Transport {
this.group = g;
AtomicLong createBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber(Transport.class.getSimpleName() + "-" + name + "-" + protocol + ".Buffer.creatCounter");
AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber(Transport.class.getSimpleName() + "-" + name + "-" + protocol + ".Buffer.cycleCounter");
int rcapacity = 8192;
final int rcapacity = bufferCapacity;
this.bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false;

View File

@@ -38,7 +38,7 @@ public abstract class Sncp {
private static final java.lang.reflect.Type GROUPS_TYPE2 = new TypeToken<String[]>() {
}.getType();
public static final String DEFAULT_PROTOCOL = "UDP";
public static final String DEFAULT_PROTOCOL = "TCP";
static final String LOCALPREFIX = "_DynLocal";

View File

@@ -222,22 +222,21 @@ public final class SncpClient {
final int writeto = conn.getWriteTimeoutSecond();
try {
if ((HEADER_SIZE + bodyLength) > buffer.limit()) {
if (debug) logger.finest(this.serviceid + "," + this.nameid + "," + action + " sncp length : " + (HEADER_SIZE + bodyLength));
//if (debug) logger.finest(this.serviceid + "," + this.nameid + "," + action + " sncp length : " + (HEADER_SIZE + bodyLength));
final int frames = bodyLength / (buffer.capacity() - HEADER_SIZE) + (bodyLength % (buffer.capacity() - HEADER_SIZE) > 0 ? 1 : 0);
int pos = 0;
for (int i = frames - 1; i >= 0; i--) { //填充每一帧的数据
int len = Math.min(buffer.remaining() - HEADER_SIZE, bodyLength - pos);
fillHeader(buffer, seqid, actionid, bodyLength, frames, i, pos, len);
fillHeader(buffer, seqid, actionid, bodyLength, pos, len);
pos += bw.toBuffer(pos, buffer);
buffer.flip();
Thread.sleep(1);
conn.write(buffer).get(writeto > 0 ? writeto : 5, TimeUnit.SECONDS);
buffer.clear();
}
convert.offerBsonWriter(bw);
} else { //只有一帧的数据
//---------------------head----------------------------------
fillHeader(buffer, seqid, actionid, bodyLength, 1, 0, 0, bodyLength);
fillHeader(buffer, seqid, actionid, bodyLength, 0, bodyLength);
//---------------------body----------------------------------
bw.toBuffer(buffer);
convert.offerBsonWriter(bw);
@@ -260,33 +259,31 @@ public final class SncpClient {
buffer.getInt(); //地址
buffer.getChar(); //端口
final int bodylen = buffer.getInt();
final int frameCount = buffer.get();
if (frameCount < 1) throw new RuntimeException("sncp(" + action.method + ") send nameid = " + nameid + ", but frame.count =" + frameCount);
int frameIndex = buffer.get();
if (frameIndex < 0 || frameIndex >= frameCount) throw new RuntimeException("sncp(" + action.method + ") send nameid = " + nameid + ", but frame.count =" + frameCount + " & frame.index =" + frameIndex);
int bodyOffset = buffer.getInt();
int frameLength = buffer.getInt();
final int retcode = buffer.getInt();
if (retcode != 0) throw new RuntimeException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")");
int bodyOffset = buffer.getInt();
int frameLength = buffer.getChar();
final byte[] body = new byte[bodylen];
if (frameCount == 1) { //只有一帧的数据
if (bodylen == frameLength) { //只有一帧的数据
buffer.get(body, bodyOffset, frameLength);
return body;
} else { //读取多帧结果数据
int received = 0;
int lack = 0;
int lackoffset = 0;
for (int i = 0; i < frameCount; i++) {
received += frameLength;
while (received < bodylen) {
if (buffer.remaining() < frameLength) { //一帧缺失部分数据
lack = frameLength - buffer.remaining();
lackoffset = bodyOffset + buffer.remaining();
received += buffer.remaining();
buffer.get(body, bodyOffset, buffer.remaining());
} else {
lack = 0;
received += frameLength;
buffer.get(body, bodyOffset, frameLength);
}
if (i == frameCount - 1) break;
if (received >= bodylen) break;
if (buffer.hasRemaining()) {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
@@ -313,13 +310,10 @@ public final class SncpClient {
buffer.getChar(); //端口
int rbodylen = buffer.getInt();
if (rbodylen != bodylen) throw new RuntimeException("sncp(" + action.method + ") receive bodylength = " + bodylen + ", but receive next.bodylength =" + rbodylen);
if (buffer.get() <= 1) throw new RuntimeException("sncp(" + action.method + ") receive count error, but next.frame.count != " + frameCount);
frameIndex = buffer.get();
if (frameIndex < 0 || frameIndex >= frameCount) throw new RuntimeException("sncp(" + action.method + ") receive nameid = " + nameid + ", but frame.count =" + frameCount + " & next.frame.index =" + frameIndex);
bodyOffset = buffer.getInt();
frameLength = buffer.getInt();
int rretcode = buffer.getInt();
if (rretcode != 0) throw new RuntimeException("remote service(" + action.method + ") deal error (receive retcode =" + rretcode + ")");
bodyOffset = buffer.getInt();
frameLength = buffer.getChar();
}
if (received != bodylen) throw new RuntimeException("sncp(" + action.method + ") receive bodylength = " + bodylen + ", but receive next.receivedlength =" + received);
return body;
@@ -334,7 +328,7 @@ public final class SncpClient {
}
}
private void fillHeader(ByteBuffer buffer, long seqid, DLong actionid, int bodyLength, int frameCount, int frameIndex, int bodyOffset, int frameLength) {
private void fillHeader(ByteBuffer buffer, long seqid, DLong actionid, int bodyLength, int bodyOffset, int frameLength) {
//---------------------head----------------------------------
buffer.putLong(seqid); //序列号
buffer.putChar((char) HEADER_SIZE); //header长度
@@ -344,11 +338,9 @@ public final class SncpClient {
buffer.putLong(actionid.getSecond());
buffer.put(addrBytes);
buffer.putChar((char) this.addrPort);
buffer.putInt(bodyLength); //body长度
buffer.put((byte) frameCount); //数据的帧数, 最小值为1
buffer.put((byte) frameIndex); //数据的帧数序号, 从frame.count-1开始, 0表示最后一帧
buffer.putInt(0); //结果码, 请求方固定传0
buffer.putInt(bodyLength); //body长度
buffer.putInt(bodyOffset);
buffer.putChar((char) frameLength); //一帧数据的长度
buffer.putInt(frameLength); //一帧数据的长度
buffer.putInt(0); //结果码, 请求方固定传0
}
}

View File

@@ -88,7 +88,9 @@ public final class SncpDynServlet extends SncpServlet {
BsonReader in = action.convert.pollBsonReader();
try {
in.setBytes(request.getBody());
response.finish(0, action.action(in));
BsonWriter bw = action.action(in);
response.finish(0, bw);
if (bw != null) action.convert.offerBsonWriter(bw);
} catch (Throwable t) {
response.getContext().getLogger().log(Level.INFO, "sncp execute error(" + request + ")", t);
response.finish(SncpResponse.RETCODE_THROWEXCEPTION, null);
@@ -107,7 +109,7 @@ public final class SncpDynServlet extends SncpServlet {
protected java.lang.reflect.Type[] paramTypes; //index=0表示返回参数的type void的返回参数类型为null
public abstract byte[] action(final BsonReader in) throws Throwable;
public abstract BsonWriter action(final BsonReader in) throws Throwable;
/*
*
@@ -122,7 +124,7 @@ public final class SncpDynServlet extends SncpServlet {
* public TestService service;
*
* @Override
* public byte[] action(final BsonReader in) throws Throwable {
* public BsonWriter action(final BsonReader in) throws Throwable {
* TestBean arg1 = convert.convertFrom(in, paramTypes[1]);
* String arg2 = convert.convertFrom(in, paramTypes[2]);
* int arg3 = convert.convertFrom(in, paramTypes[3]);
@@ -145,6 +147,7 @@ public final class SncpDynServlet extends SncpServlet {
final String serviceName = serviceClass.getName().replace('.', '/');
final String convertName = BsonConvert.class.getName().replace('.', '/');
final String convertReaderDesc = Type.getDescriptor(BsonReader.class);
final String convertWriterDesc = Type.getDescriptor(BsonWriter.class);
final String serviceDesc = Type.getDescriptor(serviceClass);
String newDynName = serviceName.substring(0, serviceName.lastIndexOf('/') + 1)
+ "DynAction" + serviceClass.getSimpleName() + "_" + method.getName() + "_" + actionid;
@@ -185,7 +188,7 @@ public final class SncpDynServlet extends SncpServlet {
throw new RuntimeException(ex); //不可能会发生
}
{ // action方法
mv = new DebugMethodVisitor(cw.visitMethod(ACC_PUBLIC, "action", "(" + convertReaderDesc + ")[B", null, new String[]{"java/lang/Throwable"}));
mv = new DebugMethodVisitor(cw.visitMethod(ACC_PUBLIC, "action", "(" + convertReaderDesc + ")" + convertWriterDesc, null, new String[]{"java/lang/Throwable"}));
//mv.setDebug(true);
int iconst = ICONST_1;
int intconst = 1;
@@ -277,7 +280,7 @@ public final class SncpDynServlet extends SncpServlet {
mv.visitInsn(ICONST_0);
mv.visitInsn(AALOAD);
mv.visitVarInsn(ALOAD, store);
mv.visitMethodInsn(INVOKEVIRTUAL, convertName, "convertTo", "(Ljava/lang/reflect/Type;Ljava/lang/Object;)[B", false);
mv.visitMethodInsn(INVOKEVIRTUAL, convertName, "convertToWriter", "(Ljava/lang/reflect/Type;Ljava/lang/Object;)" + convertWriterDesc, false);
mv.visitInsn(ARETURN);
store++;
if (maxStack < 10) maxStack = 10;

View File

@@ -24,12 +24,6 @@ public final class SncpRequest extends Request {
private long seqid;
private int framecount;
private int frameindex;
private int framelength;
private long nameid;
private long serviceid;
@@ -40,6 +34,8 @@ public final class SncpRequest extends Request {
private int bodyoffset;
private int framelength;
//private byte[][] paramBytes;
private boolean ping;
@@ -75,16 +71,15 @@ public final class SncpRequest extends Request {
this.remoteAddress = new InetSocketAddress((0xff & bufferbytes[0]) + "." + (0xff & bufferbytes[1]) + "." + (0xff & bufferbytes[2]) + "." + (0xff & bufferbytes[3]), port);
}
this.bodylength = buffer.getInt();
this.framecount = buffer.get();
this.frameindex = buffer.get();
this.bodyoffset = buffer.getInt();
this.framelength = buffer.getInt();
if (buffer.getInt() != 0) {
context.getLogger().finest("sncp buffer header.retcode not 0");
return -1;
}
this.bodyoffset = buffer.getInt();
this.framelength = buffer.getChar();
//---------------------body----------------------------------
if (this.framecount == 1) { //只有一帧的数据
if (this.bodylength == this.framelength) { //只有一帧的数据
this.body = new byte[this.framelength];
buffer.get(body);
return 0;
@@ -121,15 +116,11 @@ public final class SncpRequest extends Request {
buffer.getInt(); //地址
buffer.getChar(); //端口
final int bodylen = buffer.getInt();
if (bodylen != this.bodylength) throw new RuntimeException("sncp frame receive bodylength = " + bodylen + ", but first bodylength =" + bodylength);
final int frameCount = buffer.get();
if (frameCount != this.framecount) throw new RuntimeException("sncp frame receive nameid = " + nameid + ", but first frame.count =" + frameCount);
final int frameIndex = buffer.get();
if (frameIndex < 0 || frameIndex >= frameCount) throw new RuntimeException("sncp frame receive nameid = " + nameid + ", but first frame.count =" + frameCount + " & frame.index =" + frameIndex);
if (bodylen != this.bodylength) throw new RuntimeException("sncp frame receive bodylength = " + bodylen + ", but first bodylength =" + bodylength);
final int bodyOffset = buffer.getInt();
final int framelen = buffer.getInt();
final int retcode = buffer.getInt();
if (retcode != 0) throw new RuntimeException("sncp frame receive retcode error (retcode=" + retcode + ")");
final int bodyOffset = buffer.getInt();
final int framelen = buffer.getChar();
final SncpContext scontext = (SncpContext) this.context;
RequestEntry entry = scontext.getRequestEntity(this.seqid);
if (entry == null) entry = scontext.addRequestEntity(this.seqid, new byte[this.bodylength]);
@@ -150,22 +141,19 @@ public final class SncpRequest extends Request {
public String toString() {
return SncpRequest.class.getSimpleName() + "{seqid=" + this.seqid
+ ",serviceid=" + this.serviceid + ",actionid=" + this.actionid
+ ",framecount=" + this.framecount + ",frameindex=" + this.frameindex + ",framelength=" + this.framelength
+ ",bodylength=" + this.bodylength + ",bodyoffset=" + this.bodyoffset + ",remoteAddress=" + remoteAddress + "}";
+ ",bodylength=" + this.bodylength + ",bodyoffset=" + this.bodyoffset
+ ",framelength=" + this.framelength + ",remoteAddress=" + remoteAddress + "}";
}
@Override
protected void recycle() {
this.seqid = 0;
this.framecount = 0;
this.frameindex = 0;
this.framelength = 0;
this.serviceid = 0;
this.actionid = null;
this.bodylength = 0;
this.bodyoffset = 0;
this.body = null;
//this.paramBytes = null;
this.ping = false;
this.remoteAddress = null;
this.bufferbytes[0] = 0;
@@ -176,9 +164,6 @@ public final class SncpRequest extends Request {
return ping;
}
// public byte[][] getParamBytes() {
// return paramBytes;
// }
public byte[] getBody() {
return body;
}

View File

@@ -5,6 +5,7 @@
*/
package com.wentch.redkale.net.sncp;
import com.wentch.redkale.convert.bson.*;
import com.wentch.redkale.net.*;
import static com.wentch.redkale.net.sncp.SncpRequest.HEADER_SIZE;
import com.wentch.redkale.util.*;
@@ -47,26 +48,27 @@ public final class SncpResponse extends Response<SncpRequest> {
this.addrPort = context.getServerAddress().getPort();
}
public void finish(final int retcode, final byte[] bytes) {
public void finish(final int retcode, final BsonWriter out) {
ByteBuffer buffer = context.pollBuffer();
final int bodyLength = (bytes == null ? 0 : bytes.length);
final int frames = bodyLength / (buffer.capacity() - HEADER_SIZE) + (bodyLength % (buffer.capacity() - HEADER_SIZE) > 0 ? 1 : 0);
if (frames <= 1) {
final int bodyLength = (out == null ? 0 : out.count());
final int bufsize = buffer.capacity() - HEADER_SIZE;
if (bufsize > bodyLength) { //只需一帧
//---------------------head----------------------------------
fillHeader(buffer, bodyLength, 1, 0, retcode, 0, bytes == null ? 0 : bytes.length);
fillHeader(buffer, bodyLength, 0, bodyLength, retcode);
//---------------------body----------------------------------
if (bytes != null) buffer.put(bytes);
out.toBuffer(buffer);
buffer.flip();
finish(buffer);
} else {
final int frames = (bodyLength / bufsize) + (bodyLength % bufsize > 0 ? 1 : 0);
final ByteBuffer[] buffers = new ByteBuffer[frames];
int pos = 0;
for (int i = frames - 1; i >= 0; i--) {
if (i != frames - 1) buffer = context.pollBuffer();
int len = Math.min(buffer.remaining() - HEADER_SIZE, bytes.length - pos);
fillHeader(buffer, bodyLength, frames, i, retcode, pos, len);
for (int i = 0; i < frames; i++) {
if (i != 0) buffer = context.pollBuffer();
int len = Math.min(bufsize, bodyLength - pos);
fillHeader(buffer, bodyLength, pos, len, retcode);
buffers[i] = buffer;
buffer.put(bytes, pos, len);
out.toBuffer(pos, buffer);
pos += len;
buffer.flip();
}
@@ -74,7 +76,7 @@ public final class SncpResponse extends Response<SncpRequest> {
}
}
private void fillHeader(ByteBuffer buffer, int bodyLength, int frameCount, int frameIndex, int retcode, int bodyOffset, int framelength) {
private void fillHeader(ByteBuffer buffer, int bodyLength, int bodyOffset, int framelength, int retcode) {
//---------------------head----------------------------------
buffer.putLong(request.getSeqid());
buffer.putChar((char) SncpRequest.HEADER_SIZE);
@@ -86,10 +88,8 @@ public final class SncpResponse extends Response<SncpRequest> {
buffer.put(addrBytes);
buffer.putChar((char) this.addrPort);
buffer.putInt(bodyLength);
buffer.put((byte) frameCount); // frame count
buffer.put((byte) frameIndex); //frame index
buffer.putInt(retcode);
buffer.putInt(bodyOffset);
buffer.putChar((char) framelength);
buffer.putInt(framelength);
buffer.putInt(retcode);
}
}

View File

@@ -47,7 +47,7 @@ public final class SncpServer extends Server {
final int port = this.address.getPort();
AtomicLong createBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Buffer.creatCounter");
AtomicLong cycleBufferCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SNCP_" + port + ".Buffer.cycleCounter");
int rcapacity = Math.max(this.capacity, 8 * 1024);
int rcapacity = Math.max(this.capacity, 4 * 1024);
ObjectPool<ByteBuffer> bufferPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, this.bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(rcapacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != rcapacity) return false;