From 6329722f1735675f0ef64054eccc9251ffa34fc8 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Mon, 1 Jun 2020 11:04:03 +0800 Subject: [PATCH] --- src/org/redkale/mq/HttpMessageRequest.java | 7 +- src/org/redkale/mq/HttpMessageResponse.java | 99 ++++++++++++++------- src/org/redkale/mq/MessageRecord.java | 2 +- src/org/redkale/net/Response.java | 2 +- src/org/redkale/net/http/HttpResponse.java | 33 ++++--- src/org/redkale/net/sncp/SncpRequest.java | 12 +-- src/org/redkale/service/RetResult.java | 6 ++ 7 files changed, 105 insertions(+), 56 deletions(-) diff --git a/src/org/redkale/mq/HttpMessageRequest.java b/src/org/redkale/mq/HttpMessageRequest.java index 7b036e291..977ace086 100644 --- a/src/org/redkale/mq/HttpMessageRequest.java +++ b/src/org/redkale/mq/HttpMessageRequest.java @@ -18,8 +18,11 @@ import org.redkale.net.http.*; */ public class HttpMessageRequest extends HttpRequest { - public HttpMessageRequest(HttpContext context, HttpSimpleRequest req) { - super(context, req); + protected MessageRecord reqMessage; + + public HttpMessageRequest(HttpContext context, MessageRecord reqMessage) { + super(context, reqMessage.decodeContent(HttpSimpleRequestCoder.getInstance())); + this.reqMessage = reqMessage; } } diff --git a/src/org/redkale/mq/HttpMessageResponse.java b/src/org/redkale/mq/HttpMessageResponse.java index 133f8ba27..549bdbf02 100644 --- a/src/org/redkale/mq/HttpMessageResponse.java +++ b/src/org/redkale/mq/HttpMessageResponse.java @@ -6,8 +6,7 @@ package org.redkale.mq; import java.nio.ByteBuffer; -import java.util.function.*; -import org.redkale.convert.Convert; +import org.redkale.convert.*; import org.redkale.net.Response; import org.redkale.net.http.*; import org.redkale.util.ObjectPool; @@ -24,57 +23,89 @@ import org.redkale.util.ObjectPool; */ public class HttpMessageResponse extends HttpResponse { - protected MessageRecord message; + protected MessageRecord reqMessage; - protected BiConsumer resultConsumer; + protected MessageProducer producer; public HttpMessageResponse(HttpContext context, HttpMessageRequest request, - ObjectPool responsePool, HttpResponseConfig config) { + ObjectPool responsePool, HttpResponseConfig config, MessageProducer producer) { super(context, request, responsePool, config); + this.reqMessage = request.reqMessage; + this.producer = producer; } - public HttpMessageResponse(HttpContext context, HttpSimpleRequest req, HttpResponseConfig config) { - super(context, new HttpMessageRequest(context, req), null, config); + public HttpMessageResponse(HttpContext context, MessageRecord reqMessage, HttpResponseConfig config, MessageProducer producer) { + super(context, new HttpMessageRequest(context, reqMessage), null, config); + this.reqMessage = reqMessage; + this.producer = producer; } - public HttpMessageResponse resultConsumer(MessageRecord message, BiConsumer resultConsumer) { - this.message = message; - this.resultConsumer = resultConsumer; - return this; + public void finishHttpResult(HttpResult result) { + ConvertType format = result.convert() == null ? null : result.convert().getFactory().getConvertType(); + byte[] content = HttpResultCoder.getInstance().encode(result); + this.producer.apply(new MessageRecord(format, reqMessage.getResptopic(), null, content)); } @Override public void finishJson(org.redkale.service.RetResult ret) { - + if (reqMessage.isEmptyResptopic()) return; + finishHttpResult(new HttpResult(ret.clearConvert(), ret)); } @Override public void finish(String obj) { - - } - - @Override - public void finish(final Convert convert, final Object obj) { - - } - - @Override - public void finish(final byte[] bs) { - - } - - @Override - public void finish(ByteBuffer buffer) { - - } - - @Override - public void finish(ByteBuffer... buffers) { - + if (reqMessage.isEmptyResptopic()) return; + finishHttpResult(new HttpResult(obj == null ? "" : obj)); } @Override public void finish(int status, String message) { - + if (reqMessage.isEmptyResptopic()) return; + finishHttpResult(new HttpResult(message == null ? "" : message).status(status)); } + + @Override + public void finish(final Convert convert, HttpResult result) { + if (reqMessage.isEmptyResptopic()) return; + if (convert != null) result.convert(convert); + finishHttpResult(result); + } + + @Override + public void finish(final byte[] bs) { + if (reqMessage.isEmptyResptopic()) return; + finishHttpResult(new HttpResult(bs)); + } + + @Override + public void finish(final String contentType, final byte[] bs) { + if (reqMessage.isEmptyResptopic()) return; + finishHttpResult(new HttpResult(bs).contentType(contentType)); + } + + @Override + public void finish(boolean kill, ByteBuffer buffer) { + if (reqMessage.isEmptyResptopic()) return; + byte[] bs = new byte[buffer.remaining()]; + buffer.get(bs); + finishHttpResult(new HttpResult(bs)); + } + + @Override + public void finish(boolean kill, ByteBuffer... buffers) { + if (reqMessage.isEmptyResptopic()) return; + int size = 0; + for (ByteBuffer buf : buffers) { + size += buf.remaining(); + } + byte[] bs = new byte[size]; + int index = 0; + for (ByteBuffer buf : buffers) { + int r = buf.remaining(); + buf.get(bs, index, r); + index += r; + } + finishHttpResult(new HttpResult(bs)); + } + } diff --git a/src/org/redkale/mq/MessageRecord.java b/src/org/redkale/mq/MessageRecord.java index 58f34fb15..29dfc64a2 100644 --- a/src/org/redkale/mq/MessageRecord.java +++ b/src/org/redkale/mq/MessageRecord.java @@ -144,7 +144,7 @@ public class MessageRecord implements Serializable { return (T) convert.convertFrom(type, this.content); } - public T decodeContent(MessageCoder coder, java.lang.reflect.Type type) { + public T decodeContent(MessageCoder coder) { if (this.content == null || this.content.length == 0) return null; return (T) coder.decode(this.content); } diff --git a/src/org/redkale/net/Response.java b/src/org/redkale/net/Response.java index 46e2b1088..7a824660d 100644 --- a/src/org/redkale/net/Response.java +++ b/src/org/redkale/net/Response.java @@ -27,7 +27,7 @@ public abstract class Response> { protected final C context; - protected final ObjectPool responsePool; + protected final ObjectPool responsePool; //虚拟构建的Response可能不存在responsePool protected final R request; diff --git a/src/org/redkale/net/http/HttpResponse.java b/src/org/redkale/net/http/HttpResponse.java index 639d3a6bd..10d1bac7f 100644 --- a/src/org/redkale/net/http/HttpResponse.java +++ b/src/org/redkale/net/http/HttpResponse.java @@ -430,6 +430,26 @@ public class HttpResponse extends Response { finish(convert.convertTo(getBodyBufferSupplier(), ret)); } + /** + * 将HttpResult对象输出 + * + * @param convert 指定的Convert + * @param result HttpResult输出对象 + */ + public void finish(final Convert convert, HttpResult result) { + if (result.getContentType() != null) setContentType(result.getContentType()); + addHeader(result.getHeaders()).addCookie(result.getCookies()).setStatus(result.getStatus() < 1 ? 200 : result.getStatus()); + if (result.getResult() == null) { + finish(""); + } else if (result.getResult() instanceof CharSequence) { + finish(result.getResult().toString()); + } else { + Convert cc = result.convert(); + if (cc == null || !(cc instanceof TextConvert)) cc = convert; + finish(cc, result.getResult()); + } + } + /** * 将CompletableFuture的结果对象以JSON格式输出 * @@ -521,18 +541,7 @@ public class HttpResponse extends Response { } else if (obj instanceof org.redkale.service.RetResult) { finishJson((org.redkale.service.RetResult) obj); } else if (obj instanceof HttpResult) { - HttpResult result = (HttpResult) obj; - if (result.getContentType() != null) setContentType(result.getContentType()); - addHeader(result.getHeaders()).addCookie(result.getCookies()).setStatus(result.getStatus() < 1 ? 200 : result.getStatus()); - if (result.getResult() == null) { - finish(""); - } else if (result.getResult() instanceof CharSequence) { - finish(result.getResult().toString()); - } else { - Convert cc = result.convert(); - if (cc == null || !(cc instanceof TextConvert)) cc = convert; - finish(cc, result.getResult()); - } + finish(convert, (HttpResult) obj); } else { if (hasRender) { if (onlyoneHttpRender != null) { diff --git a/src/org/redkale/net/sncp/SncpRequest.java b/src/org/redkale/net/sncp/SncpRequest.java index 35b26a63e..375a5fc96 100644 --- a/src/org/redkale/net/sncp/SncpRequest.java +++ b/src/org/redkale/net/sncp/SncpRequest.java @@ -43,7 +43,7 @@ public class SncpRequest extends Request { private byte[] body; - private byte[] bufferbytes = new byte[6]; + private byte[] addrbytes = new byte[6]; protected SncpRequest(SncpContext context, ObjectPool bufferPool) { super(context, bufferPool); @@ -70,7 +70,7 @@ public class SncpRequest extends Request { this.serviceid = DLong.read(buffer); this.serviceversion = buffer.getInt(); this.actionid = DLong.read(buffer); - buffer.get(bufferbytes); + buffer.get(addrbytes); //ipaddr this.bodylength = buffer.getInt(); if (buffer.getInt() != 0) { @@ -116,7 +116,7 @@ public class SncpRequest extends Request { this.bodyoffset = 0; this.body = null; this.ping = false; - this.bufferbytes[0] = 0; + this.addrbytes[0] = 0; super.recycle(); } @@ -145,9 +145,9 @@ public class SncpRequest extends Request { } public InetSocketAddress getRemoteAddress() { - if (bufferbytes[0] == 0) return null; - return new InetSocketAddress((0xff & bufferbytes[0]) + "." + (0xff & bufferbytes[1]) + "." + (0xff & bufferbytes[2]) + "." + (0xff & bufferbytes[3]), - ((0xff00 & (bufferbytes[4] << 8)) | (0xff & bufferbytes[5]))); + if (addrbytes[0] == 0) return null; + return new InetSocketAddress((0xff & addrbytes[0]) + "." + (0xff & addrbytes[1]) + "." + (0xff & addrbytes[2]) + "." + (0xff & addrbytes[3]), + ((0xff00 & (addrbytes[4] << 8)) | (0xff & addrbytes[5]))); } } diff --git a/src/org/redkale/service/RetResult.java b/src/org/redkale/service/RetResult.java index 93fc9178e..dd54b8185 100644 --- a/src/org/redkale/service/RetResult.java +++ b/src/org/redkale/service/RetResult.java @@ -71,6 +71,12 @@ public class RetResult { return convert; } + public Convert clearConvert() { + Convert c = this.convert; + this.convert = null; + return c; + } + public void convert(Convert convert) { this.convert = convert; }