diff --git a/src/com/zdemo/zhub/ZHubClient.java b/src/com/zdemo/zhub/ZHubClient.java index 55448ff..bd4d86f 100644 --- a/src/com/zdemo/zhub/ZHubClient.java +++ b/src/com/zdemo/zhub/ZHubClient.java @@ -8,10 +8,7 @@ import org.redkale.service.Service; import org.redkale.util.*; import javax.annotation.Resource; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStream; +import java.io.*; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; @@ -108,8 +105,22 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer reader.readLine(); //$n len(key) String topic = reader.readLine(); // topic - reader.readLine(); //$n len(value) - String value = reader.readLine(); // value + String lenStr = reader.readLine();//$n len(value) + int clen = 0; + if (lenStr.startsWith("$")) { + clen = Integer.parseInt(lenStr.replace("$", "")); + } + + String value = ""; + do { + if (value.length() > 0) { + value += "\r\n"; + } + String s = reader.readLine(); + value += s; // value + } while (clen > 0 && clen > strLength(value)); + + // lock msg if ("lock".equals(topic)) { @@ -154,6 +165,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer if (e instanceof SocketException) { initSocket(Integer.MAX_VALUE); } + e.printStackTrace(); } } }).start(); @@ -261,7 +273,7 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer StringBuffer buf = new StringBuffer(); buf.append("*" + data.length + "\r\n"); for (String d : data) { - buf.append("$" + d.length() + "\r\n"); + buf.append("$" + strLength(d) + "\r\n"); buf.append(d + "\r\n"); } sendMsgQueue.add(buf.toString()); @@ -291,6 +303,11 @@ public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer return false; }*/ + private int strLength(String str) { + str = str.replaceAll("[^\\x00-\\xff]", "*"); + return str.length(); + } + private String toStr(Object v) { if (v instanceof String) { return (String) v;