From 7eb2a405d3a2125ddda9407697b208a578b7df38 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Wed, 24 Jun 2020 18:04:42 +0800 Subject: [PATCH] --- src/org/redkale/net/AsyncConnection.java | 28 +++++++++++-------- src/org/redkale/net/Response.java | 2 +- .../redkale/net/TcpAioAsyncConnection.java | 4 +-- .../redkale/net/UdpBioAsyncConnection.java | 4 +-- .../net/nio/TcpNioAsyncConnection.java | 16 +++++++---- 5 files changed, 29 insertions(+), 25 deletions(-) diff --git a/src/org/redkale/net/AsyncConnection.java b/src/org/redkale/net/AsyncConnection.java index ab89fc9b8..89e16561a 100644 --- a/src/org/redkale/net/AsyncConnection.java +++ b/src/org/redkale/net/AsyncConnection.java @@ -25,43 +25,47 @@ import org.redkale.util.*; */ public abstract class AsyncConnection implements AutoCloseable { - protected SSLContext sslContext; + private SSLContext sslContext; - protected Map attributes; //用于存储绑定在Connection上的对象集合 + private Map attributes; //用于存储绑定在Connection上的对象集合 - protected Object subobject; //用于存储绑定在Connection上的对象, 同attributes, 只绑定单个对象时尽量使用subobject而非attributes + private Object subobject; //用于存储绑定在Connection上的对象, 同attributes, 只绑定单个对象时尽量使用subobject而非attributes protected volatile long readtime; protected volatile long writetime; - protected final Supplier bufferSupplier; + private final Supplier bufferSupplier; - protected final Consumer bufferConsumer; + private final Consumer bufferConsumer; private ByteBuffer readBuffer; //在线数 - protected AtomicLong livingCounter; + private AtomicLong livingCounter; //关闭数 - protected AtomicLong closedCounter; + private AtomicLong closedCounter; - protected Consumer beforeCloseListener; + private Consumer beforeCloseListener; //关联的事件数, 小于1表示没有事件 - protected final AtomicInteger eventing = new AtomicInteger(); + private final AtomicInteger eventing = new AtomicInteger(); - protected AsyncConnection(ObjectPool bufferPool, SSLContext sslContext) { - this(bufferPool, bufferPool, sslContext); + protected AsyncConnection(ObjectPool bufferPool, SSLContext sslContext, + final AtomicLong livingCounter, final AtomicLong closedCounter) { + this(bufferPool, bufferPool, sslContext, livingCounter, closedCounter); } - protected AsyncConnection(Supplier bufferSupplier, Consumer bufferConsumer, SSLContext sslContext) { + protected AsyncConnection(Supplier bufferSupplier, Consumer bufferConsumer, SSLContext sslContext, + final AtomicLong livingCounter, final AtomicLong closedCounter) { Objects.requireNonNull(bufferSupplier); Objects.requireNonNull(bufferConsumer); this.bufferSupplier = bufferSupplier; this.bufferConsumer = bufferConsumer; this.sslContext = sslContext; + this.livingCounter = livingCounter; + this.closedCounter = closedCounter; } public Supplier getBufferSupplier() { diff --git a/src/org/redkale/net/Response.java b/src/org/redkale/net/Response.java index 7a824660d..95521632c 100644 --- a/src/org/redkale/net/Response.java +++ b/src/org/redkale/net/Response.java @@ -223,7 +223,7 @@ public abstract class Response> { public void finish(final byte[] bs) { if (!this.inited) return; //避免重复关闭 if (this.context.bufferCapacity == bs.length) { - ByteBuffer buffer = channel.bufferSupplier.get(); + ByteBuffer buffer = channel.getBufferSupplier().get(); buffer.put(bs); buffer.flip(); this.finish(buffer); diff --git a/src/org/redkale/net/TcpAioAsyncConnection.java b/src/org/redkale/net/TcpAioAsyncConnection.java index d2399b20c..608522751 100644 --- a/src/org/redkale/net/TcpAioAsyncConnection.java +++ b/src/org/redkale/net/TcpAioAsyncConnection.java @@ -39,7 +39,7 @@ class TcpAioAsyncConnection extends AsyncConnection { final AsynchronousSocketChannel ch, final SSLContext sslContext, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) { - super(bufferSupplier, bufferConsumer, sslContext); + super(bufferSupplier, bufferConsumer, sslContext, livingCounter, closedCounter); this.channel = ch; this.readTimeoutSeconds = readTimeoutSeconds; this.writeTimeoutSeconds = writeTimeoutSeconds; @@ -52,8 +52,6 @@ class TcpAioAsyncConnection extends AsyncConnection { } } this.remoteAddress = addr; - this.livingCounter = livingCounter; - this.closedCounter = closedCounter; } @Override diff --git a/src/org/redkale/net/UdpBioAsyncConnection.java b/src/org/redkale/net/UdpBioAsyncConnection.java index f3f18591f..65ee60bab 100644 --- a/src/org/redkale/net/UdpBioAsyncConnection.java +++ b/src/org/redkale/net/UdpBioAsyncConnection.java @@ -37,7 +37,7 @@ class UdpBioAsyncConnection extends AsyncConnection { final DatagramChannel ch, final SSLContext sslContext, SocketAddress addr0, final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0, final AtomicLong livingCounter, final AtomicLong closedCounter) { - super(bufferSupplier, bufferConsumer, sslContext); + super(bufferSupplier, bufferConsumer, sslContext, livingCounter, closedCounter); this.channel = ch; this.client = client0; this.readTimeoutSeconds = readTimeoutSeconds0; @@ -51,8 +51,6 @@ class UdpBioAsyncConnection extends AsyncConnection { } } this.remoteAddress = addr; - this.livingCounter = livingCounter; - this.closedCounter = closedCounter; } @Override diff --git a/src/org/redkale/net/nio/TcpNioAsyncConnection.java b/src/org/redkale/net/nio/TcpNioAsyncConnection.java index bf99cd098..264caa635 100644 --- a/src/org/redkale/net/nio/TcpNioAsyncConnection.java +++ b/src/org/redkale/net/nio/TcpNioAsyncConnection.java @@ -11,6 +11,7 @@ import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.*; import javax.net.ssl.SSLContext; import org.redkale.net.AsyncConnection; @@ -67,9 +68,10 @@ class TcpNioAsyncConnection extends AsyncConnection { private SelectionKey writeKey; - public TcpNioAsyncConnection(NioThreadGroup ioGroup, NioThread ioThread, ExecutorService workExecutor, ObjectPool bufferPool, SocketChannel ch, - SSLContext sslContext, final SocketAddress addr0) { - super(bufferPool, sslContext); + public TcpNioAsyncConnection(NioThreadGroup ioGroup, NioThread ioThread, ExecutorService workExecutor, + ObjectPool bufferPool, SocketChannel ch, + SSLContext sslContext, final SocketAddress addr0, AtomicLong livingCounter, AtomicLong closedCounter) { + super(bufferPool, sslContext, livingCounter, closedCounter); this.ioGroup = ioGroup; this.ioThread = ioThread; this.workExecutor = workExecutor; @@ -85,9 +87,11 @@ class TcpNioAsyncConnection extends AsyncConnection { this.remoteAddress = addr; } - public TcpNioAsyncConnection(NioThreadGroup ioGroup, NioThread ioThread, ExecutorService workExecutor, Supplier bufferSupplier, Consumer bufferConsumer, - SocketChannel ch, SSLContext sslContext, final SocketAddress addr0) { - super(bufferSupplier, bufferConsumer, sslContext); + public TcpNioAsyncConnection(NioThreadGroup ioGroup, NioThread ioThread, ExecutorService workExecutor, + Supplier bufferSupplier, Consumer bufferConsumer, + SocketChannel ch, SSLContext sslContext, final SocketAddress addr0, + AtomicLong livingCounter, AtomicLong closedCounter) { + super(bufferSupplier, bufferConsumer, sslContext, livingCounter, closedCounter); this.ioGroup = ioGroup; this.ioThread = ioThread; this.workExecutor = workExecutor;