This commit is contained in:
Redkale
2020-06-23 10:28:25 +08:00
parent 1ea92d7165
commit 730fc0a911
4 changed files with 54 additions and 11 deletions

View File

@@ -112,11 +112,11 @@ public abstract class AsyncConnection implements AutoCloseable {
public abstract void setWriteTimeoutSeconds(int writeTimeoutSeconds); public abstract void setWriteTimeoutSeconds(int writeTimeoutSeconds);
public abstract InputStream newInputStream(); public abstract ReadableByteChannel readableByteChannel();
public abstract void read(CompletionHandler<Integer, ByteBuffer> handler); public abstract void read(CompletionHandler<Integer, ByteBuffer> handler);
public abstract OutputStream newOutputStream(); public abstract WritableByteChannel rritableByteChannel();
public abstract <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler); public abstract <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler);

View File

@@ -230,13 +230,55 @@ public class TcpAioAsyncConnection extends AsyncConnection {
} }
@Override @Override
public final InputStream newInputStream() { public final ReadableByteChannel readableByteChannel() {
return Channels.newInputStream(this.channel); return new ReadableByteChannel() {
@Override
public int read(ByteBuffer dst) throws IOException {
try {
return channel.read(dst).get(readTimeoutSeconds > 0 ? readTimeoutSeconds : 6, TimeUnit.SECONDS);
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
@SuppressWarnings("InfiniteRecursion")
public boolean isOpen() {
return isOpen();
}
@Override
@SuppressWarnings("InfiniteRecursion")
public void close() throws IOException {
close();
}
};
} }
@Override @Override
public final OutputStream newOutputStream() { public final WritableByteChannel rritableByteChannel() {
return Channels.newOutputStream(this.channel); return new WritableByteChannel() {
@Override
public int write(ByteBuffer src) throws IOException {
try {
return channel.write(src).get(readTimeoutSeconds > 0 ? readTimeoutSeconds : 6, TimeUnit.SECONDS);
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
@SuppressWarnings("InfiniteRecursion")
public boolean isOpen() {
return isOpen();
}
@Override
@SuppressWarnings("InfiniteRecursion")
public void close() throws IOException {
close();
}
};
} }
@Override @Override

View File

@@ -142,13 +142,13 @@ public class UdpBioAsyncConnection extends AsyncConnection {
} }
@Override @Override
public final InputStream newInputStream() { public final ReadableByteChannel readableByteChannel() {
return Channels.newInputStream(this.channel); return this.channel;
} }
@Override @Override
public final OutputStream newOutputStream() { public final WritableByteChannel rritableByteChannel() {
return Channels.newOutputStream(this.channel); return this.channel;
} }
@Override @Override

View File

@@ -10,6 +10,7 @@ import java.lang.annotation.Annotation;
import java.lang.reflect.Array; import java.lang.reflect.Array;
import java.net.*; import java.net.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.charset.*; import java.nio.charset.*;
import java.util.*; import java.util.*;
import java.util.logging.Level; import java.util.logging.Level;
@@ -581,7 +582,7 @@ public class HttpRequest extends Request<HttpContext> {
@ConvertDisabled @ConvertDisabled
public final MultiContext getMultiContext() { public final MultiContext getMultiContext() {
return new MultiContext(context.getCharset(), this.getContentType(), this.params, return new MultiContext(context.getCharset(), this.getContentType(), this.params,
new BufferedInputStream(this.channel.newInputStream(), Math.max(array.size(), 8192)) { new BufferedInputStream(Channels.newInputStream(this.channel.readableByteChannel()), Math.max(array.size(), 8192)) {
{ {
array.copyTo(this.buf); array.copyTo(this.buf);
this.count = array.size(); this.count = array.size();