From f9d250b43c16d7b91aa35b4c97f0a66bcb815c55 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Mon, 17 Dec 2018 10:29:23 +0800 Subject: [PATCH] --- src/org/redkale/net/AsyncConnection2.java | 365 ++++++++++++++++++++++ 1 file changed, 365 insertions(+) create mode 100644 src/org/redkale/net/AsyncConnection2.java diff --git a/src/org/redkale/net/AsyncConnection2.java b/src/org/redkale/net/AsyncConnection2.java new file mode 100644 index 000000000..469d29a71 --- /dev/null +++ b/src/org/redkale/net/AsyncConnection2.java @@ -0,0 +1,365 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.net; + +import java.io.IOException; +import java.net.*; +import java.nio.ByteBuffer; +import java.nio.channels.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.function.*; +import javax.net.ssl.SSLContext; + +/** + * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + */ +public abstract class AsyncConnection2 implements ReadableByteChannel, WritableByteChannel, AutoCloseable { + + protected SSLContext sslContext; + + protected Map attributes; //用于存储绑定在Connection上的对象集合 + + protected Object subobject; //用于存储绑定在Connection上的对象, 同attributes, 只绑定单个对象时尽量使用subobject而非attributes + + protected volatile long readtime; + + protected volatile long writetime; + + protected final Supplier bufferSupplier; + + protected final Consumer bufferConsumer; + + protected ByteBuffer readBuffer; + + //在线数 + protected AtomicLong livingCounter; + + //关闭数 + protected AtomicLong closedCounter; + + protected Consumer beforeCloseListener; + + //关联的事件数, 小于1表示没有事件 + protected final AtomicInteger eventing = new AtomicInteger(); + + protected AsyncConnection2(Supplier bufferSupplier, Consumer bufferConsumer) { + Objects.requireNonNull(bufferSupplier); + Objects.requireNonNull(bufferConsumer); + this.bufferSupplier = bufferSupplier; + this.bufferConsumer = bufferConsumer; + } + + public final long getLastReadTime() { + return readtime; + } + + public final long getLastWriteTime() { + return writetime; + } + + public final int increEventing() { + return eventing.incrementAndGet(); + } + + public final int decreEventing() { + return eventing.decrementAndGet(); + } + + @Override + public abstract boolean isOpen(); + + public abstract boolean isTCP(); + + public abstract boolean shutdownInput(); + + public abstract boolean shutdownOutput(); + + public abstract boolean setOption(SocketOption name, T value); + + public abstract Set> supportedOptions(); + + public abstract SocketAddress getRemoteAddress(); + + public abstract SocketAddress getLocalAddress(); + + public abstract int getReadTimeoutSeconds(); + + public abstract int getWriteTimeoutSeconds(); + + public abstract void setReadTimeoutSeconds(int readTimeoutSeconds); + + public abstract void setWriteTimeoutSeconds(int writeTimeoutSeconds); + + @Override + public abstract int read(ByteBuffer dst) throws IOException; + + public abstract void read(CompletionHandler handler); + + public abstract void read(long timeout, TimeUnit unit, CompletionHandler handler); + + @Override + public abstract int write(ByteBuffer src) throws IOException; + + public abstract void write(ByteBuffer src, A attachment, CompletionHandler handler); + + public final void write(ByteBuffer[] srcs, A attachment, CompletionHandler handler) { + write(srcs, 0, srcs.length, attachment, handler); + } + + public abstract void write(ByteBuffer[] srcs, int offset, int length, A attachment, CompletionHandler handler); + + public void setReadBuffer(ByteBuffer buffer) { + if (this.readBuffer != null) throw new RuntimeException("repeat AsyncConnection.currentReadBuffer"); + this.readBuffer = buffer; + } + + public ByteBuffer pollReadBuffer() { + ByteBuffer rs = this.readBuffer; + if (rs != null) { + this.readBuffer = null; + return rs; + } + return bufferSupplier.get(); + } + + public void offerReadBuffer(ByteBuffer buffer) { + if (buffer == null) return; + bufferConsumer.accept(buffer); + } + + public void offerReadBuffer(ByteBuffer... buffers) { + if (buffers == null) return; + for (ByteBuffer buffer : buffers) { + bufferConsumer.accept(buffer); + } + } + + public ByteBuffer pollWriteBuffer() { + return bufferSupplier.get(); + } + + public void offerWriteBuffer(ByteBuffer buffer) { + if (buffer == null) return; + bufferConsumer.accept(buffer); + } + + public void offerWriteBuffer(ByteBuffer... buffers) { + if (buffers == null) return; + for (ByteBuffer buffer : buffers) { + bufferConsumer.accept(buffer); + } + } + + public void dispose() {//同close, 只是去掉throws IOException + try { + this.close(); + } catch (IOException io) { + } + } + + public AsyncConnection2 beforeCloseListener(Consumer beforeCloseListener) { + this.beforeCloseListener = beforeCloseListener; + return this; + } + + @Override + public void close() throws IOException { + if (closedCounter != null) { + closedCounter.incrementAndGet(); + closedCounter = null; + } + if (livingCounter != null) { + livingCounter.decrementAndGet(); + livingCounter = null; + } + if (beforeCloseListener != null) { + try { + beforeCloseListener.accept(this); + } catch (Exception io) { + } + } + if (this.readBuffer != null) { + bufferConsumer.accept(this.readBuffer); + } + if (attributes == null) return; + try { + for (Object obj : attributes.values()) { + if (obj instanceof AutoCloseable) ((AutoCloseable) obj).close(); + } + } catch (Exception io) { + } + } + + @SuppressWarnings("unchecked") + public final T getSubobject() { + return (T) this.subobject; + } + + public void setSubobject(Object value) { + this.subobject = value; + } + + public void setAttribute(String name, Object value) { + if (this.attributes == null) this.attributes = new HashMap<>(); + this.attributes.put(name, value); + } + + @SuppressWarnings("unchecked") + public final T getAttribute(String name) { + return (T) (this.attributes == null ? null : this.attributes.get(name)); + } + + public final void removeAttribute(String name) { + if (this.attributes != null) this.attributes.remove(name); + } + + public final Map getAttributes() { + return this.attributes; + } + + public final void clearAttribute() { + if (this.attributes != null) this.attributes.clear(); + } + + //------------------------------------------------------------------------------------------------------------------------------ + /** + * 创建TCP协议客户端连接 + * + * @param address 连接点子 + * @param group 连接AsynchronousChannelGroup + * @param readTimeoutSeconds 读取超时秒数 + * @param writeTimeoutSeconds 写入超时秒数 + * + * @return 连接CompletableFuture + */ + public static CompletableFuture createTCP(final AsynchronousChannelGroup group, final SocketAddress address, + final int readTimeoutSeconds, final int writeTimeoutSeconds) { + return createTCP(group, null, address, readTimeoutSeconds, writeTimeoutSeconds); + } + + /** + * 创建TCP协议客户端连接 + * + * @param address 连接点子 + * @param sslContext SSLContext + * @param group 连接AsynchronousChannelGroup + * @param readTimeoutSeconds 读取超时秒数 + * @param writeTimeoutSeconds 写入超时秒数 + * + * @return 连接CompletableFuture + */ + public static CompletableFuture createTCP(final AsynchronousChannelGroup group, final SSLContext sslContext, + final SocketAddress address, final int readTimeoutSeconds, final int writeTimeoutSeconds) { + final CompletableFuture future = new CompletableFuture<>(); + try { + final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group); + try { + channel.setOption(StandardSocketOptions.TCP_NODELAY, true); + channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); + channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); + } catch (IOException e) { + } + channel.connect(address, null, new CompletionHandler() { + @Override + public void completed(Void result, Void attachment) { + future.complete(create(channel, sslContext, address, readTimeoutSeconds, writeTimeoutSeconds)); + } + + @Override + public void failed(Throwable exc, Void attachment) { + future.completeExceptionally(exc); + } + }); + } catch (IOException e) { + future.completeExceptionally(e); + } + return future; + } + + /** + * 通常用于 ssl socket + * + * @param socket Socket对象 + * + * @return 连接对象 + */ + public static AsyncConnection create(final Socket socket) { + return create(socket, null, 0, 0); + } + + public static AsyncConnection create(final Socket socket, final SocketAddress addr0, final int readTimeoutSecond0, final int writeTimeoutSecond0) { + return new TcpBioAsyncConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0, null, null); + } + + public static AsyncConnection create(final Socket socket, final SocketAddress addr0, final int readTimeoutSecond0, + final int writeTimeoutSecond0, final AtomicLong livingCounter, final AtomicLong closedCounter) { + return new TcpBioAsyncConnection(socket, addr0, readTimeoutSecond0, writeTimeoutSecond0, livingCounter, closedCounter); + } + + public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector, + final int readTimeoutSeconds0, final int writeTimeoutSeconds0) { + return new TcpNioAsyncConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, null, null); + } + + public static AsyncConnection create(final SocketChannel ch, final SocketAddress addr0, final Selector selector, final Context context) { + return new TcpNioAsyncConnection(ch, addr0, selector, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null); + } + + public static AsyncConnection create(final SocketChannel ch, SocketAddress addr, final Selector selector, + final int readTimeoutSeconds0, final int writeTimeoutSeconds0, + final AtomicLong livingCounter, final AtomicLong closedCounter) { + return new TcpNioAsyncConnection(ch, addr, selector, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter); + } + + public static AsyncConnection create(final DatagramChannel ch, SocketAddress addr, + final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0) { + return new UdpBioAsyncConnection(ch, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, null, null); + } + + public static AsyncConnection create(final DatagramChannel ch, SocketAddress addr, + final boolean client0, final int readTimeoutSeconds0, final int writeTimeoutSeconds0, + final AtomicLong livingCounter, final AtomicLong closedCounter) { + return new UdpBioAsyncConnection(ch, addr, client0, readTimeoutSeconds0, writeTimeoutSeconds0, livingCounter, closedCounter); + } + + public static AsyncConnection create(final AsynchronousSocketChannel ch) { + return create(ch, null, 0, 0); + } + + public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) { + return new TcpAioAsyncConnection(ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null); + } + + public static AsyncConnection create(final AsynchronousSocketChannel ch, SSLContext sslContext, final SocketAddress addr0, final int readTimeoutSeconds, final int writeTimeoutSeconds) { + return new TcpAioAsyncConnection(ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, null, null); + } + + public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final Context context) { + return new TcpAioAsyncConnection(ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, null, null); + } + + public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final int readTimeoutSeconds, + final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) { + return new TcpAioAsyncConnection(ch, null, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter); + } + + public static AsyncConnection create(final AsynchronousSocketChannel ch, SSLContext sslContext, final SocketAddress addr0, final int readTimeoutSeconds, + final int writeTimeoutSeconds, final AtomicLong livingCounter, final AtomicLong closedCounter) { + return new TcpAioAsyncConnection(ch, sslContext, addr0, readTimeoutSeconds, writeTimeoutSeconds, livingCounter, closedCounter); + } + + public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, + final Context context, final AtomicLong livingCounter, final AtomicLong closedCounter) { + return new TcpAioAsyncConnection(ch, context.sslContext, addr0, context.readTimeoutSeconds, context.writeTimeoutSeconds, livingCounter, closedCounter); + } + +}