From 848f33bd2b244b4a96db5cc58343017cd05af89d Mon Sep 17 00:00:00 2001 From: redkale Date: Tue, 7 Feb 2023 17:27:55 +0800 Subject: [PATCH] =?UTF-8?q?udp=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/redkale/net/AsyncConnection.java | 8 +++---- .../org/redkale/net/AsyncNioConnection.java | 4 ++-- .../redkale/net/AsyncNioUdpConnection.java | 23 +++++++++++++++++-- .../net/AsyncNioUdpProtocolServer.java | 2 ++ .../java/org/redkale/test/sncp/SncpTest.java | 14 +++++------ .../test/sncp/SncpTestServiceImpl.java | 2 +- 6 files changed, 37 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index bccd24de6..8be90e1b6 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -33,9 +33,9 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { //SSL protected SSLEngine sslEngine; - protected volatile long readtime; + protected volatile long readTime; - protected volatile long writetime; + protected volatile long writeTime; private Map attributes; //用于存储绑定在Connection上的对象集合 @@ -145,11 +145,11 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } public final long getLastReadTime() { - return readtime; + return readTime; } public final long getLastWriteTime() { - return writetime; + return writeTime; } public final boolean ssl() { diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index 942cff445..6a90c86f8 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -251,7 +251,7 @@ abstract class AsyncNioConnection extends AsyncConnection { public void doRead(boolean direct) { try { - this.readtime = System.currentTimeMillis(); + this.readTime = System.currentTimeMillis(); int readCount = 0; if (direct) { if (this.readByteBuffer == null) { @@ -284,7 +284,7 @@ abstract class AsyncNioConnection extends AsyncConnection { public void doWrite(boolean direct) { try { - this.writetime = System.currentTimeMillis(); + this.writeTime = System.currentTimeMillis(); int totalCount = 0; boolean hasRemain = true; boolean writeCompleted = true; diff --git a/src/main/java/org/redkale/net/AsyncNioUdpConnection.java b/src/main/java/org/redkale/net/AsyncNioUdpConnection.java index f83d404fa..a978b143a 100644 --- a/src/main/java/org/redkale/net/AsyncNioUdpConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioUdpConnection.java @@ -13,6 +13,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentLinkedDeque; import javax.net.ssl.SSLContext; import org.redkale.net.AsyncNioUdpProtocolServer.AsyncNioUdpServerChannel; +import org.redkale.util.Utility; /** * @@ -149,7 +150,25 @@ class AsyncNioUdpConnection extends AsyncNioConnection { @Override protected int implWrite(ByteBuffer src) throws IOException { - return this.channel.send(src, remoteAddress); + long now = System.currentTimeMillis(); + //发送过频会丢包 + if (clientMode) { + if (this.writeTime + 1 > now) { + Utility.sleep(1); + this.writeTime = System.currentTimeMillis(); + } else { + this.writeTime = now; + } + return this.channel.send(src, remoteAddress); + } else { + if (udpServerChannel.writeTime + 1 > now) { + Utility.sleep(1); + udpServerChannel.writeTime = System.currentTimeMillis(); + } else { + udpServerChannel.writeTime = now; + } + return this.channel.send(src, remoteAddress); + } } @Override @@ -158,7 +177,7 @@ class AsyncNioUdpConnection extends AsyncNioConnection { for (int i = offset; i < end; i++) { ByteBuffer buf = srcs[i]; if (buf.hasRemaining()) { - return this.channel.send(buf, remoteAddress); + return implWrite(buf); } } return 0; diff --git a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java index 0844bf702..35c03a31d 100644 --- a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java @@ -240,6 +240,8 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { ByteBufferPool unsafeBufferPool; + volatile long writeTime; + ConcurrentHashMap connections = new ConcurrentHashMap<>(); public AsyncNioUdpServerChannel(DatagramChannel serverChannel) { diff --git a/src/test/java/org/redkale/test/sncp/SncpTest.java b/src/test/java/org/redkale/test/sncp/SncpTest.java index ee98a6d3d..428064670 100644 --- a/src/test/java/org/redkale/test/sncp/SncpTest.java +++ b/src/test/java/org/redkale/test/sncp/SncpTest.java @@ -26,7 +26,7 @@ public class SncpTest { private static final String myhost = "127.0.0.1"; - private static int port = 63877; + private static int port = 0; private static int port2 = 4240; @@ -95,11 +95,11 @@ public class SncpTest { System.out.println("bean: " + callbean); System.out.println("---------------------------------------------------"); Thread.sleep(200); - final int count = 1; + final int count = 10; final CountDownLatch cld = new CountDownLatch(count); final AtomicInteger ai = new AtomicInteger(); long s = System.currentTimeMillis(); - for (int i = 0; i < count; i++) { + for (int i = 10; i < count + 10; i++) { final int k = i + 1; new Thread() { @Override @@ -108,11 +108,11 @@ public class SncpTest { //Thread.sleep(k); SncpTestBean bean = new SncpTestBean(); bean.setId(k); - bean.setContent("数据: " + (k < 10 ? "0" : "") + k); + bean.setContent("数据: " + k); StringBuilder sb = new StringBuilder(); - sb.append(k).append("------"); - for (int i = 0; i < 900; i++) { - sb.append("_").append(i).append("_").append(k).append("_0123456789"); + sb.append(k).append("--------"); + for (int j = 0; j < 2000; j++) { + sb.append("_").append(j).append("_").append(k).append("_0123456789"); } bean.setContent(sb.toString()); diff --git a/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java b/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java index 8837f40d2..e84447def 100644 --- a/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java +++ b/src/test/java/org/redkale/test/sncp/SncpTestServiceImpl.java @@ -64,7 +64,7 @@ public class SncpTestServiceImpl implements SncpTestIService { @Override public String queryResult(SncpTestBean bean) { - System.out.println(Thread.currentThread().getName() + " 运行了queryResult方法"); + System.out.println(Thread.currentThread().getName() + " 运行了queryResult方法 content-length: " + bean.getContent().length()); return "result: " + bean.getContent(); }