diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java
index f72917a6d..d56eff441 100644
--- a/src/main/java/org/redkale/net/AsyncNioConnection.java
+++ b/src/main/java/org/redkale/net/AsyncNioConnection.java
@@ -14,7 +14,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import javax.net.ssl.SSLContext;
-import org.redkale.util.*;
+import org.redkale.util.ByteBufferWriter;
/**
*
@@ -68,14 +68,14 @@ abstract class AsyncNioConnection extends AsyncConnection {
protected int writeByteTuple2Length;
- //写操作, 二选一,要么writeByteBuffer有值,要么writeByteBuffers、writeOffset、writeLength有值
+ //写操作, 二选一,要么writeByteBuffer有值,要么writeByteBuffers、writeBuffersOffset、writeBuffersLength有值
protected ByteBuffer writeByteBuffer;
protected ByteBuffer[] writeByteBuffers;
- protected int writeOffset;
+ protected int writeBuffersOffset;
- protected int writeLength;
+ protected int writeBuffersLength;
protected int writeTotal;
@@ -154,9 +154,10 @@ abstract class AsyncNioConnection extends AsyncConnection {
ioReadThread.register(selector -> {
try {
if (readKey == null) {
- SelectionKey oldKey = keyFor(selector);
- int ops = oldKey == null ? SelectionKey.OP_READ : (SelectionKey.OP_READ | oldKey.interestOps());
- readKey = implRegister(selector, ops);
+ readKey = keyFor(selector);
+ }
+ if (readKey == null) {
+ readKey = implRegister(selector, SelectionKey.OP_READ);
readKey.attach(this);
} else {
readKey.interestOps(readKey.interestOps() | SelectionKey.OP_READ);
@@ -275,8 +276,8 @@ abstract class AsyncNioConnection extends AsyncConnection {
}
this.writePending = true;
this.writeByteBuffers = srcs;
- this.writeOffset = offset;
- this.writeLength = length;
+ this.writeBuffersOffset = offset;
+ this.writeBuffersLength = length;
this.writeAttachment = attachment;
if (this.writeTimeoutSeconds > 0) {
AsyncNioCompletionHandler newHandler = this.writeTimeoutCompletionHandler;
@@ -307,9 +308,10 @@ abstract class AsyncNioConnection extends AsyncConnection {
ioWriteThread.register(selector -> {
try {
if (writeKey == null) {
- SelectionKey oldKey = keyFor(selector);
- int ops = oldKey == null ? SelectionKey.OP_WRITE : (SelectionKey.OP_WRITE | oldKey.interestOps());
- writeKey = implRegister(selector, ops);
+ writeKey = keyFor(selector);
+ }
+ if (writeKey == null) {
+ writeKey = implRegister(selector, SelectionKey.OP_WRITE);
writeKey.attach(this);
} else {
writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE);
@@ -346,9 +348,10 @@ abstract class AsyncNioConnection extends AsyncConnection {
ioReadThread.register(selector -> {
try {
if (readKey == null) {
- SelectionKey oldKey = keyFor(selector);
- int ops = oldKey == null ? SelectionKey.OP_READ : (SelectionKey.OP_READ | oldKey.interestOps());
- readKey = implRegister(selector, ops);
+ readKey = keyFor(selector);
+ }
+ if (readKey == null) {
+ readKey = implRegister(selector, SelectionKey.OP_READ);
readKey.attach(this);
} else {
readKey.interestOps(readKey.interestOps() | SelectionKey.OP_READ);
@@ -372,25 +375,39 @@ abstract class AsyncNioConnection extends AsyncConnection {
boolean hasRemain = true;
boolean writeCompleted = true;
- if (writeByteTuple1Array == null && fastWriteCount.get() > 0) {
- byte[] bs = null;
+ if (writeByteBuffer == null && writeByteBuffers == null && writeByteTuple1Array == null && fastWriteCount.get() > 0) {
+ final ByteBuffer buffer = pollWriteBuffer();
+ ByteBufferWriter writer = null;
byte[] item;
while ((item = fastWriteQueue.poll()) != null) {
fastWriteCount.decrementAndGet();
- bs = Utility.append(bs, item);
+ if (writer != null) {
+ writer.put(item);
+ } else if (buffer.remaining() >= item.length) {
+ buffer.put(item);
+ } else {
+ writer = ByteBufferWriter.create(getWriteBufferSupplier(), buffer);
+ writer.put(item);
+ }
}
- this.writeByteTuple1Array = bs;
+ this.writeBuffersOffset = 0;
+ if (writer == null) {
+ this.writeByteBuffer = buffer.flip();
+ this.writeBuffersLength = 0;
+ } else {
+ this.writeByteBuffers = writer.toBuffers();
+ this.writeBuffersLength = this.writeByteBuffers.length;
+ }
+ this.writeByteTuple1Array = null;
this.writeByteTuple1Offset = 0;
- this.writeByteTuple1Length = bs == null ? 0 : bs.length;
+ this.writeByteTuple1Length = 0;
this.writeByteTuple2Array = null;
this.writeByteTuple2Offset = 0;
this.writeByteTuple2Length = 0;
- this.writeOffset = 0;
- this.writeLength = this.writeByteTuple1Length;
}
- int batchOffset = writeOffset;
- int batchLength = writeLength;
+ int batchOffset = writeBuffersOffset;
+ int batchLength = writeBuffersLength;
while (hasRemain) { //必须要将buffer写完为止
if (writeByteTuple1Array != null) {
final ByteBuffer buffer = pollWriteBuffer();
@@ -399,14 +416,13 @@ abstract class AsyncNioConnection extends AsyncConnection {
if (writeByteTuple2Length > 0) {
buffer.put(writeByteTuple2Array, writeByteTuple2Offset, writeByteTuple2Length);
}
- buffer.flip();
- writeByteBuffer = buffer;
- writeByteTuple1Array = null;
- writeByteTuple1Offset = 0;
- writeByteTuple1Length = 0;
- writeByteTuple2Array = null;
- writeByteTuple2Offset = 0;
- writeByteTuple2Length = 0;
+ this.writeByteBuffer = buffer.flip();
+ this.writeByteTuple1Array = null;
+ this.writeByteTuple1Offset = 0;
+ this.writeByteTuple1Length = 0;
+ this.writeByteTuple2Array = null;
+ this.writeByteTuple2Offset = 0;
+ this.writeByteTuple2Length = 0;
} else {
ByteBufferWriter writer = ByteBufferWriter.create(getWriteBufferSupplier(), buffer);
writer.put(writeByteTuple1Array, writeByteTuple1Offset, writeByteTuple1Length);
@@ -414,17 +430,17 @@ abstract class AsyncNioConnection extends AsyncConnection {
writer.put(writeByteTuple2Array, writeByteTuple2Offset, writeByteTuple2Length);
}
final ByteBuffer[] buffers = writer.toBuffers();
- writeByteBuffers = buffers;
- writeOffset = 0;
- writeLength = buffers.length;
- batchOffset = writeOffset;
- batchLength = writeLength;
- writeByteTuple1Array = null;
- writeByteTuple1Offset = 0;
- writeByteTuple1Length = 0;
- writeByteTuple2Array = null;
- writeByteTuple2Offset = 0;
- writeByteTuple2Length = 0;
+ this.writeByteBuffers = buffers;
+ this.writeBuffersOffset = 0;
+ this.writeBuffersLength = buffers.length;
+ batchOffset = writeBuffersOffset;
+ batchLength = writeBuffersLength;
+ this.writeByteTuple1Array = null;
+ this.writeByteTuple1Offset = 0;
+ this.writeByteTuple1Length = 0;
+ this.writeByteTuple2Array = null;
+ this.writeByteTuple2Offset = 0;
+ this.writeByteTuple2Length = 0;
}
if (this.writeCompletionHandler == this.writeTimeoutCompletionHandler) {
if (writeByteBuffer == null) {
@@ -473,14 +489,15 @@ abstract class AsyncNioConnection extends AsyncConnection {
}
if (writeCompleted && (totalCount != 0 || !hasRemain)) {
- handleWrite(writeTotal + totalCount, null);
+ handleWrite(this.writeTotal + totalCount, null);
} else if (writeKey == null) {
ioWriteThread.register(selector -> {
try {
if (writeKey == null) {
- SelectionKey oldKey = keyFor(selector);
- int ops = oldKey == null ? SelectionKey.OP_WRITE : (SelectionKey.OP_WRITE | oldKey.interestOps());
- writeKey = implRegister(selector, ops);
+ writeKey = keyFor(selector);
+ }
+ if (writeKey == null) {
+ writeKey = implRegister(selector, SelectionKey.OP_WRITE);
writeKey.attach(this);
} else {
writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE);
@@ -549,8 +566,8 @@ abstract class AsyncNioConnection extends AsyncConnection {
this.writeAttachment = null;
this.writeByteBuffer = null;
this.writeByteBuffers = null;
- this.writeOffset = 0;
- this.writeLength = 0;
+ this.writeBuffersOffset = 0;
+ this.writeBuffersLength = 0;
this.writeTotal = 0;
this.writePending = false; //必须放最后
diff --git a/src/main/java/org/redkale/util/ByteBufferWriter.java b/src/main/java/org/redkale/util/ByteBufferWriter.java
index 98c74b266..051f5070d 100644
--- a/src/main/java/org/redkale/util/ByteBufferWriter.java
+++ b/src/main/java/org/redkale/util/ByteBufferWriter.java
@@ -1,228 +1,229 @@
-/*
- * To change this license header, choose License Headers in Project Properties.
- * To change this template file, choose Tools | Templates
- * and open the template in the editor.
- */
-package org.redkale.util;
-
-import java.nio.*;
-import java.util.function.Supplier;
-
-/**
- * 以ByteBuffer为数据载体的Writer
- *
- *
- * 详情见: https://redkale.org
- *
- * @author zhangjx
- */
-public class ByteBufferWriter {
-
- private final Supplier supplier;
-
- private ByteBuffer[] buffers;
-
- private int position;
-
- private int writeBytesCounter = 0; //put(byte[] src, int offset, int length) 调用的次数
-
- private boolean bigEndian = true;
-
- protected ByteBufferWriter(Supplier supplier) {
- this.supplier = supplier;
- }
-
- public static ByteBufferWriter create(Supplier supplier) {
- return new ByteBufferWriter(supplier);
- }
-
- public static ByteBufferWriter create(Supplier supplier, ByteBuffer one) {
- ByteBufferWriter writer = new ByteBufferWriter(supplier);
- writer.bigEndian = one.order() == ByteOrder.BIG_ENDIAN;
- writer.buffers = new ByteBuffer[]{one};
- return writer;
- }
-
- public static ByteBuffer[] toBuffers(Supplier supplier, byte[] content) {
- ByteBufferWriter writer = new ByteBufferWriter(supplier);
- writer.put(false, content, 0, content.length);
- return writer.toBuffers();
- }
-
- public static ByteBuffer[] toBuffers(Supplier supplier, byte[] content, int offset, int length) {
- ByteBufferWriter writer = new ByteBufferWriter(supplier);
- writer.put(false, content, offset, length);
- return writer.toBuffers();
- }
-
- private ByteBuffer getLastBuffer(int size) {
- if (this.buffers == null) {
- ByteBuffer buf = supplier.get();
- this.bigEndian = buf.order() == ByteOrder.BIG_ENDIAN;
- this.buffers = Utility.append(this.buffers, buf);
- return buf;
- } else if (this.buffers[this.buffers.length - 1].remaining() < size) {
- ByteBuffer buf = supplier.get();
- this.buffers = Utility.append(this.buffers, buf);
- return buf;
- }
- return this.buffers[this.buffers.length - 1];
- }
-
- public ByteBuffer[] toBuffers() {
- if (buffers == null) {
- return new ByteBuffer[0];
- }
- for (ByteBuffer buf : this.buffers) {
- buf.flip();
- }
- return this.buffers;
- }
-
- public int position() {
- return position;
- }
-
- public ByteBufferWriter put(byte b) {
- getLastBuffer(1).put(b);
- position++;
- return this;
- }
-
- public ByteBufferWriter putShort(short value) {
- getLastBuffer(2).putShort(value);
- position += 2;
- return this;
- }
-
- public ByteBufferWriter putInt(int value) {
- getLastBuffer(4).putInt(value);
- position += 4;
- return this;
- }
-
- //重新设置指定位置的值
- public ByteBufferWriter putInt(final int index, int value) {
- int start = 0;
- ByteBuffer[] buffs = this.buffers;
- for (int i = 0; i < buffs.length; i++) {
- int pos = buffs[i].position();
- if (pos + start > index) {
- int r = pos + start - index;
- if (r >= 4) {
- buffs[i].putInt(index - start, value);
- return this;
- } else {
- byte b1 = bigEndian ? (byte) ((value >> 24) & 0xFF) : (byte) (value & 0xFF);
- byte b2 = bigEndian ? (byte) ((value >> 16) & 0xFF) : (byte) ((value >> 8) & 0xFF);
- byte b3 = bigEndian ? (byte) ((value >> 8) & 0xFF) : (byte) ((value >> 16) & 0xFF);
- byte b4 = bigEndian ? (byte) (value & 0xFF) : (byte) ((value >> 24) & 0xFF);
- if (r == 3) {
- buffs[i].put(index - start, b1);
- buffs[i].put(index - start + 1, b2);
- buffs[i].put(index - start + 2, b3);
- buffs[i + 1].put(0, b4);
- } else if (r == 2) {
- buffs[i].put(index - start, b1);
- buffs[i].put(index - start + 1, b2);
- buffs[i + 1].put(0, b3);
- buffs[i + 1].put(1, b4);
- } else if (r == 1) {
- buffs[i].put(index - start, b1);
- buffs[i + 1].put(0, b2);
- buffs[i + 1].put(1, b3);
- buffs[i + 1].put(2, b4);
- }
- return this;
- }
- } else {
- start += pos;
- }
- }
- throw new ArrayIndexOutOfBoundsException(index);
- }
-
-// public static void main(String[] args) throws Throwable {
-// ObjectPool pool = new ObjectPool<>(20, (p) -> ByteBuffer.allocate(10), (ByteBuffer t) -> t.clear(), (ByteBuffer t) -> false);
-// ByteBufferWriter writer = ByteBufferWriter.create(pool);
-// for (int i = 1; i <= 18; i++) {
-// writer.put((byte) i);
-// }
-// System.out.println(Arrays.toString(toBytes(writer.toBuffers())));
-//
-// writer = ByteBufferWriter.create(pool);
-// for (int i = 1; i <= 18; i++) {
-// writer.put((byte) i);
-// }
-// int value = 0x223344;
-// byte[] b4 = new byte[]{(byte) ((value >> 24) & 0xFF), (byte) ((value >> 16) & 0xFF), (byte) ((value >> 8) & 0xFF), (byte) (value & 0xFF)};
-// writer.putInt(9, value);
-// System.out.println(Arrays.toString(b4));
-// System.out.println(Arrays.toString(toBytes(writer.toBuffers())));
-// }
- public ByteBufferWriter putFloat(float value) {
- getLastBuffer(4).putFloat(value);
- position += 4;
- return this;
- }
-
- public ByteBufferWriter putLong(long value) {
- getLastBuffer(8).putLong(value);
- position += 8;
- return this;
- }
-
- public ByteBufferWriter putDouble(double value) {
- getLastBuffer(8).putDouble(value);
- position += 8;
- return this;
- }
-
- public int put(byte[] src) {
- return put(true, src, 0, src.length);
- }
-
- public int put(byte[] src, int offset, int length) {
- return put(true, src, offset, length);
- }
-
- public int put(byte[] src, int offset, int length, byte[] src2, int offset2, int length2) {
- ByteBuffer buf = getLastBuffer(1);
- int remain = buf.remaining();
- if (remain >= length + length2) {
- buf.put(src, offset, length);
- if (src2 != null) {
- buf.put(src2, offset2, length2);
- }
- position += length + length2;
- this.writeBytesCounter++;
- } else {
- put(true, src, offset, length);
- if (src2 != null) {
- put(false, src2, offset2, length2);
- }
- }
- return writeBytesCounter;
- }
-
- private int put(boolean outside, byte[] src, int offset, int length) {
- ByteBuffer buf = getLastBuffer(1);
- int remain = buf.remaining();
- if (remain >= length) {
- buf.put(src, offset, length);
- position += length;
- } else {
- buf.put(src, offset, remain);
- position += remain;
- put(false, src, offset + remain, length - remain);
- }
- if (outside) {
- this.writeBytesCounter++;
- }
- return writeBytesCounter;
- }
-
- public int getWriteBytesCounter() {
- return this.writeBytesCounter;
- }
-}
+/*
+ * To change this license header, choose License Headers in Project Properties.
+ * To change this template file, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.redkale.util;
+
+import java.nio.*;
+import java.util.function.Supplier;
+
+/**
+ * 以ByteBuffer为数据载体的Writer
+ *
+ *
+ * 详情见: https://redkale.org
+ *
+ * @author zhangjx
+ */
+public class ByteBufferWriter {
+
+ private final Supplier supplier;
+
+ private ByteBuffer[] buffers;
+
+ private int position;
+
+ private int writeBytesCounter = 0; //put(byte[] src, int offset, int length) 调用的次数
+
+ private boolean bigEndian = true;
+
+ protected ByteBufferWriter(Supplier supplier) {
+ this.supplier = supplier;
+ }
+
+ public static ByteBufferWriter create(Supplier supplier) {
+ return new ByteBufferWriter(supplier);
+ }
+
+ public static ByteBufferWriter create(Supplier supplier, ByteBuffer one) {
+ ByteBufferWriter writer = new ByteBufferWriter(supplier);
+ writer.bigEndian = one.order() == ByteOrder.BIG_ENDIAN;
+ writer.buffers = new ByteBuffer[]{one};
+ writer.position = one.position();
+ return writer;
+ }
+
+ public static ByteBuffer[] toBuffers(Supplier supplier, byte[] content) {
+ ByteBufferWriter writer = new ByteBufferWriter(supplier);
+ writer.put(false, content, 0, content.length);
+ return writer.toBuffers();
+ }
+
+ public static ByteBuffer[] toBuffers(Supplier supplier, byte[] content, int offset, int length) {
+ ByteBufferWriter writer = new ByteBufferWriter(supplier);
+ writer.put(false, content, offset, length);
+ return writer.toBuffers();
+ }
+
+ private ByteBuffer getLastBuffer(int size) {
+ if (this.buffers == null) {
+ ByteBuffer buf = supplier.get();
+ this.bigEndian = buf.order() == ByteOrder.BIG_ENDIAN;
+ this.buffers = Utility.append(this.buffers, buf);
+ return buf;
+ } else if (this.buffers[this.buffers.length - 1].remaining() < size) {
+ ByteBuffer buf = supplier.get();
+ this.buffers = Utility.append(this.buffers, buf);
+ return buf;
+ }
+ return this.buffers[this.buffers.length - 1];
+ }
+
+ public ByteBuffer[] toBuffers() {
+ if (buffers == null) {
+ return new ByteBuffer[0];
+ }
+ for (ByteBuffer buf : this.buffers) {
+ buf.flip();
+ }
+ return this.buffers;
+ }
+
+ public int position() {
+ return position;
+ }
+
+ public ByteBufferWriter put(byte b) {
+ getLastBuffer(1).put(b);
+ position++;
+ return this;
+ }
+
+ public ByteBufferWriter putShort(short value) {
+ getLastBuffer(2).putShort(value);
+ position += 2;
+ return this;
+ }
+
+ public ByteBufferWriter putInt(int value) {
+ getLastBuffer(4).putInt(value);
+ position += 4;
+ return this;
+ }
+
+ //重新设置指定位置的值
+ public ByteBufferWriter putInt(final int index, int value) {
+ int start = 0;
+ ByteBuffer[] buffs = this.buffers;
+ for (int i = 0; i < buffs.length; i++) {
+ int pos = buffs[i].position();
+ if (pos + start > index) {
+ int r = pos + start - index;
+ if (r >= 4) {
+ buffs[i].putInt(index - start, value);
+ return this;
+ } else {
+ byte b1 = bigEndian ? (byte) ((value >> 24) & 0xFF) : (byte) (value & 0xFF);
+ byte b2 = bigEndian ? (byte) ((value >> 16) & 0xFF) : (byte) ((value >> 8) & 0xFF);
+ byte b3 = bigEndian ? (byte) ((value >> 8) & 0xFF) : (byte) ((value >> 16) & 0xFF);
+ byte b4 = bigEndian ? (byte) (value & 0xFF) : (byte) ((value >> 24) & 0xFF);
+ if (r == 3) {
+ buffs[i].put(index - start, b1);
+ buffs[i].put(index - start + 1, b2);
+ buffs[i].put(index - start + 2, b3);
+ buffs[i + 1].put(0, b4);
+ } else if (r == 2) {
+ buffs[i].put(index - start, b1);
+ buffs[i].put(index - start + 1, b2);
+ buffs[i + 1].put(0, b3);
+ buffs[i + 1].put(1, b4);
+ } else if (r == 1) {
+ buffs[i].put(index - start, b1);
+ buffs[i + 1].put(0, b2);
+ buffs[i + 1].put(1, b3);
+ buffs[i + 1].put(2, b4);
+ }
+ return this;
+ }
+ } else {
+ start += pos;
+ }
+ }
+ throw new ArrayIndexOutOfBoundsException(index);
+ }
+
+// public static void main(String[] args) throws Throwable {
+// ObjectPool pool = new ObjectPool<>(20, (p) -> ByteBuffer.allocate(10), (ByteBuffer t) -> t.clear(), (ByteBuffer t) -> false);
+// ByteBufferWriter writer = ByteBufferWriter.create(pool);
+// for (int i = 1; i <= 18; i++) {
+// writer.put((byte) i);
+// }
+// System.out.println(Arrays.toString(toBytes(writer.toBuffers())));
+//
+// writer = ByteBufferWriter.create(pool);
+// for (int i = 1; i <= 18; i++) {
+// writer.put((byte) i);
+// }
+// int value = 0x223344;
+// byte[] b4 = new byte[]{(byte) ((value >> 24) & 0xFF), (byte) ((value >> 16) & 0xFF), (byte) ((value >> 8) & 0xFF), (byte) (value & 0xFF)};
+// writer.putInt(9, value);
+// System.out.println(Arrays.toString(b4));
+// System.out.println(Arrays.toString(toBytes(writer.toBuffers())));
+// }
+ public ByteBufferWriter putFloat(float value) {
+ getLastBuffer(4).putFloat(value);
+ position += 4;
+ return this;
+ }
+
+ public ByteBufferWriter putLong(long value) {
+ getLastBuffer(8).putLong(value);
+ position += 8;
+ return this;
+ }
+
+ public ByteBufferWriter putDouble(double value) {
+ getLastBuffer(8).putDouble(value);
+ position += 8;
+ return this;
+ }
+
+ public int put(byte[] src) {
+ return put(true, src, 0, src.length);
+ }
+
+ public int put(byte[] src, int offset, int length) {
+ return put(true, src, offset, length);
+ }
+
+ public int put(byte[] src, int offset, int length, byte[] src2, int offset2, int length2) {
+ ByteBuffer buf = getLastBuffer(1);
+ int remain = buf.remaining();
+ if (remain >= length + length2) {
+ buf.put(src, offset, length);
+ if (src2 != null) {
+ buf.put(src2, offset2, length2);
+ }
+ position += length + length2;
+ this.writeBytesCounter++;
+ } else {
+ put(true, src, offset, length);
+ if (src2 != null) {
+ put(false, src2, offset2, length2);
+ }
+ }
+ return writeBytesCounter;
+ }
+
+ private int put(boolean outside, byte[] src, int offset, int length) {
+ ByteBuffer buf = getLastBuffer(1);
+ int remain = buf.remaining();
+ if (remain >= length) {
+ buf.put(src, offset, length);
+ position += length;
+ } else {
+ buf.put(src, offset, remain);
+ position += remain;
+ put(false, src, offset + remain, length - remain);
+ }
+ if (outside) {
+ this.writeBytesCounter++;
+ }
+ return writeBytesCounter;
+ }
+
+ public int getWriteBytesCounter() {
+ return this.writeBytesCounter;
+ }
+}