diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index c703b5089..ee6819a57 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -583,10 +583,8 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { dataNode.pipelineCount = pipelineCount; } dataNode.put(pipelineIndex, bytes, offset, length); - if (writer.getWriteBytesCounter() + dataNode.itemsize == dataNode.pipelineCount) { - for (PipelineDataItem item : dataNode.arrayItems()) { - writer.put(item.data); - } + if (writer.getWriteBytesCounter() + dataNode.size == dataNode.pipelineCount) { // pipeline全部完成 + dataNode.write(writer); this.pipelineDataNode = null; return true; } @@ -601,40 +599,56 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { public int pipelineCount; - public int itemsize; + public int size; private PipelineDataItem head; private PipelineDataItem tail; - public PipelineDataItem[] arrayItems() { - PipelineDataItem[] items = new PipelineDataItem[itemsize]; + public void write(ByteBufferWriter writer) { PipelineDataItem item = head; - int i = 0; while (item != null) { - items[i] = item; + writer.put(item.data); item = item.next; - items[i].next = null; - i++; } - Arrays.sort(items); - return items; } public void put(int pipelineIndex, byte[] bs, int offset, int length) { + size++; if (tail == null) { head = new PipelineDataItem(pipelineIndex, bs, offset, length); tail = head; } else { PipelineDataItem item = new PipelineDataItem(pipelineIndex, bs, offset, length); - tail.next = item; - tail = item; + if (item.index > tail.index) { // 追加到最后 + tail.next = item; + tail = item; + } else if (head.index > item.index) { // 插入前面 + item.next = head; + head = item; + } else { // 中间插队 + PipelineDataItem l = head; + PipelineDataItem d = head.next; + while (d != null) { + if (d.index > item.index) { + l.next = item; + item.next = d; + break; + } + l = d; + d = d.next; + } + } } - itemsize++; + } + + @Override + public String toString() { + return "{\"size\":" + size + ", \"item\":" + head + "}"; } } - private static class PipelineDataItem implements Comparable { + private static class PipelineDataItem { final byte[] data; @@ -647,14 +661,9 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { this.data = Arrays.copyOfRange(bs, offset, offset + length); } - @Override - public int compareTo(PipelineDataItem o) { - return this.index - o.index; - } - @Override public String toString() { - return "{\"index\":" + index + "}"; + return "{\"index\":" + index + (next == null ? "" : (", \"next\":" + next)) + "}"; } }