This commit is contained in:
地平线
2015-10-30 16:09:43 +08:00
parent 3833975e1f
commit 1db4f70651
3 changed files with 172 additions and 13 deletions

View File

@@ -206,6 +206,131 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
} }
} }
private static class BIOUDPAsyncConnection extends AsyncConnection {
private int readTimeoutSecond;
private int writeTimeoutSecond;
private final DatagramChannel channel;
private final SocketAddress remoteAddress;
private final boolean client;
public BIOUDPAsyncConnection(final DatagramChannel ch, SocketAddress addr,
final boolean client0, final int readTimeoutSecond0, final int writeTimeoutSecond0) {
this.channel = ch;
if (client0) {
try {
this.channel.configureBlocking(true);
} catch (IOException e) {
e.printStackTrace();
}
}
this.client = client0;
this.readTimeoutSecond = readTimeoutSecond0;
this.writeTimeoutSecond = writeTimeoutSecond0;
this.remoteAddress = addr;
}
@Override
public void setReadTimeoutSecond(int readTimeoutSecond) {
this.readTimeoutSecond = readTimeoutSecond;
}
@Override
public void setWriteTimeoutSecond(int writeTimeoutSecond) {
this.writeTimeoutSecond = writeTimeoutSecond;
}
@Override
public int getReadTimeoutSecond() {
return this.readTimeoutSecond;
}
@Override
public int getWriteTimeoutSecond() {
return this.writeTimeoutSecond;
}
@Override
public final SocketAddress getRemoteAddress() {
return remoteAddress;
}
@Override
protected <A> void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = 0;
for (int i = offset; i < offset + length; i++) {
rs += channel.write(srcs[i]);
}
if (handler != null) handler.completed(rs, attachment);
} catch (IOException e) {
if (handler != null) handler.failed(e, attachment);
}
}
@Override
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = channel.read(dst);
if (handler != null) handler.completed(rs, attachment);
} catch (IOException e) {
if (handler != null) handler.failed(e, attachment);
}
}
@Override
public Future<Integer> read(ByteBuffer dst) {
try {
int rs = channel.read(dst);
return new SimpleFuture(rs);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
try {
int rs = channel.write(src);
if (handler != null) handler.completed(rs, attachment);
} catch (IOException e) {
if (handler != null) handler.failed(e, attachment);
}
}
@Override
public Future<Integer> write(ByteBuffer src) {
try {
int rs = channel.write(src);
return new SimpleFuture(rs);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public final void close() throws IOException {
super.close();
if (client) {
channel.close();
}
}
@Override
public final boolean isOpen() {
return channel.isOpen();
}
@Override
public final boolean isTCP() {
return false;
}
}
public static AsyncConnection create(final AsyncDatagramChannel ch, SocketAddress addr, final boolean client0) { public static AsyncConnection create(final AsyncDatagramChannel ch, SocketAddress addr, final boolean client0) {
return create(ch, addr, client0, 0, 0); return create(ch, addr, client0, 0, 0);
} }
@@ -215,6 +340,11 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
return new AIOUDPAsyncConnection(ch, addr, client0, readTimeoutSecond0, writeTimeoutSecond0); return new AIOUDPAsyncConnection(ch, addr, client0, readTimeoutSecond0, writeTimeoutSecond0);
} }
public static AsyncConnection create(final DatagramChannel ch, SocketAddress addr,
final boolean client0, final int readTimeoutSecond0, final int writeTimeoutSecond0) {
return new BIOUDPAsyncConnection(ch, addr, client0, readTimeoutSecond0, writeTimeoutSecond0);
}
private static class SimpleFuture implements Future<Integer> { private static class SimpleFuture implements Future<Integer> {
private final int rs; private final int rs;

View File

@@ -23,6 +23,8 @@ public final class Transport {
protected static final int MAX_POOL_LIMIT = 16; protected static final int MAX_POOL_LIMIT = 16;
protected final boolean aio;
protected final String name; protected final String name;
protected final int bufferPoolSize; protected final int bufferPoolSize;
@@ -40,12 +42,17 @@ public final class Transport {
protected final ConcurrentHashMap<SocketAddress, BlockingQueue<AsyncConnection>> connPool = new ConcurrentHashMap<>(); protected final ConcurrentHashMap<SocketAddress, BlockingQueue<AsyncConnection>> connPool = new ConcurrentHashMap<>();
public Transport(Transport transport, InetSocketAddress localAddress, Collection<Transport> transports) { public Transport(Transport transport, InetSocketAddress localAddress, Collection<Transport> transports) {
this(transport.name, transport.protocol, null, transport.bufferPoolSize, parse(localAddress, transports)); this(transport.name, transport.protocol, transport.aio, null, transport.bufferPoolSize, parse(localAddress, transports));
} }
public Transport(String name, String protocol, WatchFactory watch, int bufferPoolSize, Collection<InetSocketAddress> addresses) { public Transport(String name, String protocol, WatchFactory watch, int bufferPoolSize, Collection<InetSocketAddress> addresses) {
this(name, protocol, false, watch, bufferPoolSize, addresses);
}
public Transport(String name, String protocol, boolean aio, WatchFactory watch, int bufferPoolSize, Collection<InetSocketAddress> addresses) {
this.name = name; this.name = name;
this.protocol = protocol; this.protocol = protocol;
this.aio = aio;
this.bufferPoolSize = bufferPoolSize; this.bufferPoolSize = bufferPoolSize;
AsynchronousChannelGroup g = null; AsynchronousChannelGroup g = null;
try { try {
@@ -120,8 +127,9 @@ public final class Transport {
final boolean rand = addr == null; final boolean rand = addr == null;
try { try {
if ("TCP".equalsIgnoreCase(protocol)) { if ("TCP".equalsIgnoreCase(protocol)) {
Socket socket = null;
AsynchronousSocketChannel channel = null; AsynchronousSocketChannel channel = null;
if (rand) { if (rand) { //随机取地址
int p = 0; int p = 0;
for (int i = index.get(); i < remoteAddres.length; i++) { for (int i = index.get(); i < remoteAddres.length; i++) {
p = i; p = i;
@@ -133,29 +141,51 @@ public final class Transport {
if (conn.isOpen()) return conn; if (conn.isOpen()) return conn;
} }
} }
if (channel == null) channel = AsynchronousSocketChannel.open(group); if (aio) {
if (channel == null) channel = AsynchronousSocketChannel.open(group);
} else {
if (socket == null) socket = new Socket();
}
try { try {
channel.connect(addr).get(1, TimeUnit.SECONDS); if (aio) {
channel.connect(addr).get(1, TimeUnit.SECONDS);
} else {
socket.connect(addr, 1000);
}
break; break;
} catch (Exception iex) { } catch (Exception iex) {
if (i == remoteAddres.length - 1) { if (i == remoteAddres.length - 1) {
p = 0; p = 0;
socket = null;
channel = null; channel = null;
} }
} }
} }
index.set(p); index.set(p);
} else { } else {
channel = AsynchronousSocketChannel.open(group); if (aio) {
channel.connect(addr).get(2, TimeUnit.SECONDS); channel = AsynchronousSocketChannel.open(group);
channel.connect(addr).get(1, TimeUnit.SECONDS);
} else {
socket = new Socket();
socket.connect(addr, 1000);
}
} }
if (channel == null) return null; if (aio && channel == null) return null;
return AsyncConnection.create(channel, addr, 0, 0); if (!aio && socket == null) return null;
} else { return aio ? AsyncConnection.create(channel, addr, 3000, 3000) : AsyncConnection.create(socket, addr, 3000, 3000);
} else { // UDP
if (rand) addr = remoteAddres[0]; if (rand) addr = remoteAddres[0];
AsyncDatagramChannel channel = AsyncDatagramChannel.open(group); if (aio) {
channel.connect(addr); AsyncDatagramChannel channel = AsyncDatagramChannel.open(group);
return AsyncConnection.create(channel, addr, true, 0, 0); channel.connect(addr);
return AsyncConnection.create(channel, addr, true, 3000, 3000);
} else {
DatagramChannel socket = DatagramChannel.open();
socket.configureBlocking(true);
socket.connect(addr);
return AsyncConnection.create(socket, addr, true, 3000, 3000);
}
} }
} catch (Exception ex) { } catch (Exception ex) {
throw new RuntimeException("transport address = " + addr, ex); throw new RuntimeException("transport address = " + addr, ex);

View File

@@ -244,7 +244,6 @@ public final class SncpClient {
buffer.put(all, pos, len); buffer.put(all, pos, len);
pos += len; pos += len;
buffer.flip(); buffer.flip();
Thread.sleep(10);
conn.write(buffer).get(writeto > 0 ? writeto : 5, TimeUnit.SECONDS); conn.write(buffer).get(writeto > 0 ? writeto : 5, TimeUnit.SECONDS);
buffer.clear(); buffer.clear();
} }