From 7b7fb33b7b268ddd84038327f55bb4499142cec9 Mon Sep 17 00:00:00 2001 From: redkale Date: Wed, 5 Jul 2023 06:38:17 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96ByteBufferWriter?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/redkale/net/AsyncNioConnection.java | 115 +++-- .../org/redkale/util/ByteBufferWriter.java | 457 +++++++++--------- 2 files changed, 295 insertions(+), 277 deletions(-) diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index f72917a6d..d56eff441 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -14,7 +14,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import javax.net.ssl.SSLContext; -import org.redkale.util.*; +import org.redkale.util.ByteBufferWriter; /** * @@ -68,14 +68,14 @@ abstract class AsyncNioConnection extends AsyncConnection { protected int writeByteTuple2Length; - //写操作, 二选一,要么writeByteBuffer有值,要么writeByteBuffers、writeOffset、writeLength有值 + //写操作, 二选一,要么writeByteBuffer有值,要么writeByteBuffers、writeBuffersOffset、writeBuffersLength有值 protected ByteBuffer writeByteBuffer; protected ByteBuffer[] writeByteBuffers; - protected int writeOffset; + protected int writeBuffersOffset; - protected int writeLength; + protected int writeBuffersLength; protected int writeTotal; @@ -154,9 +154,10 @@ abstract class AsyncNioConnection extends AsyncConnection { ioReadThread.register(selector -> { try { if (readKey == null) { - SelectionKey oldKey = keyFor(selector); - int ops = oldKey == null ? SelectionKey.OP_READ : (SelectionKey.OP_READ | oldKey.interestOps()); - readKey = implRegister(selector, ops); + readKey = keyFor(selector); + } + if (readKey == null) { + readKey = implRegister(selector, SelectionKey.OP_READ); readKey.attach(this); } else { readKey.interestOps(readKey.interestOps() | SelectionKey.OP_READ); @@ -275,8 +276,8 @@ abstract class AsyncNioConnection extends AsyncConnection { } this.writePending = true; this.writeByteBuffers = srcs; - this.writeOffset = offset; - this.writeLength = length; + this.writeBuffersOffset = offset; + this.writeBuffersLength = length; this.writeAttachment = attachment; if (this.writeTimeoutSeconds > 0) { AsyncNioCompletionHandler newHandler = this.writeTimeoutCompletionHandler; @@ -307,9 +308,10 @@ abstract class AsyncNioConnection extends AsyncConnection { ioWriteThread.register(selector -> { try { if (writeKey == null) { - SelectionKey oldKey = keyFor(selector); - int ops = oldKey == null ? SelectionKey.OP_WRITE : (SelectionKey.OP_WRITE | oldKey.interestOps()); - writeKey = implRegister(selector, ops); + writeKey = keyFor(selector); + } + if (writeKey == null) { + writeKey = implRegister(selector, SelectionKey.OP_WRITE); writeKey.attach(this); } else { writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE); @@ -346,9 +348,10 @@ abstract class AsyncNioConnection extends AsyncConnection { ioReadThread.register(selector -> { try { if (readKey == null) { - SelectionKey oldKey = keyFor(selector); - int ops = oldKey == null ? SelectionKey.OP_READ : (SelectionKey.OP_READ | oldKey.interestOps()); - readKey = implRegister(selector, ops); + readKey = keyFor(selector); + } + if (readKey == null) { + readKey = implRegister(selector, SelectionKey.OP_READ); readKey.attach(this); } else { readKey.interestOps(readKey.interestOps() | SelectionKey.OP_READ); @@ -372,25 +375,39 @@ abstract class AsyncNioConnection extends AsyncConnection { boolean hasRemain = true; boolean writeCompleted = true; - if (writeByteTuple1Array == null && fastWriteCount.get() > 0) { - byte[] bs = null; + if (writeByteBuffer == null && writeByteBuffers == null && writeByteTuple1Array == null && fastWriteCount.get() > 0) { + final ByteBuffer buffer = pollWriteBuffer(); + ByteBufferWriter writer = null; byte[] item; while ((item = fastWriteQueue.poll()) != null) { fastWriteCount.decrementAndGet(); - bs = Utility.append(bs, item); + if (writer != null) { + writer.put(item); + } else if (buffer.remaining() >= item.length) { + buffer.put(item); + } else { + writer = ByteBufferWriter.create(getWriteBufferSupplier(), buffer); + writer.put(item); + } } - this.writeByteTuple1Array = bs; + this.writeBuffersOffset = 0; + if (writer == null) { + this.writeByteBuffer = buffer.flip(); + this.writeBuffersLength = 0; + } else { + this.writeByteBuffers = writer.toBuffers(); + this.writeBuffersLength = this.writeByteBuffers.length; + } + this.writeByteTuple1Array = null; this.writeByteTuple1Offset = 0; - this.writeByteTuple1Length = bs == null ? 0 : bs.length; + this.writeByteTuple1Length = 0; this.writeByteTuple2Array = null; this.writeByteTuple2Offset = 0; this.writeByteTuple2Length = 0; - this.writeOffset = 0; - this.writeLength = this.writeByteTuple1Length; } - int batchOffset = writeOffset; - int batchLength = writeLength; + int batchOffset = writeBuffersOffset; + int batchLength = writeBuffersLength; while (hasRemain) { //必须要将buffer写完为止 if (writeByteTuple1Array != null) { final ByteBuffer buffer = pollWriteBuffer(); @@ -399,14 +416,13 @@ abstract class AsyncNioConnection extends AsyncConnection { if (writeByteTuple2Length > 0) { buffer.put(writeByteTuple2Array, writeByteTuple2Offset, writeByteTuple2Length); } - buffer.flip(); - writeByteBuffer = buffer; - writeByteTuple1Array = null; - writeByteTuple1Offset = 0; - writeByteTuple1Length = 0; - writeByteTuple2Array = null; - writeByteTuple2Offset = 0; - writeByteTuple2Length = 0; + this.writeByteBuffer = buffer.flip(); + this.writeByteTuple1Array = null; + this.writeByteTuple1Offset = 0; + this.writeByteTuple1Length = 0; + this.writeByteTuple2Array = null; + this.writeByteTuple2Offset = 0; + this.writeByteTuple2Length = 0; } else { ByteBufferWriter writer = ByteBufferWriter.create(getWriteBufferSupplier(), buffer); writer.put(writeByteTuple1Array, writeByteTuple1Offset, writeByteTuple1Length); @@ -414,17 +430,17 @@ abstract class AsyncNioConnection extends AsyncConnection { writer.put(writeByteTuple2Array, writeByteTuple2Offset, writeByteTuple2Length); } final ByteBuffer[] buffers = writer.toBuffers(); - writeByteBuffers = buffers; - writeOffset = 0; - writeLength = buffers.length; - batchOffset = writeOffset; - batchLength = writeLength; - writeByteTuple1Array = null; - writeByteTuple1Offset = 0; - writeByteTuple1Length = 0; - writeByteTuple2Array = null; - writeByteTuple2Offset = 0; - writeByteTuple2Length = 0; + this.writeByteBuffers = buffers; + this.writeBuffersOffset = 0; + this.writeBuffersLength = buffers.length; + batchOffset = writeBuffersOffset; + batchLength = writeBuffersLength; + this.writeByteTuple1Array = null; + this.writeByteTuple1Offset = 0; + this.writeByteTuple1Length = 0; + this.writeByteTuple2Array = null; + this.writeByteTuple2Offset = 0; + this.writeByteTuple2Length = 0; } if (this.writeCompletionHandler == this.writeTimeoutCompletionHandler) { if (writeByteBuffer == null) { @@ -473,14 +489,15 @@ abstract class AsyncNioConnection extends AsyncConnection { } if (writeCompleted && (totalCount != 0 || !hasRemain)) { - handleWrite(writeTotal + totalCount, null); + handleWrite(this.writeTotal + totalCount, null); } else if (writeKey == null) { ioWriteThread.register(selector -> { try { if (writeKey == null) { - SelectionKey oldKey = keyFor(selector); - int ops = oldKey == null ? SelectionKey.OP_WRITE : (SelectionKey.OP_WRITE | oldKey.interestOps()); - writeKey = implRegister(selector, ops); + writeKey = keyFor(selector); + } + if (writeKey == null) { + writeKey = implRegister(selector, SelectionKey.OP_WRITE); writeKey.attach(this); } else { writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE); @@ -549,8 +566,8 @@ abstract class AsyncNioConnection extends AsyncConnection { this.writeAttachment = null; this.writeByteBuffer = null; this.writeByteBuffers = null; - this.writeOffset = 0; - this.writeLength = 0; + this.writeBuffersOffset = 0; + this.writeBuffersLength = 0; this.writeTotal = 0; this.writePending = false; //必须放最后 diff --git a/src/main/java/org/redkale/util/ByteBufferWriter.java b/src/main/java/org/redkale/util/ByteBufferWriter.java index 98c74b266..051f5070d 100644 --- a/src/main/java/org/redkale/util/ByteBufferWriter.java +++ b/src/main/java/org/redkale/util/ByteBufferWriter.java @@ -1,228 +1,229 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.util; - -import java.nio.*; -import java.util.function.Supplier; - -/** - * 以ByteBuffer为数据载体的Writer - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - */ -public class ByteBufferWriter { - - private final Supplier supplier; - - private ByteBuffer[] buffers; - - private int position; - - private int writeBytesCounter = 0; //put(byte[] src, int offset, int length) 调用的次数 - - private boolean bigEndian = true; - - protected ByteBufferWriter(Supplier supplier) { - this.supplier = supplier; - } - - public static ByteBufferWriter create(Supplier supplier) { - return new ByteBufferWriter(supplier); - } - - public static ByteBufferWriter create(Supplier supplier, ByteBuffer one) { - ByteBufferWriter writer = new ByteBufferWriter(supplier); - writer.bigEndian = one.order() == ByteOrder.BIG_ENDIAN; - writer.buffers = new ByteBuffer[]{one}; - return writer; - } - - public static ByteBuffer[] toBuffers(Supplier supplier, byte[] content) { - ByteBufferWriter writer = new ByteBufferWriter(supplier); - writer.put(false, content, 0, content.length); - return writer.toBuffers(); - } - - public static ByteBuffer[] toBuffers(Supplier supplier, byte[] content, int offset, int length) { - ByteBufferWriter writer = new ByteBufferWriter(supplier); - writer.put(false, content, offset, length); - return writer.toBuffers(); - } - - private ByteBuffer getLastBuffer(int size) { - if (this.buffers == null) { - ByteBuffer buf = supplier.get(); - this.bigEndian = buf.order() == ByteOrder.BIG_ENDIAN; - this.buffers = Utility.append(this.buffers, buf); - return buf; - } else if (this.buffers[this.buffers.length - 1].remaining() < size) { - ByteBuffer buf = supplier.get(); - this.buffers = Utility.append(this.buffers, buf); - return buf; - } - return this.buffers[this.buffers.length - 1]; - } - - public ByteBuffer[] toBuffers() { - if (buffers == null) { - return new ByteBuffer[0]; - } - for (ByteBuffer buf : this.buffers) { - buf.flip(); - } - return this.buffers; - } - - public int position() { - return position; - } - - public ByteBufferWriter put(byte b) { - getLastBuffer(1).put(b); - position++; - return this; - } - - public ByteBufferWriter putShort(short value) { - getLastBuffer(2).putShort(value); - position += 2; - return this; - } - - public ByteBufferWriter putInt(int value) { - getLastBuffer(4).putInt(value); - position += 4; - return this; - } - - //重新设置指定位置的值 - public ByteBufferWriter putInt(final int index, int value) { - int start = 0; - ByteBuffer[] buffs = this.buffers; - for (int i = 0; i < buffs.length; i++) { - int pos = buffs[i].position(); - if (pos + start > index) { - int r = pos + start - index; - if (r >= 4) { - buffs[i].putInt(index - start, value); - return this; - } else { - byte b1 = bigEndian ? (byte) ((value >> 24) & 0xFF) : (byte) (value & 0xFF); - byte b2 = bigEndian ? (byte) ((value >> 16) & 0xFF) : (byte) ((value >> 8) & 0xFF); - byte b3 = bigEndian ? (byte) ((value >> 8) & 0xFF) : (byte) ((value >> 16) & 0xFF); - byte b4 = bigEndian ? (byte) (value & 0xFF) : (byte) ((value >> 24) & 0xFF); - if (r == 3) { - buffs[i].put(index - start, b1); - buffs[i].put(index - start + 1, b2); - buffs[i].put(index - start + 2, b3); - buffs[i + 1].put(0, b4); - } else if (r == 2) { - buffs[i].put(index - start, b1); - buffs[i].put(index - start + 1, b2); - buffs[i + 1].put(0, b3); - buffs[i + 1].put(1, b4); - } else if (r == 1) { - buffs[i].put(index - start, b1); - buffs[i + 1].put(0, b2); - buffs[i + 1].put(1, b3); - buffs[i + 1].put(2, b4); - } - return this; - } - } else { - start += pos; - } - } - throw new ArrayIndexOutOfBoundsException(index); - } - -// public static void main(String[] args) throws Throwable { -// ObjectPool pool = new ObjectPool<>(20, (p) -> ByteBuffer.allocate(10), (ByteBuffer t) -> t.clear(), (ByteBuffer t) -> false); -// ByteBufferWriter writer = ByteBufferWriter.create(pool); -// for (int i = 1; i <= 18; i++) { -// writer.put((byte) i); -// } -// System.out.println(Arrays.toString(toBytes(writer.toBuffers()))); -// -// writer = ByteBufferWriter.create(pool); -// for (int i = 1; i <= 18; i++) { -// writer.put((byte) i); -// } -// int value = 0x223344; -// byte[] b4 = new byte[]{(byte) ((value >> 24) & 0xFF), (byte) ((value >> 16) & 0xFF), (byte) ((value >> 8) & 0xFF), (byte) (value & 0xFF)}; -// writer.putInt(9, value); -// System.out.println(Arrays.toString(b4)); -// System.out.println(Arrays.toString(toBytes(writer.toBuffers()))); -// } - public ByteBufferWriter putFloat(float value) { - getLastBuffer(4).putFloat(value); - position += 4; - return this; - } - - public ByteBufferWriter putLong(long value) { - getLastBuffer(8).putLong(value); - position += 8; - return this; - } - - public ByteBufferWriter putDouble(double value) { - getLastBuffer(8).putDouble(value); - position += 8; - return this; - } - - public int put(byte[] src) { - return put(true, src, 0, src.length); - } - - public int put(byte[] src, int offset, int length) { - return put(true, src, offset, length); - } - - public int put(byte[] src, int offset, int length, byte[] src2, int offset2, int length2) { - ByteBuffer buf = getLastBuffer(1); - int remain = buf.remaining(); - if (remain >= length + length2) { - buf.put(src, offset, length); - if (src2 != null) { - buf.put(src2, offset2, length2); - } - position += length + length2; - this.writeBytesCounter++; - } else { - put(true, src, offset, length); - if (src2 != null) { - put(false, src2, offset2, length2); - } - } - return writeBytesCounter; - } - - private int put(boolean outside, byte[] src, int offset, int length) { - ByteBuffer buf = getLastBuffer(1); - int remain = buf.remaining(); - if (remain >= length) { - buf.put(src, offset, length); - position += length; - } else { - buf.put(src, offset, remain); - position += remain; - put(false, src, offset + remain, length - remain); - } - if (outside) { - this.writeBytesCounter++; - } - return writeBytesCounter; - } - - public int getWriteBytesCounter() { - return this.writeBytesCounter; - } -} +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.util; + +import java.nio.*; +import java.util.function.Supplier; + +/** + * 以ByteBuffer为数据载体的Writer + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + */ +public class ByteBufferWriter { + + private final Supplier supplier; + + private ByteBuffer[] buffers; + + private int position; + + private int writeBytesCounter = 0; //put(byte[] src, int offset, int length) 调用的次数 + + private boolean bigEndian = true; + + protected ByteBufferWriter(Supplier supplier) { + this.supplier = supplier; + } + + public static ByteBufferWriter create(Supplier supplier) { + return new ByteBufferWriter(supplier); + } + + public static ByteBufferWriter create(Supplier supplier, ByteBuffer one) { + ByteBufferWriter writer = new ByteBufferWriter(supplier); + writer.bigEndian = one.order() == ByteOrder.BIG_ENDIAN; + writer.buffers = new ByteBuffer[]{one}; + writer.position = one.position(); + return writer; + } + + public static ByteBuffer[] toBuffers(Supplier supplier, byte[] content) { + ByteBufferWriter writer = new ByteBufferWriter(supplier); + writer.put(false, content, 0, content.length); + return writer.toBuffers(); + } + + public static ByteBuffer[] toBuffers(Supplier supplier, byte[] content, int offset, int length) { + ByteBufferWriter writer = new ByteBufferWriter(supplier); + writer.put(false, content, offset, length); + return writer.toBuffers(); + } + + private ByteBuffer getLastBuffer(int size) { + if (this.buffers == null) { + ByteBuffer buf = supplier.get(); + this.bigEndian = buf.order() == ByteOrder.BIG_ENDIAN; + this.buffers = Utility.append(this.buffers, buf); + return buf; + } else if (this.buffers[this.buffers.length - 1].remaining() < size) { + ByteBuffer buf = supplier.get(); + this.buffers = Utility.append(this.buffers, buf); + return buf; + } + return this.buffers[this.buffers.length - 1]; + } + + public ByteBuffer[] toBuffers() { + if (buffers == null) { + return new ByteBuffer[0]; + } + for (ByteBuffer buf : this.buffers) { + buf.flip(); + } + return this.buffers; + } + + public int position() { + return position; + } + + public ByteBufferWriter put(byte b) { + getLastBuffer(1).put(b); + position++; + return this; + } + + public ByteBufferWriter putShort(short value) { + getLastBuffer(2).putShort(value); + position += 2; + return this; + } + + public ByteBufferWriter putInt(int value) { + getLastBuffer(4).putInt(value); + position += 4; + return this; + } + + //重新设置指定位置的值 + public ByteBufferWriter putInt(final int index, int value) { + int start = 0; + ByteBuffer[] buffs = this.buffers; + for (int i = 0; i < buffs.length; i++) { + int pos = buffs[i].position(); + if (pos + start > index) { + int r = pos + start - index; + if (r >= 4) { + buffs[i].putInt(index - start, value); + return this; + } else { + byte b1 = bigEndian ? (byte) ((value >> 24) & 0xFF) : (byte) (value & 0xFF); + byte b2 = bigEndian ? (byte) ((value >> 16) & 0xFF) : (byte) ((value >> 8) & 0xFF); + byte b3 = bigEndian ? (byte) ((value >> 8) & 0xFF) : (byte) ((value >> 16) & 0xFF); + byte b4 = bigEndian ? (byte) (value & 0xFF) : (byte) ((value >> 24) & 0xFF); + if (r == 3) { + buffs[i].put(index - start, b1); + buffs[i].put(index - start + 1, b2); + buffs[i].put(index - start + 2, b3); + buffs[i + 1].put(0, b4); + } else if (r == 2) { + buffs[i].put(index - start, b1); + buffs[i].put(index - start + 1, b2); + buffs[i + 1].put(0, b3); + buffs[i + 1].put(1, b4); + } else if (r == 1) { + buffs[i].put(index - start, b1); + buffs[i + 1].put(0, b2); + buffs[i + 1].put(1, b3); + buffs[i + 1].put(2, b4); + } + return this; + } + } else { + start += pos; + } + } + throw new ArrayIndexOutOfBoundsException(index); + } + +// public static void main(String[] args) throws Throwable { +// ObjectPool pool = new ObjectPool<>(20, (p) -> ByteBuffer.allocate(10), (ByteBuffer t) -> t.clear(), (ByteBuffer t) -> false); +// ByteBufferWriter writer = ByteBufferWriter.create(pool); +// for (int i = 1; i <= 18; i++) { +// writer.put((byte) i); +// } +// System.out.println(Arrays.toString(toBytes(writer.toBuffers()))); +// +// writer = ByteBufferWriter.create(pool); +// for (int i = 1; i <= 18; i++) { +// writer.put((byte) i); +// } +// int value = 0x223344; +// byte[] b4 = new byte[]{(byte) ((value >> 24) & 0xFF), (byte) ((value >> 16) & 0xFF), (byte) ((value >> 8) & 0xFF), (byte) (value & 0xFF)}; +// writer.putInt(9, value); +// System.out.println(Arrays.toString(b4)); +// System.out.println(Arrays.toString(toBytes(writer.toBuffers()))); +// } + public ByteBufferWriter putFloat(float value) { + getLastBuffer(4).putFloat(value); + position += 4; + return this; + } + + public ByteBufferWriter putLong(long value) { + getLastBuffer(8).putLong(value); + position += 8; + return this; + } + + public ByteBufferWriter putDouble(double value) { + getLastBuffer(8).putDouble(value); + position += 8; + return this; + } + + public int put(byte[] src) { + return put(true, src, 0, src.length); + } + + public int put(byte[] src, int offset, int length) { + return put(true, src, offset, length); + } + + public int put(byte[] src, int offset, int length, byte[] src2, int offset2, int length2) { + ByteBuffer buf = getLastBuffer(1); + int remain = buf.remaining(); + if (remain >= length + length2) { + buf.put(src, offset, length); + if (src2 != null) { + buf.put(src2, offset2, length2); + } + position += length + length2; + this.writeBytesCounter++; + } else { + put(true, src, offset, length); + if (src2 != null) { + put(false, src2, offset2, length2); + } + } + return writeBytesCounter; + } + + private int put(boolean outside, byte[] src, int offset, int length) { + ByteBuffer buf = getLastBuffer(1); + int remain = buf.remaining(); + if (remain >= length) { + buf.put(src, offset, length); + position += length; + } else { + buf.put(src, offset, remain); + position += remain; + put(false, src, offset + remain, length - remain); + } + if (outside) { + this.writeBytesCounter++; + } + return writeBytesCounter; + } + + public int getWriteBytesCounter() { + return this.writeBytesCounter; + } +}