修改:主题消息内容多行文本支持

This commit is contained in:
lxy 2021-07-28 18:16:39 +08:00
parent d7058907d6
commit 9be0721de0

View File

@ -8,10 +8,7 @@ import org.redkale.service.Service;
import org.redkale.util.*;
import javax.annotation.Resource;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
@ -108,8 +105,22 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
reader.readLine(); //$n len(key)
String topic = reader.readLine(); // topic
reader.readLine(); //$n len(value)
String value = reader.readLine(); // value
String lenStr = reader.readLine();//$n len(value)
int clen = 0;
if (lenStr.startsWith("$")) {
clen = Integer.parseInt(lenStr.replace("$", ""));
}
String value = "";
do {
if (value.length() > 0) {
value += "\r\n";
}
String s = reader.readLine();
value += s; // value
} while (clen > 0 && clen > strLength(value));
// lock msg
if ("lock".equals(topic)) {
@ -154,6 +165,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
if (e instanceof SocketException) {
initSocket(Integer.MAX_VALUE);
}
e.printStackTrace();
}
}
}).start();
@ -261,7 +273,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
StringBuffer buf = new StringBuffer();
buf.append("*" + data.length + "\r\n");
for (String d : data) {
buf.append("$" + d.length() + "\r\n");
buf.append("$" + strLength(d) + "\r\n");
buf.append(d + "\r\n");
}
sendMsgQueue.add(buf.toString());
@ -291,6 +303,11 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer
return false;
}*/
private int strLength(String str) {
str = str.replaceAll("[^\\x00-\\xff]", "*");
return str.length();
}
private String toStr(Object v) {
if (v instanceof String) {
return (String) v;