This commit is contained in:
地平线
2015-09-06 15:00:08 +08:00
parent 7f6bc74796
commit 3ac9d251a7
3 changed files with 409 additions and 206 deletions

View File

@@ -70,110 +70,418 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
}
}
private static class AIOUDPAsyncConnection extends AsyncConnection {
private int readTimeoutSecond;
private int writeTimeoutSecond;
private final AsyncDatagramChannel channel;
private final SocketAddress remoteAddress;
private final boolean client;
public AIOUDPAsyncConnection(final AsyncDatagramChannel ch, SocketAddress addr,
final boolean client0, final int readTimeoutSecond0, final int writeTimeoutSecond0) {
this.channel = ch;
this.client = client0;
this.readTimeoutSecond = readTimeoutSecond0;
this.writeTimeoutSecond = writeTimeoutSecond0;
this.remoteAddress = addr;
}
@Override
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (readTimeoutSecond > 0) {
channel.read(dst, readTimeoutSecond, TimeUnit.SECONDS, attachment, handler);
} else {
channel.read(dst, attachment, handler);
}
}
@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
channel.send(src, remoteAddress, attachment, handler);
}
@Override
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
channel.send(srcs, offset, length, remoteAddress, attachment, handler);
}
@Override
public void setReadTimeoutSecond(int readTimeoutSecond) {
this.readTimeoutSecond = readTimeoutSecond;
}
@Override
public void setWriteTimeoutSecond(int writeTimeoutSecond) {
this.writeTimeoutSecond = writeTimeoutSecond;
}
@Override
public int getReadTimeoutSecond() {
return this.readTimeoutSecond;
}
@Override
public int getWriteTimeoutSecond() {
return this.writeTimeoutSecond;
}
@Override
public final SocketAddress getRemoteAddress() {
return remoteAddress;
}
@Override
public final Future<Integer> read(ByteBuffer dst) {
return channel.read(dst);
}
@Override
public final Future<Integer> write(ByteBuffer src) {
return channel.write(src);
}
@Override
public final void close() throws IOException {
if (client) {
channel.close();
}
}
@Override
public void dispose() {
try {
this.close();
} catch (IOException io) {
}
}
@Override
public final boolean isOpen() {
return channel.isOpen();
}
@Override
public final boolean isTCP() {
return false;
}
}
public static AsyncConnection create(final AsyncDatagramChannel ch, SocketAddress addr, final boolean client0) {
return create(ch, addr, client0, 0, 0);
}
public static AsyncConnection create(final AsyncDatagramChannel ch, SocketAddress addr,
final boolean client0, final int readTimeoutSecond0, final int writeTimeoutSecond0) {
return new AsyncConnection() {
private int readTimeoutSecond;
return new AIOUDPAsyncConnection(ch, addr, client0, readTimeoutSecond0, writeTimeoutSecond0);
}
private int writeTimeoutSecond;
private static class SimpleFuture implements Future<Integer> {
private final AsyncDatagramChannel channel;
private final int rs;
private final SocketAddress remoteAddress;
public SimpleFuture(int rs) {
this.rs = rs;
}
private final boolean client;
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return true;
}
{
this.channel = ch;
this.client = client0;
this.readTimeoutSecond = readTimeoutSecond0;
this.writeTimeoutSecond = writeTimeoutSecond0;
this.remoteAddress = addr;
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return true;
}
@Override
public Integer get() throws InterruptedException, ExecutionException {
return rs;
}
@Override
public Integer get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return rs;
}
}
private static class BIOTCPAsyncConnection extends AsyncConnection {
private int readTimeoutSecond;
private int writeTimeoutSecond;
private final Socket socket;
private final ReadableByteChannel readChannel;
private final WritableByteChannel writeChannel;
private final SocketAddress remoteAddress;
public BIOTCPAsyncConnection(final Socket socket, final SocketAddress addr0, final int readTimeoutSecond0, final int writeTimeoutSecond0) {
this.socket = socket;
ReadableByteChannel rc = null;
WritableByteChannel wc = null;
try {
socket.setSoTimeout(Math.max(readTimeoutSecond0, writeTimeoutSecond0));
rc = Channels.newChannel(socket.getInputStream());
wc = Channels.newChannel(socket.getOutputStream());
} catch (IOException e) {
}
@Override
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (readTimeoutSecond > 0) {
channel.read(dst, readTimeoutSecond, TimeUnit.SECONDS, attachment, handler);
} else {
channel.read(dst, attachment, handler);
}
}
@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
channel.send(src, remoteAddress, attachment, handler);
}
@Override
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
channel.send(srcs, offset, length, remoteAddress, attachment, handler);
}
@Override
public void setReadTimeoutSecond(int readTimeoutSecond) {
this.readTimeoutSecond = readTimeoutSecond;
}
@Override
public void setWriteTimeoutSecond(int writeTimeoutSecond) {
this.writeTimeoutSecond = writeTimeoutSecond;
}
@Override
public int getReadTimeoutSecond() {
return this.readTimeoutSecond;
}
@Override
public int getWriteTimeoutSecond() {
return this.writeTimeoutSecond;
}
@Override
public final SocketAddress getRemoteAddress() {
return remoteAddress;
}
@Override
public final Future<Integer> read(ByteBuffer dst) {
return channel.read(dst);
}
@Override
public final Future<Integer> write(ByteBuffer src) {
return channel.write(src);
}
@Override
public final void close() throws IOException {
if (client) {
channel.close();
}
}
@Override
public void dispose() {
this.readChannel = rc;
this.writeChannel = wc;
this.readTimeoutSecond = readTimeoutSecond0;
this.writeTimeoutSecond = writeTimeoutSecond0;
SocketAddress addr = addr0;
if (addr == null) {
try {
this.close();
} catch (IOException io) {
addr = socket.getRemoteSocketAddress();
} catch (Exception e) {
//do nothing
}
}
this.remoteAddress = addr;
}
@Override
public final boolean isOpen() {
return channel.isOpen();
}
@Override
public boolean isTCP() {
return true;
}
@Override
public final boolean isTCP() {
return false;
@Override
public SocketAddress getRemoteAddress() {
return remoteAddress;
}
@Override
public int getReadTimeoutSecond() {
return readTimeoutSecond;
}
@Override
public int getWriteTimeoutSecond() {
return writeTimeoutSecond;
}
@Override
public void setReadTimeoutSecond(int readTimeoutSecond) {
this.readTimeoutSecond = readTimeoutSecond;
}
@Override
public void setWriteTimeoutSecond(int writeTimeoutSecond) {
this.writeTimeoutSecond = writeTimeoutSecond;
}
@Override
protected <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = 0;
for (int i = offset; i < offset + length; i++) {
rs += writeChannel.write(srcs[i]);
}
if (handler != null) handler.completed(rs, attachment);
} catch (IOException e) {
if (handler != null) handler.failed(e, attachment);
}
};
}
@Override
public void dispose() {
try {
this.close();
} catch (IOException io) {
}
}
@Override
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = readChannel.read(dst);
if (handler != null) handler.completed(rs, attachment);
} catch (IOException e) {
if (handler != null) handler.failed(e, attachment);
}
}
@Override
public Future<Integer> read(ByteBuffer dst) {
try {
int rs = readChannel.read(dst);
return new SimpleFuture(rs);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = writeChannel.write(src);
if (handler != null) handler.completed(rs, attachment);
} catch (IOException e) {
if (handler != null) handler.failed(e, attachment);
}
}
@Override
public Future<Integer> write(ByteBuffer src) {
try {
int rs = writeChannel.write(src);
return new SimpleFuture(rs);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() throws IOException {
this.socket.close();
}
@Override
public boolean isOpen() {
return !socket.isClosed();
}
}
public static AsyncConnection create(final Socket socket) {
return create(socket, null, 0, 0);
}
public static AsyncConnection create(final Socket socket, final SocketAddress addr0, final int readTimeoutSecond0, final int writeTimeoutSecond0) {
return new BIOTCPAsyncConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0);
}
private static class AIOTCPAsyncConnection extends AsyncConnection {
private int readTimeoutSecond;
private int writeTimeoutSecond;
private final AsynchronousSocketChannel channel;
private final SocketAddress remoteAddress;
public AIOTCPAsyncConnection(final AsynchronousSocketChannel ch, final SocketAddress addr0, final int readTimeoutSecond0, final int writeTimeoutSecond0) {
this.channel = ch;
this.readTimeoutSecond = readTimeoutSecond0;
this.writeTimeoutSecond = writeTimeoutSecond0;
SocketAddress addr = addr0;
if (addr == null) {
try {
addr = ch.getRemoteAddress();
} catch (Exception e) {
//do nothing
}
}
this.remoteAddress = addr;
}
@Override
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (readTimeoutSecond > 0) {
channel.read(dst, readTimeoutSecond, TimeUnit.SECONDS, attachment, handler);
} else {
channel.read(dst, attachment, handler);
}
}
@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (writeTimeoutSecond > 0) {
channel.write(src, writeTimeoutSecond, TimeUnit.SECONDS, attachment, handler);
} else {
channel.write(src, attachment, handler);
}
}
@Override
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
channel.write(srcs, offset, length, writeTimeoutSecond > 0 ? writeTimeoutSecond : 60, TimeUnit.SECONDS,
attachment, new CompletionHandler<Long, A>() {
@Override
public void completed(Long result, A attachment) {
handler.completed(result.intValue(), attachment);
}
@Override
public void failed(Throwable exc, A attachment) {
handler.failed(exc, attachment);
}
});
}
@Override
public void setReadTimeoutSecond(int readTimeoutSecond) {
this.readTimeoutSecond = readTimeoutSecond;
}
@Override
public void setWriteTimeoutSecond(int writeTimeoutSecond) {
this.writeTimeoutSecond = writeTimeoutSecond;
}
@Override
public int getReadTimeoutSecond() {
return this.readTimeoutSecond;
}
@Override
public int getWriteTimeoutSecond() {
return this.writeTimeoutSecond;
}
@Override
public final SocketAddress getRemoteAddress() {
return remoteAddress;
}
@Override
public final Future<Integer> read(ByteBuffer dst) {
return channel.read(dst);
}
@Override
public final Future<Integer> write(ByteBuffer src) {
return channel.write(src);
}
@Override
public final void close() throws IOException {
channel.close();
}
@Override
public final boolean isOpen() {
return channel.isOpen();
}
@Override
public final boolean isTCP() {
return true;
}
@Override
public void dispose() {
try {
this.close();
} catch (IOException io) {
}
}
}
public static AsyncConnection create(final AsynchronousSocketChannel ch) {
@@ -181,125 +489,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
}
public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final int readTimeoutSecond0, final int writeTimeoutSecond0) {
return new AsyncConnection() {
private int readTimeoutSecond;
private int writeTimeoutSecond;
private final AsynchronousSocketChannel channel;
private final SocketAddress remoteAddress;
{
this.channel = ch;
this.readTimeoutSecond = readTimeoutSecond0;
this.writeTimeoutSecond = writeTimeoutSecond0;
SocketAddress addr = addr0;
if (addr == null) {
try {
addr = ch.getRemoteAddress();
} catch (Exception e) {
//do nothing
}
}
this.remoteAddress = addr;
}
@Override
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (readTimeoutSecond > 0) {
channel.read(dst, readTimeoutSecond, TimeUnit.SECONDS, attachment, handler);
} else {
channel.read(dst, attachment, handler);
}
}
@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
if (writeTimeoutSecond > 0) {
channel.write(src, writeTimeoutSecond, TimeUnit.SECONDS, attachment, handler);
} else {
channel.write(src, attachment, handler);
}
}
@Override
public <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
channel.write(srcs, offset, length, writeTimeoutSecond > 0 ? writeTimeoutSecond : 60, TimeUnit.SECONDS,
attachment, new CompletionHandler<Long, A>() {
@Override
public void completed(Long result, A attachment) {
handler.completed(result.intValue(), attachment);
}
@Override
public void failed(Throwable exc, A attachment) {
handler.failed(exc, attachment);
}
});
}
@Override
public void setReadTimeoutSecond(int readTimeoutSecond) {
this.readTimeoutSecond = readTimeoutSecond;
}
@Override
public void setWriteTimeoutSecond(int writeTimeoutSecond) {
this.writeTimeoutSecond = writeTimeoutSecond;
}
@Override
public int getReadTimeoutSecond() {
return this.readTimeoutSecond;
}
@Override
public int getWriteTimeoutSecond() {
return this.writeTimeoutSecond;
}
@Override
public final SocketAddress getRemoteAddress() {
return remoteAddress;
}
@Override
public final Future<Integer> read(ByteBuffer dst) {
return channel.read(dst);
}
@Override
public final Future<Integer> write(ByteBuffer src) {
return channel.write(src);
}
@Override
public final void close() throws IOException {
channel.close();
}
@Override
public final boolean isOpen() {
return channel.isOpen();
}
@Override
public final boolean isTCP() {
return true;
}
@Override
public void dispose() {
try {
this.close();
} catch (IOException io) {
}
}
};
return new AIOTCPAsyncConnection(ch, addr0, readTimeoutSecond0, writeTimeoutSecond0);
}
}

