PipelinePacket

This commit is contained in:
redkale
2024-10-21 12:41:17 +08:00
parent 7c9c3822e0
commit 5da88a7916
4 changed files with 132 additions and 73 deletions

View File

@@ -65,7 +65,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
private Consumer<ByteBuffer> writeBufferConsumer; private Consumer<ByteBuffer> writeBufferConsumer;
private final ReentrantLock pipelineLock = new ReentrantLock(); final ReentrantLock pipelineLock = new ReentrantLock();
private ByteBufferWriter pipelineWriter; private ByteBufferWriter pipelineWriter;
@@ -205,15 +205,13 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
return ioWriteThread; return ioWriteThread;
} }
public abstract AsyncConnection pipelineHandler(CompletionHandler handler);
/** /**
* 快速发送 * 快速发送
* *
* @see org.redkale.net.AsyncNioConnection#pipelineWrite(java.util.function.Consumer) * @see org.redkale.net.AsyncNioConnection#pipelineWrite(org.redkale.net.PipelinePacket)
* @param consumer Consumer * @param packet PipelinePacket
*/ */
public abstract void pipelineWrite(Consumer<ByteArray> consumer); public abstract void pipelineWrite(PipelinePacket packet);
public abstract boolean isTCP(); public abstract boolean isTCP();

View File

@@ -138,10 +138,6 @@ public class AsyncIOThread extends WorkThread {
selector.wakeup(); selector.wakeup();
} }
public final void wakeup() {
selector.wakeup();
}
public Supplier<ByteBuffer> getBufferSupplier() { public Supplier<ByteBuffer> getBufferSupplier() {
return bufferSupplier; return bufferSupplier;
} }

View File

