From 1db4f706516934aeb4bfba820b3fa0a9fec13d9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=9C=B0=E5=B9=B3=E7=BA=BF?= <22250530@qq.com> Date: Fri, 30 Oct 2015 16:09:43 +0800 Subject: [PATCH] --- .../wentch/redkale/net/AsyncConnection.java | 130 ++++++++++++++++++ src/com/wentch/redkale/net/Transport.java | 54 ++++++-- .../wentch/redkale/net/sncp/SncpClient.java | 1 - 3 files changed, 172 insertions(+), 13 deletions(-) diff --git a/src/com/wentch/redkale/net/AsyncConnection.java b/src/com/wentch/redkale/net/AsyncConnection.java index 7c648c6d9..dcbca565b 100644 --- a/src/com/wentch/redkale/net/AsyncConnection.java +++ b/src/com/wentch/redkale/net/AsyncConnection.java @@ -206,6 +206,131 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl } } + private static class BIOUDPAsyncConnection extends AsyncConnection { + + private int readTimeoutSecond; + + private int writeTimeoutSecond; + + private final DatagramChannel channel; + + private final SocketAddress remoteAddress; + + private final boolean client; + + public BIOUDPAsyncConnection(final DatagramChannel ch, SocketAddress addr, + final boolean client0, final int readTimeoutSecond0, final int writeTimeoutSecond0) { + this.channel = ch; + if (client0) { + try { + this.channel.configureBlocking(true); + } catch (IOException e) { + e.printStackTrace(); + } + } + this.client = client0; + this.readTimeoutSecond = readTimeoutSecond0; + this.writeTimeoutSecond = writeTimeoutSecond0; + this.remoteAddress = addr; + } + + @Override + public void setReadTimeoutSecond(int readTimeoutSecond) { + this.readTimeoutSecond = readTimeoutSecond; + } + + @Override + public void setWriteTimeoutSecond(int writeTimeoutSecond) { + this.writeTimeoutSecond = writeTimeoutSecond; + } + + @Override + public int getReadTimeoutSecond() { + return this.readTimeoutSecond; + } + + @Override + public int getWriteTimeoutSecond() { + return this.writeTimeoutSecond; + } + + @Override + public final SocketAddress getRemoteAddress() { + return remoteAddress; + } + + @Override + protected void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler) { + try { + int rs = 0; + for (int i = offset; i < offset + length; i++) { + rs += channel.write(srcs[i]); + } + if (handler != null) handler.completed(rs, attachment); + } catch (IOException e) { + if (handler != null) handler.failed(e, attachment); + } + } + + @Override + public void read(ByteBuffer dst, A attachment, CompletionHandler handler) { + try { + int rs = channel.read(dst); + if (handler != null) handler.completed(rs, attachment); + } catch (IOException e) { + if (handler != null) handler.failed(e, attachment); + } + } + + @Override + public Future read(ByteBuffer dst) { + try { + int rs = channel.read(dst); + return new SimpleFuture(rs); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void write(ByteBuffer src, A attachment, CompletionHandler handler) { + try { + int rs = channel.write(src); + if (handler != null) handler.completed(rs, attachment); + } catch (IOException e) { + if (handler != null) handler.failed(e, attachment); + } + } + + @Override + public Future write(ByteBuffer src) { + try { + int rs = channel.write(src); + return new SimpleFuture(rs); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public final void close() throws IOException { + super.close(); + if (client) { + channel.close(); + } + } + + @Override + public final boolean isOpen() { + return channel.isOpen(); + } + + @Override + public final boolean isTCP() { + return false; + } + } + public static AsyncConnection create(final AsyncDatagramChannel ch, SocketAddress addr, final boolean client0) { return create(ch, addr, client0, 0, 0); } @@ -215,6 +340,11 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl return new AIOUDPAsyncConnection(ch, addr, client0, readTimeoutSecond0, writeTimeoutSecond0); } + public static AsyncConnection create(final DatagramChannel ch, SocketAddress addr, + final boolean client0, final int readTimeoutSecond0, final int writeTimeoutSecond0) { + return new BIOUDPAsyncConnection(ch, addr, client0, readTimeoutSecond0, writeTimeoutSecond0); + } + private static class SimpleFuture implements Future { private final int rs; diff --git a/src/com/wentch/redkale/net/Transport.java b/src/com/wentch/redkale/net/Transport.java index 5368af39c..0a1ddd3cc 100644 --- a/src/com/wentch/redkale/net/Transport.java +++ b/src/com/wentch/redkale/net/Transport.java @@ -23,6 +23,8 @@ public final class Transport { protected static final int MAX_POOL_LIMIT = 16; + protected final boolean aio; + protected final String name; protected final int bufferPoolSize; @@ -40,12 +42,17 @@ public final class Transport { protected final ConcurrentHashMap> connPool = new ConcurrentHashMap<>(); public Transport(Transport transport, InetSocketAddress localAddress, Collection transports) { - this(transport.name, transport.protocol, null, transport.bufferPoolSize, parse(localAddress, transports)); + this(transport.name, transport.protocol, transport.aio, null, transport.bufferPoolSize, parse(localAddress, transports)); } public Transport(String name, String protocol, WatchFactory watch, int bufferPoolSize, Collection addresses) { + this(name, protocol, false, watch, bufferPoolSize, addresses); + } + + public Transport(String name, String protocol, boolean aio, WatchFactory watch, int bufferPoolSize, Collection addresses) { this.name = name; this.protocol = protocol; + this.aio = aio; this.bufferPoolSize = bufferPoolSize; AsynchronousChannelGroup g = null; try { @@ -120,8 +127,9 @@ public final class Transport { final boolean rand = addr == null; try { if ("TCP".equalsIgnoreCase(protocol)) { + Socket socket = null; AsynchronousSocketChannel channel = null; - if (rand) { + if (rand) { //随机取地址 int p = 0; for (int i = index.get(); i < remoteAddres.length; i++) { p = i; @@ -133,29 +141,51 @@ public final class Transport { if (conn.isOpen()) return conn; } } - if (channel == null) channel = AsynchronousSocketChannel.open(group); + if (aio) { + if (channel == null) channel = AsynchronousSocketChannel.open(group); + } else { + if (socket == null) socket = new Socket(); + } try { - channel.connect(addr).get(1, TimeUnit.SECONDS); + if (aio) { + channel.connect(addr).get(1, TimeUnit.SECONDS); + } else { + socket.connect(addr, 1000); + } break; } catch (Exception iex) { if (i == remoteAddres.length - 1) { p = 0; + socket = null; channel = null; } } } index.set(p); } else { - channel = AsynchronousSocketChannel.open(group); - channel.connect(addr).get(2, TimeUnit.SECONDS); + if (aio) { + channel = AsynchronousSocketChannel.open(group); + channel.connect(addr).get(1, TimeUnit.SECONDS); + } else { + socket = new Socket(); + socket.connect(addr, 1000); + } } - if (channel == null) return null; - return AsyncConnection.create(channel, addr, 0, 0); - } else { + if (aio && channel == null) return null; + if (!aio && socket == null) return null; + return aio ? AsyncConnection.create(channel, addr, 3000, 3000) : AsyncConnection.create(socket, addr, 3000, 3000); + } else { // UDP if (rand) addr = remoteAddres[0]; - AsyncDatagramChannel channel = AsyncDatagramChannel.open(group); - channel.connect(addr); - return AsyncConnection.create(channel, addr, true, 0, 0); + if (aio) { + AsyncDatagramChannel channel = AsyncDatagramChannel.open(group); + channel.connect(addr); + return AsyncConnection.create(channel, addr, true, 3000, 3000); + } else { + DatagramChannel socket = DatagramChannel.open(); + socket.configureBlocking(true); + socket.connect(addr); + return AsyncConnection.create(socket, addr, true, 3000, 3000); + } } } catch (Exception ex) { throw new RuntimeException("transport address = " + addr, ex); diff --git a/src/com/wentch/redkale/net/sncp/SncpClient.java b/src/com/wentch/redkale/net/sncp/SncpClient.java index b89f463e9..68feeeac7 100644 --- a/src/com/wentch/redkale/net/sncp/SncpClient.java +++ b/src/com/wentch/redkale/net/sncp/SncpClient.java @@ -244,7 +244,6 @@ public final class SncpClient { buffer.put(all, pos, len); pos += len; buffer.flip(); - Thread.sleep(10); conn.write(buffer).get(writeto > 0 ? writeto : 5, TimeUnit.SECONDS); buffer.clear(); }