This commit is contained in:
Redkale
2020-06-01 11:04:03 +08:00
parent 2dee61223d
commit 6329722f17
7 changed files with 105 additions and 56 deletions

View File

@@ -18,8 +18,11 @@ import org.redkale.net.http.*;
*/
public class HttpMessageRequest extends HttpRequest {
public HttpMessageRequest(HttpContext context, HttpSimpleRequest req) {
super(context, req);
protected MessageRecord reqMessage;
public HttpMessageRequest(HttpContext context, MessageRecord reqMessage) {
super(context, reqMessage.decodeContent(HttpSimpleRequestCoder.getInstance()));
this.reqMessage = reqMessage;
}
}

View File

@@ -6,8 +6,7 @@
package org.redkale.mq;
import java.nio.ByteBuffer;
import java.util.function.*;
import org.redkale.convert.Convert;
import org.redkale.convert.*;
import org.redkale.net.Response;
import org.redkale.net.http.*;
import org.redkale.util.ObjectPool;
@@ -24,57 +23,89 @@ import org.redkale.util.ObjectPool;
*/
public class HttpMessageResponse extends HttpResponse {
protected MessageRecord message;
protected MessageRecord reqMessage;
protected BiConsumer<MessageRecord, byte[]> resultConsumer;
protected MessageProducer producer;
public HttpMessageResponse(HttpContext context, HttpMessageRequest request,
ObjectPool<Response> responsePool, HttpResponseConfig config) {
ObjectPool<Response> responsePool, HttpResponseConfig config, MessageProducer producer) {
super(context, request, responsePool, config);
this.reqMessage = request.reqMessage;
this.producer = producer;
}
public HttpMessageResponse(HttpContext context, HttpSimpleRequest req, HttpResponseConfig config) {
super(context, new HttpMessageRequest(context, req), null, config);
public HttpMessageResponse(HttpContext context, MessageRecord reqMessage, HttpResponseConfig config, MessageProducer producer) {
super(context, new HttpMessageRequest(context, reqMessage), null, config);
this.reqMessage = reqMessage;
this.producer = producer;
}
public HttpMessageResponse resultConsumer(MessageRecord message, BiConsumer<MessageRecord, byte[]> resultConsumer) {
this.message = message;
this.resultConsumer = resultConsumer;
return this;
public void finishHttpResult(HttpResult result) {
ConvertType format = result.convert() == null ? null : result.convert().getFactory().getConvertType();
byte[] content = HttpResultCoder.getInstance().encode(result);
this.producer.apply(new MessageRecord(format, reqMessage.getResptopic(), null, content));
}
@Override
public void finishJson(org.redkale.service.RetResult ret) {
if (reqMessage.isEmptyResptopic()) return;
finishHttpResult(new HttpResult(ret.clearConvert(), ret));
}
@Override
public void finish(String obj) {
}
@Override
public void finish(final Convert convert, final Object obj) {
}
@Override
public void finish(final byte[] bs) {
}
@Override
public void finish(ByteBuffer buffer) {
}
@Override
public void finish(ByteBuffer... buffers) {
if (reqMessage.isEmptyResptopic()) return;
finishHttpResult(new HttpResult(obj == null ? "" : obj));
}
@Override
public void finish(int status, String message) {
if (reqMessage.isEmptyResptopic()) return;
finishHttpResult(new HttpResult(message == null ? "" : message).status(status));
}
@Override
public void finish(final Convert convert, HttpResult result) {
if (reqMessage.isEmptyResptopic()) return;
if (convert != null) result.convert(convert);
finishHttpResult(result);
}
@Override
public void finish(final byte[] bs) {
if (reqMessage.isEmptyResptopic()) return;
finishHttpResult(new HttpResult(bs));
}
@Override
public void finish(final String contentType, final byte[] bs) {
if (reqMessage.isEmptyResptopic()) return;
finishHttpResult(new HttpResult(bs).contentType(contentType));
}
@Override
public void finish(boolean kill, ByteBuffer buffer) {
if (reqMessage.isEmptyResptopic()) return;
byte[] bs = new byte[buffer.remaining()];
buffer.get(bs);
finishHttpResult(new HttpResult(bs));
}
@Override
public void finish(boolean kill, ByteBuffer... buffers) {
if (reqMessage.isEmptyResptopic()) return;
int size = 0;
for (ByteBuffer buf : buffers) {
size += buf.remaining();
}
byte[] bs = new byte[size];
int index = 0;
for (ByteBuffer buf : buffers) {
int r = buf.remaining();
buf.get(bs, index, r);
index += r;
}
finishHttpResult(new HttpResult(bs));
}
}

View File

@@ -144,7 +144,7 @@ public class MessageRecord implements Serializable {
return (T) convert.convertFrom(type, this.content);
}
public <T> T decodeContent(MessageCoder<T> coder, java.lang.reflect.Type type) {
public <T> T decodeContent(MessageCoder<T> coder) {
if (this.content == null || this.content.length == 0) return null;
return (T) coder.decode(this.content);
}

View File

@@ -27,7 +27,7 @@ public abstract class Response<C extends Context, R extends Request<C>> {
protected final C context;
protected final ObjectPool<Response> responsePool;
protected final ObjectPool<Response> responsePool; //虚拟构建的Response可能不存在responsePool
protected final R request;

View File

@@ -430,6 +430,26 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
finish(convert.convertTo(getBodyBufferSupplier(), ret));
}
/**
* 将HttpResult对象输出
*
* @param convert 指定的Convert
* @param result HttpResult输出对象
*/
public void finish(final Convert convert, HttpResult result) {
if (result.getContentType() != null) setContentType(result.getContentType());
addHeader(result.getHeaders()).addCookie(result.getCookies()).setStatus(result.getStatus() < 1 ? 200 : result.getStatus());
if (result.getResult() == null) {
finish("");
} else if (result.getResult() instanceof CharSequence) {
finish(result.getResult().toString());
} else {
Convert cc = result.convert();
if (cc == null || !(cc instanceof TextConvert)) cc = convert;
finish(cc, result.getResult());
}
}
/**
* 将CompletableFuture的结果对象以JSON格式输出
*
@@ -521,18 +541,7 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
} else if (obj instanceof org.redkale.service.RetResult) {
finishJson((org.redkale.service.RetResult) obj);
} else if (obj instanceof HttpResult) {
HttpResult result = (HttpResult) obj;
if (result.getContentType() != null) setContentType(result.getContentType());
addHeader(result.getHeaders()).addCookie(result.getCookies()).setStatus(result.getStatus() < 1 ? 200 : result.getStatus());
if (result.getResult() == null) {
finish("");
} else if (result.getResult() instanceof CharSequence) {
finish(result.getResult().toString());
} else {
Convert cc = result.convert();
if (cc == null || !(cc instanceof TextConvert)) cc = convert;
finish(cc, result.getResult());
}
finish(convert, (HttpResult) obj);
} else {
if (hasRender) {
if (onlyoneHttpRender != null) {

View File

@@ -43,7 +43,7 @@ public class SncpRequest extends Request<SncpContext> {
private byte[] body;
private byte[] bufferbytes = new byte[6];
private byte[] addrbytes = new byte[6];
protected SncpRequest(SncpContext context, ObjectPool<ByteBuffer> bufferPool) {
super(context, bufferPool);
@@ -70,7 +70,7 @@ public class SncpRequest extends Request<SncpContext> {
this.serviceid = DLong.read(buffer);
this.serviceversion = buffer.getInt();
this.actionid = DLong.read(buffer);
buffer.get(bufferbytes);
buffer.get(addrbytes); //ipaddr
this.bodylength = buffer.getInt();
if (buffer.getInt() != 0) {
@@ -116,7 +116,7 @@ public class SncpRequest extends Request<SncpContext> {
this.bodyoffset = 0;
this.body = null;
this.ping = false;
this.bufferbytes[0] = 0;
this.addrbytes[0] = 0;
super.recycle();
}
@@ -145,9 +145,9 @@ public class SncpRequest extends Request<SncpContext> {
}
public InetSocketAddress getRemoteAddress() {
if (bufferbytes[0] == 0) return null;
return new InetSocketAddress((0xff & bufferbytes[0]) + "." + (0xff & bufferbytes[1]) + "." + (0xff & bufferbytes[2]) + "." + (0xff & bufferbytes[3]),
((0xff00 & (bufferbytes[4] << 8)) | (0xff & bufferbytes[5])));
if (addrbytes[0] == 0) return null;
return new InetSocketAddress((0xff & addrbytes[0]) + "." + (0xff & addrbytes[1]) + "." + (0xff & addrbytes[2]) + "." + (0xff & addrbytes[3]),
((0xff00 & (addrbytes[4] << 8)) | (0xff & addrbytes[5])));
}
}

View File

@@ -71,6 +71,12 @@ public class RetResult<T> {
return convert;
}
public Convert clearConvert() {
Convert c = this.convert;
this.convert = null;
return c;
}
public void convert(Convert convert) {
this.convert = convert;
}