优化sncp
This commit is contained in:
@@ -612,7 +612,7 @@ public final class Application {
|
||||
DefaultAnyValue tarnsportConf = DefaultAnyValue.create(TransportFactory.NAME_POOLMAXCONNS, System.getProperty("redkale.net.transport.pool.maxconns", "100"))
|
||||
.addValue(TransportFactory.NAME_PINGINTERVAL, System.getProperty("redkale.net.transport.ping.interval", "30"))
|
||||
.addValue(TransportFactory.NAME_CHECKINTERVAL, System.getProperty("redkale.net.transport.check.interval", "30"));
|
||||
this.sncpTransportFactory.init(tarnsportConf, Sncp.PING_BUFFER, Sncp.PONG_BUFFER.remaining());
|
||||
this.sncpTransportFactory.init(tarnsportConf, ByteBuffer.wrap(Sncp.getPingBytes()).asReadOnlyBuffer(), Sncp.getPongBytes().length);
|
||||
this.clusterAgent = cluster;
|
||||
this.messageAgents = mqs;
|
||||
if (compileMode || this.classLoader instanceof RedkaleClassLoader.RedkaleCacheClassLoader) {
|
||||
|
||||
@@ -1102,6 +1102,8 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
|
||||
decoder = createStreamDecoder(type);
|
||||
} else if (Map.class.isAssignableFrom(clazz)) {
|
||||
decoder = createMapDecoder(type);
|
||||
} else if (CompletionHandler.class.isAssignableFrom(clazz)) {
|
||||
decoder = CompletionHandlerSimpledCoder.instance;
|
||||
} else if (Optional.class == clazz) {
|
||||
decoder = new OptionalCoder(this, type);
|
||||
} else if (clazz == Object.class) {
|
||||
|
||||
@@ -12,7 +12,7 @@ import org.redkale.convert.*;
|
||||
import org.redkale.convert.bson.BsonConvert;
|
||||
import org.redkale.convert.json.JsonConvert;
|
||||
import org.redkale.net.http.HttpSimpleRequest;
|
||||
import org.redkale.net.sncp.SncpRequest;
|
||||
import org.redkale.net.sncp.Sncp;
|
||||
|
||||
/**
|
||||
* 存在MQ里面的数据结构<p>
|
||||
@@ -318,8 +318,8 @@ public class MessageRecord implements Serializable {
|
||||
sb.append(",\"respTopic\":\"").append(this.respTopic).append("\"");
|
||||
}
|
||||
if (this.content != null) {
|
||||
if (this.ctype == CTYPE_BSON_RESULT && this.content.length > SncpRequest.HEADER_SIZE) {
|
||||
int offset = SncpRequest.HEADER_SIZE + 1; //循环占位符
|
||||
if (this.ctype == CTYPE_BSON_RESULT && this.content.length > Sncp.HEADER_SIZE) {
|
||||
int offset = Sncp.HEADER_SIZE + 1; //循环占位符
|
||||
Object rs = BsonConvert.root().convertFrom(Object.class, this.content, offset, this.content.length - offset);
|
||||
sb.append(",\"content\":").append(rs);
|
||||
} else if (this.ctype == CTYPE_HTTP_REQUEST) {
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
package org.redkale.mq;
|
||||
|
||||
import org.redkale.convert.bson.BsonWriter;
|
||||
import static org.redkale.net.sncp.SncpRequest.HEADER_SIZE;
|
||||
import org.redkale.net.sncp.*;
|
||||
import org.redkale.util.ByteArray;
|
||||
|
||||
@@ -51,14 +50,14 @@ public class SncpMessageResponse extends SncpResponse {
|
||||
callback.run();
|
||||
}
|
||||
if (out == null) {
|
||||
final ByteArray result = new ByteArray(SncpRequest.HEADER_SIZE);
|
||||
final ByteArray result = new ByteArray(Sncp.HEADER_SIZE);
|
||||
fillHeader(result, 0, retcode);
|
||||
producer.apply(messageClient.createMessageRecord(message.getSeqid(), message.getRespTopic(), null, (byte[]) null));
|
||||
return;
|
||||
}
|
||||
final int respBodyLength = out.count(); //body总长度
|
||||
final ByteArray result = out.toByteArray();
|
||||
fillHeader(result, respBodyLength - HEADER_SIZE, retcode);
|
||||
fillHeader(result, respBodyLength - Sncp.HEADER_SIZE, retcode);
|
||||
producer.apply(messageClient.createMessageRecord(message.getSeqid(), message.getRespTopic(), null, result.getBytes()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,7 +11,6 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
|
||||
import java.lang.annotation.*;
|
||||
import java.lang.reflect.*;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.CompletionHandler;
|
||||
import java.util.*;
|
||||
import org.redkale.annotation.*;
|
||||
@@ -38,9 +37,20 @@ import org.redkale.util.*;
|
||||
*/
|
||||
public abstract class Sncp {
|
||||
|
||||
public static final ByteBuffer PING_BUFFER = ByteBuffer.wrap("PING".getBytes()).asReadOnlyBuffer();
|
||||
public static final int HEADER_SIZE = 60;
|
||||
|
||||
public static final ByteBuffer PONG_BUFFER = ByteBuffer.wrap("PONG".getBytes()).asReadOnlyBuffer();
|
||||
private static final byte[] PING_BYTES = new ByteArray(HEADER_SIZE)
|
||||
.putLong(0L) //8 seqid
|
||||
.putChar((char) HEADER_SIZE) //2 headerSize
|
||||
.putUint128(Uint128.ZERO) //16 serviceid
|
||||
.putInt(0) //4 serviceVersion
|
||||
.putUint128(Uint128.ZERO) //16 actionid
|
||||
.put(new byte[6]) //6 addr
|
||||
.putInt(0) //4 bodyLength
|
||||
.putInt(0) //4 retcode
|
||||
.getBytes();
|
||||
|
||||
private static final byte[] PONG_BYTES = Arrays.copyOf(PING_BYTES, PING_BYTES.length);
|
||||
|
||||
static final String FIELDPREFIX = "_redkale";
|
||||
|
||||
@@ -70,6 +80,14 @@ public abstract class Sncp {
|
||||
private Sncp() {
|
||||
}
|
||||
|
||||
public static byte[] getPingBytes() {
|
||||
return Arrays.copyOf(PING_BYTES, PING_BYTES.length);
|
||||
}
|
||||
|
||||
public static byte[] getPongBytes() {
|
||||
return Arrays.copyOf(PONG_BYTES, PONG_BYTES.length);
|
||||
}
|
||||
|
||||
public static Uint128 actionid(final RpcAction action) {
|
||||
return hash(action.name());
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ import org.redkale.convert.bson.*;
|
||||
import org.redkale.convert.json.*;
|
||||
import org.redkale.mq.*;
|
||||
import org.redkale.net.*;
|
||||
import static org.redkale.net.sncp.Sncp.HEADER_SIZE;
|
||||
import org.redkale.net.sncp.Sncp.SncpDyn;
|
||||
import static org.redkale.net.sncp.SncpRequest.*;
|
||||
import static org.redkale.net.sncp.SncpResponse.fillRespHeader;
|
||||
@@ -62,9 +63,6 @@ public final class SncpClient {
|
||||
|
||||
protected final String topic;
|
||||
|
||||
@Resource
|
||||
protected JsonConvert jsonConvert;
|
||||
|
||||
@Resource
|
||||
protected BsonConvert bsonConvert;
|
||||
|
||||
|
||||
@@ -21,6 +21,8 @@ public class SncpDispatcherServlet extends DispatcherServlet<Uint128, SncpContex
|
||||
|
||||
private final Object sncplock = new Object();
|
||||
|
||||
private final byte[] pongBytes = Sncp.getPongBytes();
|
||||
|
||||
@Override
|
||||
public void addServlet(SncpServlet servlet, Object attachment, AnyValue conf, Uint128... mappings) {
|
||||
synchronized (sncplock) {
|
||||
@@ -70,7 +72,7 @@ public class SncpDispatcherServlet extends DispatcherServlet<Uint128, SncpContex
|
||||
@Override
|
||||
public void execute(SncpRequest request, SncpResponse response) throws IOException {
|
||||
if (request.isPing()) {
|
||||
response.finishBuffer(false, Sncp.PONG_BUFFER.duplicate());
|
||||
response.finish(pongBytes);
|
||||
return;
|
||||
}
|
||||
SncpServlet servlet = (SncpServlet) mappingServlet(request.getServiceid());
|
||||
|
||||
@@ -11,6 +11,7 @@ import java.nio.ByteBuffer;
|
||||
import java.util.logging.*;
|
||||
import org.redkale.convert.bson.BsonConvert;
|
||||
import org.redkale.net.Request;
|
||||
import static org.redkale.net.sncp.Sncp.HEADER_SIZE;
|
||||
import org.redkale.util.Uint128;
|
||||
|
||||
/**
|
||||
@@ -22,8 +23,6 @@ import org.redkale.util.Uint128;
|
||||
*/
|
||||
public class SncpRequest extends Request<SncpContext> {
|
||||
|
||||
public static final int HEADER_SIZE = 60;
|
||||
|
||||
public static final byte[] DEFAULT_HEADER = new byte[HEADER_SIZE];
|
||||
|
||||
protected static final int READ_STATE_ROUTE = 1;
|
||||
@@ -63,14 +62,6 @@ public class SncpRequest extends Request<SncpContext> {
|
||||
|
||||
@Override //request.header与response.header数据格式保持一致
|
||||
protected int readHeader(ByteBuffer buffer, Request last) {
|
||||
if (buffer.remaining() == Sncp.PING_BUFFER.remaining()) {
|
||||
if (buffer.hasRemaining()) {
|
||||
buffer.get(new byte[buffer.remaining()]);
|
||||
}
|
||||
this.ping = true; //Sncp.PING_BUFFER
|
||||
this.readState = READ_STATE_END;
|
||||
return 0;
|
||||
}
|
||||
//---------------------head----------------------------------
|
||||
if (this.readState == READ_STATE_ROUTE) {
|
||||
if (buffer.remaining() < HEADER_SIZE) {
|
||||
@@ -89,7 +80,7 @@ public class SncpRequest extends Request<SncpContext> {
|
||||
buffer.get(addrBytes); //ipaddr //6
|
||||
this.bodyLength = buffer.getInt(); //4
|
||||
|
||||
if (buffer.getInt() != 0) { //4
|
||||
if (buffer.getInt() != 0) { //4 retcode
|
||||
if (context.getLogger().isLoggable(Level.FINEST)) {
|
||||
context.getLogger().finest("sncp buffer header.retcode not 0");
|
||||
}
|
||||
@@ -100,6 +91,13 @@ public class SncpRequest extends Request<SncpContext> {
|
||||
}
|
||||
//---------------------body----------------------------------
|
||||
if (this.readState == READ_STATE_BODY) {
|
||||
if (this.bodyLength == 0) {
|
||||
this.readState = READ_STATE_END;
|
||||
if (this.seqid == 0 && this.serviceid == Uint128.ZERO && this.actionid == Uint128.ZERO) {
|
||||
this.ping = true;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
int len = Math.min(this.bodyLength, buffer.remaining());
|
||||
buffer.get(body, 0, len);
|
||||
this.bodyOffset = len;
|
||||
|
||||
@@ -5,10 +5,9 @@
|
||||
*/
|
||||
package org.redkale.net.sncp;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import org.redkale.convert.bson.BsonWriter;
|
||||
import org.redkale.net.Response;
|
||||
import static org.redkale.net.sncp.SncpRequest.HEADER_SIZE;
|
||||
import static org.redkale.net.sncp.Sncp.HEADER_SIZE;
|
||||
import org.redkale.util.*;
|
||||
|
||||
/**
|
||||
@@ -67,14 +66,9 @@ public class SncpResponse extends Response<SncpContext, SncpRequest> {
|
||||
return super.recycle();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void finishBuffer(boolean kill, ByteBuffer buffer) {
|
||||
super.finishBuffer(kill, buffer);
|
||||
}
|
||||
|
||||
public void finish(final int retcode, final BsonWriter out) {
|
||||
if (out == null) {
|
||||
final ByteArray buffer = new ByteArray(SncpRequest.HEADER_SIZE);
|
||||
final ByteArray buffer = new ByteArray(HEADER_SIZE);
|
||||
fillHeader(buffer, 0, retcode);
|
||||
finish(buffer);
|
||||
return;
|
||||
@@ -96,13 +90,13 @@ public class SncpResponse extends Response<SncpContext, SncpRequest> {
|
||||
int offset = 0;
|
||||
buffer.putLong(offset, seqid);
|
||||
offset += 8;
|
||||
buffer.putChar(offset, (char) SncpRequest.HEADER_SIZE);
|
||||
buffer.putChar(offset, (char) HEADER_SIZE);
|
||||
offset += 2;
|
||||
Uint128.write(buffer, offset, serviceid);
|
||||
buffer.putUint128(offset, serviceid);
|
||||
offset += 16;
|
||||
buffer.putInt(offset, serviceVersion);
|
||||
offset += 4;
|
||||
Uint128.write(buffer, offset, actionid);
|
||||
buffer.putUint128(offset, actionid);
|
||||
offset += 16;
|
||||
buffer.put(offset, addrBytes);
|
||||
offset += addrBytes.length; //4
|
||||
|
||||
@@ -250,6 +250,22 @@ public final class ByteArray implements ByteTuple {
|
||||
return Double.longBitsToDouble(getLong(offset));
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取指定位置的Uint128值,须确保0 <= offset+16 < length
|
||||
*
|
||||
* @param offset 位置
|
||||
*
|
||||
* @return Uint128值
|
||||
*/
|
||||
public Uint128 getUint128(int offset) {
|
||||
return Uint128.create(new byte[]{
|
||||
content[offset], content[offset + 1], content[offset + 2], content[offset + 3],
|
||||
content[offset + 4], content[offset + 5], content[offset + 6], content[offset + 7],
|
||||
content[offset + 8], content[offset + 9], content[offset + 10], content[offset + 11],
|
||||
content[offset + 12], content[offset + 13], content[offset + 14], content[offset + 15]
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取最后一个字节值,调用前须保证count大于0
|
||||
*
|
||||
@@ -751,6 +767,29 @@ public final class ByteArray implements ByteTuple {
|
||||
return this.putLong(offset, Double.doubleToLongBits(value));
|
||||
}
|
||||
|
||||
/**
|
||||
* 写入一个Uint128值
|
||||
*
|
||||
* @param value Uint128值
|
||||
*
|
||||
* @return ByteArray
|
||||
*/
|
||||
public ByteArray putUint128(Uint128 value) {
|
||||
return this.put(value.directBytes());
|
||||
}
|
||||
|
||||
/**
|
||||
* 指定位置写入一个Uint128值, content.length 必须不能小于offset+16
|
||||
*
|
||||
* @param offset 偏移量
|
||||
* @param value Uint128值
|
||||
*
|
||||
* @return ByteArray
|
||||
*/
|
||||
public ByteArray putUint128(int offset, Uint128 value) {
|
||||
return this.put(offset, value.directBytes());
|
||||
}
|
||||
|
||||
public ByteArray putByte(short value) {
|
||||
return put((byte) value);
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ public final class Uint128 extends Number implements Comparable<Uint128> {
|
||||
(byte) (v2 >> 24), (byte) (v2 >> 16), (byte) (v2 >> 8), (byte) v2};
|
||||
}
|
||||
|
||||
protected Uint128(byte[] bytes) {
|
||||
private Uint128(byte[] bytes) {
|
||||
if (bytes == null || bytes.length != 16) {
|
||||
throw new NumberFormatException("Not 16 length bytes");
|
||||
}
|
||||
@@ -44,13 +44,25 @@ public final class Uint128 extends Number implements Comparable<Uint128> {
|
||||
return value;
|
||||
}
|
||||
|
||||
public static Uint128 create(byte[] bytes) {
|
||||
return new Uint128(bytes);
|
||||
public static Uint128 create(byte[] bs) {
|
||||
if (bs[15] == 0 && bs[14] == 0 && bs[13] == 0 && bs[12] == 0
|
||||
&& bs[11] == 0 && bs[10] == 0 && bs[9] == 0 && bs[8] == 0
|
||||
&& bs[7] == 0 && bs[6] == 0 && bs[5] == 0 && bs[4] == 0
|
||||
&& bs[3] == 0 && bs[2] == 0 && bs[1] == 0 && bs[0] == 0) {
|
||||
return ZERO;
|
||||
}
|
||||
return new Uint128(bs);
|
||||
}
|
||||
|
||||
public static Uint128 read(ByteBuffer buffer) {
|
||||
byte[] bs = new byte[16];
|
||||
buffer.get(bs);
|
||||
if (bs[15] == 0 && bs[14] == 0 && bs[13] == 0 && bs[12] == 0
|
||||
&& bs[11] == 0 && bs[10] == 0 && bs[9] == 0 && bs[8] == 0
|
||||
&& bs[7] == 0 && bs[6] == 0 && bs[5] == 0 && bs[4] == 0
|
||||
&& bs[3] == 0 && bs[2] == 0 && bs[1] == 0 && bs[0] == 0) {
|
||||
return ZERO;
|
||||
}
|
||||
return new Uint128(bs);
|
||||
}
|
||||
|
||||
@@ -59,16 +71,6 @@ public final class Uint128 extends Number implements Comparable<Uint128> {
|
||||
return buffer;
|
||||
}
|
||||
|
||||
public static ByteArray write(ByteArray array, Uint128 dlong) {
|
||||
array.put(dlong.value);
|
||||
return array;
|
||||
}
|
||||
|
||||
public static ByteArray write(ByteArray array, int offset, Uint128 dlong) {
|
||||
array.put(offset, dlong.value);
|
||||
return array;
|
||||
}
|
||||
|
||||
public boolean equals(byte[] bytes) {
|
||||
return Arrays.equals(this.value, bytes);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user