ClientConnection优化

This commit is contained in:
Redkale
2023-01-17 22:40:00 +08:00
parent 4d41b2afda
commit 961ff7fa22
4 changed files with 19 additions and 5 deletions

View File

@@ -9,6 +9,7 @@ import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level;
import org.redkale.convert.json.JsonConvert;
import org.redkale.net.*;
@@ -62,7 +63,7 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
private void decodeResponse(ByteBuffer buffer) {
AsyncConnection channel = connection.channel;
Deque<ClientFuture> responseQueue = connection.responseQueue;
ConcurrentLinkedQueue<ClientFuture> responseQueue = connection.responseQueue;
Map<Serializable, ClientFuture> responseMap = connection.responseMap;
if (decodeMessages(buffer, readArray)) { //成功了
readArray.clear();
@@ -157,6 +158,10 @@ public abstract class ClientCodec<R extends ClientRequest, P> implements Complet
return connection.responseQueue.iterator();
}
protected ClientFuture responseByRequestid(Serializable requestid) {
return (ClientFuture) connection.responseMap.get(requestid);
}
protected List<ClientResponse<P>> pollMessages() {
List<ClientResponse<P>> rs = new ArrayList<>(respResults);
this.respResults.clear();

View File

@@ -7,7 +7,7 @@ package org.redkale.net.client;
import java.io.Serializable;
import java.nio.channels.ClosedChannelException;
import java.util.*;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.*;
@@ -46,10 +46,10 @@ public abstract class ClientConnection<R extends ClientRequest, P> implements Co
private final ClientWriteIOThread writeThread;
//responseQueue、responseMap二选一
final Deque<ClientFuture> responseQueue = new LinkedBlockingDeque<>();
final ConcurrentLinkedQueue<ClientFuture> responseQueue = new ConcurrentLinkedQueue<>();
//responseQueue、responseMap二选一, key: requestid
final Map<Serializable, ClientFuture> responseMap = new ConcurrentHashMap<>();
final ConcurrentHashMap<Serializable, ClientFuture> responseMap = new ConcurrentHashMap<>();
private int maxPipelines; //最大并行处理数

View File

@@ -24,7 +24,7 @@ import org.redkale.util.*;
*/
public class ClientWriteIOThread extends AsyncIOThread {
private final BlockingDeque<ClientFuture> requestQueue = new LinkedBlockingDeque<>();
private final BlockingQueue<ClientFuture> requestQueue = new LinkedBlockingQueue<>();
public ClientWriteIOThread(ThreadGroup g, String name, int index, int threads,
ExecutorService workExecutor, ObjectPool<ByteBuffer> safeBufferPool) throws IOException {

View File

@@ -624,6 +624,15 @@ public abstract class DataSqlSource extends AbstractDataSource implements Functi
@Local
protected <T> Serializable getEntityAttrValue(EntityInfo info, Attribute attr, T entity) {
Serializable val = info.getSQLValue(attr, entity);
Class clazz = attr.type();
if (clazz == String.class
|| clazz == int.class || clazz == long.class
|| clazz == Integer.class || clazz == Long.class
|| clazz == short.class || clazz == Short.class
|| clazz == float.class || clazz == Float.class
|| clazz == double.class || clazz == Double.class) {
return val;
}
return getSQLAttrValue(info, attr, val);
}