SncpClientCodecTest

This commit is contained in:
redkale
2023-02-02 21:40:58 +08:00
parent 7f379f9874
commit 6973e6b159
13 changed files with 135 additions and 90 deletions

View File

@@ -31,7 +31,7 @@ public final class Uint128SimpledCoder<R extends Reader, W extends Writer> exten
if (value == null) {
out.writeNull();
} else {
bsSimpledCoder.convertTo(out, value.directBytes());
bsSimpledCoder.convertTo(out, value.getBytes());
}
}

View File

@@ -37,7 +37,7 @@ import org.redkale.util.*;
*/
public abstract class Sncp {
private static final byte[] PING_BYTES = new SncpHeader(null, Uint128.ZERO, Uint128.ZERO).write(new ByteArray(SncpHeader.HEADER_SIZE), null, 0, 0, 0).getBytes();
private static final byte[] PING_BYTES = new SncpHeader(null, Uint128.ZERO, Uint128.ZERO).writeTo(new ByteArray(SncpHeader.HEADER_SIZE), null, 0, 0, 0).getBytes();
private static final byte[] PONG_BYTES = Arrays.copyOf(PING_BYTES, PING_BYTES.length);

View File

@@ -3,6 +3,7 @@
*/
package org.redkale.net.sncp;
import java.net.InetSocketAddress;
import org.redkale.net.*;
import org.redkale.net.client.*;
@@ -12,14 +13,21 @@ import org.redkale.net.client.*;
*/
public class SncpClient extends Client<SncpClientConnection, SncpClientRequest, SncpClientResult> {
private InetSocketAddress sncpAddress;
@SuppressWarnings("OverridableMethodCallInConstructor")
public SncpClient(String name, AsyncGroup group, String key, ClientAddress address, int maxConns, int maxPipelines) {
public SncpClient(String name, AsyncGroup group, InetSocketAddress sncpAddress, ClientAddress address, int maxConns, int maxPipelines) {
super(name, group, true, address, maxConns, maxPipelines, null, null, null); //maxConns
this.sncpAddress = sncpAddress;
}
@Override
protected SncpClientConnection createClientConnection(int index, AsyncConnection channel) {
throw new UnsupportedOperationException("Not supported yet.");
public SncpClientConnection createClientConnection(int index, AsyncConnection channel) {
return new SncpClientConnection(this, index, channel);
}
public InetSocketAddress getSncpAddress() {
return sncpAddress;
}
}

View File

@@ -18,7 +18,7 @@ public class SncpClientConnection extends ClientConnection<SncpClientRequest, Sn
public SncpClientConnection(SncpClient client, int index, AsyncConnection channel) {
super(client, index, channel);
requestPool = ObjectPool.createUnsafePool(Thread.currentThread(), 256,
ObjectPool.createSafePool(256, t -> new SncpClientRequest(null), SncpClientRequest::prepare, SncpClientRequest::recycle)
ObjectPool.createSafePool(256, t -> new SncpClientRequest(), SncpClientRequest::prepare, SncpClientRequest::recycle)
);
}

View File

@@ -3,10 +3,9 @@
*/
package org.redkale.net.sncp;
import java.net.InetSocketAddress;
import java.util.Objects;
import org.redkale.net.client.*;
import org.redkale.util.*;
import org.redkale.util.ByteArray;
/**
*
@@ -14,34 +13,19 @@ import org.redkale.util.*;
*/
public class SncpClientRequest extends ClientRequest {
private final InetSocketAddress clientSncpAddress;
private final byte[] addrBytes;
private final int addrPort;
private SncpHeader header;
private long seqid;
private Uint128 serviceid;
private int serviceVersion;
private Uint128 actionid;
private byte[] bodyContent;
public SncpClientRequest(InetSocketAddress clientSncpAddress) {
this.clientSncpAddress = clientSncpAddress;
this.addrBytes = clientSncpAddress == null ? new byte[4] : clientSncpAddress.getAddress().getAddress();
this.addrPort = clientSncpAddress == null ? 0 : clientSncpAddress.getPort();
public SncpClientRequest() {
}
public SncpClientRequest prepare(long seqid, Uint128 serviceid, int serviceVersion, Uint128 actionid, String traceid, byte[] bodyContent) {
public SncpClientRequest prepare(SncpHeader header, long seqid, String traceid, byte[] bodyContent) {
super.prepare();
this.header = header;
this.seqid = seqid;
this.serviceid = serviceid;
this.serviceVersion = serviceVersion;
this.actionid = actionid;
this.traceid = traceid;
this.bodyContent = bodyContent;
return this;
@@ -54,50 +38,39 @@ public class SncpClientRequest extends ClientRequest {
@Override
protected boolean recycle() {
boolean rs = super.recycle();
this.header = null;
this.seqid = 0;
this.serviceVersion = 0;
this.serviceid = null;
this.actionid = null;
this.bodyContent = null;
return rs;
}
@Override
public void writeTo(ClientConnection conn, ByteArray array) {
if (bodyContent == null) {
header.writeTo(array, header.getAddrBytes(), header.getAddrPort(), seqid, 0, 0);
} else {
header.writeTo(array, header.getAddrBytes(), header.getAddrPort(), seqid, bodyContent.length, 0);
array.put(bodyContent);
}
}
@Override
public String toString() {
return getClass().getSimpleName() + "_" + Objects.hashCode(this) + "{"
+ "seqid = " + seqid
+ ", serviceVersion = " + serviceVersion
+ ", serviceid = " + serviceid
+ ", actionid = " + actionid
+ ", bodyLength = " + (bodyContent == null ? -1 : bodyContent.length)
+ "header=" + header
+ ", seqid =" + seqid
+ ", body=[" + (bodyContent == null ? -1 : bodyContent.length) + "]"
+ "}";
}
public SncpHeader getHeader() {
return header;
}
public long getSeqid() {
return seqid;
}
public Uint128 getServiceid() {
return serviceid;
}
public int getServiceVersion() {
return serviceVersion;
}
public Uint128 getActionid() {
return actionid;
}
public InetSocketAddress getClientSncpAddress() {
return clientSncpAddress;
}
public byte[] getBodyContent() {
return bodyContent;
}

View File

@@ -5,6 +5,7 @@ package org.redkale.net.sncp;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Objects;
import org.redkale.util.ByteArray;
/**
@@ -47,6 +48,14 @@ public class SncpClientResult {
return header == null ? null : header.getSeqid();
}
@Override
public String toString() {
return getClass().getSimpleName() + "_" + Objects.hashCode(this) + "{"
+ "header=" + header
+ ", body=[" + (bodyContent == null ? -1 : bodyContent.length) + "]"
+ "}";
}
public int getBodyLength() {
return header.getBodyLength();
}

View File

@@ -18,7 +18,7 @@ public class SncpHeader {
private static final byte[] EMPTY_ADDR = new byte[4];
private long seqid;
private Long seqid;
private Uint128 serviceid;
@@ -92,39 +92,29 @@ public class SncpHeader {
return size;
}
public ByteArray write(ByteArray array, InetSocketAddress address, long newSeqid, int bodyLength, int retcode) {
public ByteArray writeTo(ByteArray array, InetSocketAddress address, long newSeqid, int bodyLength, int retcode) {
byte[] newAddrBytes = address == null ? EMPTY_ADDR : address.getAddress().getAddress();
int newAddrPort = address == null ? 0 : address.getPort();
return write(array, newAddrBytes, newAddrPort, newSeqid, bodyLength, retcode);
return writeTo(array, newAddrBytes, newAddrPort, newSeqid, bodyLength, retcode);
}
public ByteArray write(ByteArray array, byte[] newAddrBytes, int newAddrPort, long newSeqid, int bodyLength, int retcode) {
int offset = 0;
array.putLong(offset, newSeqid);
offset += 8;
array.putChar(offset, (char) HEADER_SIZE);
offset += 2;
array.putUint128(offset, serviceid);
offset += 16;
array.putInt(offset, serviceVersion);
offset += 4;
array.putUint128(offset, actionid);
offset += 16;
array.put(offset, newAddrBytes);
offset += newAddrBytes.length; //4
array.putChar(offset, (char) newAddrPort);
offset += 2;
array.putInt(offset, bodyLength);
offset += 4;
array.putInt(offset, retcode); //4
public ByteArray writeTo(ByteArray array, byte[] newAddrBytes, int newAddrPort, long newSeqid, int bodyLength, int retcode) {
array.putLong(newSeqid); //8
array.putChar((char) HEADER_SIZE); //2
array.putUint128(serviceid); //16
array.putInt(serviceVersion); //4
array.putUint128(actionid); //16
array.put(newAddrBytes); //4
array.putChar((char) newAddrPort); //2
array.putInt(bodyLength); //4
array.putInt(retcode); //4
return array;
}
@Override
public String toString() {
return getClass().getSimpleName()
+ "{seqid=" + this.seqid
+ ",serviceid=" + this.serviceid
+ (this.seqid == null ? ("{serviceid=" + this.serviceid) : ("{seqid=" + this.seqid + ",serviceid=" + this.serviceid))
+ ",serviceVersion=" + this.serviceVersion
+ ",actionid=" + this.actionid
+ ",address=" + getAddress()
@@ -146,14 +136,16 @@ public class SncpHeader {
//供client端request和response的header判断
public boolean checkValid(SncpHeader other) {
return this.seqid == other.seqid && Objects.equals(this.serviceid, other.serviceid) && Objects.equals(this.actionid, other.actionid);
return Objects.equals(this.seqid, other.seqid)
&& Objects.equals(this.serviceid, other.serviceid)
&& Objects.equals(this.actionid, other.actionid);
}
public long getSeqid() {
public Long getSeqid() {
return seqid;
}
public void setSeqid(long seqid) {
public void setSeqid(Long seqid) {
this.seqid = seqid;
}

View File

@@ -522,7 +522,7 @@ public final class SncpOldClient {
}
private void fillHeader(ByteArray buffer, SncpAction action, long seqid, String traceid, int bodyLength) {
action.header.write(buffer, addrBytes, addrPort, seqid, bodyLength, 0); //结果码, 请求方固定传0
action.header.writeTo(buffer, addrBytes, addrPort, seqid, bodyLength, 0); //结果码, 请求方固定传0
}
protected static final class SncpAction {

View File

@@ -107,7 +107,7 @@ public class SncpRequest extends Request<SncpContext> {
@Override
public String toString() {
return SncpRequest.class.getSimpleName() + "{header=" + this.header + ",bodyOffset=" + this.bodyOffset + "}";
return SncpRequest.class.getSimpleName() + "{header=" + this.header + ",bodyOffset=" + this.bodyOffset + ",body=[" + (this.body == null ? -1 : this.body.length) + "]}";
}
@Override

View File

@@ -81,7 +81,7 @@ public class SncpResponse extends Response<SncpContext, SncpRequest> {
protected void fillHeader(ByteArray buffer, int bodyLength, int retcode) {
SncpHeader header = request.getHeader();
header.write(buffer, this.addrBytes, this.addrPort, header.getSeqid(), bodyLength, retcode);
header.writeTo(buffer, this.addrBytes, this.addrPort, header.getSeqid(), bodyLength, retcode);
}
}

View File

@@ -775,7 +775,7 @@ public final class ByteArray implements ByteTuple {
* @return ByteArray
*/
public ByteArray putUint128(Uint128 value) {
return this.put(value.directBytes());
return this.put(value.value);
}
/**
@@ -787,7 +787,7 @@ public final class ByteArray implements ByteTuple {
* @return ByteArray
*/
public ByteArray putUint128(int offset, Uint128 value) {
return this.put(offset, value.directBytes());
return this.put(offset, value.value);
}
public ByteArray putByte(short value) {

View File

@@ -21,14 +21,13 @@ public final class Uint128 extends Number implements Comparable<Uint128> {
public static final Uint128 ZERO = new Uint128(new byte[16]);
protected final byte[] value;
final byte[] value;
// private Uint128(long v1, long v2) { //暂时不用
// this.value = new byte[]{(byte) (v1 >> 56), (byte) (v1 >> 48), (byte) (v1 >> 40), (byte) (v1 >> 32),
// (byte) (v1 >> 24), (byte) (v1 >> 16), (byte) (v1 >> 8), (byte) v1, (byte) (v2 >> 56), (byte) (v2 >> 48), (byte) (v2 >> 40), (byte) (v2 >> 32),
// (byte) (v2 >> 24), (byte) (v2 >> 16), (byte) (v2 >> 8), (byte) v2};
// }
private Uint128(byte[] bytes) {
if (bytes == null || bytes.length != 16) {
throw new NumberFormatException("Not 16 length bytes");
@@ -40,10 +39,6 @@ public final class Uint128 extends Number implements Comparable<Uint128> {
return Arrays.copyOf(value, value.length);
}
public byte[] directBytes() {
return value;
}
public static Uint128 create(byte[] bs) {
if (bs[15] == 0 && bs[14] == 0 && bs[13] == 0 && bs[12] == 0
&& bs[11] == 0 && bs[10] == 0 && bs[9] == 0 && bs[8] == 0

View File

@@ -0,0 +1,68 @@
/*
*
*/
package org.redkale.test.sncp;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.*;
import org.junit.jupiter.api.*;
import org.redkale.net.AsyncIOGroup;
import org.redkale.net.client.*;
import org.redkale.net.sncp.*;
import org.redkale.util.*;
/**
*
* @author zhangjx
*/
public class SncpClientCodecTest {
private boolean main;
public static void main(String[] args) throws Throwable {
SncpClientCodecTest test = new SncpClientCodecTest();
test.main = true;
test.run();
}
@Test
public void run() throws Exception {
InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", 3389);
InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 3344);
final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16);
SncpClient client = new SncpClient("test", asyncGroup, sncpAddress, new ClientAddress(remoteAddress), Utility.cpus(), 16);
SncpClientConnection conn = client.createClientConnection(1, asyncGroup.newTCPClientConnection());
SncpClientCodec codec = new SncpClientCodec(conn);
List respResults = new ArrayList();
try {
Field respResultsField = ClientCodec.class.getDeclaredField("respResults");
respResultsField.setAccessible(true);
respResults = (List) respResultsField.get(codec);
} catch (Exception e) {
e.printStackTrace();
}
//----------------------------------------------
ByteBuffer realBuf;
{
SncpHeader header = new SncpHeader(sncpAddress, Uint128.ZERO, Uint128.ZERO);
SncpClientRequest request = new SncpClientRequest();
ByteArray writeArray = new ByteArray();
request.prepare(header, 1, "", new byte[20]);
System.out.println("request.1 = " + request);
request.writeTo(conn, writeArray);
request.prepare(header, 2, "", new byte[25]);
System.out.println("request.2 = " + request);
request.writeTo(conn, writeArray);
System.out.println(writeArray.getBytes().length);
realBuf = ByteBuffer.wrap(writeArray.getBytes());
}
System.out.println("sncp.realBuf = " + realBuf.remaining());
codec.decodeMessages(realBuf, new ByteArray());
if (!main) {
Assertions.assertEquals(2, respResults.size());
}
System.out.println(respResults);
}
}