This commit is contained in:
@@ -14,6 +14,7 @@ import java.nio.channels.CompletionHandler;
|
|||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
import java.util.stream.Stream;
|
||||||
import org.redkale.convert.ext.InetAddressSimpledCoder.InetSocketAddressSimpledCoder;
|
import org.redkale.convert.ext.InetAddressSimpledCoder.InetSocketAddressSimpledCoder;
|
||||||
import org.redkale.convert.ext.*;
|
import org.redkale.convert.ext.*;
|
||||||
import org.redkale.util.*;
|
import org.redkale.util.*;
|
||||||
@@ -397,6 +398,8 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
|
|||||||
decoder = new ArrayDecoder(this, type);
|
decoder = new ArrayDecoder(this, type);
|
||||||
} else if (Collection.class.isAssignableFrom(clazz)) {
|
} else if (Collection.class.isAssignableFrom(clazz)) {
|
||||||
decoder = new CollectionDecoder(this, type);
|
decoder = new CollectionDecoder(this, type);
|
||||||
|
} else if (Stream.class.isAssignableFrom(clazz)) {
|
||||||
|
decoder = new StreamDecoder(this, type);
|
||||||
} else if (Map.class.isAssignableFrom(clazz)) {
|
} else if (Map.class.isAssignableFrom(clazz)) {
|
||||||
decoder = new MapDecoder(this, type);
|
decoder = new MapDecoder(this, type);
|
||||||
} else if (clazz == Object.class) {
|
} else if (clazz == Object.class) {
|
||||||
@@ -480,6 +483,8 @@ public abstract class ConvertFactory<R extends Reader, W extends Writer> {
|
|||||||
encoder = new ArrayEncoder(this, type);
|
encoder = new ArrayEncoder(this, type);
|
||||||
} else if (Collection.class.isAssignableFrom(clazz)) {
|
} else if (Collection.class.isAssignableFrom(clazz)) {
|
||||||
encoder = new CollectionEncoder(this, type);
|
encoder = new CollectionEncoder(this, type);
|
||||||
|
} else if (Stream.class.isAssignableFrom(clazz)) {
|
||||||
|
encoder = new StreamEncoder(this, type);
|
||||||
} else if (Map.class.isAssignableFrom(clazz)) {
|
} else if (Map.class.isAssignableFrom(clazz)) {
|
||||||
encoder = new MapEncoder(this, type);
|
encoder = new MapEncoder(this, type);
|
||||||
} else if (clazz == Object.class) {
|
} else if (clazz == Object.class) {
|
||||||
|
|||||||
94
src/org/redkale/convert/StreamDecoder.java
Normal file
94
src/org/redkale/convert/StreamDecoder.java
Normal file
@@ -0,0 +1,94 @@
|
|||||||
|
/*
|
||||||
|
* To change this license header, choose License Headers in Project Properties.
|
||||||
|
* To change this template file, choose Tools | Templates
|
||||||
|
* and open the template in the editor.
|
||||||
|
*/
|
||||||
|
package org.redkale.convert;
|
||||||
|
|
||||||
|
import org.redkale.util.Creator;
|
||||||
|
import java.lang.reflect.ParameterizedType;
|
||||||
|
import java.lang.reflect.Type;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stream的反序列化操作类 <br>
|
||||||
|
* 支持一定程度的泛型。 <br>
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* 详情见: https://redkale.org
|
||||||
|
*
|
||||||
|
* @author zhangjx
|
||||||
|
* @param <T> 反解析的集合元素类型
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public final class StreamDecoder<T> implements Decodeable<Reader, Stream<T>> {
|
||||||
|
|
||||||
|
private final Type type;
|
||||||
|
|
||||||
|
private final Type componentType;
|
||||||
|
|
||||||
|
protected Creator<Stream<T>> creator;
|
||||||
|
|
||||||
|
protected final Decodeable<Reader, T> decoder;
|
||||||
|
|
||||||
|
private boolean inited = false;
|
||||||
|
|
||||||
|
private final Object lock = new Object();
|
||||||
|
|
||||||
|
public StreamDecoder(final ConvertFactory factory, final Type type) {
|
||||||
|
this.type = type;
|
||||||
|
try {
|
||||||
|
if (type instanceof ParameterizedType) {
|
||||||
|
final ParameterizedType pt = (ParameterizedType) type;
|
||||||
|
this.componentType = pt.getActualTypeArguments()[0];
|
||||||
|
this.creator = factory.loadCreator((Class) pt.getRawType());
|
||||||
|
factory.register(type, this);
|
||||||
|
this.decoder = factory.loadDecoder(this.componentType);
|
||||||
|
} else {
|
||||||
|
throw new ConvertException("StreamDecoder not support the type (" + type + ")");
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
inited = true;
|
||||||
|
synchronized (lock) {
|
||||||
|
lock.notifyAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<T> convertFrom(Reader in) {
|
||||||
|
final int len = in.readArrayB();
|
||||||
|
if (len == Reader.SIGN_NULL) return null;
|
||||||
|
if (this.decoder == null) {
|
||||||
|
if (!this.inited) {
|
||||||
|
synchronized (lock) {
|
||||||
|
try {
|
||||||
|
lock.wait();
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
final Decodeable<Reader, T> localdecoder = this.decoder;
|
||||||
|
final List<T> result = new ArrayList();
|
||||||
|
if (len == Reader.SIGN_NOLENGTH) {
|
||||||
|
while (in.hasNext()) {
|
||||||
|
result.add(localdecoder.convertFrom(in));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for (int i = 0; i < len; i++) {
|
||||||
|
result.add(localdecoder.convertFrom(in));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
in.readArrayE();
|
||||||
|
return result.stream();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Type getType() {
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
90
src/org/redkale/convert/StreamEncoder.java
Normal file
90
src/org/redkale/convert/StreamEncoder.java
Normal file
@@ -0,0 +1,90 @@
|
|||||||
|
/*
|
||||||
|
* To change this license header, choose License Headers in Project Properties.
|
||||||
|
* To change this template file, choose Tools | Templates
|
||||||
|
* and open the template in the editor.
|
||||||
|
*/
|
||||||
|
package org.redkale.convert;
|
||||||
|
|
||||||
|
import java.lang.reflect.*;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stream的序列化操作类 <br>
|
||||||
|
* 支持一定程度的泛型。 <br>
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* 详情见: https://redkale.org
|
||||||
|
*
|
||||||
|
* @author zhangjx
|
||||||
|
* @param <T> 序列化的集合元素类型
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public final class StreamEncoder<T> implements Encodeable<Writer, Stream<T>> {
|
||||||
|
|
||||||
|
private final Type type;
|
||||||
|
|
||||||
|
private final Encodeable<Writer, Object> encoder;
|
||||||
|
|
||||||
|
private boolean inited = false;
|
||||||
|
|
||||||
|
private final Object lock = new Object();
|
||||||
|
|
||||||
|
public StreamEncoder(final ConvertFactory factory, final Type type) {
|
||||||
|
this.type = type;
|
||||||
|
try {
|
||||||
|
if (type instanceof ParameterizedType) {
|
||||||
|
Type t = ((ParameterizedType) type).getActualTypeArguments()[0];
|
||||||
|
if (t instanceof TypeVariable) {
|
||||||
|
this.encoder = factory.getAnyEncoder();
|
||||||
|
} else {
|
||||||
|
this.encoder = factory.loadEncoder(t);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
this.encoder = factory.getAnyEncoder();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
inited = true;
|
||||||
|
synchronized (lock) {
|
||||||
|
lock.notifyAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void convertTo(Writer out, Stream<T> value) {
|
||||||
|
if (value == null) {
|
||||||
|
out.writeNull();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Object[] array = value.toArray();
|
||||||
|
if (array.length == 0) {
|
||||||
|
out.writeArrayB(0);
|
||||||
|
out.writeArrayE();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (this.encoder == null) {
|
||||||
|
if (!this.inited) {
|
||||||
|
synchronized (lock) {
|
||||||
|
try {
|
||||||
|
lock.wait();
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
out.writeArrayB(array.length);
|
||||||
|
boolean first = true;
|
||||||
|
for (Object v : array) {
|
||||||
|
if (!first) out.writeArrayMark();
|
||||||
|
encoder.convertTo(out, v);
|
||||||
|
if (first) first = false;
|
||||||
|
}
|
||||||
|
out.writeArrayE();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Type getType() {
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -38,8 +38,6 @@ class WebSocketRunner implements Runnable {
|
|||||||
|
|
||||||
private ByteBuffer readBuffer;
|
private ByteBuffer readBuffer;
|
||||||
|
|
||||||
private ByteBuffer[] writeBuffers;
|
|
||||||
|
|
||||||
protected volatile boolean closed = false;
|
protected volatile boolean closed = false;
|
||||||
|
|
||||||
private AtomicBoolean writing = new AtomicBoolean();
|
private AtomicBoolean writing = new AtomicBoolean();
|
||||||
@@ -136,7 +134,7 @@ class WebSocketRunner implements Runnable {
|
|||||||
context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e);
|
context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e);
|
||||||
}
|
}
|
||||||
} else if (packet.type == FrameType.BINARY) {
|
} else if (packet.type == FrameType.BINARY) {
|
||||||
byte[] message =packet.getReceiveBytes();
|
byte[] message = packet.getReceiveBytes();
|
||||||
if (readBuffer != null) {
|
if (readBuffer != null) {
|
||||||
readBuffer.clear();
|
readBuffer.clear();
|
||||||
channel.read(readBuffer, null, this);
|
channel.read(readBuffer, null, this);
|
||||||
@@ -218,7 +216,6 @@ class WebSocketRunner implements Runnable {
|
|||||||
return futureResult;
|
return futureResult;
|
||||||
}
|
}
|
||||||
ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(this.context.getBufferSupplier());
|
ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(this.context.getBufferSupplier());
|
||||||
this.writeBuffers = buffers;
|
|
||||||
try {
|
try {
|
||||||
channel.write(buffers, buffers, new CompletionHandler<Integer, ByteBuffer[]>() {
|
channel.write(buffers, buffers, new CompletionHandler<Integer, ByteBuffer[]>() {
|
||||||
|
|
||||||
@@ -230,11 +227,10 @@ class WebSocketRunner implements Runnable {
|
|||||||
if (future != null) {
|
if (future != null) {
|
||||||
future.complete(RETCODE_WSOCKET_CLOSED);
|
future.complete(RETCODE_WSOCKET_CLOSED);
|
||||||
future = null;
|
future = null;
|
||||||
if (writeBuffers != null) {
|
if (attachments != null) {
|
||||||
for (ByteBuffer buf : writeBuffers) {
|
for (ByteBuffer buf : attachments) {
|
||||||
context.offerBuffer(buf);
|
context.offerBuffer(buf);
|
||||||
}
|
}
|
||||||
writeBuffers = null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
@@ -254,18 +250,16 @@ class WebSocketRunner implements Runnable {
|
|||||||
if (future != null) {
|
if (future != null) {
|
||||||
future.complete(0);
|
future.complete(0);
|
||||||
future = null;
|
future = null;
|
||||||
if (writeBuffers != null) {
|
if (attachments != null) {
|
||||||
for (ByteBuffer buf : writeBuffers) {
|
for (ByteBuffer buf : attachments) {
|
||||||
context.offerBuffer(buf);
|
context.offerBuffer(buf);
|
||||||
}
|
}
|
||||||
writeBuffers = null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
QueueEntry entry = queue.poll();
|
QueueEntry entry = queue.poll();
|
||||||
if (entry != null) {
|
if (entry != null) {
|
||||||
future = entry.future;
|
future = entry.future;
|
||||||
ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(context.getBufferSupplier());
|
ByteBuffer[] buffers = packet.sendBuffers != null ? packet.duplicateSendBuffers() : packet.encode(context.getBufferSupplier());
|
||||||
writeBuffers = buffers;
|
|
||||||
channel.write(buffers, buffers, this);
|
channel.write(buffers, buffers, this);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
@@ -304,12 +298,6 @@ class WebSocketRunner implements Runnable {
|
|||||||
}
|
}
|
||||||
context.offerBuffer(readBuffer);
|
context.offerBuffer(readBuffer);
|
||||||
readBuffer = null;
|
readBuffer = null;
|
||||||
if (writeBuffers != null) {
|
|
||||||
for (ByteBuffer buf : writeBuffers) {
|
|
||||||
context.offerBuffer(buf);
|
|
||||||
}
|
|
||||||
writeBuffers = null;
|
|
||||||
}
|
|
||||||
engine.remove(webSocket);
|
engine.remove(webSocket);
|
||||||
webSocket.onClose(0, null);
|
webSocket.onClose(0, null);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -324,6 +324,17 @@ public class CacheMemorySource<K extends Serializable, V extends Object> extends
|
|||||||
return CompletableFuture.supplyAsync(() -> getCollection(key), getExecutor());
|
return CompletableFuture.supplyAsync(() -> getCollection(key), getExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getCollectionSize(final K key) {
|
||||||
|
Collection<V> collection = (Collection<V>) get(key);
|
||||||
|
return collection == null ? 0 : collection.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Long> getCollectionSizeAsync(final K key) {
|
||||||
|
return CompletableFuture.supplyAsync(() -> getCollectionSize(key), getExecutor());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Collection<V> getCollectionAndRefresh(final K key, final int expireSeconds) {
|
public Collection<V> getCollectionAndRefresh(final K key, final int expireSeconds) {
|
||||||
return (Collection<V>) getAndRefresh(key, expireSeconds);
|
return (Collection<V>) getAndRefresh(key, expireSeconds);
|
||||||
|
|||||||
@@ -42,6 +42,8 @@ public interface CacheSource<K extends Serializable, V extends Object> {
|
|||||||
|
|
||||||
public Collection<V> getCollection(final K key);
|
public Collection<V> getCollection(final K key);
|
||||||
|
|
||||||
|
public long getCollectionSize(final K key);
|
||||||
|
|
||||||
public Collection<V> getCollectionAndRefresh(final K key, final int expireSeconds);
|
public Collection<V> getCollectionAndRefresh(final K key, final int expireSeconds);
|
||||||
|
|
||||||
public void appendListItem(final K key, final V value);
|
public void appendListItem(final K key, final V value);
|
||||||
@@ -71,6 +73,8 @@ public interface CacheSource<K extends Serializable, V extends Object> {
|
|||||||
|
|
||||||
public CompletableFuture<Collection<V>> getCollectionAsync(final K key);
|
public CompletableFuture<Collection<V>> getCollectionAsync(final K key);
|
||||||
|
|
||||||
|
public CompletableFuture<Long> getCollectionSizeAsync(final K key);
|
||||||
|
|
||||||
public CompletableFuture<Collection<V>> getCollectionAndRefreshAsync(final K key, final int expireSeconds);
|
public CompletableFuture<Collection<V>> getCollectionAndRefreshAsync(final K key, final int expireSeconds);
|
||||||
|
|
||||||
public CompletableFuture<Void> appendListItemAsync(final K key, final V value);
|
public CompletableFuture<Void> appendListItemAsync(final K key, final V value);
|
||||||
|
|||||||
Reference in New Issue
Block a user