From e0411a94f6aedd7f3d9e082d79ae0969690f3d83 Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Sat, 27 May 2017 20:44:47 +0800 Subject: [PATCH] --- src/org/redkale/convert/ConvertFactory.java | 5 + src/org/redkale/convert/StreamDecoder.java | 94 +++++++++++++++++++ src/org/redkale/convert/StreamEncoder.java | 90 ++++++++++++++++++ src/org/redkale/net/http/WebSocketRunner.java | 22 +---- src/org/redkale/source/CacheMemorySource.java | 11 +++ src/org/redkale/source/CacheSource.java | 4 + 6 files changed, 209 insertions(+), 17 deletions(-) create mode 100644 src/org/redkale/convert/StreamDecoder.java create mode 100644 src/org/redkale/convert/StreamEncoder.java diff --git a/src/org/redkale/convert/ConvertFactory.java b/src/org/redkale/convert/ConvertFactory.java index f3a13f7fd..88350dca9 100644 --- a/src/org/redkale/convert/ConvertFactory.java +++ b/src/org/redkale/convert/ConvertFactory.java @@ -14,6 +14,7 @@ import java.nio.channels.CompletionHandler; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Pattern; +import java.util.stream.Stream; import org.redkale.convert.ext.InetAddressSimpledCoder.InetSocketAddressSimpledCoder; import org.redkale.convert.ext.*; import org.redkale.util.*; @@ -397,6 +398,8 @@ public abstract class ConvertFactory { decoder = new ArrayDecoder(this, type); } else if (Collection.class.isAssignableFrom(clazz)) { decoder = new CollectionDecoder(this, type); + } else if (Stream.class.isAssignableFrom(clazz)) { + decoder = new StreamDecoder(this, type); } else if (Map.class.isAssignableFrom(clazz)) { decoder = new MapDecoder(this, type); } else if (clazz == Object.class) { @@ -480,6 +483,8 @@ public abstract class ConvertFactory { encoder = new ArrayEncoder(this, type); } else if (Collection.class.isAssignableFrom(clazz)) { encoder = new CollectionEncoder(this, type); + } else if (Stream.class.isAssignableFrom(clazz)) { + encoder = new StreamEncoder(this, type); } else if (Map.class.isAssignableFrom(clazz)) { encoder = new MapEncoder(this, type); } else if (clazz == Object.class) { diff --git a/src/org/redkale/convert/StreamDecoder.java b/src/org/redkale/convert/StreamDecoder.java new file mode 100644 index 000000000..5e1e35d62 --- /dev/null +++ b/src/org/redkale/convert/StreamDecoder.java @@ -0,0 +1,94 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.convert; + +import org.redkale.util.Creator; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.*; +import java.util.stream.Stream; + +/** + * Stream的反序列化操作类
+ * 支持一定程度的泛型。
+ * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * @param 反解析的集合元素类型 + */ +@SuppressWarnings("unchecked") +public final class StreamDecoder implements Decodeable> { + + private final Type type; + + private final Type componentType; + + protected Creator> creator; + + protected final Decodeable decoder; + + private boolean inited = false; + + private final Object lock = new Object(); + + public StreamDecoder(final ConvertFactory factory, final Type type) { + this.type = type; + try { + if (type instanceof ParameterizedType) { + final ParameterizedType pt = (ParameterizedType) type; + this.componentType = pt.getActualTypeArguments()[0]; + this.creator = factory.loadCreator((Class) pt.getRawType()); + factory.register(type, this); + this.decoder = factory.loadDecoder(this.componentType); + } else { + throw new ConvertException("StreamDecoder not support the type (" + type + ")"); + } + } finally { + inited = true; + synchronized (lock) { + lock.notifyAll(); + } + } + } + + @Override + public Stream convertFrom(Reader in) { + final int len = in.readArrayB(); + if (len == Reader.SIGN_NULL) return null; + if (this.decoder == null) { + if (!this.inited) { + synchronized (lock) { + try { + lock.wait(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + final Decodeable localdecoder = this.decoder; + final List result = new ArrayList(); + if (len == Reader.SIGN_NOLENGTH) { + while (in.hasNext()) { + result.add(localdecoder.convertFrom(in)); + } + } else { + for (int i = 0; i < len; i++) { + result.add(localdecoder.convertFrom(in)); + } + } + in.readArrayE(); + return result.stream(); + } + + @Override + public Type getType() { + return type; + } + +} diff --git a/src/org/redkale/convert/StreamEncoder.java b/src/org/redkale/convert/StreamEncoder.java new file mode 100644 index 000000000..553d35e63 --- /dev/null +++ b/src/org/redkale/convert/StreamEncoder.java @@ -0,0 +1,90 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.redkale.convert; + +import java.lang.reflect.*; +import java.util.stream.Stream; + +/** + * Stream的序列化操作类
+ * 支持一定程度的泛型。
+ * + *

+ * 详情见: https://redkale.org + * + * @author zhangjx + * @param 序列化的集合元素类型 + */ +@SuppressWarnings("unchecked") +public final class StreamEncoder implements Encodeable> { + + private final Type type; + + private final Encodeable encoder; + + private boolean inited = false; + + private final Object lock = new Object(); + + public StreamEncoder(final ConvertFactory factory, final Type type) { + this.type = type; + try { + if (type instanceof ParameterizedType) { + Type t = ((ParameterizedType) type).getActualTypeArguments()[0]; + if (t instanceof TypeVariable) { + this.encoder = factory.getAnyEncoder(); + } else { + this.encoder = factory.loadEncoder(t); + } + } else { + this.encoder = factory.getAnyEncoder(); + } + } finally { + inited = true; + synchronized (lock) { + lock.notifyAll(); + } + } + } + + @Override + public void convertTo(Writer out, Stream value) { + if (value == null) { + out.writeNull(); + return; + } + Object[] array = value.toArray(); + if (array.length == 0) { + out.writeArrayB(0); + out.writeArrayE(); + return; + } + if (this.encoder == null) { + if (!this.inited) { + synchronized (lock) { + try { + lock.wait(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + out.writeArrayB(array.length); + boolean first = true; + for (Object v : array) { + if (!first) out.writeArrayMark(); + encoder.convertTo(out, v); + if (first) first = false; + } + out.writeArrayE(); + } + + @Override + public Type getType() { + return type; + } +} diff --git a/src/org/redkale/net/http/WebSocketRunner.java b/src/org/redkale/net/http/WebSocketRunner.java index b68390ae7..519421af5 100644 --- a/src/org/redkale/net/http/WebSocketRunner.java +++ b/src/org/redkale/net/http/WebSocketRunner.java @@ -38,8 +38,6 @@ class WebSocketRunner implements Runnable { private ByteBuffer readBuffer; - private ByteBuffer[] writeBuffers; - protected volatile boolean closed = false; private AtomicBoolean writing = new AtomicBoolean(); @@ -136,7 +134,7 @@ class WebSocketRunner implements Runnable { context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e); } } else if (packet.type == FrameType.BINARY) { - byte[] message =packet.getReceiveBytes(); + byte[] message = packet.getReceiveBytes(); if (readBuffer != null) { readBuffer.clear(); channel.read(readBuffer, null, this); @@ -218,7 +216,6 @@ class WebSocketRunner implements Runnable { return futureResult; } ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(this.context.getBufferSupplier()); - this.writeBuffers = buffers; try { channel.write(buffers, buffers, new CompletionHandler() { @@ -230,11 +227,10 @@ class WebSocketRunner implements Runnable { if (future != null) { future.complete(RETCODE_WSOCKET_CLOSED); future = null; - if (writeBuffers != null) { - for (ByteBuffer buf : writeBuffers) { + if (attachments != null) { + for (ByteBuffer buf : attachments) { context.offerBuffer(buf); } - writeBuffers = null; } } return; @@ -254,18 +250,16 @@ class WebSocketRunner implements Runnable { if (future != null) { future.complete(0); future = null; - if (writeBuffers != null) { - for (ByteBuffer buf : writeBuffers) { + if (attachments != null) { + for (ByteBuffer buf : attachments) { context.offerBuffer(buf); } - writeBuffers = null; } } QueueEntry entry = queue.poll(); if (entry != null) { future = entry.future; ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(context.getBufferSupplier()); - writeBuffers = buffers; channel.write(buffers, buffers, this); } } catch (Exception e) { @@ -304,12 +298,6 @@ class WebSocketRunner implements Runnable { } context.offerBuffer(readBuffer); readBuffer = null; - if (writeBuffers != null) { - for (ByteBuffer buf : writeBuffers) { - context.offerBuffer(buf); - } - writeBuffers = null; - } engine.remove(webSocket); webSocket.onClose(0, null); } diff --git a/src/org/redkale/source/CacheMemorySource.java b/src/org/redkale/source/CacheMemorySource.java index f94d3d7e3..9a73b3126 100644 --- a/src/org/redkale/source/CacheMemorySource.java +++ b/src/org/redkale/source/CacheMemorySource.java @@ -324,6 +324,17 @@ public class CacheMemorySource extends return CompletableFuture.supplyAsync(() -> getCollection(key), getExecutor()); } + @Override + public long getCollectionSize(final K key) { + Collection collection = (Collection) get(key); + return collection == null ? 0 : collection.size(); + } + + @Override + public CompletableFuture getCollectionSizeAsync(final K key) { + return CompletableFuture.supplyAsync(() -> getCollectionSize(key), getExecutor()); + } + @Override public Collection getCollectionAndRefresh(final K key, final int expireSeconds) { return (Collection) getAndRefresh(key, expireSeconds); diff --git a/src/org/redkale/source/CacheSource.java b/src/org/redkale/source/CacheSource.java index bb7200b49..f455db6ca 100644 --- a/src/org/redkale/source/CacheSource.java +++ b/src/org/redkale/source/CacheSource.java @@ -42,6 +42,8 @@ public interface CacheSource { public Collection getCollection(final K key); + public long getCollectionSize(final K key); + public Collection getCollectionAndRefresh(final K key, final int expireSeconds); public void appendListItem(final K key, final V value); @@ -71,6 +73,8 @@ public interface CacheSource { public CompletableFuture> getCollectionAsync(final K key); + public CompletableFuture getCollectionSizeAsync(final K key); + public CompletableFuture> getCollectionAndRefreshAsync(final K key, final int expireSeconds); public CompletableFuture appendListItemAsync(final K key, final V value);