This commit is contained in:
Redkale
2020-05-27 17:10:23 +08:00
parent fcd69474a2
commit 0f7520e67b
5 changed files with 88 additions and 7 deletions

View File

@@ -60,7 +60,7 @@
<!--
MQ管理接口配置
name: MQ名称长度不能超过11个字节. 默认为空字符串。 注意: name不能包含$符号。
value 实现类名必须是org.redkale.boot.MessageAgent的子类
value 实现类名必须是org.redkale.mq.MessageAgent的子类
MQ节点下的子节点配置没有固定格式, 根据MessageAgent实现方的定义来配置
-->
<!--

View File

@@ -5,6 +5,7 @@
*/
package org.redkale.boot;
import org.redkale.mq.MessageAgent;
import org.redkale.util.RedkaleClassLoader;
import java.io.*;
import java.lang.annotation.Annotation;
@@ -27,7 +28,6 @@ import org.redkale.net.sncp.*;
import org.redkale.service.*;
import org.redkale.source.*;
import org.redkale.util.*;
import org.redkale.util.AnyValue.DefaultAnyValue;
/**
* Server节点的初始化配置类
@@ -572,7 +572,7 @@ public abstract class NodeServer {
}
cf = null;
for (AnyValue list : proplist) {
DefaultAnyValue prop = null;
AnyValue.DefaultAnyValue prop = null;
String sc = list.getValue("groups");
if (sc != null) {
sc = sc.trim();
@@ -586,8 +586,8 @@ public abstract class NodeServer {
ClassFilter filter = new ClassFilter(this.serverClassLoader, ref, inter, excludeSuperClasses, prop);
for (AnyValue av : list.getAnyValues(property)) { // <service>、<filter>、<servlet> 节点
final AnyValue[] items = av.getAnyValues("property");
if (av instanceof DefaultAnyValue && items.length > 0) { //存在 <property>节点
DefaultAnyValue dav = DefaultAnyValue.create();
if (av instanceof AnyValue.DefaultAnyValue && items.length > 0) { //存在 <property>节点
AnyValue.DefaultAnyValue dav = AnyValue.DefaultAnyValue.create();
final AnyValue.Entry<String>[] strings = av.getStringEntrys();
if (strings != null) { //将<service>、<filter>、<servlet>节点的属性值传给dav
for (AnyValue.Entry<String> en : strings) {
@@ -600,7 +600,7 @@ public abstract class NodeServer {
if (!"property".equals(en.name)) dav.addValue(en.name, en.getValue());
}
}
DefaultAnyValue ps = DefaultAnyValue.create();
AnyValue.DefaultAnyValue ps = AnyValue.DefaultAnyValue.create();
for (AnyValue item : items) {
ps.addValue(item.getValue("name"), item.getValue("value"));
}

View File

@@ -3,7 +3,7 @@
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.boot;
package org.redkale.mq;
/**
* MQ管理
@@ -15,4 +15,9 @@ package org.redkale.boot;
*/
public abstract class MessageAgent {
//创建topic如果已存在则跳过
public abstract void createTopic(String... topics);
//删除topic如果不存在则跳过
public abstract void deleteTopic(String... topics);
}

View File

@@ -5,6 +5,9 @@
*/
package org.redkale.mq;
import java.nio.ByteBuffer;
import java.util.function.*;
import org.redkale.convert.Convert;
import org.redkale.net.Response;
import org.redkale.net.http.*;
import org.redkale.util.ObjectPool;
@@ -15,8 +18,53 @@ import org.redkale.util.ObjectPool;
*/
public class MessageHttpResponse extends HttpResponse {
protected MessageRecord message;
protected BiConsumer<MessageRecord, byte[]> resultConsumer;
public MessageHttpResponse(HttpContext context, MessageHttpRequest request,
ObjectPool<Response> responsePool, HttpResponseConfig config) {
super(context, request, responsePool, config);
}
public MessageHttpResponse resultConsumer(MessageRecord message, BiConsumer<MessageRecord, byte[]> resultConsumer) {
this.message = message;
this.resultConsumer = resultConsumer;
return this;
}
@Override
public void finishJson(org.redkale.service.RetResult ret) {
}
@Override
public void finish(String obj) {
}
@Override
public void finish(final Convert convert, final Object obj) {
}
@Override
public void finish(final byte[] bs) {
}
@Override
public void finish(ByteBuffer buffer) {
}
@Override
public void finish(ByteBuffer... buffers) {
}
@Override
public void finish(int status, String message) {
}
}

View File

@@ -5,8 +5,12 @@
*/
package org.redkale.mq;
import java.nio.ByteBuffer;
import java.util.function.*;
import org.redkale.convert.bson.BsonWriter;
import org.redkale.net.Response;
import org.redkale.net.sncp.*;
import static org.redkale.net.sncp.SncpRequest.HEADER_SIZE;
import org.redkale.util.ObjectPool;
/**
@@ -18,7 +22,31 @@ import org.redkale.util.ObjectPool;
*/
public class MessageSncpResponse extends SncpResponse {
protected MessageRecord message;
protected BiConsumer<MessageRecord, byte[]> resultConsumer;
public MessageSncpResponse(SncpContext context, MessageSncpRequest request, ObjectPool<Response> responsePool) {
super(context, request, responsePool);
}
public MessageSncpResponse resultConsumer(MessageRecord message, BiConsumer<MessageRecord, byte[]> resultConsumer) {
this.message = message;
this.resultConsumer = resultConsumer;
return this;
}
@Override
public void finish(final int retcode, final BsonWriter out) {
if (out == null) {
final byte[] result = new byte[SncpRequest.HEADER_SIZE];
fillHeader(ByteBuffer.wrap(result), 0, retcode);
resultConsumer.accept(message, result);
return;
}
final int respBodyLength = out.count(); //body总长度
final byte[] result = out.toArray();
fillHeader(ByteBuffer.wrap(result), respBodyLength - HEADER_SIZE, retcode);
resultConsumer.accept(message, result);
}
}