diff --git a/src/main/java/org/redkale/net/client/ClientCodec.java b/src/main/java/org/redkale/net/client/ClientCodec.java index b82be5e27..ceb64d52c 100644 --- a/src/main/java/org/redkale/net/client/ClientCodec.java +++ b/src/main/java/org/redkale/net/client/ClientCodec.java @@ -9,6 +9,7 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.logging.Level; import org.redkale.convert.json.JsonConvert; import org.redkale.net.*; @@ -62,7 +63,7 @@ public abstract class ClientCodec implements Complet private void decodeResponse(ByteBuffer buffer) { AsyncConnection channel = connection.channel; - Deque responseQueue = connection.responseQueue; + ConcurrentLinkedQueue responseQueue = connection.responseQueue; Map responseMap = connection.responseMap; if (decodeMessages(buffer, readArray)) { //成功了 readArray.clear(); @@ -157,6 +158,10 @@ public abstract class ClientCodec implements Complet return connection.responseQueue.iterator(); } + protected ClientFuture responseByRequestid(Serializable requestid) { + return (ClientFuture) connection.responseMap.get(requestid); + } + protected List> pollMessages() { List> rs = new ArrayList<>(respResults); this.respResults.clear(); diff --git a/src/main/java/org/redkale/net/client/ClientConnection.java b/src/main/java/org/redkale/net/client/ClientConnection.java index 9e226b612..4d51789eb 100644 --- a/src/main/java/org/redkale/net/client/ClientConnection.java +++ b/src/main/java/org/redkale/net/client/ClientConnection.java @@ -7,7 +7,7 @@ package org.redkale.net.client; import java.io.Serializable; import java.nio.channels.ClosedChannelException; -import java.util.*; +import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.function.*; @@ -46,10 +46,10 @@ public abstract class ClientConnection implements Co private final ClientWriteIOThread writeThread; //responseQueue、responseMap二选一 - final Deque responseQueue = new LinkedBlockingDeque<>(); + final ConcurrentLinkedQueue responseQueue = new ConcurrentLinkedQueue<>(); //responseQueue、responseMap二选一, key: requestid - final Map responseMap = new ConcurrentHashMap<>(); + final ConcurrentHashMap responseMap = new ConcurrentHashMap<>(); private int maxPipelines; //最大并行处理数 diff --git a/src/main/java/org/redkale/net/client/ClientWriteIOThread.java b/src/main/java/org/redkale/net/client/ClientWriteIOThread.java index ed4792a62..dc8eab2df 100644 --- a/src/main/java/org/redkale/net/client/ClientWriteIOThread.java +++ b/src/main/java/org/redkale/net/client/ClientWriteIOThread.java @@ -24,7 +24,7 @@ import org.redkale.util.*; */ public class ClientWriteIOThread extends AsyncIOThread { - private final BlockingDeque requestQueue = new LinkedBlockingDeque<>(); + private final BlockingQueue requestQueue = new LinkedBlockingQueue<>(); public ClientWriteIOThread(ThreadGroup g, String name, int index, int threads, ExecutorService workExecutor, ObjectPool safeBufferPool) throws IOException { diff --git a/src/main/java/org/redkale/source/DataSqlSource.java b/src/main/java/org/redkale/source/DataSqlSource.java index e3731b308..4b48dd63f 100644 --- a/src/main/java/org/redkale/source/DataSqlSource.java +++ b/src/main/java/org/redkale/source/DataSqlSource.java @@ -624,6 +624,15 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi @Local protected Serializable getEntityAttrValue(EntityInfo info, Attribute attr, T entity) { Serializable val = info.getSQLValue(attr, entity); + Class clazz = attr.type(); + if (clazz == String.class + || clazz == int.class || clazz == long.class + || clazz == Integer.class || clazz == Long.class + || clazz == short.class || clazz == Short.class + || clazz == float.class || clazz == Float.class + || clazz == double.class || clazz == Double.class) { + return val; + } return getSQLAttrValue(info, attr, val); }