This commit is contained in:
Redkale
2020-06-23 10:13:12 +08:00
parent 0dfc0ca853
commit 1ea92d7165
4 changed files with 23 additions and 41 deletions

View File

@@ -5,7 +5,7 @@
*/ */
package org.redkale.net; package org.redkale.net;
import java.io.IOException; import java.io.*;
import java.net.*; import java.net.*;
import java.nio.*; import java.nio.*;
import java.nio.channels.*; import java.nio.channels.*;
@@ -23,7 +23,7 @@ import org.redkale.util.*;
* *
* @author zhangjx * @author zhangjx
*/ */
public abstract class AsyncConnection implements ReadableByteChannel, WritableByteChannel, AutoCloseable { public abstract class AsyncConnection implements AutoCloseable {
protected SSLContext sslContext; protected SSLContext sslContext;
@@ -88,7 +88,6 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
return eventing.decrementAndGet(); return eventing.decrementAndGet();
} }
@Override
public abstract boolean isOpen(); public abstract boolean isOpen();
public abstract boolean isTCP(); public abstract boolean isTCP();
@@ -113,13 +112,11 @@ public abstract class AsyncConnection implements ReadableByteChannel, WritableBy
public abstract void setWriteTimeoutSeconds(int writeTimeoutSeconds); public abstract void setWriteTimeoutSeconds(int writeTimeoutSeconds);
@Override public abstract InputStream newInputStream();
public abstract int read(ByteBuffer dst) throws IOException;
public abstract void read(CompletionHandler<Integer, ByteBuffer> handler); public abstract void read(CompletionHandler<Integer, ByteBuffer> handler);
@Override public abstract OutputStream newOutputStream();
public abstract int write(ByteBuffer src) throws IOException;
public abstract <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler); public abstract <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler);

View File

@@ -5,7 +5,7 @@
*/ */
package org.redkale.net; package org.redkale.net;
import java.io.IOException; import java.io.*;
import java.net.*; import java.net.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
@@ -25,7 +25,6 @@ import javax.net.ssl.SSLContext;
public class TcpAioAsyncConnection extends AsyncConnection { public class TcpAioAsyncConnection extends AsyncConnection {
//private final Semaphore semaphore = new Semaphore(1); //private final Semaphore semaphore = new Semaphore(1);
private int readTimeoutSeconds; private int readTimeoutSeconds;
private int writeTimeoutSeconds; private int writeTimeoutSeconds;
@@ -132,7 +131,6 @@ public class TcpAioAsyncConnection extends AsyncConnection {
// semaphore.release(); // semaphore.release();
// } // }
// } // }
@Override @Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) { public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
write(true, src, attachment, handler); write(true, src, attachment, handler);
@@ -232,21 +230,13 @@ public class TcpAioAsyncConnection extends AsyncConnection {
} }
@Override @Override
public final int read(ByteBuffer dst) throws IOException { public final InputStream newInputStream() {
try { return Channels.newInputStream(this.channel);
return channel.read(dst).get();
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
}
} }
@Override @Override
public final int write(ByteBuffer src) throws IOException { public final OutputStream newOutputStream() {
try { return Channels.newOutputStream(this.channel);
return channel.write(src).get();
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
}
} }
@Override @Override
@@ -318,13 +308,13 @@ public class TcpAioAsyncConnection extends AsyncConnection {
return; return;
} }
// try { // try {
writeHandler.completed(writeCount, attachment); writeHandler.completed(writeCount, attachment);
// } finally { // } finally {
// nextWrite(null, attachment); // nextWrite(null, attachment);
// } // }
} else { } else {
// try { // try {
writeHandler.completed(result.intValue(), attachment); writeHandler.completed(result.intValue(), attachment);
// } finally { // } finally {
// nextWrite(null, attachment); // nextWrite(null, attachment);
// } // }
@@ -334,7 +324,7 @@ public class TcpAioAsyncConnection extends AsyncConnection {
@Override @Override
public void failed(Throwable exc, A attachment) { public void failed(Throwable exc, A attachment) {
// try { // try {
writeHandler.failed(exc, attachment); writeHandler.failed(exc, attachment);
// } finally { // } finally {
// nextWrite(exc, attachment); // nextWrite(exc, attachment);
// } // }
@@ -365,7 +355,7 @@ public class TcpAioAsyncConnection extends AsyncConnection {
return; return;
} }
// try { // try {
writeHandler.completed(result, attachment); writeHandler.completed(result, attachment);
// } finally { // } finally {
// nextWrite(null, attachment); // nextWrite(null, attachment);
// } // }
@@ -375,7 +365,7 @@ public class TcpAioAsyncConnection extends AsyncConnection {
@Override @Override
public void failed(Throwable exc, A attachment) { public void failed(Throwable exc, A attachment) {
// try { // try {
writeHandler.failed(exc, attachment); writeHandler.failed(exc, attachment);
// } finally { // } finally {
// nextWrite(exc, attachment); // nextWrite(exc, attachment);
// } // }

View File

@@ -5,7 +5,7 @@
*/ */
package org.redkale.net; package org.redkale.net;
import java.io.IOException; import java.io.*;
import java.net.*; import java.net.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
@@ -142,10 +142,13 @@ public class UdpBioAsyncConnection extends AsyncConnection {
} }
@Override @Override
public int read(ByteBuffer dst) throws IOException { public final InputStream newInputStream() {
int rs = channel.read(dst); return Channels.newInputStream(this.channel);
this.readtime = System.currentTimeMillis(); }
return rs;
@Override
public final OutputStream newOutputStream() {
return Channels.newOutputStream(this.channel);
} }
@Override @Override
@@ -159,13 +162,6 @@ public class UdpBioAsyncConnection extends AsyncConnection {
} }
} }
@Override
public int write(ByteBuffer src) throws IOException {
int rs = channel.send(src, remoteAddress);
this.writetime = System.currentTimeMillis();
return rs;
}
@Override @Override
public final void close() throws IOException { public final void close() throws IOException {
super.close(); super.close();

View File

@@ -10,7 +10,6 @@ import java.lang.annotation.Annotation;
import java.lang.reflect.Array; import java.lang.reflect.Array;
import java.net.*; import java.net.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.charset.*; import java.nio.charset.*;
import java.util.*; import java.util.*;
import java.util.logging.Level; import java.util.logging.Level;
@@ -582,7 +581,7 @@ public class HttpRequest extends Request<HttpContext> {
@ConvertDisabled @ConvertDisabled
public final MultiContext getMultiContext() { public final MultiContext getMultiContext() {
return new MultiContext(context.getCharset(), this.getContentType(), this.params, return new MultiContext(context.getCharset(), this.getContentType(), this.params,
new BufferedInputStream(Channels.newInputStream(this.channel), Math.max(array.size(), 8192)) { new BufferedInputStream(this.channel.newInputStream(), Math.max(array.size(), 8192)) {
{ {
array.copyTo(this.buf); array.copyTo(this.buf);
this.count = array.size(); this.count = array.size();