diff --git a/src/main/java/org/redkale/net/AsyncConnection.java b/src/main/java/org/redkale/net/AsyncConnection.java index ee6819a57..6ace04dde 100644 --- a/src/main/java/org/redkale/net/AsyncConnection.java +++ b/src/main/java/org/redkale/net/AsyncConnection.java @@ -355,11 +355,15 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } else { try { int remain = src.remaining(); - sslWriteImpl(false, src, t -> { - if (t == null) { + sslWriteImpl(false, src, new CompletionHandler() { + @Override + public void completed(Integer result, Void attach) { handler.completed(remain - src.remaining(), attachment); - } else { - handler.failed(t, attachment); + } + + @Override + public void failed(Throwable exc, Void attach) { + handler.failed(exc, attachment); } }); } catch (SSLException e) { @@ -375,11 +379,15 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } else { try { int remain = ByteBufferReader.remaining(srcs, offset, length); - sslWriteImpl(false, srcs, offset, length, t -> { - if (t == null) { + sslWriteImpl(false, srcs, offset, length, new CompletionHandler() { + @Override + public void completed(Integer result, Void attach) { handler.completed(remain - ByteBufferReader.remaining(srcs, offset, length), attachment); - } else { - handler.failed(t, attachment); + } + + @Override + public void failed(Throwable exc, Void attach) { + handler.failed(exc, attachment); } }); } catch (SSLException e) { @@ -839,22 +847,22 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } } - protected void startHandshake(final Consumer callback) { + protected void startHandshake(CompletionHandler handler) { if (sslEngine == null) { - callback.accept(null); + handler.completed(0, null); return; } SSLEngine engine = this.sslEngine; try { engine.beginHandshake(); - doHandshake(callback); + doHandshake(handler); } catch (Throwable t) { - callback.accept(t); + handler.failed(t, null); } } // 解密ssl网络数据, 返回null表示CLOSED - protected ByteBuffer sslUnwrap(final boolean handshake, ByteBuffer netBuffer) throws SSLException { + protected ByteBuffer sslUnwrap(boolean handshake, ByteBuffer netBuffer) throws SSLException { ByteBuffer appBuffer = pollReadBuffer(); SSLEngine engine = this.sslEngine; HandshakeStatus hss; @@ -885,64 +893,21 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { return appBuffer; } - protected void sslReadImpl(final boolean handshake, final CompletionHandler handler) { + protected void sslReadImpl(boolean handshake, CompletionHandler handler) { readImpl(createSslCompletionHandler(handshake, handler)); } - protected void sslReadRegisterImpl(final boolean handshake, final CompletionHandler handler) { + protected void sslReadRegisterImpl(boolean handshake, CompletionHandler handler) { readRegisterImpl(createSslCompletionHandler(handshake, handler)); } private CompletionHandler createSslCompletionHandler( - final boolean handshake, final CompletionHandler handler) { - return new CompletionHandler() { - - @Override - public void completed(Integer count, ByteBuffer attachment) { - // System.out.println(AsyncConnection.this + " 进来了读到的字节数: " + count); - if (count < 0) { - handler.completed(count, attachment); - return; - } - ByteBuffer netBuffer = attachment; - netBuffer.flip(); - try { - ByteBuffer appBuffer = sslUnwrap(handshake, netBuffer); - if (appBuffer == null) { - return; // CLOSED,netBuffer已被回收 - } - if (AsyncConnection.this.readSSLHalfBuffer != netBuffer) { - offerReadBuffer(netBuffer); - } - if (AsyncConnection.this.readBuffer != null) { - ByteBuffer rsBuffer = AsyncConnection.this.readBuffer; - AsyncConnection.this.readBuffer = null; - appBuffer.flip(); - if (rsBuffer.remaining() >= appBuffer.remaining()) { - rsBuffer.put(appBuffer); - offerReadBuffer(appBuffer); - appBuffer = rsBuffer; - } else { - while (rsBuffer.hasRemaining()) rsBuffer.put(appBuffer.get()); - AsyncConnection.this.readBuffer = appBuffer.compact(); - appBuffer = rsBuffer; - } - } - handler.completed(count, appBuffer); - } catch (SSLException e) { - failed(e, attachment); - } - } - - @Override - public void failed(Throwable t, ByteBuffer attachment) { - handler.failed(t, attachment); - } - }; + boolean handshake, CompletionHandler handler) { + return new SslReadCompletionHandler(handshake, handler); } // 加密ssl内容数据 - protected ByteBuffer[] sslWrap(final boolean handshake, ByteBuffer appBuffer) throws SSLException { + protected ByteBuffer[] sslWrap(boolean handshake, ByteBuffer appBuffer) throws SSLException { final SSLEngine engine = this.sslEngine; final int netSize = engine.getSession().getPacketBufferSize(); ByteBuffer netBuffer = pollWriteBuffer(); @@ -980,7 +945,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } // 加密ssl内容数据 - protected ByteBuffer[] sslWrap(final boolean handshake, ByteBuffer[] appBuffers, int offset, int length) + protected ByteBuffer[] sslWrap(boolean handshake, ByteBuffer[] appBuffers, int offset, int length) throws SSLException { final SSLEngine engine = this.sslEngine; final int netSize = engine.getSession().getPacketBufferSize(); @@ -1016,38 +981,14 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { return netBuffers; } - protected boolean sslWriteImpl(final boolean handshake, ByteBuffer appBuffer, final Consumer callback) + protected boolean sslWriteImpl(boolean handshake, ByteBuffer appBuffer, CompletionHandler handler) throws SSLException { ByteBuffer[] netBuffers = sslWrap(handshake, appBuffer); if (netBuffers.length > 0) { if (netBuffers.length == 1) { - writeImpl(netBuffers[0], null, new CompletionHandler() { - @Override - public void completed(Integer count, Void attachment) { - offerWriteBuffer(netBuffers[0]); - callback.accept(null); - } - - @Override - public void failed(Throwable t, Void attachment) { - offerWriteBuffer(netBuffers[0]); - callback.accept(t); - } - }); + writeImpl(netBuffers[0], writeBufferConsumer, null, handler); } else { - writeImpl(netBuffers, 0, netBuffers.length, null, new CompletionHandler() { - @Override - public void completed(Integer count, Void attachment) { - offerWriteBuffers(netBuffers); - callback.accept(null); - } - - @Override - public void failed(Throwable t, Void attachment) { - offerWriteBuffers(netBuffers); - callback.accept(t); - } - }); + writeImpl(netBuffers, 0, netBuffers.length, writeBufferConsumer, null, handler); } return true; } else { @@ -1057,42 +998,18 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } protected boolean sslWriteImpl( - final boolean handshake, + boolean handshake, ByteBuffer[] appBuffers, int offset, int length, - final Consumer callback) + CompletionHandler handler) throws SSLException { ByteBuffer[] netBuffers = sslWrap(handshake, appBuffers, offset, length); if (netBuffers.length > 0) { if (netBuffers.length == 1) { - writeImpl(netBuffers[0], null, new CompletionHandler() { - @Override - public void completed(Integer count, Void attachment) { - offerWriteBuffer(netBuffers[0]); - callback.accept(null); - } - - @Override - public void failed(Throwable t, Void attachment) { - offerWriteBuffer(netBuffers[0]); - callback.accept(t); - } - }); + writeImpl(netBuffers[0], writeBufferConsumer, null, handler); } else { - writeImpl(netBuffers, 0, netBuffers.length, null, new CompletionHandler() { - @Override - public void completed(Integer count, Void attachment) { - offerWriteBuffers(netBuffers); - callback.accept(null); - } - - @Override - public void failed(Throwable t, Void attachment) { - offerWriteBuffers(netBuffers); - callback.accept(t); - } - }); + writeImpl(netBuffers, 0, netBuffers.length, writeBufferConsumer, null, handler); } return true; } else { @@ -1101,7 +1018,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } } - private void doHandshake(final Consumer callback) { + private void doHandshake(CompletionHandler handler) { HandshakeStatus handshakeStatus; final SSLEngine engine = this.sslEngine; while ((handshakeStatus = engine.getHandshakeStatus()) != null) { @@ -1110,7 +1027,7 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { case FINISHED: case NOT_HANDSHAKING: // System.out.println(AsyncConnection.this + " doHandshakde完毕,开始进入读写操作-----"); - callback.accept(null); + handler.completed(0, null); return; case NEED_TASK: { Runnable task; @@ -1121,18 +1038,22 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { } case NEED_WRAP: { try { // - boolean rs = sslWriteImpl(true, EMPTY_BUFFER, t -> { - if (t == null) { - doHandshake(callback); - } else { - callback.accept(t); + boolean rs = sslWriteImpl(true, EMPTY_BUFFER, new CompletionHandler() { + @Override + public void completed(Integer result, Void attachment) { + doHandshake(handler); + } + + @Override + public void failed(Throwable exc, Void attachment) { + handler.failed(exc, attachment); } }); if (rs) { return; } } catch (SSLException e) { - callback.accept(e); + handler.failed(e, null); return; } break; @@ -1142,16 +1063,19 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { @Override public void completed(Integer count, ByteBuffer attachment) { if (count < 1) { - callback.accept(new IOException("read data error")); + handler.failed(new IOException("read data error"), null); } else { offerReadBuffer(attachment); - doHandshake(callback); + doHandshake(handler); } } @Override public void failed(Throwable t, ByteBuffer attachment) { - callback.accept(t); + if (attachment != null) { + offerReadBuffer(attachment); + } + handler.failed(t, null); } }); return; @@ -1176,4 +1100,59 @@ public abstract class AsyncConnection implements Channel, AutoCloseable { for (int i = 0; i < cha; i++) s += ' '; return s; } + + protected class SslReadCompletionHandler implements CompletionHandler { + + private boolean handshake; + + private CompletionHandler handler; + + public SslReadCompletionHandler(boolean handshake, CompletionHandler handler) { + this.handshake = handshake; + this.handler = handler; + } + + @Override + public void completed(Integer count, ByteBuffer attachment) { + // System.out.println(AsyncConnection.this + " 进来了读到的字节数: " + count); + if (count < 0) { + handler.completed(count, attachment); + return; + } + ByteBuffer netBuffer = attachment; + netBuffer.flip(); + try { + ByteBuffer appBuffer = sslUnwrap(handshake, netBuffer); + if (appBuffer == null) { + failed(new SSLException("appBuffer is null"), attachment); + return; // CLOSED,netBuffer已被回收 + } + if (readSSLHalfBuffer != netBuffer) { // unwap完整 + offerReadBuffer(netBuffer); + } + if (readBuffer != null) { + ByteBuffer rsBuffer = readBuffer; + readBuffer = null; + appBuffer.flip(); + if (rsBuffer.remaining() >= appBuffer.remaining()) { + rsBuffer.put(appBuffer); + offerReadBuffer(appBuffer); + appBuffer = rsBuffer; + } else { + while (rsBuffer.hasRemaining()) rsBuffer.put(appBuffer.get()); + readBuffer = appBuffer.compact(); + appBuffer = rsBuffer; + } + } + handler.completed(count, appBuffer); + } catch (SSLException e) { + failed(e, attachment); + } + } + + @Override + public void failed(Throwable t, ByteBuffer attachment) { + handler.failed(t, attachment); + } + } } diff --git a/src/main/java/org/redkale/net/AsyncIOGroup.java b/src/main/java/org/redkale/net/AsyncIOGroup.java index 9101e6bce..669d7307e 100644 --- a/src/main/java/org/redkale/net/AsyncIOGroup.java +++ b/src/main/java/org/redkale/net/AsyncIOGroup.java @@ -267,10 +267,14 @@ public class AsyncIOGroup extends AsyncGroup { if (conn.sslEngine == null) { future.complete(conn); } else { - conn.startHandshake(t -> { - if (t == null) { + conn.startHandshake(new CompletionHandler() { + @Override + public void completed(Integer result, Void attachment) { future.complete(conn); - } else { + } + + @Override + public void failed(Throwable t, Void attachment) { future.completeExceptionally(t); } }); @@ -344,10 +348,14 @@ public class AsyncIOGroup extends AsyncGroup { if (conn.sslEngine == null) { future.complete(conn); } else { - conn.startHandshake(t -> { - if (t == null) { + conn.startHandshake(new CompletionHandler() { + @Override + public void completed(Integer result, Void attachment) { future.complete(conn); - } else { + } + + @Override + public void failed(Throwable t, Void attachment) { future.completeExceptionally(t); } }); diff --git a/src/main/java/org/redkale/net/AsyncNioConnection.java b/src/main/java/org/redkale/net/AsyncNioConnection.java index d40e02167..da15c3415 100644 --- a/src/main/java/org/redkale/net/AsyncNioConnection.java +++ b/src/main/java/org/redkale/net/AsyncNioConnection.java @@ -85,8 +85,8 @@ abstract class AsyncNioConnection extends AsyncConnection { } @Override - protected void startHandshake(final Consumer callback) { - ioReadThread.register(t -> super.startHandshake(callback)); + protected void startHandshake(CompletionHandler handler) { + ioReadThread.register(t -> super.startHandshake(handler)); } @Override @@ -102,7 +102,7 @@ abstract class AsyncNioConnection extends AsyncConnection { return; } if (handler != readCompletionHandler) { // 如果是Codec无需重复赋值 - if (this.readPending) { + if (this.readPending && handler.getClass() != SslReadCompletionHandler.class) { handler.failed(new ReadPendingException(), null); return; } diff --git a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java index 2614a5378..70bf3857c 100644 --- a/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioTcpProtocolServer.java @@ -197,13 +197,19 @@ class AsyncNioTcpProtocolServer extends ProtocolServer { if (conn.sslEngine == null) { codec.start(null); } else { - conn.startHandshake(t -> { - if (t == null) { + conn.startHandshake(new CompletionHandler() { + @Override + public void completed(Integer result, Void attachment) { codec.start(null); - } else if (t instanceof RuntimeException) { - throw (RuntimeException) t; - } else { - throw new RedkaleException(t); + } + + @Override + public void failed(Throwable t, Void attachment) { + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else { + throw new RedkaleException(t); + } } }); } diff --git a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java index 49c4734ce..0156f4572 100644 --- a/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java +++ b/src/main/java/org/redkale/net/AsyncNioUdpProtocolServer.java @@ -209,13 +209,19 @@ class AsyncNioUdpProtocolServer extends ProtocolServer { if (conn.sslEngine == null) { codec.start(buffer); } else { - conn.startHandshake(t -> { - if (t == null) { + conn.startHandshake(new CompletionHandler() { + @Override + public void completed(Integer result, Void attachment) { codec.start(buffer); - } else if (t instanceof RuntimeException) { - throw (RuntimeException) t; - } else { - throw new RedkaleException(t); + } + + @Override + public void failed(Throwable t, Void attachment) { + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else { + throw new RedkaleException(t); + } } }); } diff --git a/src/main/java/org/redkale/net/ProtocolCodec.java b/src/main/java/org/redkale/net/ProtocolCodec.java index d43f9a022..8ad19cb6e 100644 --- a/src/main/java/org/redkale/net/ProtocolCodec.java +++ b/src/main/java/org/redkale/net/ProtocolCodec.java @@ -179,7 +179,7 @@ class ProtocolCodec implements CompletionHandler { final Response pipelineResponse = createResponse(); try { decode(buffer, pipelineResponse, pindex + 1, plength); - } catch (Throwable t) { // 此处不可 offerBuffer(buffer); 以免dispatcher.dispatch内部异常导致重复 offerBuffer + } catch (Throwable t) { // 此处不可 offerBuffer(buffer); 以免dispatcher.dispatch内部异常导致重复offerBuffer context.logger.log(Level.WARNING, "dispatch pipeline servlet abort, force to close channel ", t); pipelineResponse.codecError(t); }