@@ -10,12 +10,10 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.function.Consumer; import java.util.function.Consumer;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import org.redkale.util.ByteArray;
import org.redkale.util.ByteBufferWriter; import org.redkale.util.ByteBufferWriter;
import org.redkale.util.RedkaleException;
/** /**
* 详情见: https://redkale.org * 详情见: https://redkale.org
@@ -42,11 +40,8 @@ abstract class AsyncNioConnection extends AsyncConnection {
protected SelectionKey readKey; protected SelectionKey readKey;
// ------------------------------ pipeline写操作 ------------------------------------ // ------------------------------ pipeline写操作 ------------------------------------
protected ByteArray pipelineWriteArray; protected Queue<PipelinePacket> pipelineWriteQueue;
protected Queue<Consumer<ByteArray>> pipelineWriteQueue;
protected CompletionHandler pipelineWriteHandler;
// -------------------------------- 写操作 -------------------------------------- // -------------------------------- 写操作 --------------------------------------
protected byte[] writeByteTuple1Array; protected byte[] writeByteTuple1Array;
@@ -163,63 +158,32 @@ abstract class AsyncNioConnection extends AsyncConnection {
} }
@Override @Override
public final AsyncConnection pipelineHandler(CompletionHandler handler) { public final void pipelineWrite(PipelinePacket packet) {
if (!clientMode) { if (pipelineWriteQueue == null) {
throw new RedkaleException("fast-writer only for client connection"); pipelineLock.lock();
}
this.pipelineWriteHandler = Objects.requireNonNull(handler);
this.pipelineWriteArray = new ByteArray();
this.pipelineWriteQueue = new ConcurrentLinkedQueue<>();
return this;
}
@Override
public final void pipelineWrite(Consumer<ByteArray> consumer) {
if (pipelineWriteHandler == null) {
throw new RedkaleException("fast-writer handler is null");
}
this.pipelineWriteQueue.offer(consumer);
if (writeKey == null) {
this.ioWriteThread.register(this::pipelineWriteRegister);
} else {
this.writeCompletionHandler = this.pipelineWriteHandler;
writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE);
this.ioWriteThread.wakeup();
}
}
private void pipelineWriteRegister(Selector selector) {
try { try {
if (writeKey == null) { if (pipelineWriteQueue == null) {
writeKey = keyFor(selector); pipelineWriteQueue = new ConcurrentLinkedDeque<>();
} }
if (writeKey == null) { } finally {
writeKey = implRegister(selector, SelectionKey.OP_WRITE); pipelineLock.unlock();
writeKey.attach(this);
} else {
writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE);
} }
// writeCompletionHandler必须赋值不然会跳过doWrite
this.writeCompletionHandler = this.pipelineWriteHandler;
} catch (ClosedChannelException e) {
e.printStackTrace();
this.pipelineWriteQueue.clear();
handleWrite(0, e);
} }
this.pipelineWriteQueue.offer(packet);
this.ioWriteThread.execute(this::pipelineDoWrite);
} }
private void pipelineWritePrepare() { private void pipelineDoWrite() {
ByteArray array = this.pipelineWriteArray.clear(); PipelinePacket packet;
Consumer<ByteArray> func; while ((packet = pipelineWriteQueue.poll()) != null) {
while ((func = pipelineWriteQueue.poll()) != null) {
func.accept(array);
}
this.writePending = true; this.writePending = true;
this.writeAttachment = null; this.writeByteTuple1Array = packet.tupleBytes;
this.writeByteTuple1Array = array.content(); this.writeByteTuple1Offset = packet.tupleOffset;
this.writeByteTuple1Offset = array.offset(); this.writeByteTuple1Length = packet.tupleLength;
this.writeByteTuple1Length = array.length(); this.writeCompletionHandler = packet.handler;
this.writeCompletionHandler = this.pipelineWriteHandler; this.writeAttachment = packet.attach;
doWrite();
}
} }
@Override @Override
@@ -364,10 +328,6 @@ abstract class AsyncNioConnection extends AsyncConnection {
boolean hasRemain = true; boolean hasRemain = true;
boolean writeCompleted = true; boolean writeCompleted = true;
boolean error = false; boolean error = false;
// pipelineWrite
if (clientMode && pipelineWriteArray != null && writeByteBuffer == null && writeByteBuffers == null) {
pipelineWritePrepare();
}
int batchOffset = writeBuffersOffset; int batchOffset = writeBuffersOffset;
int batchLength = writeBuffersLength; int batchLength = writeBuffersLength;
while (hasRemain) { // 必须要将buffer写完为止 while (hasRemain) { // 必须要将buffer写完为止

View File

@@ -0,0 +1,105 @@
/*
* Copyright (c) 2016-2116 Redkale
* All rights reserved.
*/
package org.redkale.net;
import java.nio.channels.CompletionHandler;
import org.redkale.convert.ConvertColumn;
import org.redkale.util.ByteTuple;
/**
* pipelineWrite写入包
*
* @author zhangjx
* @param <T> attachment类型
* @since 2.8.0
*/
public class PipelinePacket<T> {
@ConvertColumn(index = 1)
protected byte[] tupleBytes;
@ConvertColumn(index = 2)
protected int tupleOffset;
@ConvertColumn(index = 3)
protected int tupleLength;
@ConvertColumn(index = 4)
protected CompletionHandler<Integer, T> handler;
@ConvertColumn(index = 5)
protected T attach;
public PipelinePacket() {}
public PipelinePacket(ByteTuple data, CompletionHandler<Integer, T> handler) {
this(data, handler, null);
}
public PipelinePacket(ByteTuple data, CompletionHandler<Integer, T> handler, T attach) {
this(data.content(), data.offset(), data.length(), handler, attach);
}
public PipelinePacket(byte[] tupleBytes, CompletionHandler<Integer, T> handler) {
this(tupleBytes, 0, tupleBytes.length, handler, null);
}
public PipelinePacket(byte[] tupleBytes, CompletionHandler<Integer, T> handler, T attach) {
this(tupleBytes, 0, tupleBytes.length, handler, attach);
}
public PipelinePacket(byte[] tupleBytes, int tupleOffset, int tupleLength, CompletionHandler<Integer, T> handler) {
this(tupleBytes, tupleOffset, tupleLength, handler, null);
}
public PipelinePacket(
byte[] tupleBytes, int tupleOffset, int tupleLength, CompletionHandler<Integer, T> handler, T attach) {
this.tupleBytes = tupleBytes;
this.tupleOffset = tupleOffset;
this.tupleLength = tupleLength;
this.handler = handler;
this.attach = attach;
}
public byte[] getTupleBytes() {
return tupleBytes;
}
public void setTupleBytes(byte[] tupleBytes) {
this.tupleBytes = tupleBytes;
}
public int getTupleOffset() {
return tupleOffset;
}
public void setTupleOffset(int tupleOffset) {
this.tupleOffset = tupleOffset;
}
public int getTupleLength() {
return tupleLength;
}
public void setTupleLength(int tupleLength) {
this.tupleLength = tupleLength;
}
public CompletionHandler<Integer, T> getHandler() {
return handler;
}
public void setHandler(CompletionHandler<Integer, T> handler) {
this.handler = handler;
}
public T getAttach() {
return attach;
}
public void setAttach(T attach) {
this.attach = attach;
}
}