优化ByteBufferWriter

This commit is contained in:
redkale
2023-07-05 06:38:17 +08:00
parent c80bb876a3
commit 7b7fb33b7b
2 changed files with 295 additions and 277 deletions

View File

@@ -14,7 +14,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer; import java.util.function.Consumer;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import org.redkale.util.*; import org.redkale.util.ByteBufferWriter;
/** /**
* *
@@ -68,14 +68,14 @@ abstract class AsyncNioConnection extends AsyncConnection {
protected int writeByteTuple2Length; protected int writeByteTuple2Length;
//写操作, 二选一要么writeByteBuffer有值要么writeByteBuffers、writeOffset、writeLength有值 //写操作, 二选一要么writeByteBuffer有值要么writeByteBuffers、writeBuffersOffset、writeBuffersLength有值
protected ByteBuffer writeByteBuffer; protected ByteBuffer writeByteBuffer;
protected ByteBuffer[] writeByteBuffers; protected ByteBuffer[] writeByteBuffers;
protected int writeOffset; protected int writeBuffersOffset;
protected int writeLength; protected int writeBuffersLength;
protected int writeTotal; protected int writeTotal;
@@ -154,9 +154,10 @@ abstract class AsyncNioConnection extends AsyncConnection {
ioReadThread.register(selector -> { ioReadThread.register(selector -> {
try { try {
if (readKey == null) { if (readKey == null) {
SelectionKey oldKey = keyFor(selector); readKey = keyFor(selector);
int ops = oldKey == null ? SelectionKey.OP_READ : (SelectionKey.OP_READ | oldKey.interestOps()); }
readKey = implRegister(selector, ops); if (readKey == null) {
readKey = implRegister(selector, SelectionKey.OP_READ);
readKey.attach(this); readKey.attach(this);
} else { } else {
readKey.interestOps(readKey.interestOps() | SelectionKey.OP_READ); readKey.interestOps(readKey.interestOps() | SelectionKey.OP_READ);
@@ -275,8 +276,8 @@ abstract class AsyncNioConnection extends AsyncConnection {
} }
this.writePending = true; this.writePending = true;
this.writeByteBuffers = srcs; this.writeByteBuffers = srcs;
this.writeOffset = offset; this.writeBuffersOffset = offset;
this.writeLength = length; this.writeBuffersLength = length;
this.writeAttachment = attachment; this.writeAttachment = attachment;
if (this.writeTimeoutSeconds > 0) { if (this.writeTimeoutSeconds > 0) {
AsyncNioCompletionHandler newHandler = this.writeTimeoutCompletionHandler; AsyncNioCompletionHandler newHandler = this.writeTimeoutCompletionHandler;
@@ -307,9 +308,10 @@ abstract class AsyncNioConnection extends AsyncConnection {
ioWriteThread.register(selector -> { ioWriteThread.register(selector -> {
try { try {
if (writeKey == null) { if (writeKey == null) {
SelectionKey oldKey = keyFor(selector); writeKey = keyFor(selector);
int ops = oldKey == null ? SelectionKey.OP_WRITE : (SelectionKey.OP_WRITE | oldKey.interestOps()); }
writeKey = implRegister(selector, ops); if (writeKey == null) {
writeKey = implRegister(selector, SelectionKey.OP_WRITE);
writeKey.attach(this); writeKey.attach(this);
} else { } else {
writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE); writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE);
@@ -346,9 +348,10 @@ abstract class AsyncNioConnection extends AsyncConnection {
ioReadThread.register(selector -> { ioReadThread.register(selector -> {
try { try {
if (readKey == null) { if (readKey == null) {
SelectionKey oldKey = keyFor(selector); readKey = keyFor(selector);
int ops = oldKey == null ? SelectionKey.OP_READ : (SelectionKey.OP_READ | oldKey.interestOps()); }
readKey = implRegister(selector, ops); if (readKey == null) {
readKey = implRegister(selector, SelectionKey.OP_READ);
readKey.attach(this); readKey.attach(this);
} else { } else {
readKey.interestOps(readKey.interestOps() | SelectionKey.OP_READ); readKey.interestOps(readKey.interestOps() | SelectionKey.OP_READ);
@@ -372,25 +375,39 @@ abstract class AsyncNioConnection extends AsyncConnection {
boolean hasRemain = true; boolean hasRemain = true;
boolean writeCompleted = true; boolean writeCompleted = true;
if (writeByteTuple1Array == null && fastWriteCount.get() > 0) { if (writeByteBuffer == null && writeByteBuffers == null && writeByteTuple1Array == null && fastWriteCount.get() > 0) {
byte[] bs = null; final ByteBuffer buffer = pollWriteBuffer();
ByteBufferWriter writer = null;
byte[] item; byte[] item;
while ((item = fastWriteQueue.poll()) != null) { while ((item = fastWriteQueue.poll()) != null) {
fastWriteCount.decrementAndGet(); fastWriteCount.decrementAndGet();
bs = Utility.append(bs, item); if (writer != null) {
writer.put(item);
} else if (buffer.remaining() >= item.length) {
buffer.put(item);
} else {
writer = ByteBufferWriter.create(getWriteBufferSupplier(), buffer);
writer.put(item);
}
} }
this.writeByteTuple1Array = bs; this.writeBuffersOffset = 0;
if (writer == null) {
this.writeByteBuffer = buffer.flip();
this.writeBuffersLength = 0;
} else {
this.writeByteBuffers = writer.toBuffers();
this.writeBuffersLength = this.writeByteBuffers.length;
}
this.writeByteTuple1Array = null;
this.writeByteTuple1Offset = 0; this.writeByteTuple1Offset = 0;
this.writeByteTuple1Length = bs == null ? 0 : bs.length; this.writeByteTuple1Length = 0;
this.writeByteTuple2Array = null; this.writeByteTuple2Array = null;
this.writeByteTuple2Offset = 0; this.writeByteTuple2Offset = 0;
this.writeByteTuple2Length = 0; this.writeByteTuple2Length = 0;
this.writeOffset = 0;
this.writeLength = this.writeByteTuple1Length;
} }
int batchOffset = writeOffset; int batchOffset = writeBuffersOffset;
int batchLength = writeLength; int batchLength = writeBuffersLength;
while (hasRemain) { //必须要将buffer写完为止 while (hasRemain) { //必须要将buffer写完为止
if (writeByteTuple1Array != null) { if (writeByteTuple1Array != null) {
final ByteBuffer buffer = pollWriteBuffer(); final ByteBuffer buffer = pollWriteBuffer();
@@ -399,14 +416,13 @@ abstract class AsyncNioConnection extends AsyncConnection {
if (writeByteTuple2Length > 0) { if (writeByteTuple2Length > 0) {
buffer.put(writeByteTuple2Array, writeByteTuple2Offset, writeByteTuple2Length); buffer.put(writeByteTuple2Array, writeByteTuple2Offset, writeByteTuple2Length);
} }
buffer.flip(); this.writeByteBuffer = buffer.flip();
writeByteBuffer = buffer; this.writeByteTuple1Array = null;
writeByteTuple1Array = null; this.writeByteTuple1Offset = 0;
writeByteTuple1Offset = 0; this.writeByteTuple1Length = 0;
writeByteTuple1Length = 0; this.writeByteTuple2Array = null;
writeByteTuple2Array = null; this.writeByteTuple2Offset = 0;
writeByteTuple2Offset = 0; this.writeByteTuple2Length = 0;
writeByteTuple2Length = 0;
} else { } else {
ByteBufferWriter writer = ByteBufferWriter.create(getWriteBufferSupplier(), buffer); ByteBufferWriter writer = ByteBufferWriter.create(getWriteBufferSupplier(), buffer);
writer.put(writeByteTuple1Array, writeByteTuple1Offset, writeByteTuple1Length); writer.put(writeByteTuple1Array, writeByteTuple1Offset, writeByteTuple1Length);
@@ -414,17 +430,17 @@ abstract class AsyncNioConnection extends AsyncConnection {
writer.put(writeByteTuple2Array, writeByteTuple2Offset, writeByteTuple2Length); writer.put(writeByteTuple2Array, writeByteTuple2Offset, writeByteTuple2Length);
} }
final ByteBuffer[] buffers = writer.toBuffers(); final ByteBuffer[] buffers = writer.toBuffers();
writeByteBuffers = buffers; this.writeByteBuffers = buffers;
writeOffset = 0; this.writeBuffersOffset = 0;
writeLength = buffers.length; this.writeBuffersLength = buffers.length;
batchOffset = writeOffset; batchOffset = writeBuffersOffset;
batchLength = writeLength; batchLength = writeBuffersLength;
writeByteTuple1Array = null; this.writeByteTuple1Array = null;
writeByteTuple1Offset = 0; this.writeByteTuple1Offset = 0;
writeByteTuple1Length = 0; this.writeByteTuple1Length = 0;
writeByteTuple2Array = null; this.writeByteTuple2Array = null;
writeByteTuple2Offset = 0; this.writeByteTuple2Offset = 0;
writeByteTuple2Length = 0; this.writeByteTuple2Length = 0;
} }
if (this.writeCompletionHandler == this.writeTimeoutCompletionHandler) { if (this.writeCompletionHandler == this.writeTimeoutCompletionHandler) {
if (writeByteBuffer == null) { if (writeByteBuffer == null) {
@@ -473,14 +489,15 @@ abstract class AsyncNioConnection extends AsyncConnection {
} }
if (writeCompleted && (totalCount != 0 || !hasRemain)) { if (writeCompleted && (totalCount != 0 || !hasRemain)) {
handleWrite(writeTotal + totalCount, null); handleWrite(this.writeTotal + totalCount, null);
} else if (writeKey == null) { } else if (writeKey == null) {
ioWriteThread.register(selector -> { ioWriteThread.register(selector -> {
try { try {
if (writeKey == null) { if (writeKey == null) {
SelectionKey oldKey = keyFor(selector); writeKey = keyFor(selector);
int ops = oldKey == null ? SelectionKey.OP_WRITE : (SelectionKey.OP_WRITE | oldKey.interestOps()); }
writeKey = implRegister(selector, ops); if (writeKey == null) {
writeKey = implRegister(selector, SelectionKey.OP_WRITE);
writeKey.attach(this); writeKey.attach(this);
} else { } else {
writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE); writeKey.interestOps(writeKey.interestOps() | SelectionKey.OP_WRITE);
@@ -549,8 +566,8 @@ abstract class AsyncNioConnection extends AsyncConnection {
this.writeAttachment = null; this.writeAttachment = null;
this.writeByteBuffer = null; this.writeByteBuffer = null;
this.writeByteBuffers = null; this.writeByteBuffers = null;
this.writeOffset = 0; this.writeBuffersOffset = 0;
this.writeLength = 0; this.writeBuffersLength = 0;
this.writeTotal = 0; this.writeTotal = 0;
this.writePending = false; //必须放最后 this.writePending = false; //必须放最后

View File

@@ -1,228 +1,229 @@
/* /*
* To change this license header, choose License Headers in Project Properties. * To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates * To change this template file, choose Tools | Templates
* and open the template in the editor. * and open the template in the editor.
*/ */
package org.redkale.util; package org.redkale.util;
import java.nio.*; import java.nio.*;
import java.util.function.Supplier; import java.util.function.Supplier;
/** /**
* 以ByteBuffer为数据载体的Writer * 以ByteBuffer为数据载体的Writer
* *
* <p> * <p>
* 详情见: https://redkale.org * 详情见: https://redkale.org
* *
* @author zhangjx * @author zhangjx
*/ */
public class ByteBufferWriter { public class ByteBufferWriter {
private final Supplier<ByteBuffer> supplier; private final Supplier<ByteBuffer> supplier;
private ByteBuffer[] buffers; private ByteBuffer[] buffers;
private int position; private int position;
private int writeBytesCounter = 0; //put(byte[] src, int offset, int length) 调用的次数 private int writeBytesCounter = 0; //put(byte[] src, int offset, int length) 调用的次数
private boolean bigEndian = true; private boolean bigEndian = true;
protected ByteBufferWriter(Supplier<ByteBuffer> supplier) { protected ByteBufferWriter(Supplier<ByteBuffer> supplier) {
this.supplier = supplier; this.supplier = supplier;
} }
public static ByteBufferWriter create(Supplier<ByteBuffer> supplier) { public static ByteBufferWriter create(Supplier<ByteBuffer> supplier) {
return new ByteBufferWriter(supplier); return new ByteBufferWriter(supplier);
} }
public static ByteBufferWriter create(Supplier<ByteBuffer> supplier, ByteBuffer one) { public static ByteBufferWriter create(Supplier<ByteBuffer> supplier, ByteBuffer one) {
ByteBufferWriter writer = new ByteBufferWriter(supplier); ByteBufferWriter writer = new ByteBufferWriter(supplier);
writer.bigEndian = one.order() == ByteOrder.BIG_ENDIAN; writer.bigEndian = one.order() == ByteOrder.BIG_ENDIAN;
writer.buffers = new ByteBuffer[]{one}; writer.buffers = new ByteBuffer[]{one};
return writer; writer.position = one.position();
} return writer;
}
public static ByteBuffer[] toBuffers(Supplier<ByteBuffer> supplier, byte[] content) {
ByteBufferWriter writer = new ByteBufferWriter(supplier); public static ByteBuffer[] toBuffers(Supplier<ByteBuffer> supplier, byte[] content) {
writer.put(false, content, 0, content.length); ByteBufferWriter writer = new ByteBufferWriter(supplier);
return writer.toBuffers(); writer.put(false, content, 0, content.length);
} return writer.toBuffers();
}
public static ByteBuffer[] toBuffers(Supplier<ByteBuffer> supplier, byte[] content, int offset, int length) {
ByteBufferWriter writer = new ByteBufferWriter(supplier); public static ByteBuffer[] toBuffers(Supplier<ByteBuffer> supplier, byte[] content, int offset, int length) {
writer.put(false, content, offset, length); ByteBufferWriter writer = new ByteBufferWriter(supplier);
return writer.toBuffers(); writer.put(false, content, offset, length);
} return writer.toBuffers();
}
private ByteBuffer getLastBuffer(int size) {
if (this.buffers == null) { private ByteBuffer getLastBuffer(int size) {
ByteBuffer buf = supplier.get(); if (this.buffers == null) {
this.bigEndian = buf.order() == ByteOrder.BIG_ENDIAN; ByteBuffer buf = supplier.get();
this.buffers = Utility.append(this.buffers, buf); this.bigEndian = buf.order() == ByteOrder.BIG_ENDIAN;
return buf; this.buffers = Utility.append(this.buffers, buf);
} else if (this.buffers[this.buffers.length - 1].remaining() < size) { return buf;
ByteBuffer buf = supplier.get(); } else if (this.buffers[this.buffers.length - 1].remaining() < size) {
this.buffers = Utility.append(this.buffers, buf); ByteBuffer buf = supplier.get();
return buf; this.buffers = Utility.append(this.buffers, buf);
} return buf;
return this.buffers[this.buffers.length - 1]; }
} return this.buffers[this.buffers.length - 1];
}
public ByteBuffer[] toBuffers() {
if (buffers == null) { public ByteBuffer[] toBuffers() {
return new ByteBuffer[0]; if (buffers == null) {
} return new ByteBuffer[0];
for (ByteBuffer buf : this.buffers) { }
buf.flip(); for (ByteBuffer buf : this.buffers) {
} buf.flip();
return this.buffers; }
} return this.buffers;
}
public int position() {
return position; public int position() {
} return position;
}
public ByteBufferWriter put(byte b) {
getLastBuffer(1).put(b); public ByteBufferWriter put(byte b) {
position++; getLastBuffer(1).put(b);
return this; position++;
} return this;
}
public ByteBufferWriter putShort(short value) {
getLastBuffer(2).putShort(value); public ByteBufferWriter putShort(short value) {
position += 2; getLastBuffer(2).putShort(value);
return this; position += 2;
} return this;
}
public ByteBufferWriter putInt(int value) {
getLastBuffer(4).putInt(value); public ByteBufferWriter putInt(int value) {
position += 4; getLastBuffer(4).putInt(value);
return this; position += 4;
} return this;
}
//重新设置指定位置的值
public ByteBufferWriter putInt(final int index, int value) { //重新设置指定位置的值
int start = 0; public ByteBufferWriter putInt(final int index, int value) {
ByteBuffer[] buffs = this.buffers; int start = 0;
for (int i = 0; i < buffs.length; i++) { ByteBuffer[] buffs = this.buffers;
int pos = buffs[i].position(); for (int i = 0; i < buffs.length; i++) {
if (pos + start > index) { int pos = buffs[i].position();
int r = pos + start - index; if (pos + start > index) {
if (r >= 4) { int r = pos + start - index;
buffs[i].putInt(index - start, value); if (r >= 4) {
return this; buffs[i].putInt(index - start, value);
} else { return this;
byte b1 = bigEndian ? (byte) ((value >> 24) & 0xFF) : (byte) (value & 0xFF); } else {
byte b2 = bigEndian ? (byte) ((value >> 16) & 0xFF) : (byte) ((value >> 8) & 0xFF); byte b1 = bigEndian ? (byte) ((value >> 24) & 0xFF) : (byte) (value & 0xFF);
byte b3 = bigEndian ? (byte) ((value >> 8) & 0xFF) : (byte) ((value >> 16) & 0xFF); byte b2 = bigEndian ? (byte) ((value >> 16) & 0xFF) : (byte) ((value >> 8) & 0xFF);
byte b4 = bigEndian ? (byte) (value & 0xFF) : (byte) ((value >> 24) & 0xFF); byte b3 = bigEndian ? (byte) ((value >> 8) & 0xFF) : (byte) ((value >> 16) & 0xFF);
if (r == 3) { byte b4 = bigEndian ? (byte) (value & 0xFF) : (byte) ((value >> 24) & 0xFF);
buffs[i].put(index - start, b1); if (r == 3) {
buffs[i].put(index - start + 1, b2); buffs[i].put(index - start, b1);
buffs[i].put(index - start + 2, b3); buffs[i].put(index - start + 1, b2);
buffs[i + 1].put(0, b4); buffs[i].put(index - start + 2, b3);
} else if (r == 2) { buffs[i + 1].put(0, b4);
buffs[i].put(index - start, b1); } else if (r == 2) {
buffs[i].put(index - start + 1, b2); buffs[i].put(index - start, b1);
buffs[i + 1].put(0, b3); buffs[i].put(index - start + 1, b2);
buffs[i + 1].put(1, b4); buffs[i + 1].put(0, b3);
} else if (r == 1) { buffs[i + 1].put(1, b4);
buffs[i].put(index - start, b1); } else if (r == 1) {
buffs[i + 1].put(0, b2); buffs[i].put(index - start, b1);
buffs[i + 1].put(1, b3); buffs[i + 1].put(0, b2);
buffs[i + 1].put(2, b4); buffs[i + 1].put(1, b3);
} buffs[i + 1].put(2, b4);
return this; }
} return this;
} else { }
start += pos; } else {
} start += pos;
} }
throw new ArrayIndexOutOfBoundsException(index); }
} throw new ArrayIndexOutOfBoundsException(index);
}
// public static void main(String[] args) throws Throwable {
// ObjectPool<ByteBuffer> pool = new ObjectPool<>(20, (p) -> ByteBuffer.allocate(10), (ByteBuffer t) -> t.clear(), (ByteBuffer t) -> false); // public static void main(String[] args) throws Throwable {
// ByteBufferWriter writer = ByteBufferWriter.create(pool); // ObjectPool<ByteBuffer> pool = new ObjectPool<>(20, (p) -> ByteBuffer.allocate(10), (ByteBuffer t) -> t.clear(), (ByteBuffer t) -> false);
// for (int i = 1; i <= 18; i++) { // ByteBufferWriter writer = ByteBufferWriter.create(pool);
// writer.put((byte) i); // for (int i = 1; i <= 18; i++) {
// } // writer.put((byte) i);
// System.out.println(Arrays.toString(toBytes(writer.toBuffers()))); // }
// // System.out.println(Arrays.toString(toBytes(writer.toBuffers())));
// writer = ByteBufferWriter.create(pool); //
// for (int i = 1; i <= 18; i++) { // writer = ByteBufferWriter.create(pool);
// writer.put((byte) i); // for (int i = 1; i <= 18; i++) {
// } // writer.put((byte) i);
// int value = 0x223344; // }
// byte[] b4 = new byte[]{(byte) ((value >> 24) & 0xFF), (byte) ((value >> 16) & 0xFF), (byte) ((value >> 8) & 0xFF), (byte) (value & 0xFF)}; // int value = 0x223344;
// writer.putInt(9, value); // byte[] b4 = new byte[]{(byte) ((value >> 24) & 0xFF), (byte) ((value >> 16) & 0xFF), (byte) ((value >> 8) & 0xFF), (byte) (value & 0xFF)};
// System.out.println(Arrays.toString(b4)); // writer.putInt(9, value);
// System.out.println(Arrays.toString(toBytes(writer.toBuffers()))); // System.out.println(Arrays.toString(b4));
// } // System.out.println(Arrays.toString(toBytes(writer.toBuffers())));
public ByteBufferWriter putFloat(float value) { // }
getLastBuffer(4).putFloat(value); public ByteBufferWriter putFloat(float value) {
position += 4; getLastBuffer(4).putFloat(value);
return this; position += 4;
} return this;
}
public ByteBufferWriter putLong(long value) {
getLastBuffer(8).putLong(value); public ByteBufferWriter putLong(long value) {
position += 8; getLastBuffer(8).putLong(value);
return this; position += 8;
} return this;
}
public ByteBufferWriter putDouble(double value) {
getLastBuffer(8).putDouble(value); public ByteBufferWriter putDouble(double value) {
position += 8; getLastBuffer(8).putDouble(value);
return this; position += 8;
} return this;
}
public int put(byte[] src) {
return put(true, src, 0, src.length); public int put(byte[] src) {
} return put(true, src, 0, src.length);
}
public int put(byte[] src, int offset, int length) {
return put(true, src, offset, length); public int put(byte[] src, int offset, int length) {
} return put(true, src, offset, length);
}
public int put(byte[] src, int offset, int length, byte[] src2, int offset2, int length2) {
ByteBuffer buf = getLastBuffer(1); public int put(byte[] src, int offset, int length, byte[] src2, int offset2, int length2) {
int remain = buf.remaining(); ByteBuffer buf = getLastBuffer(1);
if (remain >= length + length2) { int remain = buf.remaining();
buf.put(src, offset, length); if (remain >= length + length2) {
if (src2 != null) { buf.put(src, offset, length);
buf.put(src2, offset2, length2); if (src2 != null) {
} buf.put(src2, offset2, length2);
position += length + length2; }
this.writeBytesCounter++; position += length + length2;
} else { this.writeBytesCounter++;
put(true, src, offset, length); } else {
if (src2 != null) { put(true, src, offset, length);
put(false, src2, offset2, length2); if (src2 != null) {
} put(false, src2, offset2, length2);
} }
return writeBytesCounter; }
} return writeBytesCounter;
}
private int put(boolean outside, byte[] src, int offset, int length) {
ByteBuffer buf = getLastBuffer(1); private int put(boolean outside, byte[] src, int offset, int length) {
int remain = buf.remaining(); ByteBuffer buf = getLastBuffer(1);
if (remain >= length) { int remain = buf.remaining();
buf.put(src, offset, length); if (remain >= length) {
position += length; buf.put(src, offset, length);
} else { position += length;
buf.put(src, offset, remain); } else {
position += remain; buf.put(src, offset, remain);
put(false, src, offset + remain, length - remain); position += remain;
} put(false, src, offset + remain, length - remain);
if (outside) { }
this.writeBytesCounter++; if (outside) {
} this.writeBytesCounter++;
return writeBytesCounter; }
} return writeBytesCounter;
}
public int getWriteBytesCounter() {
return this.writeBytesCounter; public int getWriteBytesCounter() {
} return this.writeBytesCounter;
} }
}