This commit is contained in:
Redkale
2020-06-24 18:04:42 +08:00
parent 1d030183bb
commit 7eb2a405d3
5 changed files with 29 additions and 25 deletions

View File

@@ -25,43 +25,47 @@ import org.redkale.util.*;
*/ */
public abstract class AsyncConnection implements AutoCloseable { public abstract class AsyncConnection implements AutoCloseable {
protected SSLContext sslContext; private SSLContext sslContext;
protected Map<String, Object> attributes; //用于存储绑定在Connection上的对象集合 private Map<String, Object> attributes; //用于存储绑定在Connection上的对象集合
protected Object subobject; //用于存储绑定在Connection上的对象 同attributes 只绑定单个对象时尽量使用subobject而非attributes private Object subobject; //用于存储绑定在Connection上的对象 同attributes 只绑定单个对象时尽量使用subobject而非attributes
protected volatile long readtime; protected volatile long readtime;
protected volatile long writetime; protected volatile long writetime;
protected final Supplier<ByteBuffer> bufferSupplier; private final Supplier<ByteBuffer> bufferSupplier;
protected final Consumer<ByteBuffer> bufferConsumer; private final Consumer<ByteBuffer> bufferConsumer;
private ByteBuffer readBuffer; private ByteBuffer readBuffer;
//在线数 //在线数
protected AtomicLong livingCounter; private AtomicLong livingCounter;
//关闭数 //关闭数
protected AtomicLong closedCounter; private AtomicLong closedCounter;
protected Consumer<AsyncConnection> beforeCloseListener; private Consumer<AsyncConnection> beforeCloseListener;
//关联的事件数, 小于1表示没有事件 //关联的事件数, 小于1表示没有事件
protected final AtomicInteger eventing = new AtomicInteger(); private final AtomicInteger eventing = new AtomicInteger();
protected AsyncConnection(ObjectPool<ByteBuffer> bufferPool, SSLContext sslContext) { protected AsyncConnection(ObjectPool<ByteBuffer> bufferPool, SSLContext sslContext,
this(bufferPool, bufferPool, sslContext); final AtomicLong livingCounter, final AtomicLong closedCounter) {
this(bufferPool, bufferPool, sslContext, livingCounter, closedCounter);
} }
protected AsyncConnection(Supplier<ByteBuffer> bufferSupplier, Consumer<ByteBuffer> bufferConsumer, SSLContext sslContext) { protected AsyncConnection(Supplier<ByteBuffer> bufferSupplier, Consumer<ByteBuffer> bufferConsumer, SSLContext sslContext,
final AtomicLong livingCounter, final AtomicLong closedCounter) {
Objects.requireNonNull(bufferSupplier); Objects.requireNonNull(bufferSupplier);
Objects.requireNonNull(bufferConsumer); Objects.requireNonNull(bufferConsumer);
this.bufferSupplier = bufferSupplier; this.bufferSupplier = bufferSupplier;
this.bufferConsumer = bufferConsumer; this.bufferConsumer = bufferConsumer;
this.sslContext = sslContext; this.sslContext = sslContext;
this.livingCounter = livingCounter;
this.closedCounter = closedCounter;
} }
public Supplier<ByteBuffer> getBufferSupplier() { public Supplier<ByteBuffer> getBufferSupplier() {

View File

@@ -223,7 +223,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
public void finish(final byte[] bs) { public void finish(final byte[] bs) {
if (!this.inited) return; //避免重复关闭 if (!this.inited) return; //避免重复关闭
if (this.context.bufferCapacity == bs.length) { if (this.context.bufferCapacity == bs.length) {
ByteBuffer buffer = channel.bufferSupplier.get(); ByteBuffer buffer = channel.getBufferSupplier().get();
buffer.put(bs); buffer.put(bs);
buffer.flip(); buffer.flip();
this.finish(buffer); this.finish(buffer);

View File

@@ -39,7 +39,7 @@ class TcpAioAsyncConnection extends AsyncConnection {
final AsynchronousSocketChannel ch, final SSLContext sslContext, final SocketAddress addr0, final AsynchronousSocketChannel ch, final SSLContext sslContext, final SocketAddress addr0,
final int readTimeoutSeconds, final int writeTimeoutSeconds, final int readTimeoutSeconds, final int writeTimeoutSeconds,
final AtomicLong livingCounter, final AtomicLong closedCounter) { final AtomicLong livingCounter, final AtomicLong closedCounter) {
super(bufferSupplier, bufferConsumer, sslContext); super(bufferSupplier, bufferConsumer, sslContext, livingCounter, closedCounter);
this.channel = ch; this.channel = ch;
this.readTimeoutSeconds = readTimeoutSeconds; this.readTimeoutSeconds = readTimeoutSeconds;
this.writeTimeoutSeconds = writeTimeoutSeconds; this.writeTimeoutSeconds = writeTimeoutSeconds;
@@ -52,8 +52,6 @@ class TcpAioAsyncConnection extends AsyncConnection {
} }
} }
this.remoteAddress = addr; this.remoteAddress = addr;
this.livingCounter = livingCounter;
this.closedCounter = closedCounter;
} }
@Override @Override

View File

@@ -37,7 +37,7 @@ class UdpBioAsyncConnection extends AsyncConnection {
final DatagramChannel ch, final SSLContext sslContext, SocketAddress addr0, final boolean client0, final DatagramChannel ch, final SSLContext sslContext, SocketAddress addr0, final boolean client0,
final int readTimeoutSeconds0, final int writeTimeoutSeconds0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0,
final AtomicLong livingCounter, final AtomicLong closedCounter) { final AtomicLong livingCounter, final AtomicLong closedCounter) {
super(bufferSupplier, bufferConsumer, sslContext); super(bufferSupplier, bufferConsumer, sslContext, livingCounter, closedCounter);
this.channel = ch; this.channel = ch;
this.client = client0; this.client = client0;
this.readTimeoutSeconds = readTimeoutSeconds0; this.readTimeoutSeconds = readTimeoutSeconds0;
@@ -51,8 +51,6 @@ class UdpBioAsyncConnection extends AsyncConnection {
} }
} }
this.remoteAddress = addr; this.remoteAddress = addr;
this.livingCounter = livingCounter;
this.closedCounter = closedCounter;
} }
@Override @Override

