diff --git a/src/com/wentch/redkale/net/AsyncConnection.java b/src/com/wentch/redkale/net/AsyncConnection.java index 16fc4e9bd..84e99053a 100644 --- a/src/com/wentch/redkale/net/AsyncConnection.java +++ b/src/com/wentch/redkale/net/AsyncConnection.java @@ -70,110 +70,418 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl } } + private static class AIOUDPAsyncConnection extends AsyncConnection { + + private int readTimeoutSecond; + + private int writeTimeoutSecond; + + private final AsyncDatagramChannel channel; + + private final SocketAddress remoteAddress; + + private final boolean client; + + public AIOUDPAsyncConnection(final AsyncDatagramChannel ch, SocketAddress addr, + final boolean client0, final int readTimeoutSecond0, final int writeTimeoutSecond0) { + this.channel = ch; + this.client = client0; + this.readTimeoutSecond = readTimeoutSecond0; + this.writeTimeoutSecond = writeTimeoutSecond0; + this.remoteAddress = addr; + } + + @Override + public void read(ByteBuffer dst, A attachment, CompletionHandler handler) { + if (readTimeoutSecond > 0) { + channel.read(dst, readTimeoutSecond, TimeUnit.SECONDS, attachment, handler); + } else { + channel.read(dst, attachment, handler); + } + } + + @Override + public void write(ByteBuffer src, A attachment, CompletionHandler handler) { + channel.send(src, remoteAddress, attachment, handler); + } + + @Override + public void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { + channel.send(srcs, offset, length, remoteAddress, attachment, handler); + } + + @Override + public void setReadTimeoutSecond(int readTimeoutSecond) { + this.readTimeoutSecond = readTimeoutSecond; + } + + @Override + public void setWriteTimeoutSecond(int writeTimeoutSecond) { + this.writeTimeoutSecond = writeTimeoutSecond; + } + + @Override + public int getReadTimeoutSecond() { + return this.readTimeoutSecond; + } + + @Override + public int getWriteTimeoutSecond() { + return this.writeTimeoutSecond; + } + + @Override + public final SocketAddress getRemoteAddress() { + return remoteAddress; + } + + @Override + public final Future read(ByteBuffer dst) { + return channel.read(dst); + } + + @Override + public final Future write(ByteBuffer src) { + return channel.write(src); + } + + @Override + public final void close() throws IOException { + if (client) { + channel.close(); + } + } + + @Override + public void dispose() { + try { + this.close(); + } catch (IOException io) { + } + } + + @Override + public final boolean isOpen() { + return channel.isOpen(); + } + + @Override + public final boolean isTCP() { + return false; + } + } + public static AsyncConnection create(final AsyncDatagramChannel ch, SocketAddress addr, final boolean client0) { return create(ch, addr, client0, 0, 0); } public static AsyncConnection create(final AsyncDatagramChannel ch, SocketAddress addr, final boolean client0, final int readTimeoutSecond0, final int writeTimeoutSecond0) { - return new AsyncConnection() { - private int readTimeoutSecond; + return new AIOUDPAsyncConnection(ch, addr, client0, readTimeoutSecond0, writeTimeoutSecond0); + } - private int writeTimeoutSecond; + private static class SimpleFuture implements Future { - private final AsyncDatagramChannel channel; + private final int rs; - private final SocketAddress remoteAddress; + public SimpleFuture(int rs) { + this.rs = rs; + } - private final boolean client; + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return true; + } - { - this.channel = ch; - this.client = client0; - this.readTimeoutSecond = readTimeoutSecond0; - this.writeTimeoutSecond = writeTimeoutSecond0; - this.remoteAddress = addr; + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return true; + } + + @Override + public Integer get() throws InterruptedException, ExecutionException { + return rs; + } + + @Override + public Integer get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return rs; + } + + } + + private static class BIOTCPAsyncConnection extends AsyncConnection { + + private int readTimeoutSecond; + + private int writeTimeoutSecond; + + private final Socket socket; + + private final ReadableByteChannel readChannel; + + private final WritableByteChannel writeChannel; + + private final SocketAddress remoteAddress; + + public BIOTCPAsyncConnection(final Socket socket, final SocketAddress addr0, final int readTimeoutSecond0, final int writeTimeoutSecond0) { + this.socket = socket; + ReadableByteChannel rc = null; + WritableByteChannel wc = null; + try { + socket.setSoTimeout(Math.max(readTimeoutSecond0, writeTimeoutSecond0)); + rc = Channels.newChannel(socket.getInputStream()); + wc = Channels.newChannel(socket.getOutputStream()); + } catch (IOException e) { } - - @Override - public void read(ByteBuffer dst, A attachment, CompletionHandler handler) { - if (readTimeoutSecond > 0) { - channel.read(dst, readTimeoutSecond, TimeUnit.SECONDS, attachment, handler); - } else { - channel.read(dst, attachment, handler); - } - } - - @Override - public void write(ByteBuffer src, A attachment, CompletionHandler handler) { - channel.send(src, remoteAddress, attachment, handler); - } - - @Override - public void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { - channel.send(srcs, offset, length, remoteAddress, attachment, handler); - } - - @Override - public void setReadTimeoutSecond(int readTimeoutSecond) { - this.readTimeoutSecond = readTimeoutSecond; - } - - @Override - public void setWriteTimeoutSecond(int writeTimeoutSecond) { - this.writeTimeoutSecond = writeTimeoutSecond; - } - - @Override - public int getReadTimeoutSecond() { - return this.readTimeoutSecond; - } - - @Override - public int getWriteTimeoutSecond() { - return this.writeTimeoutSecond; - } - - @Override - public final SocketAddress getRemoteAddress() { - return remoteAddress; - } - - @Override - public final Future read(ByteBuffer dst) { - return channel.read(dst); - } - - @Override - public final Future write(ByteBuffer src) { - return channel.write(src); - } - - @Override - public final void close() throws IOException { - if (client) { - channel.close(); - } - } - - @Override - public void dispose() { + this.readChannel = rc; + this.writeChannel = wc; + this.readTimeoutSecond = readTimeoutSecond0; + this.writeTimeoutSecond = writeTimeoutSecond0; + SocketAddress addr = addr0; + if (addr == null) { try { - this.close(); - } catch (IOException io) { + addr = socket.getRemoteSocketAddress(); + } catch (Exception e) { + //do nothing } } + this.remoteAddress = addr; + } - @Override - public final boolean isOpen() { - return channel.isOpen(); - } + @Override + public boolean isTCP() { + return true; + } - @Override - public final boolean isTCP() { - return false; + @Override + public SocketAddress getRemoteAddress() { + return remoteAddress; + } + + @Override + public int getReadTimeoutSecond() { + return readTimeoutSecond; + } + + @Override + public int getWriteTimeoutSecond() { + return writeTimeoutSecond; + } + + @Override + public void setReadTimeoutSecond(int readTimeoutSecond) { + this.readTimeoutSecond = readTimeoutSecond; + } + + @Override + public void setWriteTimeoutSecond(int writeTimeoutSecond) { + this.writeTimeoutSecond = writeTimeoutSecond; + } + + @Override + protected void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { + try { + int rs = 0; + for (int i = offset; i < offset + length; i++) { + rs += writeChannel.write(srcs[i]); + } + if (handler != null) handler.completed(rs, attachment); + } catch (IOException e) { + if (handler != null) handler.failed(e, attachment); } - }; + } + + @Override + public void dispose() { + try { + this.close(); + } catch (IOException io) { + } + } + + @Override + public void read(ByteBuffer dst, A attachment, CompletionHandler handler) { + try { + int rs = readChannel.read(dst); + if (handler != null) handler.completed(rs, attachment); + } catch (IOException e) { + if (handler != null) handler.failed(e, attachment); + } + } + + @Override + public Future read(ByteBuffer dst) { + try { + int rs = readChannel.read(dst); + return new SimpleFuture(rs); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void write(ByteBuffer src, A attachment, CompletionHandler handler) { + try { + int rs = writeChannel.write(src); + if (handler != null) handler.completed(rs, attachment); + } catch (IOException e) { + if (handler != null) handler.failed(e, attachment); + } + } + + @Override + public Future write(ByteBuffer src) { + try { + int rs = writeChannel.write(src); + return new SimpleFuture(rs); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws IOException { + this.socket.close(); + } + + @Override + public boolean isOpen() { + return !socket.isClosed(); + } + } + + public static AsyncConnection create(final Socket socket) { + return create(socket, null, 0, 0); + } + + public static AsyncConnection create(final Socket socket, final SocketAddress addr0, final int readTimeoutSecond0, final int writeTimeoutSecond0) { + return new BIOTCPAsyncConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0); + } + + private static class AIOTCPAsyncConnection extends AsyncConnection { + + private int readTimeoutSecond; + + private int writeTimeoutSecond; + + private final AsynchronousSocketChannel channel; + + private final SocketAddress remoteAddress; + + public AIOTCPAsyncConnection(final AsynchronousSocketChannel ch, final SocketAddress addr0, final int readTimeoutSecond0, final int writeTimeoutSecond0) { + this.channel = ch; + this.readTimeoutSecond = readTimeoutSecond0; + this.writeTimeoutSecond = writeTimeoutSecond0; + SocketAddress addr = addr0; + if (addr == null) { + try { + addr = ch.getRemoteAddress(); + } catch (Exception e) { + //do nothing + } + } + this.remoteAddress = addr; + } + + @Override + public void read(ByteBuffer dst, A attachment, CompletionHandler handler) { + if (readTimeoutSecond > 0) { + channel.read(dst, readTimeoutSecond, TimeUnit.SECONDS, attachment, handler); + } else { + channel.read(dst, attachment, handler); + } + } + + @Override + public void write(ByteBuffer src, A attachment, CompletionHandler handler) { + if (writeTimeoutSecond > 0) { + channel.write(src, writeTimeoutSecond, TimeUnit.SECONDS, attachment, handler); + } else { + channel.write(src, attachment, handler); + } + } + + @Override + public void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { + channel.write(srcs, offset, length, writeTimeoutSecond > 0 ? writeTimeoutSecond : 60, TimeUnit.SECONDS, + attachment, new CompletionHandler() { + + @Override + public void completed(Long result, A attachment) { + handler.completed(result.intValue(), attachment); + } + + @Override + public void failed(Throwable exc, A attachment) { + handler.failed(exc, attachment); + } + + }); + } + + @Override + public void setReadTimeoutSecond(int readTimeoutSecond) { + this.readTimeoutSecond = readTimeoutSecond; + } + + @Override + public void setWriteTimeoutSecond(int writeTimeoutSecond) { + this.writeTimeoutSecond = writeTimeoutSecond; + } + + @Override + public int getReadTimeoutSecond() { + return this.readTimeoutSecond; + } + + @Override + public int getWriteTimeoutSecond() { + return this.writeTimeoutSecond; + } + + @Override + public final SocketAddress getRemoteAddress() { + return remoteAddress; + } + + @Override + public final Future read(ByteBuffer dst) { + return channel.read(dst); + } + + @Override + public final Future write(ByteBuffer src) { + return channel.write(src); + } + + @Override + public final void close() throws IOException { + channel.close(); + } + + @Override + public final boolean isOpen() { + return channel.isOpen(); + } + + @Override + public final boolean isTCP() { + return true; + } + + @Override + public void dispose() { + try { + this.close(); + } catch (IOException io) { + } + } } public static AsyncConnection create(final AsynchronousSocketChannel ch) { @@ -181,125 +489,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl } public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final int readTimeoutSecond0, final int writeTimeoutSecond0) { - return new AsyncConnection() { - private int readTimeoutSecond; - - private int writeTimeoutSecond; - - private final AsynchronousSocketChannel channel; - - private final SocketAddress remoteAddress; - - { - this.channel = ch; - this.readTimeoutSecond = readTimeoutSecond0; - this.writeTimeoutSecond = writeTimeoutSecond0; - SocketAddress addr = addr0; - if (addr == null) { - try { - addr = ch.getRemoteAddress(); - } catch (Exception e) { - //do nothing - } - } - this.remoteAddress = addr; - } - - @Override - public void read(ByteBuffer dst, A attachment, CompletionHandler handler) { - if (readTimeoutSecond > 0) { - channel.read(dst, readTimeoutSecond, TimeUnit.SECONDS, attachment, handler); - } else { - channel.read(dst, attachment, handler); - } - } - - @Override - public void write(ByteBuffer src, A attachment, CompletionHandler handler) { - if (writeTimeoutSecond > 0) { - channel.write(src, writeTimeoutSecond, TimeUnit.SECONDS, attachment, handler); - } else { - channel.write(src, attachment, handler); - } - } - - @Override - public void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { - channel.write(srcs, offset, length, writeTimeoutSecond > 0 ? writeTimeoutSecond : 60, TimeUnit.SECONDS, - attachment, new CompletionHandler() { - - @Override - public void completed(Long result, A attachment) { - handler.completed(result.intValue(), attachment); - } - - @Override - public void failed(Throwable exc, A attachment) { - handler.failed(exc, attachment); - } - - }); - } - - @Override - public void setReadTimeoutSecond(int readTimeoutSecond) { - this.readTimeoutSecond = readTimeoutSecond; - } - - @Override - public void setWriteTimeoutSecond(int writeTimeoutSecond) { - this.writeTimeoutSecond = writeTimeoutSecond; - } - - @Override - public int getReadTimeoutSecond() { - return this.readTimeoutSecond; - } - - @Override - public int getWriteTimeoutSecond() { - return this.writeTimeoutSecond; - } - - @Override - public final SocketAddress getRemoteAddress() { - return remoteAddress; - } - - @Override - public final Future read(ByteBuffer dst) { - return channel.read(dst); - } - - @Override - public final Future write(ByteBuffer src) { - return channel.write(src); - } - - @Override - public final void close() throws IOException { - channel.close(); - } - - @Override - public final boolean isOpen() { - return channel.isOpen(); - } - - @Override - public final boolean isTCP() { - return true; - } - - @Override - public void dispose() { - try { - this.close(); - } catch (IOException io) { - } - } - - }; + return new AIOTCPAsyncConnection(ch, addr0, readTimeoutSecond0, writeTimeoutSecond0); } } diff --git a/src/com/wentch/redkale/net/http/HttpProxyServlet.java b/src/com/wentch/redkale/net/http/HttpProxyServlet.java index e7be2b861..aedf57f28 100644 --- a/src/com/wentch/redkale/net/http/HttpProxyServlet.java +++ b/src/com/wentch/redkale/net/http/HttpProxyServlet.java @@ -8,6 +8,7 @@ package com.wentch.redkale.net.http; import com.wentch.redkale.net.*; import com.wentch.redkale.util.*; import java.io.*; +import java.net.*; import java.nio.*; import java.nio.channels.*; @@ -72,7 +73,9 @@ public final class HttpProxyServlet extends HttpServlet { } private void connect(HttpRequest request, HttpResponse response) throws IOException { - final AsyncConnection remote = AsyncConnection.create("TCP", HttpRequest.parseSocketAddress(request.getRequestURI()), 6, 6); + final InetSocketAddress remoteAddress = HttpRequest.parseSocketAddress(request.getRequestURI()); + final AsyncConnection remote = remoteAddress.getPort() == 443 + ? AsyncConnection.create(Utility.createDefaultSSLSocket(remoteAddress)) : AsyncConnection.create("TCP", remoteAddress, 6, 6); final ByteBuffer buffer0 = response.getContext().pollBuffer(); buffer0.put("HTTP/1.1 200 Connection established\r\nConnection: close\r\n\r\n".getBytes()); buffer0.flip(); diff --git a/src/com/wentch/redkale/util/Utility.java b/src/com/wentch/redkale/util/Utility.java index 717a86c2d..82dc11fed 100644 --- a/src/com/wentch/redkale/util/Utility.java +++ b/src/com/wentch/redkale/util/Utility.java @@ -376,6 +376,16 @@ public final class Utility { } //----------------------------------------------------------------------------- + public static Socket createDefaultSSLSocket(InetSocketAddress address) throws IOException { + return createDefaultSSLSocket(address.getAddress(), address.getPort()); + } + + public static Socket createDefaultSSLSocket(InetAddress host, int port) throws IOException { + Socket socket = DEFAULTSSL_CONTEXT.getSocketFactory().createSocket(host, port); + + return socket; + } + public static String postHttpContent(String url) throws IOException { return remoteHttpContent(null, "POST", url, null).toString("UTF-8"); }