View File

@@ -8,6 +8,7 @@ package com.wentch.redkale.net.http;
import com.wentch.redkale.net.*;
import com.wentch.redkale.util.*;
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
@@ -72,7 +73,9 @@ public final class HttpProxyServlet extends HttpServlet {
}
private void connect(HttpRequest request, HttpResponse response) throws IOException {
final AsyncConnection remote = AsyncConnection.create("TCP", HttpRequest.parseSocketAddress(request.getRequestURI()), 6, 6);
final InetSocketAddress remoteAddress = HttpRequest.parseSocketAddress(request.getRequestURI());
final AsyncConnection remote = remoteAddress.getPort() == 443
? AsyncConnection.create(Utility.createDefaultSSLSocket(remoteAddress)) : AsyncConnection.create("TCP", remoteAddress, 6, 6);
final ByteBuffer buffer0 = response.getContext().pollBuffer();
buffer0.put("HTTP/1.1 200 Connection established\r\nConnection: close\r\n\r\n".getBytes());
buffer0.flip();

View File

@@ -376,6 +376,16 @@ public final class Utility {
}
//-----------------------------------------------------------------------------
public static Socket createDefaultSSLSocket(InetSocketAddress address) throws IOException {
return createDefaultSSLSocket(address.getAddress(), address.getPort());
}
public static Socket createDefaultSSLSocket(InetAddress host, int port) throws IOException {
Socket socket = DEFAULTSSL_CONTEXT.getSocketFactory().createSocket(host, port);
return socket;
}
public static String postHttpContent(String url) throws IOException {
return remoteHttpContent(null, "POST", url, null).toString("UTF-8");
}