修复HttpMessageResponse

This commit is contained in:
redkale
2023-10-11 20:55:29 +08:00
parent 52d7a7a3f7
commit c783c1ef31
8 changed files with 60 additions and 36 deletions

View File

@@ -114,17 +114,6 @@ public class HttpMessageResponse extends HttpResponse {
return rs; return rs;
} }
@Override
public void finishJson(final Convert convert, final Type type, final Object obj) {
if (message.isEmptyRespTopic()) {
if (callback != null) {
callback.run();
}
return;
}
finishHttpResult(convert, type, new HttpResult(obj));
}
@Override @Override
public void finish(final Convert convert, Type type, RetResult ret) { public void finish(final Convert convert, Type type, RetResult ret) {
if (message.isEmptyRespTopic()) { if (message.isEmptyRespTopic()) {
@@ -136,8 +125,32 @@ public class HttpMessageResponse extends HttpResponse {
finishHttpResult(convert, type, new HttpResult(ret)); finishHttpResult(convert, type, new HttpResult(ret));
} }
@Override
public void finish(final Convert convert, Type type, HttpResult result) {
if (message.isEmptyRespTopic()) {
if (callback != null) {
callback.run();
}
return;
}
if (convert != null) {
result.convert(convert);
}
finishHttpResult(type, result);
}
@Override
public void finishJson(final Convert convert, final Type type, final Object obj) {
finish(convert, type, obj);
}
@Override @Override
public void finish(final Convert convert, final Type type, Object obj) { public void finish(final Convert convert, final Type type, Object obj) {
if (obj instanceof HttpResult) {
finish(convert, type, (HttpResult) obj);
} else if (obj instanceof RetResult) {
finish(convert, type, (RetResult) obj);
} else {
if (message.isEmptyRespTopic()) { if (message.isEmptyRespTopic()) {
if (callback != null) { if (callback != null) {
callback.run(); callback.run();
@@ -146,6 +159,7 @@ public class HttpMessageResponse extends HttpResponse {
} }
finishHttpResult(convert, type, new HttpResult(obj)); finishHttpResult(convert, type, new HttpResult(obj));
} }
}
@Override @Override
public void finish(String obj) { public void finish(String obj) {
@@ -194,20 +208,6 @@ public class HttpMessageResponse extends HttpResponse {
finishHttpResult(String.class, new HttpResult(msg == null ? "" : msg).status(status)); finishHttpResult(String.class, new HttpResult(msg == null ? "" : msg).status(status));
} }
@Override
public void finish(final Convert convert, Type type, HttpResult result) {
if (message.isEmptyRespTopic()) {
if (callback != null) {
callback.run();
}
return;
}
if (convert != null) {
result.convert(convert);
}
finishHttpResult(type, result);
}
@Override @Override
public void finish(boolean kill, final byte[] bs, int offset, int length) { public void finish(boolean kill, final byte[] bs, int offset, int length) {
if (message.isEmptyRespTopic()) { if (message.isEmptyRespTopic()) {

View File

@@ -33,6 +33,12 @@ public class HttpResultCoder implements MessageCoder<HttpResult> {
return instance; return instance;
} }
//消息内容的类型
@Override
public byte ctype() {
return MessageRecord.CTYPE_HTTP_RESULT;
}
@Override @Override
public byte[] encode(HttpResult data) { public byte[] encode(HttpResult data) {
if (data == null) { if (data == null) {

View File

@@ -28,6 +28,12 @@ public class HttpSimpleRequestCoder implements MessageCoder<HttpSimpleRequest> {
return instance; return instance;
} }
//消息内容的类型
@Override
public byte ctype() {
return MessageRecord.CTYPE_HTTP_REQUEST;
}
@Override @Override
public byte[] encode(HttpSimpleRequest data) { public byte[] encode(HttpSimpleRequest data) {
byte[] traceid = MessageCoder.getBytes(data.getTraceid());//short-string byte[] traceid = MessageCoder.getBytes(data.getTraceid());//short-string

View File

@@ -86,7 +86,7 @@ public abstract class MessageAgent implements Resourcable {
protected final ReentrantLock serviceLock = new ReentrantLock(); protected final ReentrantLock serviceLock = new ReentrantLock();
protected MessageCoder<MessageRecord> clientMessageCoder = MessageRecordCoder.getInstance(); protected MessageCoder<MessageRecord> clientMessageCoder = MessageRecordSerializer.getInstance();
//本地Service消息接收处理器 key:consumerid //本地Service消息接收处理器 key:consumerid
protected HashMap<String, MessageClientConsumerNode> clientConsumerNodes = new LinkedHashMap<>(); protected HashMap<String, MessageClientConsumerNode> clientConsumerNodes = new LinkedHashMap<>();

View File

@@ -33,6 +33,9 @@ public interface MessageCoder<T> {
//解码 //解码
public T decode(byte[] data); public T decode(byte[] data);
//消息内容的类型
public byte ctype();
//type: 1:string, 2:int, 3:long //type: 1:string, 2:int, 3:long
public static byte[] encodeUserid(Serializable value) { public static byte[] encodeUserid(Serializable value) {
if (value == null) { if (value == null) {

View File

@@ -12,6 +12,7 @@ import org.redkale.convert.*;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import org.redkale.net.http.HttpSimpleRequest; import org.redkale.net.http.HttpSimpleRequest;
import org.redkale.net.sncp.SncpHeader; import org.redkale.net.sncp.SncpHeader;
import org.redkale.util.RedkaleException;
/** /**
* 存在MQ里面的数据结构<p> * 存在MQ里面的数据结构<p>
@@ -152,6 +153,9 @@ public class MessageRecord implements Serializable {
if (this.content == null || this.content.length == 0) { if (this.content == null || this.content.length == 0) {
return null; return null;
} }
if (this.ctype != coder.ctype()) {
throw new RedkaleException("record.ctype is " + this.ctype + ", but coder.ctype is " + coder.ctype());
}
return (T) coder.decode(this.content); return (T) coder.decode(this.content);
} }

View File

@@ -18,14 +18,20 @@ import java.nio.ByteBuffer;
* *
* @since 2.1.0 * @since 2.1.0
*/ */
public class MessageRecordCoder implements MessageCoder<MessageRecord> { public class MessageRecordSerializer implements MessageCoder<MessageRecord> {
private static final MessageRecordCoder instance = new MessageRecordCoder(); private static final MessageRecordSerializer instance = new MessageRecordSerializer();
public static MessageRecordCoder getInstance() { public static MessageRecordSerializer getInstance() {
return instance; return instance;
} }
//消息内容的类型
@Override
public byte ctype() {
return 0;
}
@Override @Override
public byte[] encode(MessageRecord data) { public byte[] encode(MessageRecord data) {
if (data == null) { if (data == null) {

View File

@@ -11,9 +11,8 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier; import java.util.function.Function;
import java.util.logging.*; import java.util.logging.*;
import java.util.stream.Stream; import java.util.stream.Stream;
import java.util.zip.*; import java.util.zip.*;
@@ -946,7 +945,7 @@ public abstract class WebSocket<G extends Serializable, T> {
if (_channel == null) { if (_channel == null) {
return null; return null;
} }
Supplier<CompletableFuture> compose = () -> { Function<String, CompletableFuture> compose = t -> {
_channel.dispose(); _channel.dispose();
if (_readHandler != null) { if (_readHandler != null) {
_readHandler.byteArrayPool.accept(_readHandler.halfFrameBytes); _readHandler.byteArrayPool.accept(_readHandler.halfFrameBytes);
@@ -957,8 +956,8 @@ public abstract class WebSocket<G extends Serializable, T> {
return onClose(code, reason); return onClose(code, reason);
}; };
CompletableFuture<Void> future = _engine.removeLocalThenDisconnect(this); CompletableFuture<Void> future = _engine.removeLocalThenDisconnect(this);
return future == null ? compose.get() return future == null ? compose.apply(null)
: future.exceptionally(t -> null).thenCompose(v -> (CompletionStage) compose); : future.exceptionally(t -> null).thenCompose((Function) compose);
} else { } else {
return null; return null;
} }