diff --git a/src/org/redkale/net/http/WebSocket.java b/src/org/redkale/net/http/WebSocket.java index 5363eb6ab..73bf073ce 100644 --- a/src/org/redkale/net/http/WebSocket.java +++ b/src/org/redkale/net/http/WebSocket.java @@ -12,6 +12,7 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; import java.util.function.Supplier; +import java.util.logging.*; import java.util.stream.Stream; import org.redkale.convert.Convert; import org.redkale.util.Comment; @@ -482,6 +483,25 @@ public abstract class WebSocket { public void onClose(int code, String reason) { } + /** + * 发生异常时调用 + * + * @param t 异常 + * @param buffers ByteBuffer[] + */ + public void onOccurException(Throwable t, ByteBuffer[] buffers) { + this.getLogger().log(Level.SEVERE, "WebSocket receive or send Message error", t); + } + + /** + * 获取Logger + * + * @return Logger Logger + */ + public Logger getLogger() { + return this._engine.logger; + } + /** * 获取最后一次发送消息的时间 * diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index e4e31c14d..df3467433 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -17,6 +17,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.logging.*; import org.redkale.convert.Convert; +import org.redkale.util.Utility; /** * WebSocket的消息接收发送器, 一个WebSocket对应一个WebSocketRunner @@ -99,8 +100,17 @@ class WebSocketRunner implements Runnable { } try { - WebSocketPacket packet = new WebSocketPacket().decode(context.getLogger(), readBuffer, exBuffers); - + WebSocketPacket packet; + try { + packet = new WebSocketPacket().decode(context.getLogger(), readBuffer, exBuffers); + } catch (Exception e) { //接收的消息体解析失败 + webSocket.onOccurException(e, Utility.append(new ByteBuffer[]{readBuffer}, exBuffers == null ? new ByteBuffer[0] : exBuffers)); + if (readBuffer != null) { + readBuffer.clear(); + channel.read(readBuffer, null, this); + } + return; + } if (packet == null) { failed(null, attachment1); if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on decode WebSocketPacket, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds"); @@ -121,7 +131,17 @@ class WebSocketRunner implements Runnable { context.getLogger().log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e); } } else { - Object message = textConvert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers); + Object message; + try { + message = textConvert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers); + } catch (Exception e) { //接收的消息体解析失败 + webSocket.onOccurException(e, packet.receiveBuffers); + if (readBuffer != null) { + readBuffer.clear(); + channel.read(readBuffer, null, this); + } + return; + } if (readBuffer != null) { readBuffer.clear(); channel.read(readBuffer, null, this); @@ -150,7 +170,17 @@ class WebSocketRunner implements Runnable { context.getLogger().log(Level.SEVERE, "WebSocket onBinaryMessage error (" + packet + ")", e); } } else { - Object message = binaryConvert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers); + Object message; + try { + message = binaryConvert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers); + } catch (Exception e) { //接收的消息体解析失败 + webSocket.onOccurException(e, packet.receiveBuffers); + if (readBuffer != null) { + readBuffer.clear(); + channel.read(readBuffer, null, this); + } + return; + } if (readBuffer != null) { readBuffer.clear(); channel.read(readBuffer, null, this);