From 730fc0a9113254b9e788c9080d31bf951eb1ac1d Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Tue, 23 Jun 2020 10:28:25 +0800 Subject: [PATCH] --- src/org/redkale/net/AsyncConnection.java | 4 +- .../redkale/net/TcpAioAsyncConnection.java | 50 +++++++++++++++++-- .../redkale/net/UdpBioAsyncConnection.java | 8 +-- src/org/redkale/net/http/HttpRequest.java | 3 +- 4 files changed, 54 insertions(+), 11 deletions(-) diff --git a/src/org/redkale/net/AsyncConnection.java b/src/org/redkale/net/AsyncConnection.java index 9d447ee50..5886a1290 100644 --- a/src/org/redkale/net/AsyncConnection.java +++ b/src/org/redkale/net/AsyncConnection.java @@ -112,11 +112,11 @@ public abstract class AsyncConnection implements AutoCloseable { public abstract void setWriteTimeoutSeconds(int writeTimeoutSeconds); - public abstract InputStream newInputStream(); + public abstract ReadableByteChannel readableByteChannel(); public abstract void read(CompletionHandler handler); - public abstract OutputStream newOutputStream(); + public abstract WritableByteChannel rritableByteChannel(); public abstract void write(ByteBuffer src, A attachment, CompletionHandler handler); diff --git a/src/org/redkale/net/TcpAioAsyncConnection.java b/src/org/redkale/net/TcpAioAsyncConnection.java index 6cf5bca81..c3305c62b 100644 --- a/src/org/redkale/net/TcpAioAsyncConnection.java +++ b/src/org/redkale/net/TcpAioAsyncConnection.java @@ -230,13 +230,55 @@ public class TcpAioAsyncConnection extends AsyncConnection { } @Override - public final InputStream newInputStream() { - return Channels.newInputStream(this.channel); + public final ReadableByteChannel readableByteChannel() { + return new ReadableByteChannel() { + @Override + public int read(ByteBuffer dst) throws IOException { + try { + return channel.read(dst).get(readTimeoutSeconds > 0 ? readTimeoutSeconds : 6, TimeUnit.SECONDS); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + @SuppressWarnings("InfiniteRecursion") + public boolean isOpen() { + return isOpen(); + } + + @Override + @SuppressWarnings("InfiniteRecursion") + public void close() throws IOException { + close(); + } + }; } @Override - public final OutputStream newOutputStream() { - return Channels.newOutputStream(this.channel); + public final WritableByteChannel rritableByteChannel() { + return new WritableByteChannel() { + @Override + public int write(ByteBuffer src) throws IOException { + try { + return channel.write(src).get(readTimeoutSeconds > 0 ? readTimeoutSeconds : 6, TimeUnit.SECONDS); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + @SuppressWarnings("InfiniteRecursion") + public boolean isOpen() { + return isOpen(); + } + + @Override + @SuppressWarnings("InfiniteRecursion") + public void close() throws IOException { + close(); + } + }; } @Override diff --git a/src/org/redkale/net/UdpBioAsyncConnection.java b/src/org/redkale/net/UdpBioAsyncConnection.java index 3456f0024..40b6d8fa0 100644 --- a/src/org/redkale/net/UdpBioAsyncConnection.java +++ b/src/org/redkale/net/UdpBioAsyncConnection.java @@ -142,13 +142,13 @@ public class UdpBioAsyncConnection extends AsyncConnection { } @Override - public final InputStream newInputStream() { - return Channels.newInputStream(this.channel); + public final ReadableByteChannel readableByteChannel() { + return this.channel; } @Override - public final OutputStream newOutputStream() { - return Channels.newOutputStream(this.channel); + public final WritableByteChannel rritableByteChannel() { + return this.channel; } @Override diff --git a/src/org/redkale/net/http/HttpRequest.java b/src/org/redkale/net/http/HttpRequest.java index c224ecdcf..fa5a74bb8 100644 --- a/src/org/redkale/net/http/HttpRequest.java +++ b/src/org/redkale/net/http/HttpRequest.java @@ -10,6 +10,7 @@ import java.lang.annotation.Annotation; import java.lang.reflect.Array; import java.net.*; import java.nio.ByteBuffer; +import java.nio.channels.Channels; import java.nio.charset.*; import java.util.*; import java.util.logging.Level; @@ -581,7 +582,7 @@ public class HttpRequest extends Request { @ConvertDisabled public final MultiContext getMultiContext() { return new MultiContext(context.getCharset(), this.getContentType(), this.params, - new BufferedInputStream(this.channel.newInputStream(), Math.max(array.size(), 8192)) { + new BufferedInputStream(Channels.newInputStream(this.channel.readableByteChannel()), Math.max(array.size(), 8192)) { { array.copyTo(this.buf); this.count = array.size();