View File

@@ -11,6 +11,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.*; import java.util.function.*;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import org.redkale.net.AsyncConnection; import org.redkale.net.AsyncConnection;
@@ -67,9 +68,10 @@ class TcpNioAsyncConnection extends AsyncConnection {
private SelectionKey writeKey; private SelectionKey writeKey;
public TcpNioAsyncConnection(NioThreadGroup ioGroup, NioThread ioThread, ExecutorService workExecutor, ObjectPool<ByteBuffer> bufferPool, SocketChannel ch, public TcpNioAsyncConnection(NioThreadGroup ioGroup, NioThread ioThread, ExecutorService workExecutor,
SSLContext sslContext, final SocketAddress addr0) { ObjectPool<ByteBuffer> bufferPool, SocketChannel ch,
super(bufferPool, sslContext); SSLContext sslContext, final SocketAddress addr0, AtomicLong livingCounter, AtomicLong closedCounter) {
super(bufferPool, sslContext, livingCounter, closedCounter);
this.ioGroup = ioGroup; this.ioGroup = ioGroup;
this.ioThread = ioThread; this.ioThread = ioThread;
this.workExecutor = workExecutor; this.workExecutor = workExecutor;
@@ -85,9 +87,11 @@ class TcpNioAsyncConnection extends AsyncConnection {
this.remoteAddress = addr; this.remoteAddress = addr;
} }
public TcpNioAsyncConnection(NioThreadGroup ioGroup, NioThread ioThread, ExecutorService workExecutor, Supplier<ByteBuffer> bufferSupplier, Consumer<ByteBuffer> bufferConsumer, public TcpNioAsyncConnection(NioThreadGroup ioGroup, NioThread ioThread, ExecutorService workExecutor,
SocketChannel ch, SSLContext sslContext, final SocketAddress addr0) { Supplier<ByteBuffer> bufferSupplier, Consumer<ByteBuffer> bufferConsumer,
super(bufferSupplier, bufferConsumer, sslContext); SocketChannel ch, SSLContext sslContext, final SocketAddress addr0,
AtomicLong livingCounter, AtomicLong closedCounter) {
super(bufferSupplier, bufferConsumer, sslContext, livingCounter, closedCounter);
this.ioGroup = ioGroup; this.ioGroup = ioGroup;
this.ioThread = ioThread; this.ioThread = ioThread;
this.workExecutor = workExecutor; this.workExecutor = workExecutor;