udp优化

This commit is contained in:
redkale
2023-02-07 17:27:55 +08:00
parent 1d6b76b182
commit 848f33bd2b
6 changed files with 37 additions and 16 deletions

View File

@@ -33,9 +33,9 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
//SSL //SSL
protected SSLEngine sslEngine; protected SSLEngine sslEngine;
protected volatile long readtime; protected volatile long readTime;
protected volatile long writetime; protected volatile long writeTime;
private Map<String, Object> attributes; //用于存储绑定在Connection上的对象集合 private Map<String, Object> attributes; //用于存储绑定在Connection上的对象集合
@@ -145,11 +145,11 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
} }
public final long getLastReadTime() { public final long getLastReadTime() {
return readtime; return readTime;
} }
public final long getLastWriteTime() { public final long getLastWriteTime() {
return writetime; return writeTime;
} }
public final boolean ssl() { public final boolean ssl() {

View File

@@ -251,7 +251,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
public void doRead(boolean direct) { public void doRead(boolean direct) {
try { try {
this.readtime = System.currentTimeMillis(); this.readTime = System.currentTimeMillis();
int readCount = 0; int readCount = 0;
if (direct) { if (direct) {
if (this.readByteBuffer == null) { if (this.readByteBuffer == null) {
@@ -284,7 +284,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
public void doWrite(boolean direct) { public void doWrite(boolean direct) {
try { try {
this.writetime = System.currentTimeMillis(); this.writeTime = System.currentTimeMillis();
int totalCount = 0; int totalCount = 0;
boolean hasRemain = true; boolean hasRemain = true;
boolean writeCompleted = true; boolean writeCompleted = true;

View File

@@ -13,6 +13,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedDeque;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import org.redkale.net.AsyncNioUdpProtocolServer.AsyncNioUdpServerChannel; import org.redkale.net.AsyncNioUdpProtocolServer.AsyncNioUdpServerChannel;
import org.redkale.util.Utility;
/** /**
* *
@@ -149,7 +150,25 @@ class AsyncNioUdpConnection extends AsyncNioConnection {
@Override @Override
protected int implWrite(ByteBuffer src) throws IOException { protected int implWrite(ByteBuffer src) throws IOException {
return this.channel.send(src, remoteAddress); long now = System.currentTimeMillis();
//发送过频会丢包
if (clientMode) {
if (this.writeTime + 1 > now) {
Utility.sleep(1);
this.writeTime = System.currentTimeMillis();
} else {
this.writeTime = now;
}
return this.channel.send(src, remoteAddress);
} else {
if (udpServerChannel.writeTime + 1 > now) {
Utility.sleep(1);
udpServerChannel.writeTime = System.currentTimeMillis();
} else {
udpServerChannel.writeTime = now;
}
return this.channel.send(src, remoteAddress);
}
} }
@Override @Override
@@ -158,7 +177,7 @@ class AsyncNioUdpConnection extends AsyncNioConnection {
for (int i = offset; i < end; i++) { for (int i = offset; i < end; i++) {
ByteBuffer buf = srcs[i]; ByteBuffer buf = srcs[i];
if (buf.hasRemaining()) { if (buf.hasRemaining()) {
return this.channel.send(buf, remoteAddress); return implWrite(buf);
} }
} }
return 0; return 0;

View File

@@ -240,6 +240,8 @@ class AsyncNioUdpProtocolServer extends ProtocolServer {
ByteBufferPool unsafeBufferPool; ByteBufferPool unsafeBufferPool;
volatile long writeTime;
ConcurrentHashMap<SocketAddress, AsyncNioUdpConnection> connections = new ConcurrentHashMap<>(); ConcurrentHashMap<SocketAddress, AsyncNioUdpConnection> connections = new ConcurrentHashMap<>();
public AsyncNioUdpServerChannel(DatagramChannel serverChannel) { public AsyncNioUdpServerChannel(DatagramChannel serverChannel) {

View File

@@ -26,7 +26,7 @@ public class SncpTest {
private static final String myhost = "127.0.0.1"; private static final String myhost = "127.0.0.1";
private static int port = 63877; private static int port = 0;
private static int port2 = 4240; private static int port2 = 4240;
@@ -95,11 +95,11 @@ public class SncpTest {
System.out.println("bean " + callbean); System.out.println("bean " + callbean);
System.out.println("---------------------------------------------------"); System.out.println("---------------------------------------------------");
Thread.sleep(200); Thread.sleep(200);
final int count = 1; final int count = 10;
final CountDownLatch cld = new CountDownLatch(count); final CountDownLatch cld = new CountDownLatch(count);
final AtomicInteger ai = new AtomicInteger(); final AtomicInteger ai = new AtomicInteger();
long s = System.currentTimeMillis(); long s = System.currentTimeMillis();
for (int i = 0; i < count; i++) { for (int i = 10; i < count + 10; i++) {
final int k = i + 1; final int k = i + 1;
new Thread() { new Thread() {
@Override @Override
@@ -108,11 +108,11 @@ public class SncpTest {
//Thread.sleep(k); //Thread.sleep(k);
SncpTestBean bean = new SncpTestBean(); SncpTestBean bean = new SncpTestBean();
bean.setId(k); bean.setId(k);
bean.setContent("数据: " + (k < 10 ? "0" : "") + k); bean.setContent("数据: " + k);
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append(k).append("------"); sb.append(k).append("--------");
for (int i = 0; i < 900; i++) { for (int j = 0; j < 2000; j++) {
sb.append("_").append(i).append("_").append(k).append("_0123456789"); sb.append("_").append(j).append("_").append(k).append("_0123456789");
} }
bean.setContent(sb.toString()); bean.setContent(sb.toString());

View File

@@ -64,7 +64,7 @@ public class SncpTestServiceImpl implements SncpTestIService {
@Override @Override
public String queryResult(SncpTestBean bean) { public String queryResult(SncpTestBean bean) {
System.out.println(Thread.currentThread().getName() + " 运行了queryResult方法"); System.out.println(Thread.currentThread().getName() + " 运行了queryResult方法 content-length: " + bean.getContent().length());
return "result: " + bean.getContent(); return "result: " + bean.getContent();
} }