From c6ef28c358ffc6f0fb11f9034157062f6a1d3d75 Mon Sep 17 00:00:00 2001 From: redkale Date: Thu, 2 Feb 2023 15:31:19 +0800 Subject: [PATCH] =?UTF-8?q?sncp=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/net/client/ClientCodec.java | 1 + .../org/redkale/net/sncp/SncpClientCodec.java | 24 ++++++++++++++---- .../net/sncp/SncpClientConnection.java | 12 +++++++++ .../redkale/net/sncp/SncpClientRequest.java | 6 ++++- .../redkale/net/sncp/SncpClientResult.java | 25 +++++++++++-------- .../java/org/redkale/net/sncp/SncpHeader.java | 14 ++++++----- .../org/redkale/net/sncp/SncpRequest.java | 11 +++----- .../test/sncp/SncpTestServiceImpl.java | 4 +-- 8 files changed, 66 insertions(+), 31 deletions(-) diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index 1bccf8ade..41cc5d37a 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -69,6 +69,7 @@ public abstract class ClientCodec implements Complet for (ClientResponse cr : respResults) { if (cr.isError()) { connection.dispose(null); + return; } else { ClientFuture respFuture = connection.pollRespFuture(cr.getRequestid()); if (respFuture != null) { diff --git a/src/main/java/org/redkale/net/sncp/SncpClientCodec.java b/src/main/java/org/redkale/net/sncp/SncpClientCodec.java index 63fef3087..9f7f967a3 100644 --- a/src/main/java/org/redkale/net/sncp/SncpClientCodec.java +++ b/src/main/java/org/redkale/net/sncp/SncpClientCodec.java @@ -6,7 +6,8 @@ package org.redkale.net.sncp; import java.nio.ByteBuffer; import java.util.logging.Logger; import org.redkale.net.client.ClientCodec; -import org.redkale.util.ByteArray; +import static org.redkale.net.sncp.SncpHeader.HEADER_SIZE; +import org.redkale.util.*; /** * @@ -16,6 +17,8 @@ public class SncpClientCodec extends ClientCodec resultPool = ObjectPool.createUnsafePool(256, t -> new SncpClientResult(), SncpClientResult::prepare, SncpClientResult::recycle); + private ByteArray recyclableArray; protected ByteArray halfBodyBytes; @@ -28,6 +31,15 @@ public class SncpClientCodec extends ClientCodec { + private final ObjectPool requestPool; + public SncpClientConnection(SncpClient client, int index, AsyncConnection channel) { super(client, index, channel); + requestPool = ObjectPool.createUnsafePool(Thread.currentThread(), 256, + ObjectPool.createSafePool(256, t -> new SncpClientRequest(null), SncpClientRequest::prepare, SncpClientRequest::recycle) + ); } @Override @@ -21,4 +27,10 @@ public class SncpClientConnection extends ClientConnection { return HEADER_SIZE - buffer.remaining(); //小于60 } this.header = new SncpHeader(); - if (!this.header.read(buffer)) { - if (context.getLogger().isLoggable(Level.FINEST)) { - context.getLogger().finest("sncp buffer header.length not " + HEADER_SIZE); - } + int headerSize = this.header.read(buffer); + if (headerSize != HEADER_SIZE) { + context.getLogger().log(Level.WARNING, "sncp buffer header.length not " + HEADER_SIZE); return -1; } if (this.header.getRetcode() != 0) { // retcode - if (context.getLogger().isLoggable(Level.FINEST)) { - context.getLogger().finest("sncp buffer header.retcode not 0"); - } + context.getLogger().log(Level.WARNING, "sncp buffer header.retcode not 0"); return -1; } this.body = new byte[this.header.getBodyLength()]; diff --git a/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java b/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java index 2415a772b..8153e305f 100644 --- a/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java +++ b/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java @@ -29,8 +29,8 @@ public class SncpTestServiceImpl implements SncpTestIService { @Override public void run() { try { - Thread.sleep(1000); - System.out.println(Thread.currentThread().getName() + " sleep 1秒后运行了异步方法-----------queryResultAsync方法"); + Thread.sleep(200); + System.out.println(Thread.currentThread().getName() + " sleep 200ms后运行了异步方法-----------queryResultAsync方法"); future.complete("异步result: " + bean); } catch (Exception e) { e.printStackTrace();