移除pipelineWrite

This commit is contained in:
redkale
2024-11-09 08:37:51 +08:00
parent 243bd95b44
commit 65b4e66677
3 changed files with 23 additions and 156 deletions

View File

@@ -213,14 +213,6 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
writeLock.unlock(); writeLock.unlock();
} }
/**
* 快速发送
*
* @see org.redkale.net.AsyncNioConnection#pipelineWrite(org.redkale.net.PipelinePacket...)
* @param packets PipelinePacket[]
*/
public abstract void pipelineWrite(PipelinePacket... packets);
public abstract boolean isTCP(); public abstract boolean isTCP();
public abstract boolean shutdownInput(); public abstract boolean shutdownInput();
@@ -300,16 +292,31 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
} }
} }
/**
* src写完才会回调
*
* @see #lockWrite()
* @see #unlockWrite()
* @param array 内容
* @param handler 回调函数
*/
public final void write(ByteTuple array, CompletionHandler<Integer, Void> handler) { public final void write(ByteTuple array, CompletionHandler<Integer, Void> handler) {
write(array.content(), array.offset(), array.length(), (byte[]) null, 0, 0, handler); write(array.content(), array.offset(), array.length(), (byte[]) null, 0, 0, handler);
} }
/**
* src写完才会回调
*
* @see #lockWrite()
* @see #unlockWrite()
* @param buffer 内容
* @param handler 回调函数
*/
public final void write(ByteBuffer buffer, CompletionHandler<Integer, Void> handler) { public final void write(ByteBuffer buffer, CompletionHandler<Integer, Void> handler) {
write(buffer, null, handler); write(buffer, null, handler);
} }
// src写完才会回调 <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
final <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (sslEngine == null) { if (sslEngine == null) {
writeImpl(src, attachment, handler); writeImpl(src, attachment, handler);
} else { } else {
@@ -328,7 +335,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
} }
} }
final <A> void write( <A> void write(
ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) { ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (sslEngine == null) { if (sslEngine == null) {
writeImpl(srcs, offset, length, attachment, handler); writeImpl(srcs, offset, length, attachment, handler);
@@ -348,15 +355,15 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
} }
} }
final <A> void write(ByteBuffer[] srcs, A attachment, CompletionHandler<Integer, ? super A> handler) { <A> void write(ByteBuffer[] srcs, A attachment, CompletionHandler<Integer, ? super A> handler) {
write(srcs, 0, srcs.length, attachment, handler); write(srcs, 0, srcs.length, attachment, handler);
} }
final void write(byte[] bytes, CompletionHandler<Integer, Void> handler) { void write(byte[] bytes, CompletionHandler<Integer, Void> handler) {
write(bytes, 0, bytes.length, (byte[]) null, 0, 0, handler); write(bytes, 0, bytes.length, (byte[]) null, 0, 0, handler);
} }
final void write(byte[] bytes, int offset, int length, CompletionHandler<Integer, Void> handler) { void write(byte[] bytes, int offset, int length, CompletionHandler<Integer, Void> handler) {
write(bytes, offset, length, (byte[]) null, 0, 0, handler); write(bytes, offset, length, (byte[]) null, 0, 0, handler);
} }
@@ -505,11 +512,11 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
return writer != null && writer.position() > 0; return writer != null && writer.position() > 0;
} }
public final void writePipeline(CompletionHandler<Integer, Void> handler) { void writePipeline(CompletionHandler<Integer, Void> handler) {
writePipeline(null, handler); writePipeline(null, handler);
} }
public <A> void writePipeline(A attachment, CompletionHandler<Integer, ? super A> handler) { <A> void writePipeline(A attachment, CompletionHandler<Integer, ? super A> handler) {
ByteBufferWriter writer = this.pipelineWriter; ByteBufferWriter writer = this.pipelineWriter;
this.pipelineWriter = null; this.pipelineWriter = null;
if (writer == null) { if (writer == null) {

View File

@@ -10,7 +10,6 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.function.Consumer; import java.util.function.Consumer;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import org.redkale.util.ByteBufferWriter; import org.redkale.util.ByteBufferWriter;
@@ -39,9 +38,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
protected SelectionKey readKey; protected SelectionKey readKey;
// ------------------------------ pipeline写操作 ------------------------------------
protected Queue<PipelinePacket> pipelineWriteQueue;
// -------------------------------- 写操作 -------------------------------------- // -------------------------------- 写操作 --------------------------------------
protected byte[] writeByteTuple1Array; protected byte[] writeByteTuple1Array;
@@ -72,7 +68,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
protected SelectionKey writeKey; protected SelectionKey writeKey;
// protected CompletionHandler<Integer, Object> writeFastHandler;
public AsyncNioConnection( public AsyncNioConnection(
boolean clientMode, boolean clientMode,
AsyncIOGroup ioGroup, AsyncIOGroup ioGroup,
@@ -157,37 +152,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
doRead(this.ioReadThread.inCurrThread()); doRead(this.ioReadThread.inCurrThread());
} }
@Override
public final void pipelineWrite(PipelinePacket... packets) {
if (pipelineWriteQueue == null) {
lockWrite();
try {
if (pipelineWriteQueue == null) {
pipelineWriteQueue = new ConcurrentLinkedDeque<>();
}
} finally {
unlockWrite();
}
}
for (PipelinePacket packet : packets) {
this.pipelineWriteQueue.offer(packet);
}
this.ioWriteThread.execute(this::pipelineDoWrite);
}
private void pipelineDoWrite() {
PipelinePacket packet;
while ((packet = pipelineWriteQueue.poll()) != null) {
this.writePending = true;
this.writeByteTuple1Array = packet.tupleBytes;
this.writeByteTuple1Offset = packet.tupleOffset;
this.writeByteTuple1Length = packet.tupleLength;
this.writeCompletionHandler = packet.handler;
this.writeAttachment = packet.attach;
doWrite();
}
}
@Override @Override
public void write( public void write(
byte[] headerContent, byte[] headerContent,

View File

@@ -1,104 +0,0 @@
/*
* Copyright (c) 2016-2116 Redkale
* All rights reserved.
*/
package org.redkale.net;
import java.nio.channels.CompletionHandler;
import org.redkale.convert.ConvertColumn;
import org.redkale.util.ByteTuple;
/**
* pipelineWrite写入包
*
* @author zhangjx
* @since 2.8.0
*/
public class PipelinePacket {
@ConvertColumn(index = 1)
protected byte[] tupleBytes;
@ConvertColumn(index = 2)
protected int tupleOffset;
@ConvertColumn(index = 3)
protected int tupleLength;
@ConvertColumn(index = 4)
protected CompletionHandler<Integer, ?> handler;
@ConvertColumn(index = 5)
protected Object attach;
public PipelinePacket() {}
public PipelinePacket(ByteTuple data, CompletionHandler<Integer, ?> handler) {
this(data, handler, null);
}
public PipelinePacket(ByteTuple data, CompletionHandler<Integer, ?> handler, Object attach) {
this(data.content(), data.offset(), data.length(), handler, attach);
}
public PipelinePacket(byte[] tupleBytes, CompletionHandler<Integer, ?> handler) {
this(tupleBytes, 0, tupleBytes.length, handler, null);
}
public PipelinePacket(byte[] tupleBytes, CompletionHandler<Integer, ?> handler, Object attach) {
this(tupleBytes, 0, tupleBytes.length, handler, attach);
}
public PipelinePacket(byte[] tupleBytes, int tupleOffset, int tupleLength, CompletionHandler<Integer, ?> handler) {
this(tupleBytes, tupleOffset, tupleLength, handler, null);
}
public PipelinePacket(
byte[] tupleBytes, int tupleOffset, int tupleLength, CompletionHandler<Integer, ?> handler, Object attach) {
this.tupleBytes = tupleBytes;
this.tupleOffset = tupleOffset;
this.tupleLength = tupleLength;
this.handler = handler;
this.attach = attach;
}
public byte[] getTupleBytes() {
return tupleBytes;
}
public void setTupleBytes(byte[] tupleBytes) {
this.tupleBytes = tupleBytes;
}
public int getTupleOffset() {
return tupleOffset;
}
public void setTupleOffset(int tupleOffset) {
this.tupleOffset = tupleOffset;
}
public int getTupleLength() {
return tupleLength;
}
public void setTupleLength(int tupleLength) {
this.tupleLength = tupleLength;
}
public CompletionHandler<Integer, ?> getHandler() {
return handler;
}
public void setHandler(CompletionHandler<Integer, ?> handler) {
this.handler = handler;
}
public Object getAttach() {
return attach;
}
public void setAttach(Object attach) {
this.attach = attach;
}
}