This commit is contained in:
Redkale
2017-05-25 23:51:27 +08:00
parent c326b7ed05
commit 59b08684a8
5 changed files with 38 additions and 10 deletions

View File

@@ -130,6 +130,9 @@ public final class Rest {
if (field.getAnnotation(Resource.class) == null) continue;
if (Modifier.isStatic(webSocketType.getModifiers())) throw new RuntimeException(field + " cannot static on createRestWebSocketServlet");
if (Modifier.isFinal(webSocketType.getModifiers())) throw new RuntimeException(field + " cannot final on createRestWebSocketServlet");
if (!Modifier.isPublic(webSocketType.getModifiers()) && !Modifier.isProtected(webSocketType.getModifiers())) {
throw new RuntimeException(field + " must be public or protected on createRestWebSocketServlet");
}
resourcesFieldSet.add(field);
}
} while ((clzz = clzz.getSuperclass()) != Object.class);

View File

@@ -11,7 +11,7 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
/**
* 只能依附在WebSocket类上name默认为Service的类名小写并去掉Service字样及后面的字符串 (如HelloWebSocket/HelloWebSocketImpl的默认路径为 hello)。 <br>
* <b>注意: </b> 被标记&#64;RestWebSocket的WebSocket不能被修饰为abstract或final且必须要有一个protected或public的空参数构造函数。 <br>
* <b>注意: </b> 被标记&#64;RestWebSocket的WebSocket不能被修饰为abstract或final其内部标记为&#64;Resource的字段只能是protected或public必须要有一个protected或public的空参数构造函数。 <br>
* <p>
* 详情见: https://redkale.org
*

View File

@@ -117,7 +117,7 @@ public final class WebSocketGroup {
future = future.thenCombine(s.sendPacket(packet), (a, b) -> a | (Integer) b);
}
}
if (more && future != null) future = future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers));
if (more && future != null) future.whenComplete((rs, ex) -> context.offerBuffer(packet.sendBuffers));
return future == null ? CompletableFuture.completedFuture(0) : future;
}

View File

@@ -107,10 +107,15 @@ public final class WebSocketPacket {
}
void setSendBuffers(ByteBuffer[] sendBuffers) {
this.sendBuffers = new ByteBuffer[sendBuffers.length];
for (int i = 0; i < sendBuffers.length; i++) {
this.sendBuffers[i] = sendBuffers[i].duplicate();
this.sendBuffers = sendBuffers;
}
ByteBuffer[] duplicateSendBuffers() {
ByteBuffer[] rs = new ByteBuffer[this.sendBuffers.length];
for (int i = 0; i < this.sendBuffers.length; i++) {
rs[i] = this.sendBuffers[i].duplicate();
}
return rs;
}
public WebSocketPacket(byte[] data) {
@@ -356,4 +361,24 @@ public final class WebSocketPacket {
return this;
}
byte[] getReceiveBytes() {
int count = 0;
for (ByteBuffer buf : this.receiveBuffers) {
count += buf.remaining();
}
byte[] bs = new byte[count];
int index = 0;
for (ByteBuffer buf : this.receiveBuffers) {
int r = buf.remaining();
buf.get(bs, index, r);
index += r;
}
ConvertMask mask = this.receiveMasker;
if (mask != null) {
for (int i = 0; i < bs.length; i++) {
bs[i] = mask.unmask(bs[i]);
}
}
return bs;
}
}

View File

@@ -136,7 +136,7 @@ class WebSocketRunner implements Runnable {
context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e);
}
} else if (packet.type == FrameType.BINARY) {
byte[] message = convert.convertFrom(byte[].class, packet.receiveMasker, packet.receiveBuffers);
byte[] message =packet.getReceiveBytes();
if (readBuffer != null) {
readBuffer.clear();
channel.read(readBuffer, null, this);
@@ -147,7 +147,7 @@ class WebSocketRunner implements Runnable {
context.getLogger().log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e);
}
} else if (packet.type == FrameType.PONG) {
byte[] message = convert.convertFrom(byte[].class, packet.receiveMasker, packet.receiveBuffers);
byte[] message = packet.getReceiveBytes();
if (readBuffer != null) {
readBuffer.clear();
channel.read(readBuffer, null, this);
@@ -158,7 +158,7 @@ class WebSocketRunner implements Runnable {
context.getLogger().log(Level.SEVERE, "WebSocket onPong error (" + packet + ")", e);
}
} else if (packet.type == FrameType.PING) {
byte[] message = convert.convertFrom(byte[].class, packet.receiveMasker, packet.receiveBuffers);
byte[] message = packet.getReceiveBytes();
if (readBuffer != null) {
readBuffer.clear();
channel.read(readBuffer, null, this);
@@ -217,7 +217,7 @@ class WebSocketRunner implements Runnable {
queue.add(new QueueEntry(futureResult, packet));
return futureResult;
}
ByteBuffer[] buffers = packet.sendBuffers != null ? packet.sendBuffers : packet.encode(this.context.getBufferSupplier());
ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(this.context.getBufferSupplier());
this.writeBuffers = buffers;
try {
channel.write(buffers, buffers, new CompletionHandler<Integer, ByteBuffer[]>() {
@@ -264,7 +264,7 @@ class WebSocketRunner implements Runnable {
QueueEntry entry = queue.poll();
if (entry != null) {
future = entry.future;
ByteBuffer[] buffers = packet.sendBuffers != null ? packet.sendBuffers : packet.encode(context.getBufferSupplier());
ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(context.getBufferSupplier());
writeBuffers = buffers;
channel.write(buffers, buffers, this);
}