ProtobufBytesWriter

This commit is contained in:
redkale
2024-10-03 10:03:35 +08:00
parent 88417f35a4
commit b7a604ebee
4 changed files with 307 additions and 284 deletions

View File

@@ -21,7 +21,7 @@ public class ProtobufByteBufferWriter extends ProtobufWriter {
private int currBufIndex;
public ProtobufByteBufferWriter(int features, boolean enumtostring, Supplier<ByteBuffer> supplier) {
super((byte[]) null);
super();
this.features = features;
this.enumtostring = enumtostring;
this.supplier = supplier;
@@ -37,30 +37,31 @@ public class ProtobufByteBufferWriter extends ProtobufWriter {
@Override
public final ProtobufWriter pollChild() {
ProtobufWriter rs = super.pollChild();
this.delegate = null;
this.child = null;
rs.parent = null;
return rs;
ProtobufBytesWriter result = new ProtobufBytesWriter();
return result.configFieldFunc(this);
}
@Override
public final void offerChild(ProtobufWriter child) {
int total = child.length();
ProtobufWriter next = child;
ProtobufBytesWriter bw = (ProtobufBytesWriter) child;
int total = bw.length();
ProtobufBytesWriter next = bw;
while ((next = next.child) != null) {
total += next.length();
}
writeLength(total);
writeTo(child.content(), 0, child.length());
next = child;
writeTo(bw.content(), 0, bw.length());
next = bw;
while ((next = next.child) != null) {
writeTo(next.content(), 0, next.length());
}
offerPool(child);
}
@Override
protected final void writeSelfLength(int value) {
this.writeLength(value);
}
public ByteBuffer[] toBuffers() {
if (buffers == null) {
return new ByteBuffer[0];
@@ -183,19 +184,4 @@ public class ProtobufByteBufferWriter extends ProtobufWriter {
public String toString() {
return Objects.toString(this);
}
@Override
public final ProtobufWriter clear() {
throw new UnsupportedOperationException("Not supported yet."); // 无需实现
}
@Override
public final byte[] toArray() {
return toByteArray().getBytes();
}
@Override
public final byte[] content() {
throw new UnsupportedOperationException("Not supported yet."); // 无需实现
}
}

View File

@@ -0,0 +1,258 @@
/*
* Copyright (c) 2016-2116 Redkale
* All rights reserved.
*/
package org.redkale.convert.pb;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.function.Consumer;
import org.redkale.convert.ConvertBytesHandler;
import static org.redkale.convert.pb.ProtobufWriter.CHILD_SIZE;
import org.redkale.util.ByteArray;
import org.redkale.util.ByteTuple;
/**
*
* @author zhangjx
*/
public class ProtobufBytesWriter extends ProtobufWriter implements ByteTuple {
private byte[] content;
// 链表结构
private ProtobufBytesWriter delegate;
private ArrayDeque<ProtobufBytesWriter> pool;
protected ProtobufBytesWriter(byte[] bs) {
this.content = bs;
}
ProtobufBytesWriter(byte[] bs, int count) {
this.content = bs;
this.count = count;
}
public ProtobufBytesWriter() {
this(DEFAULT_SIZE);
}
public ProtobufBytesWriter(int size) {
this.content = new byte[Math.max(size, DEFAULT_SIZE)];
}
public ProtobufBytesWriter(ByteTuple tuple) {
this.content = tuple.content();
this.count = tuple.length();
}
@Override
protected boolean recycle() {
super.recycle();
if (this.delegate != null && this.pool != null) {
ProtobufWriter s;
ProtobufWriter p = this.delegate;
do {
s = p;
p = p.parent;
offerPool(s);
} while (p != this);
}
this.delegate = null;
if (this.content.length > DEFAULT_SIZE) {
this.content = new byte[DEFAULT_SIZE];
}
return true;
}
private void offerPool(ProtobufWriter item) {
if (this.pool != null && this.pool.size() < CHILD_SIZE) {
item.recycle();
this.pool.offer((ProtobufBytesWriter) item);
}
}
@Override
public ProtobufWriter pollChild() {
Queue<ProtobufBytesWriter> queue = this.pool;
if (queue == null) {
this.pool = new ArrayDeque<>(CHILD_SIZE);
queue = this.pool;
}
ProtobufBytesWriter result = queue.poll();
if (result == null) {
result = new ProtobufBytesWriter(new byte[256], 0);
}
if (delegate == null) {
result.parent = this;
this.child = result;
delegate = result;
} else {
result.parent = delegate;
delegate.child = result;
delegate = result;
}
result.configFieldFunc(result.parent);
return result;
}
@Override
public void offerChild(ProtobufWriter child) {
if (child != null) {
int len = child.length();
ProtobufWriter next = child;
while ((next = next.child) != null) {
len += next.length();
}
child.parent.writeSelfLength(len);
}
}
@Override
protected final void writeSelfLength(int value) {
ProtobufBytesWriter old = this.delegate;
this.delegate = null;
if (value < 128) {
writeTo((byte) value);
} else {
writeUInt32(value);
}
this.delegate = old;
}
/**
* 将本对象的内容引用复制给array
*
* @param array ByteArray
*/
public void directTo(ByteArray array) {
array.directFrom(content, count);
}
public void completed(ConvertBytesHandler handler, Consumer<ProtobufWriter> callback) {
handler.completed(content, 0, count, callback, this);
}
@Override
public byte[] toArray() {
if (delegate == null) {
byte[] copy = new byte[count];
System.arraycopy(content, 0, copy, 0, count);
return copy;
} else {
int total = count;
ProtobufBytesWriter next = this;
while ((next = next.child) != null) {
total += next.length();
}
byte[] data = new byte[total];
System.arraycopy(content, 0, data, 0, count);
next = this;
int pos = count;
while ((next = next.child) != null) {
System.arraycopy(next.content(), 0, data, pos, next.length());
pos += next.length();
}
return data;
}
}
@Override
protected int expand(int len) {
int newcount = count + len;
if (newcount > content.length) {
byte[] newdata = new byte[Math.max(content.length * 2, newcount)];
System.arraycopy(content, 0, newdata, 0, count);
this.content = newdata;
}
return 0;
}
@Override
public void writeTo(final byte ch) {
if (delegate == null) {
expand(1);
content[count++] = ch;
} else {
delegate.writeTo(ch);
}
}
@Override
public void writeTo(final byte[] chs, final int start, final int len) {
if (delegate == null) {
expand(len);
System.arraycopy(chs, start, content, count, len);
count += len;
} else {
delegate.writeTo(chs, start, len);
}
}
@Override
protected void writeUInt32(int value) {
if (value >= 0 && value < TENTHOUSAND_MAX) {
writeTo(TENTHOUSAND_UINT_BYTES[value]);
return;
} else if (value < 0 && value > TENTHOUSAND_MAX) {
writeTo(TENTHOUSAND_UINT_BYTES2[-value]);
return;
}
if (delegate == null) {
expand(5);
int curr = this.count;
byte[] data = this.content;
while (true) {
if ((value & ~0x7F) == 0) {
data[curr++] = (byte) value;
this.count = curr;
return;
} else {
data[curr++] = (byte) ((value & 0x7F) | 0x80);
value >>>= 7;
}
}
} else {
delegate.writeUInt32(value);
}
}
@Override
protected void writeUInt64(long value) {
if (value >= 0 && value < TENTHOUSAND_MAX) {
writeTo(TENTHOUSAND_UINT_BYTES[(int) value]);
return;
} else if (value < 0 && value > TENTHOUSAND_MAX) {
writeTo(TENTHOUSAND_UINT_BYTES2[(int) -value]);
return;
}
if (delegate == null) {
expand(10);
int curr = this.count;
byte[] data = this.content;
while (true) {
if ((value & ~0x7FL) == 0) {
data[curr++] = (byte) value;
this.count = curr;
return;
} else {
data[curr++] = (byte) (((int) value & 0x7F) | 0x80);
value >>>= 7;
}
}
} else {
delegate.writeUInt64(value);
}
}
@Override
public byte[] content() {
return content;
}
@Override
public int offset() {
return 0;
}
}

View File

@@ -27,7 +27,8 @@ import org.redkale.util.*;
*/
public class ProtobufConvert extends BinaryConvert<ProtobufReader, ProtobufWriter> {
private final ThreadLocal<ProtobufWriter> writerPool = Utility.withInitialThreadLocal(ProtobufWriter::new);
private final ThreadLocal<ProtobufBytesWriter> writerPool =
Utility.withInitialThreadLocal(ProtobufBytesWriter::new);
private final Consumer<ProtobufWriter> writerConsumer = this::offerWriter;
@@ -105,7 +106,8 @@ public class ProtobufConvert extends BinaryConvert<ProtobufReader, ProtobufWrite
// ------------------------------ writer -----------------------------------------------------------
@Override
protected <S extends ProtobufWriter> S configWrite(S writer) {
return (S) writer.configWrite();
writer.withFeatures(features).enumtostring(((ProtobufFactory) factory).enumtostring);
return writer;
}
public ProtobufByteBufferWriter pollProtobufWriter(final Supplier<ByteBuffer> supplier) {
@@ -117,21 +119,21 @@ public class ProtobufConvert extends BinaryConvert<ProtobufReader, ProtobufWrite
}
@Override
public ProtobufWriter pollWriter() {
ProtobufWriter writer = writerPool.get();
public ProtobufBytesWriter pollWriter() {
ProtobufBytesWriter writer = writerPool.get();
if (writer == null) {
writer = new ProtobufWriter();
writer = new ProtobufBytesWriter();
} else {
writerPool.set(null);
}
return configWrite(writer.withFeatures(features).enumtostring(((ProtobufFactory) factory).enumtostring));
return configWrite(writer);
}
@Override
public void offerWriter(final ProtobufWriter out) {
if (out != null) {
out.recycle();
writerPool.set(out);
writerPool.set((ProtobufBytesWriter) out);
}
}
@@ -666,14 +668,14 @@ public class ProtobufConvert extends BinaryConvert<ProtobufReader, ProtobufWrite
@Override
public byte[] convertTo(final Type type, final Object value) {
if (value == null) {
final ProtobufWriter writer = pollWriter();
final ProtobufBytesWriter writer = pollWriter();
writer.writeNull();
byte[] result = writer.toArray();
offerWriter(writer);
return result;
}
final Type t = type == null ? value.getClass() : type;
final ProtobufWriter writer = pollWriter();
final ProtobufBytesWriter writer = pollWriter();
Encodeable encoder = this.lastEncodeable;
if (encoder == null || encoder.getType() != t) {
encoder = factory.loadEncoder(t);
@@ -699,7 +701,7 @@ public class ProtobufConvert extends BinaryConvert<ProtobufReader, ProtobufWrite
if (type == null) {
return null;
}
final ProtobufWriter writer = pollWriter();
final ProtobufBytesWriter writer = pollWriter();
Encodeable encoder = this.lastEncodeable;
if (encoder == null || encoder.getType() != type) {
encoder = factory.loadEncoder(type);
@@ -727,7 +729,7 @@ public class ProtobufConvert extends BinaryConvert<ProtobufReader, ProtobufWrite
@Override
public void convertToBytes(final Type type, final Object value, final ConvertBytesHandler handler) {
final ProtobufWriter writer = pollWriter();
final ProtobufBytesWriter writer = pollWriter();
if (value == null) {
writer.writeNull();
} else {
@@ -751,9 +753,7 @@ public class ProtobufConvert extends BinaryConvert<ProtobufReader, ProtobufWrite
@Override
public void convertToBytes(final ByteArray array, final Type type, final Object value) {
Objects.requireNonNull(array);
final ProtobufWriter writer = configWrite(new ProtobufWriter(array)
.withFeatures(features)
.enumtostring(((ProtobufFactory) factory).enumtostring));
final ProtobufBytesWriter writer = configWrite(new ProtobufBytesWriter(array));
if (value == null) {
writer.writeNull();
} else {
@@ -825,7 +825,7 @@ public class ProtobufConvert extends BinaryConvert<ProtobufReader, ProtobufWrite
writer.writeNull();
return;
}
writer.configWrite();
configWrite(writer);
final Type t = type == null ? value.getClass() : type;
Encodeable encoder = this.lastEncodeable;
if (encoder == null || encoder.getType() != t) {

View File

@@ -6,7 +6,6 @@
package org.redkale.convert.pb;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.function.*;
@@ -17,13 +16,14 @@ import org.redkale.convert.*;
import org.redkale.util.*;
/** @author zhangjx */
public class ProtobufWriter extends Writer implements ByteTuple {
public abstract class ProtobufWriter extends Writer {
private static final int DEFAULT_SIZE = Integer.getInteger(
protected static final int DEFAULT_SIZE = Integer.getInteger(
"redkale.convert.protobuf.writer.buffer.defsize",
Integer.getInteger("redkale.convert.writer.buffer.defsize", 1024));
private static final int CHILD_SIZE = 32;
protected static final int CHILD_SIZE = 32;
protected static final byte[] EMPTY_BYTES = new byte[0];
protected static final int TENTHOUSAND_MAX = 10001;
@@ -68,36 +68,9 @@ public class ProtobufWriter extends Writer implements ByteTuple {
protected ProtobufWriter parent;
protected ProtobufWriter child;
protected ProtobufBytesWriter child;
// 链表结构
protected ProtobufWriter delegate;
private byte[] content;
private ArrayDeque<ProtobufWriter> pool;
protected ProtobufWriter(byte[] bs) {
this.content = bs;
}
private ProtobufWriter(byte[] bs, int count) {
this.content = bs;
this.count = count;
}
public ProtobufWriter() {
this(DEFAULT_SIZE);
}
public ProtobufWriter(int size) {
this.content = new byte[Math.max(size, DEFAULT_SIZE)];
}
public ProtobufWriter(ByteTuple tuple) {
this.content = tuple.content();
this.count = tuple.length();
}
protected ProtobufWriter() {}
@Override
public final ProtobufWriter withFeatures(int features) {
@@ -105,10 +78,6 @@ public class ProtobufWriter extends Writer implements ByteTuple {
return this;
}
protected final ProtobufWriter configWrite() {
return this;
}
protected final ProtobufWriter configFieldFunc(ProtobufWriter out) {
this.mapFieldFunc = out.mapFieldFunc;
this.objFieldFunc = out.objFieldFunc;
@@ -126,172 +95,47 @@ public class ProtobufWriter extends Writer implements ByteTuple {
return objExtFunc;
}
public ProtobufWriter enumtostring(boolean enumtostring) {
this.enumtostring = enumtostring;
return this;
}
@Override
protected boolean recycle() {
super.recycle();
if (this.delegate != null && this.pool != null) {
ProtobufWriter s;
ProtobufWriter p = this.delegate;
do {
s = p;
p = p.parent;
offerPool(s);
} while (p != this);
}
this.parent = null;
this.child = null;
this.delegate = null;
this.parent = null;
this.mapFieldFunc = null;
this.objFieldFunc = null;
this.objExtFunc = null;
this.features = 0;
this.enumtostring = false;
this.count = 0;
if (this.content.length > DEFAULT_SIZE) {
this.content = new byte[DEFAULT_SIZE];
}
return true;
}
protected final void offerPool(ProtobufWriter item) {
if (this.pool != null && this.pool.size() < CHILD_SIZE) {
item.recycle();
this.pool.offer(item);
}
}
public abstract ProtobufWriter pollChild();
public ProtobufWriter pollChild() {
Queue<ProtobufWriter> queue = this.pool;
if (queue == null) {
this.pool = new ArrayDeque<>(CHILD_SIZE);
queue = this.pool;
}
ProtobufWriter result = queue.poll();
if (result == null) {
result = new ProtobufWriter(new byte[256], 0);
}
if (delegate == null) {
result.parent = this;
this.child = result;
delegate = result;
} else {
result.parent = delegate;
delegate.child = result;
delegate = result;
}
result.configFieldFunc(result.parent);
return result;
}
public abstract void offerChild(ProtobufWriter child);
public void offerChild(ProtobufWriter child) {
if (child != null) {
int len = child.length();
ProtobufWriter next = child;
while ((next = next.child) != null) {
len += next.length();
}
child.parent.writeSelfLength(len);
}
}
protected abstract int expand(int len);
@Override
public final int length() {
return count;
}
public abstract void writeTo(final byte ch);
@Override
public byte[] content() {
return content;
}
public abstract void writeTo(final byte[] chs, final int start, final int len);
@Override
public final int offset() {
return 0;
}
protected abstract void writeSelfLength(int value);
/**
* 将本对象的内容引用复制给array
*
* @param array ByteArray
*/
public void directTo(ByteArray array) {
array.directFrom(content, count);
}
protected abstract void writeUInt32(int value);
public ByteBuffer[] toBuffers() {
return new ByteBuffer[] {ByteBuffer.wrap(content, 0, count)};
}
public void completed(ConvertBytesHandler handler, Consumer<ProtobufWriter> callback) {
handler.completed(content, 0, count, callback, this);
}
@Override
public byte[] toArray() {
if (delegate == null) {
byte[] copy = new byte[count];
System.arraycopy(content, 0, copy, 0, count);
return copy;
} else {
int total = count;
ProtobufWriter next = this;
while ((next = next.child) != null) {
total += next.length();
}
byte[] data = new byte[total];
System.arraycopy(content, 0, data, 0, count);
next = this;
int pos = count;
while ((next = next.child) != null) {
System.arraycopy(next.content(), 0, data, pos, next.length());
pos += next.length();
}
return data;
}
}
public ProtobufWriter enumtostring(boolean enumtostring) {
this.enumtostring = enumtostring;
return this;
}
protected int expand(int len) {
int newcount = count + len;
if (newcount > content.length) {
byte[] newdata = new byte[Math.max(content.length * 2, newcount)];
System.arraycopy(content, 0, newdata, 0, count);
this.content = newdata;
}
return 0;
}
public void writeTo(final byte ch) {
if (delegate == null) {
expand(1);
content[count++] = ch;
} else {
delegate.writeTo(ch);
}
}
protected abstract void writeUInt64(long value);
public final void writeTo(final byte... chs) {
writeTo(chs, 0, chs.length);
}
public void writeTo(final byte[] chs, final int start, final int len) {
if (delegate == null) {
expand(len);
System.arraycopy(chs, start, content, count, len);
count += len;
} else {
delegate.writeTo(chs, start, len);
}
}
public ProtobufWriter clear() {
this.count = 0;
this.delegate = null;
return this;
public final int length() {
return count;
}
@Override
@@ -1212,71 +1056,6 @@ public class ProtobufWriter extends Writer implements ByteTuple {
}
}
protected final void writeSelfLength(int value) {
ProtobufWriter old = this.delegate;
this.delegate = null;
if (value < 128) {
writeTo((byte) value);
} else {
writeUInt32(value);
}
this.delegate = old;
}
protected void writeUInt32(int value) {
if (value >= 0 && value < TENTHOUSAND_MAX) {
writeTo(TENTHOUSAND_UINT_BYTES[value]);
return;
} else if (value < 0 && value > TENTHOUSAND_MAX) {
writeTo(TENTHOUSAND_UINT_BYTES2[-value]);
return;
}
if (delegate == null) {
expand(5);
int curr = this.count;
byte[] data = this.content;
while (true) {
if ((value & ~0x7F) == 0) {
data[curr++] = (byte) value;
this.count = curr;
return;
} else {
data[curr++] = (byte) ((value & 0x7F) | 0x80);
value >>>= 7;
}
}
} else {
delegate.writeUInt32(value);
}
}
protected void writeUInt64(long value) {
if (value >= 0 && value < TENTHOUSAND_MAX) {
writeTo(TENTHOUSAND_UINT_BYTES[(int) value]);
return;
} else if (value < 0 && value > TENTHOUSAND_MAX) {
writeTo(TENTHOUSAND_UINT_BYTES2[(int) -value]);
return;
}
if (delegate == null) {
expand(10);
int curr = this.count;
byte[] data = this.content;
while (true) {
if ((value & ~0x7FL) == 0) {
data[curr++] = (byte) value;
this.count = curr;
return;
} else {
data[curr++] = (byte) (((int) value & 0x7F) | 0x80);
value >>>= 7;
}
}
} else {
delegate.writeUInt64(value);
}
}
protected final void writeFixed32(int value) {
if (value >= 0 && value < TENTHOUSAND_MAX) {
writeTo(TENTHOUSAND_FIXED32_BYTES[value]);