This commit is contained in:
wentch
2015-12-21 15:06:18 +08:00
parent 754974b85f
commit 94b158a976
8 changed files with 56 additions and 41 deletions

View File

@@ -52,7 +52,7 @@ public class SocksConnectServlet extends SocksServlet {
@Override
public void execute(SocksRequest request, SocksResponse response) throws IOException {
response.getContext().submit(new SocksRunner(response.getContext(), response.removeChannel(), bindAddressBytes));
response.getContext().submit(new SocksRunner((SocksContext) response.getContext(), response.removeChannel(), bindAddressBytes));
response.finish(true);
}

View File

@@ -52,7 +52,7 @@ public final class SocksProxyServlet extends SocksServlet {
}
buffer.put(LINE);
buffer.flip();
final AsyncConnection remote = AsyncConnection.create("TCP", request.getHostSocketAddress(), 6, 6);
final AsyncConnection remote = AsyncConnection.create("TCP", request.getAsynchronousChannelGroup(), request.getHostSocketAddress(), 6, 6);
remote.write(buffer, null, new CompletionHandler<Integer, Void>() {
@Override
@@ -80,7 +80,7 @@ public final class SocksProxyServlet extends SocksServlet {
private void connect(SocksRequest request, SocksResponse response) throws IOException {
final InetSocketAddress remoteAddress = request.parseSocketAddress();
final AsyncConnection remote = remoteAddress.getPort() == 443
? AsyncConnection.create(Utility.createDefaultSSLSocket(remoteAddress)) : AsyncConnection.create("TCP", remoteAddress, 6, 6);
? AsyncConnection.create(Utility.createDefaultSSLSocket(remoteAddress)) : AsyncConnection.create("TCP", request.getAsynchronousChannelGroup(), remoteAddress, 6, 6);
final ByteBuffer buffer0 = response.getContext().pollBuffer();
buffer0.put("HTTP/1.1 200 Connection established\r\nConnection: close\r\n\r\n".getBytes());
buffer0.flip();

View File

@@ -6,10 +6,10 @@
package org.redkale.net.socks;
import org.redkale.net.AsyncConnection;
import org.redkale.net.http.HttpContext;
import org.redkale.net.http.HttpRequest;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
/**
*
@@ -22,7 +22,7 @@ public class SocksRequest extends HttpRequest {
private short requestid;
protected SocksRequest(HttpContext context) {
protected SocksRequest(SocksContext context) {
super(context, null);
}
@@ -43,6 +43,10 @@ public class SocksRequest extends HttpRequest {
return HttpRequest.parseSocketAddress(getRequestURI());
}
public AsynchronousChannelGroup getAsynchronousChannelGroup() {
return ((SocksContext) context).getAsynchronousChannelGroup();
}
@Override
protected InetSocketAddress getHostSocketAddress() {
return super.getHostSocketAddress();

View File

@@ -7,7 +7,6 @@ package org.redkale.net.socks;
import org.redkale.net.AsyncConnection;
import org.redkale.util.ObjectPool;
import org.redkale.net.Context;
import org.redkale.net.http.HttpResponse;
import org.redkale.util.Creator;
import org.redkale.net.Response;
@@ -20,7 +19,7 @@ import java.util.concurrent.atomic.*;
*/
public class SocksResponse extends HttpResponse<SocksRequest> {
protected SocksResponse(Context context, SocksRequest request) {
protected SocksResponse(SocksContext context, SocksRequest request) {
super(context, request, (String[][]) null, (String[][]) null, null);
}

View File

@@ -6,7 +6,6 @@
package org.redkale.net.socks;
import org.redkale.net.AsyncConnection;
import org.redkale.net.Context;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
@@ -25,7 +24,7 @@ public class SocksRunner implements Runnable {
private final boolean finest;
private final Context context;
private final SocksContext context;
private final byte[] bindAddressBytes;
@@ -37,7 +36,7 @@ public class SocksRunner implements Runnable {
private AsyncConnection remoteChannel;
public SocksRunner(Context context, AsyncConnection channel, final byte[] bindAddressBytes) {
public SocksRunner(SocksContext context, AsyncConnection channel, final byte[] bindAddressBytes) {
this.context = context;
this.logger = context.getLogger();
this.finest = this.context.getLogger().isLoggable(Level.FINEST);
@@ -102,12 +101,14 @@ public class SocksRunner implements Runnable {
return;
}
try {
remoteChannel = AsyncConnection.create("TCP", remoteAddress, 6, 6);
remoteChannel = AsyncConnection.create("TCP", context.getAsynchronousChannelGroup(), remoteAddress, 6, 6);
buffer.clear();
buffer.putChar((char) 0x0500);
buffer.put((byte) 0x00); //rsv
buffer.put(bindAddressBytes);
buffer.flip();
final ByteBuffer rbuffer = context.pollBuffer();
final ByteBuffer wbuffer = context.pollBuffer();
channel.write(buffer, null, new CompletionHandler<Integer, Void>() {
@Override
@@ -121,6 +122,8 @@ public class SocksRunner implements Runnable {
@Override
public void failed(Throwable exc, Void attachment) {
context.offerBuffer(rbuffer);
context.offerBuffer(wbuffer);
closeRunner(exc);
}
});
@@ -155,8 +158,8 @@ public class SocksRunner implements Runnable {
}
private void stream() {
new StreamCompletionHandler(channel, remoteChannel).completed(0, null);
new StreamCompletionHandler(remoteChannel, channel).completed(0, null);
new StreamCompletionHandler(channel, remoteChannel).completed(1, null);
new StreamCompletionHandler(remoteChannel, channel).completed(1, null);
}
public void closeRunner(final Throwable e) {
@@ -178,15 +181,15 @@ public class SocksRunner implements Runnable {
private class StreamCompletionHandler implements CompletionHandler<Integer, Void> {
private final AsyncConnection conn1;
private final AsyncConnection readconn;
private final AsyncConnection conn2;
private final AsyncConnection writeconn;
private final ByteBuffer rbuffer;
public StreamCompletionHandler(AsyncConnection conn1, AsyncConnection conn2) {
this.conn1 = conn1;
this.conn2 = conn2;
this.readconn = conn1;
this.writeconn = conn2;
this.rbuffer = context.pollBuffer();
this.rbuffer.flip();
}
@@ -195,16 +198,24 @@ public class SocksRunner implements Runnable {
public void completed(Integer result0, Void v0) {
final CompletionHandler self = this;
if (rbuffer.hasRemaining()) {
conn2.write(rbuffer, null, self);
writeconn.write(rbuffer, null, self);
return;
}
if (result0 < 1) {
self.failed(null, v0);
return;
}
rbuffer.clear();
conn1.read(rbuffer, null, new CompletionHandler<Integer, Void>() {
readconn.read(rbuffer, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
if (result < 1) {
self.failed(null, attachment);
return;
}
rbuffer.flip();
conn2.write(rbuffer, attachment, self);
writeconn.write(rbuffer, attachment, self);
}
@Override
@@ -217,8 +228,8 @@ public class SocksRunner implements Runnable {
@Override
public void failed(Throwable exc, Void v) {
context.offerBuffer(rbuffer);
conn1.dispose();
conn2.dispose();
readconn.dispose();
writeconn.dispose();
if (finest) logger.log(Level.FINEST, "StreamCompletionHandler closed", exc);
}
}

View File

@@ -7,7 +7,6 @@ package org.redkale.net.socks;
import org.redkale.util.AnyValue;
import org.redkale.net.Server;
import org.redkale.net.http.HttpContext;
import org.redkale.util.ObjectPool;
import org.redkale.net.Context;
import org.redkale.watch.WatchFactory;
@@ -57,7 +56,7 @@ public final class SocksServer extends Server {
AtomicLong createResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SOCKS_" + port + ".Response.creatCounter");
AtomicLong cycleResponseCounter = watch == null ? new AtomicLong() : watch.createWatchNumber("SOCKS_" + port + ".Response.cycleCounter");
ObjectPool<Response> responsePool = SocksResponse.createPool(createResponseCounter, cycleResponseCounter, this.responsePoolSize, null);
HttpContext localcontext = new HttpContext(this.serverStartTime, this.logger, executor, rcapacity, bufferPool, responsePool,
SocksContext localcontext = new SocksContext(this.serverStartTime, this.logger, executor, rcapacity, bufferPool, responsePool,
this.maxbody, this.charset, this.address, this.prepare, this.watch, this.readTimeoutSecond, this.writeTimeoutSecond, "");
responsePool.setCreator((Object... params) -> new SocksResponse(localcontext, new SocksRequest(localcontext)));
return localcontext;