This commit is contained in:
地平线
2015-11-02 11:02:58 +08:00
parent 4dd706f491
commit 314746d2cb

View File

@@ -9,6 +9,7 @@ import java.io.IOException;
import java.net.*; import java.net.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.concurrent.*;
/** /**
* *
@@ -16,6 +17,8 @@ import java.nio.channels.*;
*/ */
public abstract class ProtocolServer { public abstract class ProtocolServer {
protected static final boolean winos = System.getProperty("os.name").contains("Window");
public abstract void open() throws IOException; public abstract void open() throws IOException;
public abstract void bind(SocketAddress local, int backlog) throws IOException; public abstract void bind(SocketAddress local, int backlog) throws IOException;
@@ -31,7 +34,7 @@ public abstract class ProtocolServer {
//--------------------------------------------------------------------- //---------------------------------------------------------------------
public static ProtocolServer create(String protocol, Context context) { public static ProtocolServer create(String protocol, Context context) {
if ("TCP".equalsIgnoreCase(protocol)) return new ProtocolTCPServer(context); if ("TCP".equalsIgnoreCase(protocol)) return new ProtocolTCPServer(context);
if ("UDP".equalsIgnoreCase(protocol)) return new ProtocolUDPServer(context); if ("UDP".equalsIgnoreCase(protocol)) return winos ? new ProtocolUDPWinServer(context) : new ProtocolUDPServer(context);
throw new RuntimeException("ProtocolServer not support protocol " + protocol); throw new RuntimeException("ProtocolServer not support protocol " + protocol);
} }
@@ -98,6 +101,79 @@ public abstract class ProtocolServer {
} }
private static final class ProtocolUDPWinServer extends ProtocolServer {
private boolean running;
private final Context context;
private DatagramChannel serverChannel;
public ProtocolUDPWinServer(Context context) {
this.context = context;
}
@Override
public void open() throws IOException {
DatagramChannel ch = DatagramChannel.open();
ch.configureBlocking(true);
this.serverChannel = ch;
}
@Override
public void bind(SocketAddress local, int backlog) throws IOException {
this.serverChannel.bind(local);
}
@Override
public <T> void setOption(SocketOption<T> name, T value) throws IOException {
this.serverChannel.setOption(name, value);
}
@Override
public void accept() {
final DatagramChannel serchannel = this.serverChannel;
final int readTimeoutSecond = this.context.readTimeoutSecond;
final int writeTimeoutSecond = this.context.writeTimeoutSecond;
final CountDownLatch cdl = new CountDownLatch(1);
this.running = true;
new Thread() {
@Override
public void run() {
cdl.countDown();
while (running) {
final ByteBuffer buffer = context.pollBuffer();
try {
SocketAddress address = serchannel.receive(buffer);
buffer.flip();
AsyncConnection conn = AsyncConnection.create(serchannel, address, false, readTimeoutSecond, writeTimeoutSecond);
context.submit(new PrepareRunner(context, conn, buffer));
} catch (Exception e) {
context.offerBuffer(buffer);
}
}
}
}.start();
try {
cdl.await();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void close() throws IOException {
this.running = false;
this.serverChannel.close();
}
@Override
public AsynchronousChannelGroup getChannelGroup() {
return null;
}
}
private static final class ProtocolTCPServer extends ProtocolServer { private static final class ProtocolTCPServer extends ProtocolServer {
private final Context context; private final Context context;