同步:redkale 升级

This commit is contained in:
2023-10-25 17:50:31 +08:00
parent 747b165e77
commit 2e0d9af014
21 changed files with 1783 additions and 1546 deletions

View File

@@ -8,6 +8,7 @@ package org.redkalex.cache.redis;
import org.redkale.net.client.ClientCodec;
import org.redkale.net.client.ClientConnection;
import org.redkale.util.ByteArray;
import org.redkale.util.RedkaleException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@@ -16,39 +17,42 @@ import java.util.List;
import java.util.logging.Logger;
/**
*
* @author zhangjx
*/
public class RedisCacheCodec extends ClientCodec<RedisCacheRequest, RedisCacheResult> {
protected static final byte TYPE_STRING = '+'; //简单字符串(不包含CRLF)类型
protected static final byte TYPE_BULK = '$'; //字符串块类型, 例如:$6\r\n\abcdef\r\nNULL字符串$-1\r\n
protected static final byte TYPE_ERROR = '-'; //错误(不包含CRLF)类型
protected static final byte TYPE_MULTI = '*'; //数组,紧接的数字为数组长度
protected static final byte TYPE_NUMBER = ':'; //整型
protected static final byte TYPE_STRING = '+'; //字符串值类型,字符串以\r\n结尾, 例如:+OK\r\n
protected static final byte TYPE_BULK = '$'; //字符串
protected static final byte TYPE_ERROR = '-'; //错误字符串类型,字符串以\r\n结尾, 例如:-ERR unknown command 'red'\r\n
protected static final byte TYPE_ARRAY = '*'; //数组
protected static final byte TYPE_NUMBER = ':'; //整型, 例如::2\r\n
protected static final Logger logger = Logger.getLogger(RedisCacheCodec.class.getSimpleName());
private static final Logger logger = Logger.getLogger(RedisCacheCodec.class.getSimpleName());
protected byte halfFrameCmd;
private ByteArray halfFrameBytes;
protected int halfFrameBulkLength = -10;
private int halfFrameBulkLength = Integer.MIN_VALUE;
protected int halfFrameArraySize = -10;
private int halfFrameMultiSize = Integer.MIN_VALUE;
protected int halfFrameArrayIndex; //从0开始
private int halfFrameMultiItemIndex; //从0开始
protected int halfFrameArrayItemLength = -10;
private byte halfFrameMultiItemType;
protected ByteArray halfFrameBytes;
private int halfFrameMultiItemLength = Integer.MIN_VALUE;
protected byte frameType;
private byte frameType;
protected byte[] frameValue; //(不包含CRLF)
private byte[] frameCursor;
protected List<byte[]> frameList; //(不包含CRLF)
private byte[] frameValue;
private List<byte[]> frameList;
private ByteArray recyclableArray;
@@ -56,148 +60,142 @@ public class RedisCacheCodec extends ClientCodec<RedisCacheRequest, RedisCacheRe
super(connection);
}
protected ByteArray pollArray(ByteArray array) {
private ByteArray pollArray(ByteArray array) {
if (recyclableArray == null) {
recyclableArray = new ByteArray();
} else {
recyclableArray.clear();
}
recyclableArray.clear();
if (array != null) {
recyclableArray.put(array, 0, array.length());
recyclableArray.put(array);
}
return recyclableArray;
}
private boolean checkBytesFrame(RedisCacheConnection conn, ByteBuffer buffer, ByteArray array) {
private boolean readFrames(RedisCacheConnection conn, ByteBuffer buffer, ByteArray array) {
// byte[] dbs = new byte[buffer.remaining()];
// for (int i = 0; i < dbs.length; i++) {
// dbs[i] = buffer.get(buffer.position() + i);
// }
// ArrayDeque<ClientFuture> deque = (ArrayDeque) responseQueue(conn);
// logger.log(Level.FINEST, "[" + Utility.nowMillis() + "] [" + Thread.currentThread().getName() + "]: " + conn + ", 原始数据: " + new String(dbs).replace("\r\n", " ") + ", req=" + deque.getFirst().getRequest());
// (System. out).println("[" + Utility.nowMillis() + "] [" + Thread.currentThread().getName() + "]: " + conn + ", 原始数据: " + new String(dbs).replace("\r\n", " "));
array.clear();
byte type = halfFrameCmd == 0 ? buffer.get() : halfFrameCmd;
if (halfFrameBytes != null) {
array.put(halfFrameBytes, 0, halfFrameBytes.length());
if (this.frameType == 0) {
this.frameType = buffer.get();
} else if (halfFrameBytes != null) {
array.put(halfFrameBytes);
halfFrameBytes = null;
}
frameType = type;
if (type == TYPE_STRING || type == TYPE_ERROR || type == TYPE_NUMBER) {
if (readComplete(buffer, array)) {
frameValue = array.getBytes();
} else {
halfFrameCmd = type;
if (frameType == TYPE_STRING || frameType == TYPE_ERROR || frameType == TYPE_NUMBER) {
if (!readComplete(buffer, array)) {
halfFrameBytes = pollArray(array);
return false;
}
} else if (type == TYPE_BULK) {
int bulkLength = halfFrameBulkLength;
if (bulkLength < -2) {
frameValue = array.getBytes();
} else if (frameType == TYPE_BULK) {
if (halfFrameBulkLength == Integer.MIN_VALUE) {
if (!readComplete(buffer, array)) { //没有读到bulkLength
halfFrameCmd = type;
halfFrameBulkLength = -10;
halfFrameBytes = pollArray(array);
return false;
}
bulkLength = Integer.parseInt(array.toString(StandardCharsets.UTF_8));
halfFrameBulkLength = readInt(array);
array.clear();
}
if (bulkLength == -1) {
if (halfFrameBulkLength == -1) {
frameValue = null;
} else if (readComplete(buffer, array)) {
frameValue = array.getBytes();
} else {
halfFrameCmd = type;
halfFrameBulkLength = bulkLength;
halfFrameBytes = pollArray(array);
return false;
}
} else if (type == TYPE_ARRAY) {
int arraySize = halfFrameArraySize;
if (arraySize < -2) {
if (!readComplete(buffer, array)) { //没有读到arraySize
halfFrameCmd = type;
halfFrameArraySize = -10;
halfFrameArrayIndex = 0;
halfFrameArrayItemLength = -10;
int expect = halfFrameBulkLength + 2 - array.length();
if (buffer.remaining() < expect) {
array.put(buffer);
halfFrameBytes = pollArray(array);
return false;
}
arraySize = Integer.parseInt(array.toString(StandardCharsets.UTF_8));
array.clear();
array.put(buffer, expect);
array.removeLastByte(); //移除\n
array.removeLastByte(); //移除\r
frameValue = array.getBytes();
}
int arrayIndex = halfFrameArrayIndex;
for (int i = arrayIndex; i < arraySize; i++) {
int itemLength = halfFrameArrayItemLength;
halfFrameArrayItemLength = -10;
if (itemLength < -2) {
if (!readComplete(buffer, array)) { //没有读到bulkLength
halfFrameCmd = type;
halfFrameArraySize = arraySize;
halfFrameArrayIndex = i;
halfFrameArrayItemLength = -10;
halfFrameBytes = pollArray(array);
} else if (frameType == TYPE_MULTI) {
int size = halfFrameMultiSize;
if (size == Integer.MIN_VALUE) {
if (!readComplete(buffer, array)) { //没有读到bulkLength
halfFrameBytes = pollArray(array);
return false;
}
size = readInt(array);
halfFrameMultiSize = size;
array.clear();
frameValue = null;
}
if (frameList == null) {
frameList = new ArrayList<>();
}
if (size > 0) {
int index = halfFrameMultiItemIndex;
for (int i = index; i < size; i++) {
if (!buffer.hasRemaining()) {
return false;
}
byte sign = array.get(0);
itemLength = Integer.parseInt(array.toString(1, StandardCharsets.UTF_8));
array.clear();
if (sign == TYPE_ARRAY) { //数组中嵌套数组,目前有 HSCAN
frameValue = null;
if (frameList != null) {
frameList.clear();
}
clearHalfFrame();
if (itemLength == 0) {
return true;
}
halfFrameCmd = sign;
halfFrameArraySize = itemLength;
if (!buffer.hasRemaining()) {
if (halfFrameMultiItemType == 0) {
halfFrameMultiItemType = buffer.get();
}
halfFrameMultiItemIndex = i;
final byte itemType = halfFrameMultiItemType;
if (itemType == TYPE_STRING || itemType == TYPE_ERROR || itemType == TYPE_NUMBER) {
if (!readComplete(buffer, array)) {
halfFrameBytes = pollArray(array);
return false;
}
return checkBytesFrame(conn, buffer, array);
frameList.add(array.getBytes());
} else if (itemType == TYPE_BULK) {
if (halfFrameMultiItemLength == Integer.MIN_VALUE) {
if (!readComplete(buffer, array)) { //没有读到bulkLength
halfFrameBytes = pollArray(array);
return false;
}
halfFrameMultiItemLength = readInt(array);
array.clear();
}
if (halfFrameMultiItemLength == -1) {
frameList.add(null);
} else {
int expect = halfFrameMultiItemLength + 2 - array.length();
if (buffer.remaining() < expect) {
array.put(buffer);
halfFrameBytes = pollArray(array);
return false;
}
array.put(buffer, expect);
array.removeLastByte(); //移除\n
array.removeLastByte(); //移除\r
frameList.add(array.getBytes());
}
} else if (itemType == TYPE_MULTI) { //数组中嵌套数组,例如: SCAN、HSCAN
if (size == 2 && frameList != null && frameList.size() == 1) {
//读游标 数据例如: *2 $1 0 *4 $4 key1 $2 10 $4 key2 $2 30
frameCursor = frameList.get(0);
frameList.clear();
clearHalfFrame();
return readFrames(conn, buffer, array);
} else {
throw new RedkaleException("Not support multi type in array data");
}
}
}
int cha = itemLength - array.length();
if (itemLength == -1) {
if (frameList == null) {
frameList = new ArrayList<>();
}
frameList.add(null);
halfFrameMultiItemType = 0;
halfFrameMultiItemLength = Integer.MIN_VALUE;
array.clear();
} else if (buffer.remaining() >= cha + 2) {
for (int j = 0; j < cha; j++) array.put(buffer.get());
buffer.get(); //\r
buffer.get(); //\n
if (frameList == null) {
frameList = new ArrayList<>();
}
frameList.add(array.getBytes());
array.clear();
} else {
while (buffer.hasRemaining()) array.put(buffer.get());
halfFrameCmd = type;
halfFrameArraySize = arraySize;
halfFrameArrayIndex = i;
halfFrameArrayItemLength = itemLength;
halfFrameBytes = pollArray(array);
return false;
}
}
}
clearHalfFrame();
return true;
}
protected void clearHalfFrame() {
halfFrameCmd = 0;
halfFrameBulkLength = -10;
halfFrameArraySize = -10;
halfFrameArrayIndex = 0;
halfFrameArrayItemLength = -10;
private void clearHalfFrame() {
halfFrameBytes = null;
halfFrameBulkLength = Integer.MIN_VALUE;
halfFrameMultiSize = Integer.MIN_VALUE;
halfFrameMultiItemLength = Integer.MIN_VALUE;
halfFrameMultiItemIndex = 0; //从0开始
halfFrameMultiItemType = 0;
}
@Override
@@ -207,35 +205,31 @@ public class RedisCacheCodec extends ClientCodec<RedisCacheRequest, RedisCacheRe
return;
}
ByteBuffer buffer = realbuf;
if (!checkBytesFrame(conn, buffer, array)) {
return;
}
//buffer必然包含一个完整的frame数据
boolean first = true;
RedisCacheRequest request = null;
while (first || buffer.hasRemaining()) {
if (request == null) {
request = nextRequest();
}
if (!first && !checkBytesFrame(conn, buffer, array)) {
while (buffer.hasRemaining()) {
if (!readFrames(conn, buffer, array)) {
break;
}
RedisCacheRequest request = nextRequest();
if (frameType == TYPE_ERROR) {
addMessage(request, new RuntimeException(new String(frameValue, StandardCharsets.UTF_8)));
addMessage(request, new RedkaleException(new String(frameValue, StandardCharsets.UTF_8)));
} else {
addMessage(request, conn.pollResultSet(request).prepare(frameType, frameValue, frameList));
addMessage(request, conn.pollResultSet(request).prepare(frameType, frameCursor, frameValue, frameList));
}
frameType = 0;
frameCursor = null;
frameValue = null;
frameList = null;
halfFrameCmd = 0;
halfFrameBytes = null;
first = false;
clearHalfFrame();
buffer = realbuf;
}
}
protected boolean readComplete(ByteBuffer buffer, ByteArray array) {
protected RedisCacheRequest nextRequest() {
return super.nextRequest();
}
private boolean readComplete(ByteBuffer buffer, ByteArray array) {
while (buffer.hasRemaining()) {
byte b = buffer.get();
if (b == '\n') {
@@ -246,4 +240,16 @@ public class RedisCacheCodec extends ClientCodec<RedisCacheRequest, RedisCacheRe
}
return false;
}
private int readInt(ByteArray array) {
String val = array.toString(StandardCharsets.ISO_8859_1);
if (val.length() == 1 && val.charAt(0) == '0') {
return 0;
}
if (val.length() == 2 && val.charAt(0) == '-' && val.charAt(1) == '1') {
return -1;
}
return Integer.parseInt(val);
}
}