diff --git a/src/main/java/org/redkale/mq/HttpMessageResponse.java b/src/main/java/org/redkale/mq/HttpMessageResponse.java index f1a7dbe9d..b88cc1a81 100644 --- a/src/main/java/org/redkale/mq/HttpMessageResponse.java +++ b/src/main/java/org/redkale/mq/HttpMessageResponse.java @@ -114,17 +114,6 @@ public class HttpMessageResponse extends HttpResponse { return rs; } - @Override - public void finishJson(final Convert convert, final Type type, final Object obj) { - if (message.isEmptyRespTopic()) { - if (callback != null) { - callback.run(); - } - return; - } - finishHttpResult(convert, type, new HttpResult(obj)); - } - @Override public void finish(final Convert convert, Type type, RetResult ret) { if (message.isEmptyRespTopic()) { @@ -137,14 +126,39 @@ public class HttpMessageResponse extends HttpResponse { } @Override - public void finish(final Convert convert, final Type type, Object obj) { + public void finish(final Convert convert, Type type, HttpResult result) { if (message.isEmptyRespTopic()) { if (callback != null) { callback.run(); } return; } - finishHttpResult(convert, type, new HttpResult(obj)); + if (convert != null) { + result.convert(convert); + } + finishHttpResult(type, result); + } + + @Override + public void finishJson(final Convert convert, final Type type, final Object obj) { + finish(convert, type, obj); + } + + @Override + public void finish(final Convert convert, final Type type, Object obj) { + if (obj instanceof HttpResult) { + finish(convert, type, (HttpResult) obj); + } else if (obj instanceof RetResult) { + finish(convert, type, (RetResult) obj); + } else { + if (message.isEmptyRespTopic()) { + if (callback != null) { + callback.run(); + } + return; + } + finishHttpResult(convert, type, new HttpResult(obj)); + } } @Override @@ -194,20 +208,6 @@ public class HttpMessageResponse extends HttpResponse { finishHttpResult(String.class, new HttpResult(msg == null ? "" : msg).status(status)); } - @Override - public void finish(final Convert convert, Type type, HttpResult result) { - if (message.isEmptyRespTopic()) { - if (callback != null) { - callback.run(); - } - return; - } - if (convert != null) { - result.convert(convert); - } - finishHttpResult(type, result); - } - @Override public void finish(boolean kill, final byte[] bs, int offset, int length) { if (message.isEmptyRespTopic()) { diff --git a/src/main/java/org/redkale/mq/HttpResultCoder.java b/src/main/java/org/redkale/mq/HttpResultCoder.java index 90c42b5c6..b6a4f95f2 100644 --- a/src/main/java/org/redkale/mq/HttpResultCoder.java +++ b/src/main/java/org/redkale/mq/HttpResultCoder.java @@ -33,6 +33,12 @@ public class HttpResultCoder implements MessageCoder { return instance; } + //消息内容的类型 + @Override + public byte ctype() { + return MessageRecord.CTYPE_HTTP_RESULT; + } + @Override public byte[] encode(HttpResult data) { if (data == null) { diff --git a/src/main/java/org/redkale/mq/HttpSimpleRequestCoder.java b/src/main/java/org/redkale/mq/HttpSimpleRequestCoder.java index 1c3391d9b..e297bcd25 100644 --- a/src/main/java/org/redkale/mq/HttpSimpleRequestCoder.java +++ b/src/main/java/org/redkale/mq/HttpSimpleRequestCoder.java @@ -28,6 +28,12 @@ public class HttpSimpleRequestCoder implements MessageCoder { return instance; } + //消息内容的类型 + @Override + public byte ctype() { + return MessageRecord.CTYPE_HTTP_REQUEST; + } + @Override public byte[] encode(HttpSimpleRequest data) { byte[] traceid = MessageCoder.getBytes(data.getTraceid());//short-string diff --git a/src/main/java/org/redkale/mq/MessageAgent.java b/src/main/java/org/redkale/mq/MessageAgent.java index 32717cdeb..cacf4491c 100644 --- a/src/main/java/org/redkale/mq/MessageAgent.java +++ b/src/main/java/org/redkale/mq/MessageAgent.java @@ -86,7 +86,7 @@ public abstract class MessageAgent implements Resourcable { protected final ReentrantLock serviceLock = new ReentrantLock(); - protected MessageCoder clientMessageCoder = MessageRecordCoder.getInstance(); + protected MessageCoder clientMessageCoder = MessageRecordSerializer.getInstance(); //本地Service消息接收处理器, key:consumerid protected HashMap clientConsumerNodes = new LinkedHashMap<>(); diff --git a/src/main/java/org/redkale/mq/MessageCoder.java b/src/main/java/org/redkale/mq/MessageCoder.java index a2687186e..c798c59f8 100644 --- a/src/main/java/org/redkale/mq/MessageCoder.java +++ b/src/main/java/org/redkale/mq/MessageCoder.java @@ -33,6 +33,9 @@ public interface MessageCoder { //解码 public T decode(byte[] data); + //消息内容的类型 + public byte ctype(); + //type: 1:string, 2:int, 3:long public static byte[] encodeUserid(Serializable value) { if (value == null) { diff --git a/src/main/java/org/redkale/mq/MessageRecord.java b/src/main/java/org/redkale/mq/MessageRecord.java index dae5feb3d..725fd3cdc 100644 --- a/src/main/java/org/redkale/mq/MessageRecord.java +++ b/src/main/java/org/redkale/mq/MessageRecord.java @@ -12,6 +12,7 @@ import org.redkale.convert.*; import org.redkale.convert.json.JsonConvert; import org.redkale.net.http.HttpSimpleRequest; import org.redkale.net.sncp.SncpHeader; +import org.redkale.util.RedkaleException; /** * 存在MQ里面的数据结构

@@ -152,6 +153,9 @@ public class MessageRecord implements Serializable { if (this.content == null || this.content.length == 0) { return null; } + if (this.ctype != coder.ctype()) { + throw new RedkaleException("record.ctype is " + this.ctype + ", but coder.ctype is " + coder.ctype()); + } return (T) coder.decode(this.content); } diff --git a/src/main/java/org/redkale/mq/MessageRecordCoder.java b/src/main/java/org/redkale/mq/MessageRecordSerializer.java similarity index 88% rename from src/main/java/org/redkale/mq/MessageRecordCoder.java rename to src/main/java/org/redkale/mq/MessageRecordSerializer.java index 6eb336d0b..5871f6107 100644 --- a/src/main/java/org/redkale/mq/MessageRecordCoder.java +++ b/src/main/java/org/redkale/mq/MessageRecordSerializer.java @@ -18,14 +18,20 @@ import java.nio.ByteBuffer; * * @since 2.1.0 */ -public class MessageRecordCoder implements MessageCoder { +public class MessageRecordSerializer implements MessageCoder { - private static final MessageRecordCoder instance = new MessageRecordCoder(); + private static final MessageRecordSerializer instance = new MessageRecordSerializer(); - public static MessageRecordCoder getInstance() { + public static MessageRecordSerializer getInstance() { return instance; } + //消息内容的类型 + @Override + public byte ctype() { + return 0; + } + @Override public byte[] encode(MessageRecord data) { if (data == null) { diff --git a/src/main/java/org/redkale/net/http/WebSocket.java b/src/main/java/org/redkale/net/http/WebSocket.java index 1cfb3931a..948ea98ce 100644 --- a/src/main/java/org/redkale/net/http/WebSocket.java +++ b/src/main/java/org/redkale/net/http/WebSocket.java @@ -11,9 +11,8 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; +import java.util.function.Function; import java.util.logging.*; import java.util.stream.Stream; import java.util.zip.*; @@ -946,7 +945,7 @@ public abstract class WebSocket { if (_channel == null) { return null; } - Supplier compose = () -> { + Function compose = t -> { _channel.dispose(); if (_readHandler != null) { _readHandler.byteArrayPool.accept(_readHandler.halfFrameBytes); @@ -957,8 +956,8 @@ public abstract class WebSocket { return onClose(code, reason); }; CompletableFuture future = _engine.removeLocalThenDisconnect(this); - return future == null ? compose.get() - : future.exceptionally(t -> null).thenCompose(v -> (CompletionStage) compose); + return future == null ? compose.apply(null) + : future.exceptionally(t -> null).thenCompose((Function) compose); } else { return null; }