diff --git a/src-plugin/com/wentch/redkale/net/socks/SocksPrepareServlet.java b/src-plugin/com/wentch/redkale/net/socks/SocksPrepareServlet.java index badada858..eba2d5040 100644 --- a/src-plugin/com/wentch/redkale/net/socks/SocksPrepareServlet.java +++ b/src-plugin/com/wentch/redkale/net/socks/SocksPrepareServlet.java @@ -15,25 +15,31 @@ import java.io.*; */ public final class SocksPrepareServlet extends PrepareServlet { - private SocksServlet servlet = new DefaultSocksServlet(); + private SocksServlet socksServlet = new DefaultSocksServlet(); + + private SocksProxyServlet proxyServlet = new SocksProxyServlet(); public SocksPrepareServlet() { } @Override public void init(Context context, AnyValue config) { - if (servlet != null) servlet.init(context, servlet.conf == null ? config : servlet.conf); + if (socksServlet != null) socksServlet.init(context, socksServlet.conf == null ? config : socksServlet.conf); } public void setSocksServlet(SocksServlet servlet, AnyValue conf) { servlet.conf = conf; - if (servlet != null) this.servlet = servlet; + if (servlet != null) this.socksServlet = servlet; } // @Override public void execute(SocksRequest request, SocksResponse response) throws IOException { - servlet.execute(request, response); + if (request.isHttp()) { + proxyServlet.execute(request, response); + } else { + socksServlet.execute(request, response); + } } } diff --git a/src-plugin/com/wentch/redkale/net/socks/SocksProxyServlet.java b/src-plugin/com/wentch/redkale/net/socks/SocksProxyServlet.java new file mode 100644 index 000000000..18a73311f --- /dev/null +++ b/src-plugin/com/wentch/redkale/net/socks/SocksProxyServlet.java @@ -0,0 +1,196 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package com.wentch.redkale.net.socks; + +import com.wentch.redkale.net.*; +import com.wentch.redkale.util.*; +import java.io.*; +import java.net.*; +import java.nio.*; +import java.nio.channels.*; + +/** + * 在appliation.xml中的HTTP类型的server节点加上forwardproxy="true"表示该HttpServer支持正向代理 + * + * @author zhangjx + */ +public final class SocksProxyServlet extends SocksServlet { + + protected static final byte[] LINE = new byte[]{'\r', '\n'}; + + @Override + public void execute(SocksRequest request, SocksResponse response) throws IOException { + response.skipHeader(); + if ("CONNECT".equalsIgnoreCase(request.getMethod())) { + connect(request, response); + return; + } + String url = request.getRequestURI(); + url = url.substring(url.indexOf("://") + 3); + url = url.substring(url.indexOf('/')); + final ByteBuffer buffer = response.getContext().pollBuffer(); + buffer.put((request.getMethod() + " " + url + " HTTP/1.1\r\n").getBytes()); + for (String header : request.getHeaderNames()) { + if (!header.startsWith("Proxy-")) { + buffer.put((header + ": " + request.getHeader(header) + "\r\n").getBytes()); + } + } + if (request.getHost() != null) { + buffer.put(("Host: " + request.getHost() + "\r\n").getBytes()); + } + if (request.getContentType() != null) { + buffer.put(("Content-Type: " + request.getContentType() + "\r\n").getBytes()); + } + if (request.getContentLength() > 0) { + buffer.put(("Content-Length: " + request.getContentLength() + "\r\n").getBytes()); + } + buffer.put(LINE); + buffer.flip(); + final AsyncConnection remote = AsyncConnection.create("TCP", request.getHostSocketAddress(), 6, 6); + remote.write(buffer, null, new CompletionHandler() { + + @Override + public void completed(Integer result, Void attachment) { + if (buffer.hasRemaining()) { + remote.write(buffer, attachment, this); + return; + } + response.getContext().offerBuffer(buffer); + new ProxyCompletionHandler(remote, request, response).completed(0, null); + } + + @Override + public void failed(Throwable exc, Void attachment) { + response.getContext().offerBuffer(buffer); + response.finish(true); + try { + remote.close(); + } catch (IOException ex) { + } + } + }); + } + + private void connect(SocksRequest request, SocksResponse response) throws IOException { + final InetSocketAddress remoteAddress = request.parseSocketAddress(); + final AsyncConnection remote = remoteAddress.getPort() == 443 + ? AsyncConnection.create(Utility.createDefaultSSLSocket(remoteAddress)) : AsyncConnection.create("TCP", remoteAddress, 6, 6); + final ByteBuffer buffer0 = response.getContext().pollBuffer(); + buffer0.put("HTTP/1.1 200 Connection established\r\nConnection: close\r\n\r\n".getBytes()); + buffer0.flip(); + response.sendBody(buffer0, null, new CompletionHandler() { + + @Override + public void completed(Integer result, Void attachment) { + new ProxyCompletionHandler(remote, request, response).completed(0, null); + } + + @Override + public void failed(Throwable exc, Void attachment) { + response.finish(true); + try { + remote.close(); + } catch (IOException ex) { + } + } + }); + + } + + private static class ProxyCompletionHandler implements CompletionHandler { + + private AsyncConnection remote; + + private SocksRequest request; + + private SocksResponse response; + + public ProxyCompletionHandler(AsyncConnection remote, SocksRequest request, SocksResponse response) { + this.remote = remote; + this.request = request; + this.response = response; + } + + @Override + public void completed(Integer result0, Void v0) { + final ByteBuffer rbuffer = request.getContext().pollBuffer(); + remote.read(rbuffer, null, new CompletionHandler() { + + @Override + public void completed(Integer result, Void attachment) { + rbuffer.flip(); + CompletionHandler parent = this; + response.sendBody(rbuffer.duplicate().asReadOnlyBuffer(), null, new CompletionHandler() { + + @Override + public void completed(Integer result, Void attachment) { + rbuffer.clear(); + remote.read(rbuffer, attachment, parent); + } + + @Override + public void failed(Throwable exc, Void attachment) { + parent.failed(exc, attachment); + } + }); + } + + @Override + public void failed(Throwable exc, Void attachment) { + response.getContext().offerBuffer(rbuffer); + response.finish(true); + try { + remote.close(); + } catch (IOException ex) { + } + } + }); + + final ByteBuffer qbuffer = request.getContext().pollBuffer(); + request.getChannel().read(qbuffer, null, new CompletionHandler() { + + @Override + public void completed(Integer result, Void attachment) { + qbuffer.flip(); + CompletionHandler parent = this; + remote.write(qbuffer, null, new CompletionHandler() { + + @Override + public void completed(Integer result, Void attachment) { + qbuffer.clear(); + request.getChannel().read(qbuffer, null, parent); + } + + @Override + public void failed(Throwable exc, Void attachment) { + parent.failed(exc, attachment); + } + }); + } + + @Override + public void failed(Throwable exc, Void attachment) { + response.getContext().offerBuffer(qbuffer); + response.finish(true); + try { + remote.close(); + } catch (IOException ex) { + } + } + }); + } + + @Override + public void failed(Throwable exc, Void v) { + response.finish(true); + try { + remote.close(); + } catch (IOException ex) { + } + } + } + +} diff --git a/src-plugin/com/wentch/redkale/net/socks/SocksRequest.java b/src-plugin/com/wentch/redkale/net/socks/SocksRequest.java index 065d26d1c..a3eef3f17 100644 --- a/src-plugin/com/wentch/redkale/net/socks/SocksRequest.java +++ b/src-plugin/com/wentch/redkale/net/socks/SocksRequest.java @@ -5,40 +5,66 @@ */ package com.wentch.redkale.net.socks; +import com.wentch.redkale.convert.json.*; import com.wentch.redkale.net.*; +import com.wentch.redkale.net.http.*; +import java.net.*; import java.nio.*; /** * * @author zhangjx */ -public class SocksRequest extends Request { +public class SocksRequest extends HttpRequest { + + private boolean http; private short requestid; protected SocksRequest(SocksContext context) { - super(context); + super(context, JsonFactory.root(), null); } @Override protected int readHeader(ByteBuffer buffer) { + if (buffer.remaining() > 3) { + this.http = true; + return super.readHeader(buffer); + } + this.http = false; if (buffer.get() != 0x05) return -1; if (buffer.get() != 0x01) return -1; if (buffer.get() != 0x00) return -1; return 0; } + protected InetSocketAddress parseSocketAddress() { + return HttpRequest.parseSocketAddress(getRequestURI()); + } + + @Override + protected InetSocketAddress getHostSocketAddress() { + return super.getHostSocketAddress(); + } + + @Override + protected AsyncConnection getChannel() { + return super.getChannel(); + } + @Override protected void readBody(ByteBuffer buffer) { } @Override protected void prepare() { + super.prepare(); } @Override protected void recycle() { this.requestid = 0; + this.http = false; super.recycle(); } @@ -50,4 +76,12 @@ public class SocksRequest extends Request { this.requestid = requestid; } + public boolean isHttp() { + return http; + } + + public void setHttp(boolean http) { + this.http = http; + } + } diff --git a/src-plugin/com/wentch/redkale/net/socks/SocksResponse.java b/src-plugin/com/wentch/redkale/net/socks/SocksResponse.java index 1c4558bd0..fca6c3306 100644 --- a/src-plugin/com/wentch/redkale/net/socks/SocksResponse.java +++ b/src-plugin/com/wentch/redkale/net/socks/SocksResponse.java @@ -6,6 +6,7 @@ package com.wentch.redkale.net.socks; import com.wentch.redkale.net.*; +import com.wentch.redkale.net.http.*; import com.wentch.redkale.util.*; import java.util.concurrent.atomic.*; @@ -13,10 +14,10 @@ import java.util.concurrent.atomic.*; * * @author zhangjx */ -public class SocksResponse extends Response { +public class SocksResponse extends HttpResponse { protected SocksResponse(Context context, SocksRequest request) { - super(context, request); + super(context, request, (String[][]) null, (String[][]) null, null); } public static ObjectPool createPool(AtomicLong creatCounter, AtomicLong cycleCounter, int max, Creator creator) { @@ -27,4 +28,5 @@ public class SocksResponse extends Response { public AsyncConnection removeChannel() { return super.removeChannel(); } + }