This commit is contained in:
Redkale
2017-05-25 16:49:01 +08:00
parent 3504d735c1
commit 31fca5630b
4 changed files with 85 additions and 3 deletions

View File

@@ -9,6 +9,7 @@ import java.io.File;
import java.lang.reflect.*;
import java.math.BigInteger;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@@ -98,6 +99,7 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
this.register(URL.class, URLSimpledCoder.instance);
this.register(URI.class, URISimpledCoder.instance);
//---------------------------------------------------------
this.register(ByteBuffer.class, ByteBufferSimpledCoder.instance);
this.register(boolean[].class, BoolArraySimpledCoder.instance);
this.register(byte[].class, ByteArraySimpledCoder.instance);
this.register(short[].class, ShortArraySimpledCoder.instance);

View File

@@ -0,0 +1,70 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.convert.ext;
import java.nio.ByteBuffer;
import org.redkale.convert.Reader;
import org.redkale.convert.SimpledCoder;
import org.redkale.convert.Writer;
/**
* ByteBuffer 的SimpledCoder实现
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
* @param <R> Reader输入的子类型
* @param <W> Writer输出的子类型
*/
public final class ByteBufferSimpledCoder<R extends Reader, W extends Writer> extends SimpledCoder<R, W, ByteBuffer> {
public static final ByteBufferSimpledCoder instance = new ByteBufferSimpledCoder();
@Override
public void convertTo(W out, ByteBuffer value) {
if (value == null) {
out.writeNull();
return;
}
out.writeArrayB(value.remaining());
boolean flag = false;
for (byte v : value.array()) {
if (flag) out.writeArrayMark();
out.writeByte(v);
flag = true;
}
out.writeArrayE();
}
@Override
public ByteBuffer convertFrom(R in) {
int len = in.readArrayB();
if (len == Reader.SIGN_NULL) return null;
if (len == Reader.SIGN_NOLENGTH) {
int size = 0;
byte[] data = new byte[8];
while (in.hasNext()) {
if (size >= data.length) {
byte[] newdata = new byte[data.length + 4];
System.arraycopy(data, 0, newdata, 0, size);
data = newdata;
}
data[size++] = in.readByte();
}
in.readArrayE();
return ByteBuffer.wrap(data, 0, size);
} else {
byte[] values = new byte[len];
for (int i = 0; i < values.length; i++) {
values[i] = in.readByte();
}
in.readArrayE();
return ByteBuffer.wrap(values);
}
}
}

View File

@@ -62,6 +62,8 @@ public final class WebSocketPacket {
JsonConvert sendConvert;
ByteBuffer[] sendBuffers;
ConvertMask receiveMasker;
ByteBuffer[] receiveBuffers;
@@ -98,6 +100,15 @@ public final class WebSocketPacket {
this.last = fin;
}
WebSocketPacket(ByteBuffer[] sendBuffers, FrameType type, boolean fin) {
this.type = type;
this.sendBuffers = new ByteBuffer[sendBuffers.length];
for (int i = 0; i < sendBuffers.length; i++) {
this.sendBuffers[i] = sendBuffers[i].duplicate();
}
this.last = fin;
}
public WebSocketPacket(byte[] data) {
this(FrameType.BINARY, data, true);
}
@@ -247,7 +258,6 @@ public final class WebSocketPacket {
// String rs = JsonConvert.root().convertFrom(String.class, masker, buffer);
// System.out.println(rs);
// }
/**
* 消息解码 <br>
*

View File

@@ -217,7 +217,7 @@ class WebSocketRunner implements Runnable {
queue.add(new QueueEntry(futureResult, packet));
return futureResult;
}
ByteBuffer[] buffers = packet.encode(this.context.getBufferSupplier());
ByteBuffer[] buffers = packet.sendBuffers != null ? packet.sendBuffers : packet.encode(this.context.getBufferSupplier());
this.writeBuffers = buffers;
try {
channel.write(buffers, buffers, new CompletionHandler<Integer, ByteBuffer[]>() {
@@ -264,7 +264,7 @@ class WebSocketRunner implements Runnable {
QueueEntry entry = queue.poll();
if (entry != null) {
future = entry.future;
ByteBuffer[] buffers = packet.encode(context.getBufferSupplier());
ByteBuffer[] buffers = packet.sendBuffers != null ? packet.sendBuffers : packet.encode(context.getBufferSupplier());
writeBuffers = buffers;
channel.write(buffers, buffers, this);
}