AsyncConnection优化

This commit is contained in:
redkale
2023-07-04 22:11:38 +08:00
parent 5390147985
commit fbea655e96
5 changed files with 37 additions and 28 deletions

View File

@@ -224,7 +224,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
public abstract void setWriteTimeoutSeconds(int writeTimeoutSeconds); public abstract void setWriteTimeoutSeconds(int writeTimeoutSeconds);
public abstract <A> void clientWrite(byte[] data, A attachment, CompletionHandler<Integer, ? super A> handler); public abstract <A> void fastWrite(byte[] data, A attachment, CompletionHandler<Integer, ? super A> handler);
protected abstract void readRegisterImpl(CompletionHandler<Integer, ByteBuffer> handler); protected abstract void readRegisterImpl(CompletionHandler<Integer, ByteBuffer> handler);
@@ -240,8 +240,8 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
read(handler); read(handler);
} }
public final <A> void clientWrite(byte[] data, CompletionHandler<Integer, ? super A> handler) { public final <A> void fastWrite(byte[] data, CompletionHandler<Integer, ? super A> handler) {
clientWrite(data, null, handler); fastWrite(data, null, handler);
} }
public final void startReadInIOThread(CompletionHandler<Integer, ByteBuffer> handler) { public final void startReadInIOThread(CompletionHandler<Integer, ByteBuffer> handler) {

View File

@@ -153,7 +153,9 @@ abstract class AsyncNioConnection extends AsyncConnection {
ioReadThread.register(selector -> { ioReadThread.register(selector -> {
try { try {
if (readKey == null) { if (readKey == null) {
readKey = implRegister(selector, SelectionKey.OP_READ); SelectionKey oldKey = keyFor(selector);
int ops = oldKey == null ? SelectionKey.OP_READ : (SelectionKey.OP_READ | oldKey.interestOps());
readKey = implRegister(selector, ops);
readKey.attach(this); readKey.attach(this);
} else { } else {
readKey.interestOps(readKey.interestOps() | SelectionKey.OP_READ); readKey.interestOps(readKey.interestOps() | SelectionKey.OP_READ);
@@ -287,7 +289,7 @@ abstract class AsyncNioConnection extends AsyncConnection {
} }
@Override @Override
public <A> void clientWrite(byte[] data, A attachment, CompletionHandler<Integer, ? super A> handler) { public <A> void fastWrite(byte[] data, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (!this.isConnected()) { if (!this.isConnected()) {
handler.failed(new NotYetConnectedException(), null); handler.failed(new NotYetConnectedException(), null);
return; return;
@@ -303,7 +305,9 @@ abstract class AsyncNioConnection extends AsyncConnection {
ioWriteThread.register(selector -> { ioWriteThread.register(selector -> {
try { try {
if (writeKey == null) { if (writeKey == null) {
writeKey = implRegister(selector, SelectionKey.OP_WRITE); SelectionKey oldKey = keyFor(selector);
int ops = oldKey == null ? SelectionKey.OP_WRITE : (SelectionKey.OP_WRITE | oldKey.interestOps());
writeKey = implRegister(selector, ops);
writeKey.attach(this); writeKey.attach(this);
} else { } else {
writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE); writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE);
@@ -340,7 +344,9 @@ abstract class AsyncNioConnection extends AsyncConnection {
ioReadThread.register(selector -> { ioReadThread.register(selector -> {
try { try {
if (readKey == null) { if (readKey == null) {
readKey = implRegister(selector, SelectionKey.OP_READ); SelectionKey oldKey = keyFor(selector);
int ops = oldKey == null ? SelectionKey.OP_READ : (SelectionKey.OP_READ | oldKey.interestOps());
readKey = implRegister(selector, ops);
readKey.attach(this); readKey.attach(this);
} else { } else {
readKey.interestOps(readKey.interestOps() | SelectionKey.OP_READ); readKey.interestOps(readKey.interestOps() | SelectionKey.OP_READ);
@@ -467,27 +473,20 @@ abstract class AsyncNioConnection extends AsyncConnection {
if (writeCompleted && (totalCount != 0 || !hasRemain)) { if (writeCompleted && (totalCount != 0 || !hasRemain)) {
handleWrite(writeTotal + totalCount, null); handleWrite(writeTotal + totalCount, null);
} else if (writeKey == null) { } else if (writeKey == null) {
if (inCurrWriteThread()) { ioWriteThread.register(selector -> {
try { try {
writeKey = implRegister(ioWriteThread.selector, SelectionKey.OP_WRITE); if (writeKey == null) {
writeKey.attach(this); SelectionKey oldKey = keyFor(selector);
int ops = oldKey == null ? SelectionKey.OP_WRITE : (SelectionKey.OP_WRITE | oldKey.interestOps());
writeKey = implRegister(selector, ops);
writeKey.attach(this);
} else {
writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE);
}
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
handleWrite(0, e); handleWrite(0, e);
} }
} else { });
ioWriteThread.register(selector -> {
try {
if (writeKey == null) {
writeKey = implRegister(selector, SelectionKey.OP_WRITE);
writeKey.attach(this);
} else {
writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE);
}
} catch (ClosedChannelException e) {
handleWrite(0, e);
}
});
}
} else { } else {
ioWriteThread.interestOpsOr(writeKey, SelectionKey.OP_WRITE); ioWriteThread.interestOpsOr(writeKey, SelectionKey.OP_WRITE);
} }
@@ -641,6 +640,8 @@ abstract class AsyncNioConnection extends AsyncConnection {
}; };
} }
protected abstract SelectionKey keyFor(Selector sel);
protected abstract SelectionKey implRegister(Selector sel, int ops) throws ClosedChannelException; protected abstract SelectionKey implRegister(Selector sel, int ops) throws ClosedChannelException;
protected abstract int implRead(ByteBuffer dst) throws IOException; protected abstract int implRead(ByteBuffer dst) throws IOException;

View File

@@ -203,6 +203,11 @@ class AsyncNioTcpConnection extends AsyncNioConnection {
return this.channel.isConnected(); return this.channel.isConnected();
} }
@Override
protected SelectionKey keyFor(Selector sel) {
return this.channel.keyFor(sel);
}
@Override @Override
protected SelectionKey implRegister(Selector sel, int ops) throws ClosedChannelException { protected SelectionKey implRegister(Selector sel, int ops) throws ClosedChannelException {
return this.channel.register(sel, ops); return this.channel.register(sel, ops);

View File

@@ -114,6 +114,11 @@ class AsyncNioUdpConnection extends AsyncNioConnection {
} }
} }
@Override
protected SelectionKey keyFor(Selector sel) {
return this.channel.keyFor(sel);
}
@Override @Override
protected SelectionKey implRegister(Selector sel, int ops) throws ClosedChannelException { protected SelectionKey implRegister(Selector sel, int ops) throws ClosedChannelException {
return this.channel.register(sel, ops); return this.channel.register(sel, ops);

View File

@@ -59,9 +59,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
@Override @Override
public void completed(Integer result, ClientConnection attachment) { public void completed(Integer result, ClientConnection attachment) {
if (attachment == null) { //新方式
channel.readRegister(getCodec());
}
} }
@Override @Override
@@ -148,7 +146,7 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
pauseWriting.set(true); pauseWriting.set(true);
currHalfWriteFuture = respFuture; currHalfWriteFuture = respFuture;
} }
channel.clientWrite(array.getBytes(), writeHandler); channel.fastWrite(array.getBytes(), writeHandler);
} else { //旧方式 } else { //旧方式
//发送请求数据包 //发送请求数据包
writeArray.clear(); writeArray.clear();