diff --git a/src/com/wentch/redkale/net/ProtocolServer.java b/src/com/wentch/redkale/net/ProtocolServer.java index e6aaa001d..93af4682e 100644 --- a/src/com/wentch/redkale/net/ProtocolServer.java +++ b/src/com/wentch/redkale/net/ProtocolServer.java @@ -9,6 +9,7 @@ import java.io.IOException; import java.net.*; import java.nio.ByteBuffer; import java.nio.channels.*; +import java.util.concurrent.*; /** * @@ -16,6 +17,8 @@ import java.nio.channels.*; */ public abstract class ProtocolServer { + protected static final boolean winos = System.getProperty("os.name").contains("Window"); + public abstract void open() throws IOException; public abstract void bind(SocketAddress local, int backlog) throws IOException; @@ -31,7 +34,7 @@ public abstract class ProtocolServer { //--------------------------------------------------------------------- public static ProtocolServer create(String protocol, Context context) { if ("TCP".equalsIgnoreCase(protocol)) return new ProtocolTCPServer(context); - if ("UDP".equalsIgnoreCase(protocol)) return new ProtocolUDPServer(context); + if ("UDP".equalsIgnoreCase(protocol)) return winos ? new ProtocolUDPWinServer(context) : new ProtocolUDPServer(context); throw new RuntimeException("ProtocolServer not support protocol " + protocol); } @@ -98,6 +101,79 @@ public abstract class ProtocolServer { } + private static final class ProtocolUDPWinServer extends ProtocolServer { + + private boolean running; + + private final Context context; + + private DatagramChannel serverChannel; + + public ProtocolUDPWinServer(Context context) { + this.context = context; + } + + @Override + public void open() throws IOException { + DatagramChannel ch = DatagramChannel.open(); + ch.configureBlocking(true); + this.serverChannel = ch; + } + + @Override + public void bind(SocketAddress local, int backlog) throws IOException { + this.serverChannel.bind(local); + } + + @Override + public void setOption(SocketOption name, T value) throws IOException { + this.serverChannel.setOption(name, value); + } + + @Override + public void accept() { + final DatagramChannel serchannel = this.serverChannel; + final int readTimeoutSecond = this.context.readTimeoutSecond; + final int writeTimeoutSecond = this.context.writeTimeoutSecond; + final CountDownLatch cdl = new CountDownLatch(1); + this.running = true; + new Thread() { + @Override + public void run() { + cdl.countDown(); + while (running) { + final ByteBuffer buffer = context.pollBuffer(); + try { + SocketAddress address = serchannel.receive(buffer); + buffer.flip(); + AsyncConnection conn = AsyncConnection.create(serchannel, address, false, readTimeoutSecond, writeTimeoutSecond); + context.submit(new PrepareRunner(context, conn, buffer)); + } catch (Exception e) { + context.offerBuffer(buffer); + } + } + } + }.start(); + try { + cdl.await(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void close() throws IOException { + this.running = false; + this.serverChannel.close(); + } + + @Override + public AsynchronousChannelGroup getChannelGroup() { + return null; + } + + } + private static final class ProtocolTCPServer extends ProtocolServer { private final Context context;