sncp优化

This commit is contained in:
redkale
2023-02-02 15:31:19 +08:00
parent 2ef5eb6cd0
commit c6ef28c358
8 changed files with 66 additions and 31 deletions

View File

@@ -69,6 +69,7 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
for (ClientResponse<R, P> cr : respResults) {
if (cr.isError()) {
connection.dispose(null);
return;
} else {
ClientFuture<R, P> respFuture = connection.pollRespFuture(cr.getRequestid());
if (respFuture != null) {

View File

@@ -6,7 +6,8 @@ package org.redkale.net.sncp;
import java.nio.ByteBuffer;
import java.util.logging.Logger;
import org.redkale.net.client.ClientCodec;
import org.redkale.util.ByteArray;
import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE;
import org.redkale.util.*;
/**
*
@@ -16,6 +17,8 @@ public class SncpClientCodec extends ClientCodec<SncpClientRequest, SncpClientRe
protected static final Logger logger = Logger.getLogger(SncpClientCodec.class.getSimpleName());
private final ObjectPool<SncpClientResult> resultPool = ObjectPool.createUnsafePool(256, t -> new SncpClientResult(), SncpClientResult::prepare, SncpClientResult::recycle);
private ByteArray recyclableArray;
protected ByteArray halfBodyBytes;
@@ -28,6 +31,15 @@ public class SncpClientCodec extends ClientCodec<SncpClientRequest, SncpClientRe
super(connection);
}
protected SncpClientResult pollResult(SncpClientRequest request) {
SncpClientResult rs = resultPool.get();
return rs;
}
protected void offerResult(SncpClientResult rs) {
resultPool.accept(rs);
}
protected ByteArray pollArray() {
if (recyclableArray == null) {
recyclableArray = new ByteArray();
@@ -49,8 +61,9 @@ public class SncpClientCodec extends ClientCodec<SncpClientRequest, SncpClientRe
halfHeaderBytes.put(buffer, SncpHeader.HEADER_SIZE - halfHeaderBytes.length());
//读取完整header
SncpClientResult result = new SncpClientResult();
if (!result.readHeader(halfHeaderBytes)) {
occurError(null, null); //request不一定存在
int headerSize = result.readHeader(halfHeaderBytes);
if (headerSize != HEADER_SIZE) {
occurError(null, new SncpException("sncp header length must be " + HEADER_SIZE + ", but " + headerSize)); //request不一定存在
return;
}
halfHeaderBytes = null;
@@ -93,8 +106,9 @@ public class SncpClientCodec extends ClientCodec<SncpClientRequest, SncpClientRe
return;
}
SncpClientResult result = new SncpClientResult();
if (!result.readHeader(buffer)) {
occurError(null, null); //request不一定存在
int headerSize = result.readHeader(buffer);
if (headerSize != HEADER_SIZE) {
occurError(null, new SncpException("sncp header length must be " + HEADER_SIZE + ", but " + headerSize)); //request不一定存在
return;
}
if (result.getBodyLength() < 1) {

View File

@@ -5,6 +5,7 @@ package org.redkale.net.sncp;
import org.redkale.net.AsyncConnection;
import org.redkale.net.client.*;
import org.redkale.util.ObjectPool;
/**
*
@@ -12,8 +13,13 @@ import org.redkale.net.client.*;
*/
public class SncpClientConnection extends ClientConnection<SncpClientRequest, SncpClientResult> {
private final ObjectPool<SncpClientRequest> requestPool;
public SncpClientConnection(SncpClient client, int index, AsyncConnection channel) {
super(client, index, channel);
requestPool = ObjectPool.createUnsafePool(Thread.currentThread(), 256,
ObjectPool.createSafePool(256, t -> new SncpClientRequest(null), SncpClientRequest::prepare, SncpClientRequest::recycle)
);
}
@Override
@@ -21,4 +27,10 @@ public class SncpClientConnection extends ClientConnection<SncpClientRequest, Sn
return new SncpClientCodec(this);
}
protected void offerResult(SncpClientRequest req, SncpClientResult rs) {
SncpClientCodec c = getCodec();
c.offerResult(rs);
requestPool.accept(req);
}
}

View File

@@ -47,6 +47,10 @@ public class SncpClientRequest extends ClientRequest {
return this;
}
@Override
protected void prepare() {
}
@Override
protected boolean recycle() {
boolean rs = super.recycle();
@@ -60,7 +64,7 @@ public class SncpClientRequest extends ClientRequest {
@Override
public void writeTo(ClientConnection conn, ByteArray array) {
}
@Override

View File

@@ -17,18 +17,23 @@ public class SncpClientResult {
private byte[] bodyContent;
protected boolean readHeader(ByteBuffer buffer) {
SncpHeader h = new SncpHeader();
boolean rs = h.read(buffer);
this.header = h;
return rs;
protected void prepare() {
}
protected boolean readHeader(ByteArray array) {
SncpHeader h = new SncpHeader();
boolean rs = h.read(array);
this.header = h;
return rs;
protected boolean recycle() {
this.header = null;
this.bodyContent = null;
return true;
}
protected int readHeader(ByteBuffer buffer) {
this.header = new SncpHeader();
return this.header.read(buffer);
}
protected int readHeader(ByteArray array) {
this.header = new SncpHeader();
return this.header.read(array);
}
protected boolean readBody(ByteBuffer buffer) {

View File

@@ -54,9 +54,10 @@ public class SncpHeader {
this.actionid = actionid;
}
public boolean read(ByteBuffer buffer) {
public int read(ByteBuffer buffer) {
this.seqid = buffer.getLong(); //8
this.valid = buffer.getChar() != HEADER_SIZE; //2
int size = buffer.getChar();
this.valid = size != HEADER_SIZE; //2
this.serviceid = Uint128.read(buffer); //16
this.serviceVersion = buffer.getInt(); //4
this.actionid = Uint128.read(buffer); //16
@@ -65,14 +66,15 @@ public class SncpHeader {
this.addrPort = buffer.getChar(); //port 2
this.bodyLength = buffer.getInt(); //4
this.retcode = buffer.getInt(); //4
return this.valid;
return size;
}
public boolean read(ByteArray array) {
public int read(ByteArray array) {
int offset = 0;
this.seqid = array.getLong(offset); //8
offset += 8;
this.valid = array.getChar(offset) != HEADER_SIZE; //2
int size = array.getChar(offset);
this.valid = size != HEADER_SIZE; //2
offset += 2;
this.serviceid = array.getUint128(offset); //16
offset += 16;
@@ -87,7 +89,7 @@ public class SncpHeader {
this.bodyLength = array.getInt(offset); //4
offset += 4;
this.retcode = array.getInt(offset); //4
return this.valid;
return size;
}
public ByteArray write(ByteArray array, InetSocketAddress address, long newSeqid, int bodyLength, int retcode) {

View File

@@ -57,16 +57,13 @@ public class SncpRequest extends Request<SncpContext> {
return HEADER_SIZE - buffer.remaining(); //小于60
}
this.header = new SncpHeader();
if (!this.header.read(buffer)) {
if (context.getLogger().isLoggable(Level.FINEST)) {
context.getLogger().finest("sncp buffer header.length not " + HEADER_SIZE);
}
int headerSize = this.header.read(buffer);
if (headerSize != HEADER_SIZE) {
context.getLogger().log(Level.WARNING, "sncp buffer header.length not " + HEADER_SIZE);
return -1;
}
if (this.header.getRetcode() != 0) { // retcode
if (context.getLogger().isLoggable(Level.FINEST)) {
context.getLogger().finest("sncp buffer header.retcode not 0");
}
context.getLogger().log(Level.WARNING, "sncp buffer header.retcode not 0");
return -1;
}
this.body = new byte[this.header.getBodyLength()];

View File

@@ -29,8 +29,8 @@ public class SncpTestServiceImpl implements SncpTestIService {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " sleep 1秒后运行了异步方法-----------queryResultAsync方法");
Thread.sleep(200);
System.out.println(Thread.currentThread().getName() + " sleep 200ms后运行了异步方法-----------queryResultAsync方法");
future.complete("异步result: " + bean);
} catch (Exception e) {
e.printStackTrace();