AsyncConnection

This commit is contained in:
redkale
2024-11-10 01:07:00 +08:00
parent 08e79dfde2
commit 6ade0fdb77

View File

@@ -583,10 +583,8 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
dataNode.pipelineCount = pipelineCount; dataNode.pipelineCount = pipelineCount;
} }
dataNode.put(pipelineIndex, bytes, offset, length); dataNode.put(pipelineIndex, bytes, offset, length);
if (writer.getWriteBytesCounter() + dataNode.itemsize == dataNode.pipelineCount) { if (writer.getWriteBytesCounter() + dataNode.size == dataNode.pipelineCount) { // pipeline全部完成
for (PipelineDataItem item : dataNode.arrayItems()) { dataNode.write(writer);
writer.put(item.data);
}
this.pipelineDataNode = null; this.pipelineDataNode = null;
return true; return true;
} }
@@ -601,40 +599,56 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
public int pipelineCount; public int pipelineCount;
public int itemsize; public int size;
private PipelineDataItem head; private PipelineDataItem head;
private PipelineDataItem tail; private PipelineDataItem tail;
public PipelineDataItem[] arrayItems() { public void write(ByteBufferWriter writer) {
PipelineDataItem[] items = new PipelineDataItem[itemsize];
PipelineDataItem item = head; PipelineDataItem item = head;
int i = 0;
while (item != null) { while (item != null) {
items[i] = item; writer.put(item.data);
item = item.next; item = item.next;
items[i].next = null;
i++;
} }
Arrays.sort(items);
return items;
} }
public void put(int pipelineIndex, byte[] bs, int offset, int length) { public void put(int pipelineIndex, byte[] bs, int offset, int length) {
size++;
if (tail == null) { if (tail == null) {
head = new PipelineDataItem(pipelineIndex, bs, offset, length); head = new PipelineDataItem(pipelineIndex, bs, offset, length);
tail = head; tail = head;
} else { } else {
PipelineDataItem item = new PipelineDataItem(pipelineIndex, bs, offset, length); PipelineDataItem item = new PipelineDataItem(pipelineIndex, bs, offset, length);
tail.next = item; if (item.index > tail.index) { // 追加到最后
tail = item; tail.next = item;
tail = item;
} else if (head.index > item.index) { // 插入前面
item.next = head;
head = item;
} else { // 中间插队
PipelineDataItem l = head;
PipelineDataItem d = head.next;
while (d != null) {
if (d.index > item.index) {
l.next = item;
item.next = d;
break;
}
l = d;
d = d.next;
}
}
} }
itemsize++; }
@Override
public String toString() {
return "{\"size\":" + size + ", \"item\":" + head + "}";
} }
} }
private static class PipelineDataItem implements Comparable<PipelineDataItem> { private static class PipelineDataItem {
final byte[] data; final byte[] data;
@@ -647,14 +661,9 @@ public abstract class AsyncConnection implements Channel, AutoCloseable {
this.data = Arrays.copyOfRange(bs, offset, offset + length); this.data = Arrays.copyOfRange(bs, offset, offset + length);
} }
@Override
public int compareTo(PipelineDataItem o) {
return this.index - o.index;
}
@Override @Override
public String toString() { public String toString() {
return "{\"index\":" + index + "}"; return "{\"index\":" + index + (next == null ? "" : (", \"next\":" + next)) + "}";
} }
} }