From 94b158a9766858898f618d5b9f8d5f13bcd0a1d7 Mon Sep 17 00:00:00 2001 From: wentch <22250530@qq.com> Date: Mon, 21 Dec 2015 15:06:18 +0800 Subject: [PATCH] --- .../net/socks/SocksConnectServlet.java | 2 +- .../redkale/net/socks/SocksProxyServlet.java | 4 +- .../org/redkale/net/socks/SocksRequest.java | 8 +++- .../org/redkale/net/socks/SocksResponse.java | 3 +- .../org/redkale/net/socks/SocksRunner.java | 41 ++++++++++++------- .../org/redkale/net/socks/SocksServer.java | 3 +- src/org/redkale/net/AsyncConnection.java | 34 +++++++-------- src/org/redkale/net/http/HttpContext.java | 2 +- 8 files changed, 56 insertions(+), 41 deletions(-) diff --git a/src-plugin/org/redkale/net/socks/SocksConnectServlet.java b/src-plugin/org/redkale/net/socks/SocksConnectServlet.java index 34f0c620c..9da7e7e74 100644 --- a/src-plugin/org/redkale/net/socks/SocksConnectServlet.java +++ b/src-plugin/org/redkale/net/socks/SocksConnectServlet.java @@ -52,7 +52,7 @@ public class SocksConnectServlet extends SocksServlet { @Override public void execute(SocksRequest request, SocksResponse response) throws IOException { - response.getContext().submit(new SocksRunner(response.getContext(), response.removeChannel(), bindAddressBytes)); + response.getContext().submit(new SocksRunner((SocksContext) response.getContext(), response.removeChannel(), bindAddressBytes)); response.finish(true); } diff --git a/src-plugin/org/redkale/net/socks/SocksProxyServlet.java b/src-plugin/org/redkale/net/socks/SocksProxyServlet.java index 670b7f57c..5c32efa70 100644 --- a/src-plugin/org/redkale/net/socks/SocksProxyServlet.java +++ b/src-plugin/org/redkale/net/socks/SocksProxyServlet.java @@ -52,7 +52,7 @@ public final class SocksProxyServlet extends SocksServlet { } buffer.put(LINE); buffer.flip(); - final AsyncConnection remote = AsyncConnection.create("TCP", request.getHostSocketAddress(), 6, 6); + final AsyncConnection remote = AsyncConnection.create("TCP", request.getAsynchronousChannelGroup(), request.getHostSocketAddress(), 6, 6); remote.write(buffer, null, new CompletionHandler() { @Override @@ -80,7 +80,7 @@ public final class SocksProxyServlet extends SocksServlet { private void connect(SocksRequest request, SocksResponse response) throws IOException { final InetSocketAddress remoteAddress = request.parseSocketAddress(); final AsyncConnection remote = remoteAddress.getPort() == 443 - ? AsyncConnection.create(Utility.createDefaultSSLSocket(remoteAddress)) : AsyncConnection.create("TCP", remoteAddress, 6, 6); + ? AsyncConnection.create(Utility.createDefaultSSLSocket(remoteAddress)) : AsyncConnection.create("TCP", request.getAsynchronousChannelGroup(), remoteAddress, 6, 6); final ByteBuffer buffer0 = response.getContext().pollBuffer(); buffer0.put("HTTP/1.1 200 Connection established\r\nConnection: close\r\n\r\n".getBytes()); buffer0.flip(); diff --git a/src-plugin/org/redkale/net/socks/SocksRequest.java b/src-plugin/org/redkale/net/socks/SocksRequest.java index b17726d3c..c906f6449 100644 --- a/src-plugin/org/redkale/net/socks/SocksRequest.java +++ b/src-plugin/org/redkale/net/socks/SocksRequest.java @@ -6,10 +6,10 @@ package org.redkale.net.socks; import org.redkale.net.AsyncConnection; -import org.redkale.net.http.HttpContext; import org.redkale.net.http.HttpRequest; import java.net.*; import java.nio.*; +import java.nio.channels.*; /** * @@ -22,7 +22,7 @@ public class SocksRequest extends HttpRequest { private short requestid; - protected SocksRequest(HttpContext context) { + protected SocksRequest(SocksContext context) { super(context, null); } @@ -43,6 +43,10 @@ public class SocksRequest extends HttpRequest { return HttpRequest.parseSocketAddress(getRequestURI()); } + public AsynchronousChannelGroup getAsynchronousChannelGroup() { + return ((SocksContext) context).getAsynchronousChannelGroup(); + } + @Override protected InetSocketAddress getHostSocketAddress() { return super.getHostSocketAddress(); diff --git a/src-plugin/org/redkale/net/socks/SocksResponse.java b/src-plugin/org/redkale/net/socks/SocksResponse.java index 9b734b94a..6c46d2190 100644 --- a/src-plugin/org/redkale/net/socks/SocksResponse.java +++ b/src-plugin/org/redkale/net/socks/SocksResponse.java @@ -7,7 +7,6 @@ package org.redkale.net.socks; import org.redkale.net.AsyncConnection; import org.redkale.util.ObjectPool; -import org.redkale.net.Context; import org.redkale.net.http.HttpResponse; import org.redkale.util.Creator; import org.redkale.net.Response; @@ -20,7 +19,7 @@ import java.util.concurrent.atomic.*; */ public class SocksResponse extends HttpResponse { - protected SocksResponse(Context context, SocksRequest request) { + protected SocksResponse(SocksContext context, SocksRequest request) { super(context, request, (String[][]) null, (String[][]) null, null); } diff --git a/src-plugin/org/redkale/net/socks/SocksRunner.java b/src-plugin/org/redkale/net/socks/SocksRunner.java index 196596cb6..91523b9d0 100644 --- a/src-plugin/org/redkale/net/socks/SocksRunner.java +++ b/src-plugin/org/redkale/net/socks/SocksRunner.java @@ -6,7 +6,6 @@ package org.redkale.net.socks; import org.redkale.net.AsyncConnection; -import org.redkale.net.Context; import java.net.*; import java.nio.*; import java.nio.channels.*; @@ -25,7 +24,7 @@ public class SocksRunner implements Runnable { private final boolean finest; - private final Context context; + private final SocksContext context; private final byte[] bindAddressBytes; @@ -37,7 +36,7 @@ public class SocksRunner implements Runnable { private AsyncConnection remoteChannel; - public SocksRunner(Context context, AsyncConnection channel, final byte[] bindAddressBytes) { + public SocksRunner(SocksContext context, AsyncConnection channel, final byte[] bindAddressBytes) { this.context = context; this.logger = context.getLogger(); this.finest = this.context.getLogger().isLoggable(Level.FINEST); @@ -102,12 +101,14 @@ public class SocksRunner implements Runnable { return; } try { - remoteChannel = AsyncConnection.create("TCP", remoteAddress, 6, 6); + remoteChannel = AsyncConnection.create("TCP", context.getAsynchronousChannelGroup(), remoteAddress, 6, 6); buffer.clear(); buffer.putChar((char) 0x0500); buffer.put((byte) 0x00); //rsv buffer.put(bindAddressBytes); buffer.flip(); + final ByteBuffer rbuffer = context.pollBuffer(); + final ByteBuffer wbuffer = context.pollBuffer(); channel.write(buffer, null, new CompletionHandler() { @Override @@ -121,6 +122,8 @@ public class SocksRunner implements Runnable { @Override public void failed(Throwable exc, Void attachment) { + context.offerBuffer(rbuffer); + context.offerBuffer(wbuffer); closeRunner(exc); } }); @@ -155,8 +158,8 @@ public class SocksRunner implements Runnable { } private void stream() { - new StreamCompletionHandler(channel, remoteChannel).completed(0, null); - new StreamCompletionHandler(remoteChannel, channel).completed(0, null); + new StreamCompletionHandler(channel, remoteChannel).completed(1, null); + new StreamCompletionHandler(remoteChannel, channel).completed(1, null); } public void closeRunner(final Throwable e) { @@ -178,15 +181,15 @@ public class SocksRunner implements Runnable { private class StreamCompletionHandler implements CompletionHandler { - private final AsyncConnection conn1; + private final AsyncConnection readconn; - private final AsyncConnection conn2; + private final AsyncConnection writeconn; private final ByteBuffer rbuffer; public StreamCompletionHandler(AsyncConnection conn1, AsyncConnection conn2) { - this.conn1 = conn1; - this.conn2 = conn2; + this.readconn = conn1; + this.writeconn = conn2; this.rbuffer = context.pollBuffer(); this.rbuffer.flip(); } @@ -195,16 +198,24 @@ public class SocksRunner implements Runnable { public void completed(Integer result0, Void v0) { final CompletionHandler self = this; if (rbuffer.hasRemaining()) { - conn2.write(rbuffer, null, self); + writeconn.write(rbuffer, null, self); + return; + } + if (result0 < 1) { + self.failed(null, v0); return; } rbuffer.clear(); - conn1.read(rbuffer, null, new CompletionHandler() { + readconn.read(rbuffer, null, new CompletionHandler() { @Override public void completed(Integer result, Void attachment) { + if (result < 1) { + self.failed(null, attachment); + return; + } rbuffer.flip(); - conn2.write(rbuffer, attachment, self); + writeconn.write(rbuffer, attachment, self); } @Override @@ -217,8 +228,8 @@ public class SocksRunner implements Runnable { @Override public void failed(Throwable exc, Void v) { context.offerBuffer(rbuffer); - conn1.dispose(); - conn2.dispose(); + readconn.dispose(); + writeconn.dispose(); if (finest) logger.log(Level.FINEST, "StreamCompletionHandler closed", exc); } } diff --git a/src-plugin/org/redkale/net/socks/SocksServer.java b/src-plugin/org/redkale/net/socks/SocksServer.java index 4799b39ed..1392d3b41 100644 --- a/src-plugin/org/redkale/net/socks/SocksServer.java +++ b/src-plugin/org/redkale/net/socks/SocksServer.java @@ -7,7 +7,6 @@ package org.redkale.net.socks; import org.redkale.util.AnyValue; import org.redkale.net.Server; -import org.redkale.net.http.HttpContext; import org.redkale.util.ObjectPool; import org.redkale.net.Context; import org.redkale.watch.WatchFactory; @@ -57,7 +56,7 @@ public final class SocksServer extends Server { AtomicLong createResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SOCKS_" + port + ".Response.creatCounter"); AtomicLong cycleResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SOCKS_" + port + ".Response.cycleCounter"); ObjectPool responsePool = SocksResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null); - HttpContext localcontext = new HttpContext(this.serverStartTime, this.logger, executor, rcapacity, bufferPool, responsePool, + SocksContext localcontext = new SocksContext(this.serverStartTime, this.logger, executor, rcapacity, bufferPool, responsePool, this.maxbody, this.charset, this.address, this.prepare, this.watch, this.readTimeoutSecond, this.writeTimeoutSecond, ""); responsePool.setCreator((Object... params) -> new SocksResponse(localcontext, new SocksRequest(localcontext))); return localcontext; diff --git a/src/org/redkale/net/AsyncConnection.java b/src/org/redkale/net/AsyncConnection.java index 959a92c4e..24ea98616 100644 --- a/src/org/redkale/net/AsyncConnection.java +++ b/src/org/redkale/net/AsyncConnection.java @@ -82,8 +82,8 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl } //------------------------------------------------------------------------------------------------------------------------------ - public static AsyncConnection create(final String protocol, final SocketAddress address) throws IOException { - return create(protocol, address, 0, 0); + public static AsyncConnection create(final String protocol, final AsynchronousChannelGroup group, final SocketAddress address) throws IOException { + return create(protocol, group, address, 0, 0); } /** @@ -91,15 +91,16 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl * * @param protocol * @param address + * @param group * @param readTimeoutSecond0 * @param writeTimeoutSecond0 * @return * @throws java.io.IOException */ - public static AsyncConnection create(final String protocol, final SocketAddress address, + public static AsyncConnection create(final String protocol, final AsynchronousChannelGroup group, final SocketAddress address, final int readTimeoutSecond0, final int writeTimeoutSecond0) throws IOException { if ("TCP".equalsIgnoreCase(protocol)) { - AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(); + AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group); try { channel.connect(address).get(3, TimeUnit.SECONDS); } catch (Exception e) { @@ -177,7 +178,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl int rs = 0; for (int i = offset; i < offset + length; i++) { rs += channel.send(srcs[i], remoteAddress); - if(i != offset) Thread.sleep(10); + if (i != offset) Thread.sleep(10); } if (handler != null) handler.completed(rs, attachment); } catch (Exception e) { @@ -425,9 +426,10 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl } /** - * 通常用于 ssl socket + * 通常用于 ssl socket + * * @param socket - * @return + * @return */ public static AsyncConnection create(final Socket socket) { return create(socket, null, 0, 0); @@ -485,17 +487,17 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl channel.write(srcs, offset, length, writeTimeoutSecond > 0 ? writeTimeoutSecond : 60, TimeUnit.SECONDS, attachment, new CompletionHandler() { - @Override - public void completed(Long result, A attachment) { - handler.completed(result.intValue(), attachment); - } + @Override + public void completed(Long result, A attachment) { + handler.completed(result.intValue(), attachment); + } - @Override - public void failed(Throwable exc, A attachment) { - handler.failed(exc, attachment); - } + @Override + public void failed(Throwable exc, A attachment) { + handler.failed(exc, attachment); + } - }); + }); } @Override diff --git a/src/org/redkale/net/http/HttpContext.java b/src/org/redkale/net/http/HttpContext.java index 95612378e..7815d8e68 100644 --- a/src/org/redkale/net/http/HttpContext.java +++ b/src/org/redkale/net/http/HttpContext.java @@ -20,7 +20,7 @@ import org.redkale.watch.*; * @see http://www.redkale.org * @author zhangjx */ -public final class HttpContext extends Context { +public class HttpContext extends Context { protected final String contextPath;