From 6973e6b15905824c0ba4d9b9d03c3b02738cb2e5 Mon Sep 17 00:00:00 2001 From: redkale Date: Thu, 2 Feb 2023 21:40:58 +0800 Subject: [PATCH] SncpClientCodecTest --- .../convert/ext/Uint128SimpledCoder.java | 2 +- src/main/java/org/redkale/net/sncp/Sncp.java | 2 +- .../java/org/redkale/net/sncp/SncpClient.java | 14 +++- .../net/sncp/SncpClientConnection.java | 2 +- .../redkale/net/sncp/SncpClientRequest.java | 65 ++++++------------ .../redkale/net/sncp/SncpClientResult.java | 9 +++ .../java/org/redkale/net/sncp/SncpHeader.java | 46 ++++++------- .../org/redkale/net/sncp/SncpOldClient.java | 2 +- .../org/redkale/net/sncp/SncpRequest.java | 2 +- .../org/redkale/net/sncp/SncpResponse.java | 2 +- src/main/java/org/redkale/util/ByteArray.java | 4 +- src/main/java/org/redkale/util/Uint128.java | 7 +- .../test/sncp/SncpClientCodecTest.java | 68 +++++++++++++++++++ 13 files changed, 135 insertions(+), 90 deletions(-) create mode 100644 src/test/java/org/redkale/test/sncp/SncpClientCodecTest.java diff --git a/src/main/java/org/redkale/convert/ext/Uint128SimpledCoder.java b/src/main/java/org/redkale/convert/ext/Uint128SimpledCoder.java index 28169923b..a44f96073 100644 --- a/src/main/java/org/redkale/convert/ext/Uint128SimpledCoder.java +++ b/src/main/java/org/redkale/convert/ext/Uint128SimpledCoder.java @@ -31,7 +31,7 @@ public final class Uint128SimpledCoder exten if (value == null) { out.writeNull(); } else { - bsSimpledCoder.convertTo(out, value.directBytes()); + bsSimpledCoder.convertTo(out, value.getBytes()); } } diff --git a/src/main/java/org/redkale/net/sncp/Sncp.java b/src/main/java/org/redkale/net/sncp/Sncp.java index b6acd79bd..02688cfcd 100644 --- a/src/main/java/org/redkale/net/sncp/Sncp.java +++ b/src/main/java/org/redkale/net/sncp/Sncp.java @@ -37,7 +37,7 @@ import org.redkale.util.*; */ public abstract class Sncp { - private static final byte[] PING_BYTES = new SncpHeader(null, Uint128.ZERO, Uint128.ZERO).write(new ByteArray(SncpHeader.HEADER_SIZE), null, 0, 0, 0).getBytes(); + private static final byte[] PING_BYTES = new SncpHeader(null, Uint128.ZERO, Uint128.ZERO).writeTo(new ByteArray(SncpHeader.HEADER_SIZE), null, 0, 0, 0).getBytes(); private static final byte[] PONG_BYTES = Arrays.copyOf(PING_BYTES, PING_BYTES.length); diff --git a/src/main/java/org/redkale/net/sncp/SncpClient.java b/src/main/java/org/redkale/net/sncp/SncpClient.java index 0d7e86e0f..c59ebf61d 100644 --- a/src/main/java/org/redkale/net/sncp/SncpClient.java +++ b/src/main/java/org/redkale/net/sncp/SncpClient.java @@ -3,6 +3,7 @@ */ package org.redkale.net.sncp; +import java.net.InetSocketAddress; import org.redkale.net.*; import org.redkale.net.client.*; @@ -12,14 +13,21 @@ import org.redkale.net.client.*; */ public class SncpClient extends Client { + private InetSocketAddress sncpAddress; + @SuppressWarnings("OverridableMethodCallInConstructor") - public SncpClient(String name, AsyncGroup group, String key, ClientAddress address, int maxConns, int maxPipelines) { + public SncpClient(String name, AsyncGroup group, InetSocketAddress sncpAddress, ClientAddress address, int maxConns, int maxPipelines) { super(name, group, true, address, maxConns, maxPipelines, null, null, null); //maxConns + this.sncpAddress = sncpAddress; } @Override - protected SncpClientConnection createClientConnection(int index, AsyncConnection channel) { - throw new UnsupportedOperationException("Not supported yet."); + public SncpClientConnection createClientConnection(int index, AsyncConnection channel) { + return new SncpClientConnection(this, index, channel); + } + + public InetSocketAddress getSncpAddress() { + return sncpAddress; } } diff --git a/src/main/java/org/redkale/net/sncp/SncpClientConnection.java b/src/main/java/org/redkale/net/sncp/SncpClientConnection.java index 2b6671043..18497159b 100644 --- a/src/main/java/org/redkale/net/sncp/SncpClientConnection.java +++ b/src/main/java/org/redkale/net/sncp/SncpClientConnection.java @@ -18,7 +18,7 @@ public class SncpClientConnection extends ClientConnection new SncpClientRequest(null), SncpClientRequest::prepare, SncpClientRequest::recycle) + ObjectPool.createSafePool(256, t -> new SncpClientRequest(), SncpClientRequest::prepare, SncpClientRequest::recycle) ); } diff --git a/src/main/java/org/redkale/net/sncp/SncpClientRequest.java b/src/main/java/org/redkale/net/sncp/SncpClientRequest.java index f0b3d209c..2da07737e 100644 --- a/src/main/java/org/redkale/net/sncp/SncpClientRequest.java +++ b/src/main/java/org/redkale/net/sncp/SncpClientRequest.java @@ -3,10 +3,9 @@ */ package org.redkale.net.sncp; -import java.net.InetSocketAddress; import java.util.Objects; import org.redkale.net.client.*; -import org.redkale.util.*; +import org.redkale.util.ByteArray; /** * @@ -14,34 +13,19 @@ import org.redkale.util.*; */ public class SncpClientRequest extends ClientRequest { - private final InetSocketAddress clientSncpAddress; - - private final byte[] addrBytes; - - private final int addrPort; + private SncpHeader header; private long seqid; - private Uint128 serviceid; - - private int serviceVersion; - - private Uint128 actionid; - private byte[] bodyContent; - public SncpClientRequest(InetSocketAddress clientSncpAddress) { - this.clientSncpAddress = clientSncpAddress; - this.addrBytes = clientSncpAddress == null ? new byte[4] : clientSncpAddress.getAddress().getAddress(); - this.addrPort = clientSncpAddress == null ? 0 : clientSncpAddress.getPort(); + public SncpClientRequest() { } - public SncpClientRequest prepare(long seqid, Uint128 serviceid, int serviceVersion, Uint128 actionid, String traceid, byte[] bodyContent) { + public SncpClientRequest prepare(SncpHeader header, long seqid, String traceid, byte[] bodyContent) { super.prepare(); + this.header = header; this.seqid = seqid; - this.serviceid = serviceid; - this.serviceVersion = serviceVersion; - this.actionid = actionid; this.traceid = traceid; this.bodyContent = bodyContent; return this; @@ -54,50 +38,39 @@ public class SncpClientRequest extends ClientRequest { @Override protected boolean recycle() { boolean rs = super.recycle(); + this.header = null; this.seqid = 0; - this.serviceVersion = 0; - this.serviceid = null; - this.actionid = null; this.bodyContent = null; return rs; } @Override public void writeTo(ClientConnection conn, ByteArray array) { - + if (bodyContent == null) { + header.writeTo(array, header.getAddrBytes(), header.getAddrPort(), seqid, 0, 0); + } else { + header.writeTo(array, header.getAddrBytes(), header.getAddrPort(), seqid, bodyContent.length, 0); + array.put(bodyContent); + } } @Override public String toString() { return getClass().getSimpleName() + "_" + Objects.hashCode(this) + "{" - + "seqid = " + seqid - + ", serviceVersion = " + serviceVersion - + ", serviceid = " + serviceid - + ", actionid = " + actionid - + ", bodyLength = " + (bodyContent == null ? -1 : bodyContent.length) + + "header=" + header + + ", seqid =" + seqid + + ", body=[" + (bodyContent == null ? -1 : bodyContent.length) + "]" + "}"; } + public SncpHeader getHeader() { + return header; + } + public long getSeqid() { return seqid; } - public Uint128 getServiceid() { - return serviceid; - } - - public int getServiceVersion() { - return serviceVersion; - } - - public Uint128 getActionid() { - return actionid; - } - - public InetSocketAddress getClientSncpAddress() { - return clientSncpAddress; - } - public byte[] getBodyContent() { return bodyContent; } diff --git a/src/main/java/org/redkale/net/sncp/SncpClientResult.java b/src/main/java/org/redkale/net/sncp/SncpClientResult.java index 91f683791..58e6a3201 100644 --- a/src/main/java/org/redkale/net/sncp/SncpClientResult.java +++ b/src/main/java/org/redkale/net/sncp/SncpClientResult.java @@ -5,6 +5,7 @@ package org.redkale.net.sncp; import java.io.Serializable; import java.nio.ByteBuffer; +import java.util.Objects; import org.redkale.util.ByteArray; /** @@ -47,6 +48,14 @@ public class SncpClientResult { return header == null ? null : header.getSeqid(); } + @Override + public String toString() { + return getClass().getSimpleName() + "_" + Objects.hashCode(this) + "{" + + "header=" + header + + ", body=[" + (bodyContent == null ? -1 : bodyContent.length) + "]" + + "}"; + } + public int getBodyLength() { return header.getBodyLength(); } diff --git a/src/main/java/org/redkale/net/sncp/SncpHeader.java b/src/main/java/org/redkale/net/sncp/SncpHeader.java index 1c766d356..c17cbba06 100644 --- a/src/main/java/org/redkale/net/sncp/SncpHeader.java +++ b/src/main/java/org/redkale/net/sncp/SncpHeader.java @@ -18,7 +18,7 @@ public class SncpHeader { private static final byte[] EMPTY_ADDR = new byte[4]; - private long seqid; + private Long seqid; private Uint128 serviceid; @@ -92,39 +92,29 @@ public class SncpHeader { return size; } - public ByteArray write(ByteArray array, InetSocketAddress address, long newSeqid, int bodyLength, int retcode) { + public ByteArray writeTo(ByteArray array, InetSocketAddress address, long newSeqid, int bodyLength, int retcode) { byte[] newAddrBytes = address == null ? EMPTY_ADDR : address.getAddress().getAddress(); int newAddrPort = address == null ? 0 : address.getPort(); - return write(array, newAddrBytes, newAddrPort, newSeqid, bodyLength, retcode); + return writeTo(array, newAddrBytes, newAddrPort, newSeqid, bodyLength, retcode); } - public ByteArray write(ByteArray array, byte[] newAddrBytes, int newAddrPort, long newSeqid, int bodyLength, int retcode) { - int offset = 0; - array.putLong(offset, newSeqid); - offset += 8; - array.putChar(offset, (char) HEADER_SIZE); - offset += 2; - array.putUint128(offset, serviceid); - offset += 16; - array.putInt(offset, serviceVersion); - offset += 4; - array.putUint128(offset, actionid); - offset += 16; - array.put(offset, newAddrBytes); - offset += newAddrBytes.length; //4 - array.putChar(offset, (char) newAddrPort); - offset += 2; - array.putInt(offset, bodyLength); - offset += 4; - array.putInt(offset, retcode); //4 + public ByteArray writeTo(ByteArray array, byte[] newAddrBytes, int newAddrPort, long newSeqid, int bodyLength, int retcode) { + array.putLong(newSeqid); //8 + array.putChar((char) HEADER_SIZE); //2 + array.putUint128(serviceid); //16 + array.putInt(serviceVersion); //4 + array.putUint128(actionid); //16 + array.put(newAddrBytes); //4 + array.putChar((char) newAddrPort); //2 + array.putInt(bodyLength); //4 + array.putInt(retcode); //4 return array; } @Override public String toString() { return getClass().getSimpleName() - + "{seqid=" + this.seqid - + ",serviceid=" + this.serviceid + + (this.seqid == null ? ("{serviceid=" + this.serviceid) : ("{seqid=" + this.seqid + ",serviceid=" + this.serviceid)) + ",serviceVersion=" + this.serviceVersion + ",actionid=" + this.actionid + ",address=" + getAddress() @@ -146,14 +136,16 @@ public class SncpHeader { //供client端request和response的header判断 public boolean checkValid(SncpHeader other) { - return this.seqid == other.seqid && Objects.equals(this.serviceid, other.serviceid) && Objects.equals(this.actionid, other.actionid); + return Objects.equals(this.seqid, other.seqid) + && Objects.equals(this.serviceid, other.serviceid) + && Objects.equals(this.actionid, other.actionid); } - public long getSeqid() { + public Long getSeqid() { return seqid; } - public void setSeqid(long seqid) { + public void setSeqid(Long seqid) { this.seqid = seqid; } diff --git a/src/main/java/org/redkale/net/sncp/SncpOldClient.java b/src/main/java/org/redkale/net/sncp/SncpOldClient.java index c4fe60010..6bde336a6 100644 --- a/src/main/java/org/redkale/net/sncp/SncpOldClient.java +++ b/src/main/java/org/redkale/net/sncp/SncpOldClient.java @@ -522,7 +522,7 @@ public final class SncpOldClient { } private void fillHeader(ByteArray buffer, SncpAction action, long seqid, String traceid, int bodyLength) { - action.header.write(buffer, addrBytes, addrPort, seqid, bodyLength, 0); //结果码, 请求方固定传0 + action.header.writeTo(buffer, addrBytes, addrPort, seqid, bodyLength, 0); //结果码, 请求方固定传0 } protected static final class SncpAction { diff --git a/src/main/java/org/redkale/net/sncp/SncpRequest.java b/src/main/java/org/redkale/net/sncp/SncpRequest.java index 1649e3c9e..c530679f7 100644 --- a/src/main/java/org/redkale/net/sncp/SncpRequest.java +++ b/src/main/java/org/redkale/net/sncp/SncpRequest.java @@ -107,7 +107,7 @@ public class SncpRequest extends Request { @Override public String toString() { - return SncpRequest.class.getSimpleName() + "{header=" + this.header + ",bodyOffset=" + this.bodyOffset + "}"; + return SncpRequest.class.getSimpleName() + "{header=" + this.header + ",bodyOffset=" + this.bodyOffset + ",body=[" + (this.body == null ? -1 : this.body.length) + "]}"; } @Override diff --git a/src/main/java/org/redkale/net/sncp/SncpResponse.java b/src/main/java/org/redkale/net/sncp/SncpResponse.java index fa66c0fa0..d2a814cba 100644 --- a/src/main/java/org/redkale/net/sncp/SncpResponse.java +++ b/src/main/java/org/redkale/net/sncp/SncpResponse.java @@ -81,7 +81,7 @@ public class SncpResponse extends Response { protected void fillHeader(ByteArray buffer, int bodyLength, int retcode) { SncpHeader header = request.getHeader(); - header.write(buffer, this.addrBytes, this.addrPort, header.getSeqid(), bodyLength, retcode); + header.writeTo(buffer, this.addrBytes, this.addrPort, header.getSeqid(), bodyLength, retcode); } } diff --git a/src/main/java/org/redkale/util/ByteArray.java b/src/main/java/org/redkale/util/ByteArray.java index 5bb57c44c..52d494fc5 100644 --- a/src/main/java/org/redkale/util/ByteArray.java +++ b/src/main/java/org/redkale/util/ByteArray.java @@ -775,7 +775,7 @@ public final class ByteArray implements ByteTuple { * @return ByteArray */ public ByteArray putUint128(Uint128 value) { - return this.put(value.directBytes()); + return this.put(value.value); } /** @@ -787,7 +787,7 @@ public final class ByteArray implements ByteTuple { * @return ByteArray */ public ByteArray putUint128(int offset, Uint128 value) { - return this.put(offset, value.directBytes()); + return this.put(offset, value.value); } public ByteArray putByte(short value) { diff --git a/src/main/java/org/redkale/util/Uint128.java b/src/main/java/org/redkale/util/Uint128.java index 372aa29fb..9bde6b9f1 100644 --- a/src/main/java/org/redkale/util/Uint128.java +++ b/src/main/java/org/redkale/util/Uint128.java @@ -21,14 +21,13 @@ public final class Uint128 extends Number implements Comparable { public static final Uint128 ZERO = new Uint128(new byte[16]); - protected final byte[] value; + final byte[] value; // private Uint128(long v1, long v2) { //暂时不用 // this.value = new byte[]{(byte) (v1 >> 56), (byte) (v1 >> 48), (byte) (v1 >> 40), (byte) (v1 >> 32), // (byte) (v1 >> 24), (byte) (v1 >> 16), (byte) (v1 >> 8), (byte) v1, (byte) (v2 >> 56), (byte) (v2 >> 48), (byte) (v2 >> 40), (byte) (v2 >> 32), // (byte) (v2 >> 24), (byte) (v2 >> 16), (byte) (v2 >> 8), (byte) v2}; // } - private Uint128(byte[] bytes) { if (bytes == null || bytes.length != 16) { throw new NumberFormatException("Not 16 length bytes"); @@ -40,10 +39,6 @@ public final class Uint128 extends Number implements Comparable { return Arrays.copyOf(value, value.length); } - public byte[] directBytes() { - return value; - } - public static Uint128 create(byte[] bs) { if (bs[15] == 0 && bs[14] == 0 && bs[13] == 0 && bs[12] == 0 && bs[11] == 0 && bs[10] == 0 && bs[9] == 0 && bs[8] == 0 diff --git a/src/test/java/org/redkale/test/sncp/SncpClientCodecTest.java b/src/test/java/org/redkale/test/sncp/SncpClientCodecTest.java new file mode 100644 index 000000000..a1ff07a35 --- /dev/null +++ b/src/test/java/org/redkale/test/sncp/SncpClientCodecTest.java @@ -0,0 +1,68 @@ +/* + * + */ +package org.redkale.test.sncp; + +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.*; +import org.junit.jupiter.api.*; +import org.redkale.net.AsyncIOGroup; +import org.redkale.net.client.*; +import org.redkale.net.sncp.*; +import org.redkale.util.*; + +/** + * + * @author zhangjx + */ +public class SncpClientCodecTest { + + private boolean main; + + public static void main(String[] args) throws Throwable { + SncpClientCodecTest test = new SncpClientCodecTest(); + test.main = true; + test.run(); + } + + @Test + public void run() throws Exception { + InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", 3389); + InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 3344); + final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16); + SncpClient client = new SncpClient("test", asyncGroup, sncpAddress, new ClientAddress(remoteAddress), Utility.cpus(), 16); + SncpClientConnection conn = client.createClientConnection(1, asyncGroup.newTCPClientConnection()); + SncpClientCodec codec = new SncpClientCodec(conn); + List respResults = new ArrayList(); + try { + Field respResultsField = ClientCodec.class.getDeclaredField("respResults"); + respResultsField.setAccessible(true); + respResults = (List) respResultsField.get(codec); + } catch (Exception e) { + e.printStackTrace(); + } + //---------------------------------------------- + ByteBuffer realBuf; + { + SncpHeader header = new SncpHeader(sncpAddress, Uint128.ZERO, Uint128.ZERO); + SncpClientRequest request = new SncpClientRequest(); + ByteArray writeArray = new ByteArray(); + request.prepare(header, 1, "", new byte[20]); + System.out.println("request.1 = " + request); + request.writeTo(conn, writeArray); + request.prepare(header, 2, "", new byte[25]); + System.out.println("request.2 = " + request); + request.writeTo(conn, writeArray); + System.out.println(writeArray.getBytes().length); + realBuf = ByteBuffer.wrap(writeArray.getBytes()); + } + System.out.println("sncp.realBuf = " + realBuf.remaining()); + codec.decodeMessages(realBuf, new ByteArray()); + if (!main) { + Assertions.assertEquals(2, respResults.size()); + } + System.out.println(respResults); + } +}