This commit is contained in:
Redkale
2018-05-09 16:19:00 +08:00
parent 8416826827
commit 13ffa2a3e5
2 changed files with 10 additions and 3 deletions

View File

@@ -469,6 +469,10 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
return new NIOTCPAsyncConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, null, null); return new NIOTCPAsyncConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, null, null);
} }
public static AsyncConnection create(final SocketChannel ch, final SocketAddress addr0, final Selector selector, final Context context) {
return new NIOTCPAsyncConnection(ch, addr0, selector, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null);
}
public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector, public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) { final AtomicLong livingCounter, final AtomicLong closedCounter) {

View File

@@ -420,7 +420,7 @@ public abstract class ProtocolServer {
} }
public void addChannel(SocketChannel channel) throws IOException { public void addChannel(SocketChannel channel) throws IOException {
AsyncConnection conn = AsyncConnection.create(channel, null, this.selector, 0, 0); AsyncConnection conn = AsyncConnection.create(channel, null, this.selector, context);
context.runAsync(new PrepareRunner(context, conn, null, null)); context.runAsync(new PrepareRunner(context, conn, null, null));
} }
@@ -463,7 +463,8 @@ public abstract class ProtocolServer {
try { try {
final int rs = socket.read(conn.readBuffer); final int rs = socket.read(conn.readBuffer);
key.interestOps(SelectionKey.OP_CONNECT); key.interestOps(SelectionKey.OP_CONNECT);
if (rs <= 0) return; //System.out.println(conn + "------readbuf:" + conn.readBuffer + "-------handler:" + conn.readHandler + "-------read: " + rs);
if(conn.readHandler == null) return;
context.runAsync(() -> conn.completeRead(rs)); context.runAsync(() -> conn.completeRead(rs));
} catch (Throwable t) { } catch (Throwable t) {
context.runAsync(() -> conn.faileRead(t)); context.runAsync(() -> conn.faileRead(t));
@@ -478,7 +479,8 @@ public abstract class ProtocolServer {
int offset = conn.writeOffset; int offset = conn.writeOffset;
int length = conn.writeLength; int length = conn.writeLength;
for (;;) { for (;;) {
rs += socket.write(buffers, offset, length); long sr = socket.write(buffers, offset, length);
if (sr > 0) rs += sr;
boolean over = true; boolean over = true;
int end = offset + length; int end = offset + length;
for (int i = offset; i < end; i++) { for (int i = offset; i < end; i++) {
@@ -496,6 +498,7 @@ public abstract class ProtocolServer {
} }
key.interestOps(SelectionKey.OP_CONNECT); key.interestOps(SelectionKey.OP_CONNECT);
final int rs0 = rs; final int rs0 = rs;
//System.out.println(conn + "------buffers:" + conn.writeBuffers + "---onebuf:" + conn.writeOneBuffer + "-------handler:" + conn.writeHandler + "-------write: " + rs);
context.runAsync(() -> conn.completeWrite(rs0)); context.runAsync(() -> conn.completeWrite(rs0));
} catch (Throwable t) { } catch (Throwable t) {
context.runAsync(() -> conn.faileWrite(t)); context.runAsync(() -> conn.faileWrite(t));