移除MessageMultiConsumer功能
This commit is contained in:
@@ -16,7 +16,6 @@ import java.util.logging.*;
|
|||||||
import org.redkale.annotation.Comment;
|
import org.redkale.annotation.Comment;
|
||||||
import org.redkale.convert.*;
|
import org.redkale.convert.*;
|
||||||
import org.redkale.convert.json.*;
|
import org.redkale.convert.json.*;
|
||||||
import org.redkale.mq.MessageMultiConsumer;
|
|
||||||
import org.redkale.net.http.*;
|
import org.redkale.net.http.*;
|
||||||
import org.redkale.persistence.*;
|
import org.redkale.persistence.*;
|
||||||
import org.redkale.service.RetResult;
|
import org.redkale.service.RetResult;
|
||||||
@@ -107,10 +106,6 @@ public final class ApiDocCommand {
|
|||||||
if (servlet instanceof WebSocketServlet) {
|
if (servlet instanceof WebSocketServlet) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (servlet.getClass().getAnnotation(MessageMultiConsumer.class) != null) {
|
|
||||||
node.logger.log(Level.INFO, servlet + " be skipped because has @MessageMultiConsumer");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
WebServlet ws = servlet.getClass().getAnnotation(WebServlet.class);
|
WebServlet ws = servlet.getClass().getAnnotation(WebServlet.class);
|
||||||
if (ws == null) {
|
if (ws == null) {
|
||||||
node.logger.log(Level.WARNING, servlet + " not found @WebServlet");
|
node.logger.log(Level.WARNING, servlet + " not found @WebServlet");
|
||||||
|
|||||||
@@ -47,9 +47,6 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
|
|||||||
//可能被sncp用到的服务 key: serviceName
|
//可能被sncp用到的服务 key: serviceName
|
||||||
protected final ConcurrentHashMap<String, Set<InetSocketAddress>> sncpAddressMap = new ConcurrentHashMap<>();
|
protected final ConcurrentHashMap<String, Set<InetSocketAddress>> sncpAddressMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
//可能被mqtp用到的服务 key: serviceName
|
|
||||||
protected final ConcurrentHashMap<String, Set<InetSocketAddress>> mqtpAddressMap = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(AnyValue config) {
|
public void init(AnyValue config) {
|
||||||
super.init(config);
|
super.init(config);
|
||||||
@@ -138,7 +135,6 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
|
|||||||
checkApplicationHealth();
|
checkApplicationHealth();
|
||||||
checkHttpAddressHealth();
|
checkHttpAddressHealth();
|
||||||
loadSncpAddressHealth();
|
loadSncpAddressHealth();
|
||||||
loadMqtpAddressHealth();
|
|
||||||
localEntrys.values().stream().filter(e -> !e.canceled).forEach(entry -> {
|
localEntrys.values().stream().filter(e -> !e.canceled).forEach(entry -> {
|
||||||
checkLocalHealth(entry);
|
checkLocalHealth(entry);
|
||||||
});
|
});
|
||||||
@@ -162,17 +158,6 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void loadMqtpAddressHealth() {
|
|
||||||
List<String> keys = source.keysStartsWith("cluster.mqtp:");
|
|
||||||
keys.forEach(serviceName -> {
|
|
||||||
try {
|
|
||||||
this.mqtpAddressMap.put(serviceName, queryAddress(serviceName).get(3, TimeUnit.SECONDS));
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.log(Level.SEVERE, "loadMqtpAddressHealth check " + serviceName + " error", e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void checkHttpAddressHealth() {
|
protected void checkHttpAddressHealth() {
|
||||||
try {
|
try {
|
||||||
this.httpAddressMap.keySet().stream().forEach(serviceName -> {
|
this.httpAddressMap.keySet().stream().forEach(serviceName -> {
|
||||||
@@ -209,15 +194,6 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override //获取MQTP的HTTP远程服务的可用ip列表, key = serviceName的后半段
|
|
||||||
public CompletableFuture<Map<String, Set<InetSocketAddress>>> queryMqtpAddress(String protocol, String module, String resname) {
|
|
||||||
final Map<String, Set<InetSocketAddress>> rsmap = new ConcurrentHashMap<>();
|
|
||||||
final String servicenamprefix = generateHttpServiceName(protocol, module, null) + ":";
|
|
||||||
mqtpAddressMap.keySet().stream().filter(k -> k.startsWith(servicenamprefix))
|
|
||||||
.forEach(sn -> rsmap.put(sn.substring(servicenamprefix.length()), mqtpAddressMap.get(sn)));
|
|
||||||
return CompletableFuture.completedFuture(rsmap);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override //获取HTTP远程服务的可用ip列表
|
@Override //获取HTTP远程服务的可用ip列表
|
||||||
public CompletableFuture<Set<InetSocketAddress>> queryHttpAddress(String protocol, String module, String resname) {
|
public CompletableFuture<Set<InetSocketAddress>> queryHttpAddress(String protocol, String module, String resname) {
|
||||||
final String serviceName = generateHttpServiceName(protocol, module, resname);
|
final String serviceName = generateHttpServiceName(protocol, module, resname);
|
||||||
@@ -329,9 +305,6 @@ public class CacheClusterAgent extends ClusterAgent implements Resourcable {
|
|||||||
if (realcanceled && currEntry != null) {
|
if (realcanceled && currEntry != null) {
|
||||||
currEntry.canceled = true;
|
currEntry.canceled = true;
|
||||||
}
|
}
|
||||||
if (!"mqtp".equals(protocol) && currEntry != null && currEntry.submqtp) {
|
|
||||||
deregister(ns, "mqtp", service, realcanceled);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -11,14 +11,13 @@ import java.nio.charset.StandardCharsets;
|
|||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.*;
|
||||||
import java.util.logging.*;
|
import java.util.logging.*;
|
||||||
import org.redkale.annotation.AutoLoad;
|
|
||||||
import org.redkale.annotation.*;
|
import org.redkale.annotation.*;
|
||||||
|
import org.redkale.annotation.AutoLoad;
|
||||||
import org.redkale.annotation.ResourceListener;
|
import org.redkale.annotation.ResourceListener;
|
||||||
import org.redkale.boot.*;
|
import org.redkale.boot.*;
|
||||||
import static org.redkale.boot.Application.*;
|
import static org.redkale.boot.Application.*;
|
||||||
import org.redkale.convert.ConvertDisabled;
|
import org.redkale.convert.ConvertDisabled;
|
||||||
import org.redkale.convert.json.JsonConvert;
|
import org.redkale.convert.json.JsonConvert;
|
||||||
import org.redkale.mq.MessageMultiConsumer;
|
|
||||||
import org.redkale.net.Server;
|
import org.redkale.net.Server;
|
||||||
import org.redkale.net.http.*;
|
import org.redkale.net.http.*;
|
||||||
import org.redkale.net.sncp.*;
|
import org.redkale.net.sncp.*;
|
||||||
@@ -148,14 +147,6 @@ public abstract class ClusterAgent {
|
|||||||
}
|
}
|
||||||
ClusterEntry htentry = register(ns, protocol, service);
|
ClusterEntry htentry = register(ns, protocol, service);
|
||||||
localEntrys.put(htentry.serviceid, htentry);
|
localEntrys.put(htentry.serviceid, htentry);
|
||||||
if (protocol.toLowerCase().startsWith("http")) {
|
|
||||||
MessageMultiConsumer mmc = service.getClass().getAnnotation(MessageMultiConsumer.class);
|
|
||||||
if (mmc != null) {
|
|
||||||
ClusterEntry mqentry = register(ns, "mqtp", service);
|
|
||||||
localEntrys.put(mqentry.serviceid, mqentry);
|
|
||||||
htentry.submqtp = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
//远程模式加载IP列表, 只支持SNCP协议
|
//远程模式加载IP列表, 只支持SNCP协议
|
||||||
if (ns.isSNCP()) {
|
if (ns.isSNCP()) {
|
||||||
@@ -224,9 +215,6 @@ public abstract class ClusterAgent {
|
|||||||
return 10;
|
return 10;
|
||||||
}
|
}
|
||||||
|
|
||||||
//获取MQTP的HTTP远程服务的可用ip列表, key = serviceName的后半段
|
|
||||||
public abstract CompletableFuture<Map<String, Set<InetSocketAddress>>> queryMqtpAddress(String protocol, String module, String resname);
|
|
||||||
|
|
||||||
//获取HTTP远程服务的可用ip列表
|
//获取HTTP远程服务的可用ip列表
|
||||||
public abstract CompletableFuture<Set<InetSocketAddress>> queryHttpAddress(String protocol, String module, String resname);
|
public abstract CompletableFuture<Set<InetSocketAddress>> queryHttpAddress(String protocol, String module, String resname);
|
||||||
|
|
||||||
@@ -312,11 +300,6 @@ public abstract class ClusterAgent {
|
|||||||
String module = Rest.getRestModule(service).toLowerCase();
|
String module = Rest.getRestModule(service).toLowerCase();
|
||||||
return protocol.toLowerCase() + serviceSeparator() + module + (resname.isEmpty() ? "" : ("-" + resname));
|
return protocol.toLowerCase() + serviceSeparator() + module + (resname.isEmpty() ? "" : ("-" + resname));
|
||||||
}
|
}
|
||||||
if ("mqtp".equalsIgnoreCase(protocol)) {
|
|
||||||
MessageMultiConsumer mmc = service.getClass().getAnnotation(MessageMultiConsumer.class);
|
|
||||||
String selfmodule = Rest.getRestModule(service).toLowerCase();
|
|
||||||
return protocol.toLowerCase() + serviceSeparator() + mmc.module() + serviceSeparator() + selfmodule;
|
|
||||||
}
|
|
||||||
if (!Sncp.isSncpDyn(service)) {
|
if (!Sncp.isSncpDyn(service)) {
|
||||||
return protocol.toLowerCase() + serviceSeparator() + service.getClass().getName();
|
return protocol.toLowerCase() + serviceSeparator() + service.getClass().getName();
|
||||||
}
|
}
|
||||||
@@ -395,7 +378,7 @@ public abstract class ClusterAgent {
|
|||||||
|
|
||||||
public String checkName;
|
public String checkName;
|
||||||
|
|
||||||
//http or sncp or mqtp
|
//http or sncp
|
||||||
public String protocol;
|
public String protocol;
|
||||||
|
|
||||||
//TCP or UDP
|
//TCP or UDP
|
||||||
@@ -408,8 +391,6 @@ public abstract class ClusterAgent {
|
|||||||
|
|
||||||
public boolean canceled;
|
public boolean canceled;
|
||||||
|
|
||||||
public boolean submqtp;
|
|
||||||
|
|
||||||
public ClusterEntry(NodeServer ns, String protocol, Service service) {
|
public ClusterEntry(NodeServer ns, String protocol, Service service) {
|
||||||
this.serviceid = generateServiceId(ns, protocol, service);
|
this.serviceid = generateServiceId(ns, protocol, service);
|
||||||
this.serviceName = generateServiceName(ns, protocol, service);
|
this.serviceName = generateServiceName(ns, protocol, service);
|
||||||
|
|||||||
@@ -78,26 +78,6 @@ public class HttpMessageClient extends MessageClient {
|
|||||||
produceMessage(topic, userid, groupid, request, null);
|
produceMessage(topic, userid, groupid, request, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public final void broadcastMessage(HttpSimpleRequest request) {
|
|
||||||
broadcastMessage(generateHttpReqTopic(request, null), 0, null, request, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public final void broadcastMessage(Serializable userid, HttpSimpleRequest request) {
|
|
||||||
broadcastMessage(generateHttpReqTopic(request, null), userid, null, request, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public final void broadcastMessage(Serializable userid, String groupid, HttpSimpleRequest request) {
|
|
||||||
broadcastMessage(generateHttpReqTopic(request, null), userid, groupid, request, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public final void broadcastMessage(String topic, HttpSimpleRequest request) {
|
|
||||||
broadcastMessage(topic, 0, null, request, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public final void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) {
|
|
||||||
broadcastMessage(topic, userid, groupid, request, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public <T> CompletableFuture<T> sendMessage(HttpSimpleRequest request, Type type) {
|
public <T> CompletableFuture<T> sendMessage(HttpSimpleRequest request, Type type) {
|
||||||
return sendMessage(generateHttpReqTopic(request, null), 0, null, request, null).thenApply((HttpResult<byte[]> httbs) -> {
|
return sendMessage(generateHttpReqTopic(request, null), 0, null, request, null).thenApply((HttpResult<byte[]> httbs) -> {
|
||||||
if (httbs == null || httbs.getResult() == null) {
|
if (httbs == null || httbs.getResult() == null) {
|
||||||
@@ -152,12 +132,6 @@ public class HttpMessageClient extends MessageClient {
|
|||||||
return sendMessage(message, true, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance()));
|
return sendMessage(message, true, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance()));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
|
|
||||||
MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request));
|
|
||||||
message.userid(userid).groupid(groupid);
|
|
||||||
sendMessage(message, false, counter);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
|
protected void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
|
||||||
MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request));
|
MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request));
|
||||||
message.userid(userid).groupid(groupid);
|
message.userid(userid).groupid(groupid);
|
||||||
|
|||||||
@@ -37,12 +37,8 @@ public class HttpMessageClientProcessor implements MessageClientProcessor {
|
|||||||
|
|
||||||
protected final HttpServlet servlet;
|
protected final HttpServlet servlet;
|
||||||
|
|
||||||
protected final boolean multiConsumer;
|
|
||||||
|
|
||||||
protected final String restModule; // 前后有/, 例如: /user/
|
protected final String restModule; // 前后有/, 例如: /user/
|
||||||
|
|
||||||
protected final String multiModule; // 前后有/, 例如: /userstat/
|
|
||||||
|
|
||||||
protected ThreadLocal<ObjectPool<HttpMessageResponse>> respPoolThreadLocal;
|
protected ThreadLocal<ObjectPool<HttpMessageResponse>> respPoolThreadLocal;
|
||||||
|
|
||||||
protected final Supplier<HttpMessageResponse> respSupplier;
|
protected final Supplier<HttpMessageResponse> respSupplier;
|
||||||
@@ -66,10 +62,7 @@ public class HttpMessageClientProcessor implements MessageClientProcessor {
|
|||||||
this.server = server;
|
this.server = server;
|
||||||
this.service = service;
|
this.service = service;
|
||||||
this.servlet = servlet;
|
this.servlet = servlet;
|
||||||
MessageMultiConsumer mmc = service.getClass().getAnnotation(MessageMultiConsumer.class);
|
|
||||||
this.multiConsumer = mmc != null;
|
|
||||||
this.restModule = "/" + Rest.getRestModule(service) + "/";
|
this.restModule = "/" + Rest.getRestModule(service) + "/";
|
||||||
this.multiModule = mmc != null ? ("/" + mmc.module() + "/") : null;
|
|
||||||
this.respSupplier = () -> respPoolThreadLocal.get().get();
|
this.respSupplier = () -> respPoolThreadLocal.get().get();
|
||||||
this.respConsumer = resp -> respPoolThreadLocal.get().accept(resp);
|
this.respConsumer = resp -> respPoolThreadLocal.get().accept(resp);
|
||||||
this.respPoolThreadLocal = Utility.withInitialThreadLocal(() -> ObjectPool.createUnsafePool(Utility.cpus(),
|
this.respPoolThreadLocal = Utility.withInitialThreadLocal(() -> ObjectPool.createUnsafePool(Utility.cpus(),
|
||||||
@@ -94,15 +87,9 @@ public class HttpMessageClientProcessor implements MessageClientProcessor {
|
|||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
long cha = now - message.createTime;
|
long cha = now - message.createTime;
|
||||||
long e = now - startTime;
|
long e = now - startTime;
|
||||||
if (multiConsumer) {
|
|
||||||
message.setRespTopic(null); //不容许有响应
|
|
||||||
}
|
|
||||||
HttpMessageResponse response = respSupplier.get();
|
HttpMessageResponse response = respSupplier.get();
|
||||||
request = response.request();
|
request = response.request();
|
||||||
response.prepare(message, callback, producer);
|
response.prepare(message, callback, producer);
|
||||||
if (multiConsumer) {
|
|
||||||
request.setRequestURI(request.getRequestURI().replaceFirst(this.multiModule, this.restModule));
|
|
||||||
}
|
|
||||||
|
|
||||||
server.getHttpServer().getContext().execute(servlet, request, response);
|
server.getHttpServer().getContext().execute(servlet, request, response);
|
||||||
long o = System.currentTimeMillis() - now;
|
long o = System.currentTimeMillis() - now;
|
||||||
|
|||||||
@@ -72,88 +72,6 @@ public class HttpMessageClusterClient extends HttpMessageClient {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
|
|
||||||
mqtpAsync(userid, request);
|
|
||||||
}
|
|
||||||
|
|
||||||
private CompletableFuture<HttpResult<byte[]>> mqtpAsync(Serializable userid, HttpSimpleRequest req) {
|
|
||||||
String module = req.getRequestURI();
|
|
||||||
module = module.substring(1); //去掉/
|
|
||||||
module = module.substring(0, module.indexOf('/'));
|
|
||||||
Map<String, String> headers = req.getHeaders();
|
|
||||||
String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, "");
|
|
||||||
final String localModule = module;
|
|
||||||
return clusterAgent.queryMqtpAddress("mqtp", module, resname).thenCompose(addrmap -> {
|
|
||||||
if (addrmap == null || addrmap.isEmpty()) {
|
|
||||||
if (logger.isLoggable(Level.FINE)) {
|
|
||||||
logger.log(Level.FINE, "mqtpAsync.broadcastMessage: module=" + localModule + ", resname=" + resname + ", addrmap is empty");
|
|
||||||
}
|
|
||||||
return new HttpResult<byte[]>().status(404).toFuture();
|
|
||||||
}
|
|
||||||
final Map<String, String> clientHeaders = new LinkedHashMap<>();
|
|
||||||
byte[] clientBody = null;
|
|
||||||
if (req.isRpc()) {
|
|
||||||
clientHeaders.put(Rest.REST_HEADER_RPC, "true");
|
|
||||||
}
|
|
||||||
if (req.isFrombody()) {
|
|
||||||
clientHeaders.put(Rest.REST_HEADER_PARAM_FROM_BODY, "true");
|
|
||||||
}
|
|
||||||
if (req.getReqConvertType() != null) {
|
|
||||||
clientHeaders.put(Rest.REST_HEADER_REQ_CONVERT_TYPE, req.getReqConvertType().toString());
|
|
||||||
}
|
|
||||||
if (req.getRespConvertType() != null) {
|
|
||||||
clientHeaders.put(Rest.REST_HEADER_RESP_CONVERT_TYPE, req.getRespConvertType().toString());
|
|
||||||
}
|
|
||||||
if (userid != null) {
|
|
||||||
clientHeaders.put(Rest.REST_HEADER_CURRUSERID, "" + userid);
|
|
||||||
}
|
|
||||||
if (headers != null) {
|
|
||||||
headers.forEach((n, v) -> {
|
|
||||||
if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase())) {
|
|
||||||
clientHeaders.put(n, v);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
clientHeaders.put("Content-Type", "x-www-form-urlencoded");
|
|
||||||
if (req.getBody() != null && req.getBody().length > 0) {
|
|
||||||
String paramstr = req.getParametersToString();
|
|
||||||
if (paramstr != null) {
|
|
||||||
if (req.getRequestURI().indexOf('?') > 0) {
|
|
||||||
req.setRequestURI(req.getRequestURI() + "&" + paramstr);
|
|
||||||
} else {
|
|
||||||
req.setRequestURI(req.getRequestURI() + "?" + paramstr);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
clientBody = req.getBody();
|
|
||||||
} else {
|
|
||||||
String paramstr = req.getParametersToString();
|
|
||||||
if (paramstr != null) {
|
|
||||||
clientBody = paramstr.getBytes(StandardCharsets.UTF_8);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
List<CompletableFuture> futures = new ArrayList<>();
|
|
||||||
if (logger.isLoggable(Level.FINEST)) {
|
|
||||||
logger.log(Level.FINEST, "mqtpAsync: module=" + localModule + ", resname=" + resname + ", addrmap=" + addrmap);
|
|
||||||
}
|
|
||||||
for (Map.Entry<String, Set<InetSocketAddress>> en : addrmap.entrySet()) {
|
|
||||||
String realmodule = en.getKey();
|
|
||||||
Collection<InetSocketAddress> addrs = en.getValue();
|
|
||||||
if (addrs == null || addrs.isEmpty()) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
String suburi = req.getRequestURI();
|
|
||||||
suburi = suburi.substring(1); //跳过 /
|
|
||||||
suburi = "/" + realmodule + suburi.substring(suburi.indexOf('/'));
|
|
||||||
futures.add(forEachCollectionFuture(logger.isLoggable(Level.FINEST), userid, req, (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + suburi, clientHeaders, clientBody, addrs.iterator()));
|
|
||||||
}
|
|
||||||
if (futures.isEmpty()) {
|
|
||||||
return CompletableFuture.completedFuture(null);
|
|
||||||
}
|
|
||||||
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenApply(v -> null);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private CompletableFuture<HttpResult<byte[]>> httpAsync(boolean produce, Serializable userid, HttpSimpleRequest req) {
|
private CompletableFuture<HttpResult<byte[]>> httpAsync(boolean produce, Serializable userid, HttpSimpleRequest req) {
|
||||||
String module = req.getRequestURI();
|
String module = req.getRequestURI();
|
||||||
module = module.substring(1); //去掉/
|
module = module.substring(1); //去掉/
|
||||||
@@ -254,53 +172,6 @@ public class HttpMessageClusterClient extends HttpMessageClient {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// private CompletableFuture<HttpResult<byte[]>> mqtpAsync(Serializable userid, HttpSimpleRequest req) {
|
|
||||||
// final boolean finest = logger.isLoggable(Level.FINEST);
|
|
||||||
// String module = req.getRequestURI();
|
|
||||||
// module = module.substring(1); //去掉/
|
|
||||||
// module = module.substring(0, module.indexOf('/'));
|
|
||||||
// Map<String, String> headers = req.getHeaders();
|
|
||||||
// String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, "");
|
|
||||||
// return clusterAgent.queryMqtpAddress("mqtp", module, resname).thenCompose(addrmap -> {
|
|
||||||
// if (addrmap == null || addrmap.isEmpty()) return new HttpResult().status(404).toAnyFuture();
|
|
||||||
// java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().timeout(Duration.ofMillis(30000));
|
|
||||||
// if (req.isRpc()) builder.header(Rest.REST_HEADER_RPC_NAME, "true");
|
|
||||||
// if (req.isFrombody()) builder.header(Rest.REST_HEADER_PARAM_FROM_BODY, "true");
|
|
||||||
// if (req.getReqConvertType() != null) builder.header(Rest.REST_HEADER_REQ_CONVERT_TYPE, req.getReqConvertType().toString());
|
|
||||||
// if (req.getRespConvertType() != null) builder.header(Rest.REST_HEADER_RESP_CONVERT_TYPE, req.getRespConvertType().toString());
|
|
||||||
// if (userid != 0) builder.header(Rest.REST_HEADER_CURRUSERID, "" + userid);
|
|
||||||
// if (headers != null) headers.forEach((n, v) -> {
|
|
||||||
// if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase())) builder.header(n, v);
|
|
||||||
// });
|
|
||||||
// builder.header("Content-Type", "x-www-form-urlencoded");
|
|
||||||
// if (req.getBody() != null && req.getBody().length > 0) {
|
|
||||||
// String paramstr = req.getParametersToString();
|
|
||||||
// if (paramstr != null) {
|
|
||||||
// if (req.getRequestURI().indexOf('?') > 0) {
|
|
||||||
// req.setRequestURI(req.getRequestURI() + "&" + paramstr);
|
|
||||||
// } else {
|
|
||||||
// req.setRequestURI(req.getRequestURI() + "?" + paramstr);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// builder.POST(java.net.http.HttpRequest.BodyPublishers.ofByteArray(req.getBody()));
|
|
||||||
// } else {
|
|
||||||
// String paramstr = req.getParametersToString();
|
|
||||||
// if (paramstr != null) builder.POST(java.net.http.HttpRequest.BodyPublishers.ofString(paramstr));
|
|
||||||
// }
|
|
||||||
// List<CompletableFuture> futures = new ArrayList<>();
|
|
||||||
// for (Map.Entry<String, Collection<InetSocketAddress>> en : addrmap.entrySet()) {
|
|
||||||
// String realmodule = en.getKey();
|
|
||||||
// Collection<InetSocketAddress> addrs = en.getValue();
|
|
||||||
// if (addrs == null || addrs.isEmpty()) continue;
|
|
||||||
// String suburi = req.getRequestURI();
|
|
||||||
// suburi = suburi.substring(1); //跳过 /
|
|
||||||
// suburi = "/" + realmodule + suburi.substring(suburi.indexOf('/'));
|
|
||||||
// futures.add(forEachCollectionFuture(finest, userid, req, (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + suburi, builder, addrs.iterator()));
|
|
||||||
// }
|
|
||||||
// if (futures.isEmpty()) return CompletableFuture.completedFuture(null);
|
|
||||||
// return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenApply(v -> null);
|
|
||||||
// });
|
|
||||||
// }
|
|
||||||
//
|
//
|
||||||
// private CompletableFuture<HttpResult<byte[]>> httpAsync(Serializable userid, HttpSimpleRequest req) {
|
// private CompletableFuture<HttpResult<byte[]>> httpAsync(Serializable userid, HttpSimpleRequest req) {
|
||||||
// final boolean finest = logger.isLoggable(Level.FINEST);
|
// final boolean finest = logger.isLoggable(Level.FINEST);
|
||||||
|
|||||||
@@ -164,20 +164,6 @@ public class HttpMessageLocalClient extends HttpMessageClient {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
|
|
||||||
HttpDispatcherServlet ps = dispatcherServlet();
|
|
||||||
HttpRequest req = new HttpMessageLocalRequest(context(), request, userid);
|
|
||||||
HttpResponse resp = new HttpMessageLocalResponse(req, null);
|
|
||||||
ps.filterServletsByMmcTopic(topic).forEach(s -> {
|
|
||||||
try {
|
|
||||||
s.execute(req, resp);
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.log(Level.SEVERE, request + " execute " + s + " error", e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class HttpMessageLocalRequest extends HttpRequest {
|
public static class HttpMessageLocalRequest extends HttpRequest {
|
||||||
|
|
||||||
public HttpMessageLocalRequest(HttpContext context, HttpSimpleRequest req, Serializable userid) {
|
public HttpMessageLocalRequest(HttpContext context, HttpSimpleRequest req, Serializable userid) {
|
||||||
|
|||||||
@@ -488,10 +488,6 @@ public abstract class MessageAgent implements Resourcable {
|
|||||||
protected String[] generateHttpReqTopics(Service service) {
|
protected String[] generateHttpReqTopics(Service service) {
|
||||||
String resname = Sncp.getResourceName(service);
|
String resname = Sncp.getResourceName(service);
|
||||||
String module = Rest.getRestModule(service).toLowerCase();
|
String module = Rest.getRestModule(service).toLowerCase();
|
||||||
MessageMultiConsumer mmc = service.getClass().getAnnotation(MessageMultiConsumer.class);
|
|
||||||
if (mmc != null) {
|
|
||||||
return new String[]{generateHttpReqTopic(mmc.module()) + (resname.isEmpty() ? "" : ("-" + resname))};
|
|
||||||
}
|
|
||||||
return new String[]{"http.req.module." + module + (resname.isEmpty() ? "" : ("-" + resname))};
|
return new String[]{"http.req.module." + module + (resname.isEmpty() ? "" : ("-" + resname))};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,63 +1,63 @@
|
|||||||
/*
|
///*
|
||||||
* To change this license header, choose License Headers in Project Properties.
|
// * To change this license header, choose License Headers in Project Properties.
|
||||||
* To change this template file, choose Tools | Templates
|
// * To change this template file, choose Tools | Templates
|
||||||
* and open the template in the editor.
|
// * and open the template in the editor.
|
||||||
*/
|
// */
|
||||||
package org.redkale.mq;
|
//package org.redkale.mq;
|
||||||
|
//
|
||||||
import static java.lang.annotation.RetentionPolicy.RUNTIME;
|
//import static java.lang.annotation.RetentionPolicy.RUNTIME;
|
||||||
import java.lang.annotation.*;
|
//import java.lang.annotation.*;
|
||||||
import static java.lang.annotation.ElementType.*;
|
//import static java.lang.annotation.ElementType.*;
|
||||||
|
//
|
||||||
/**
|
///**
|
||||||
* 多消费组,需要同 @RestService 一起使用
|
// * 多消费组,需要同 @RestService 一起使用
|
||||||
* <p>
|
// * <p>
|
||||||
* 通常一个topic只会被一个RestService消费, 当一个topic需要被其他RestService消费时,就需要使用@MessageMultiConsumer
|
// * 通常一个topic只会被一个RestService消费, 当一个topic需要被其他RestService消费时,就需要使用@MessageMultiConsumer
|
||||||
*
|
// *
|
||||||
* <blockquote><pre>
|
// * <blockquote><pre>
|
||||||
* @RestService(name = "user", comment = "用户服务")
|
// * @RestService(name = "user", comment = "用户服务")
|
||||||
* public class UserService implements Service{
|
// * public class UserService implements Service{
|
||||||
*
|
// *
|
||||||
* @RestMapping(comment = "用户登录")
|
// * @RestMapping(comment = "用户登录")
|
||||||
* public RetResult login(LoginBean bean){
|
// * public RetResult login(LoginBean bean){
|
||||||
* //do something
|
// * //do something
|
||||||
* }
|
// * }
|
||||||
* }
|
// * }
|
||||||
* </pre></blockquote>
|
// * </pre></blockquote>
|
||||||
*
|
// *
|
||||||
* 需求:统计用户登录次数, 可以创建一个MessageMultiConsumer 的 RestService:
|
// * 需求:统计用户登录次数, 可以创建一个MessageMultiConsumer 的 RestService:
|
||||||
* <blockquote><pre>
|
// * <blockquote><pre>
|
||||||
* <b>@MessageMultiConsumer(module = "user") </b>
|
// * <b>@MessageMultiConsumer(module = "user") </b>
|
||||||
* @RestService(name = "loginstat", comment = "用户统计服务")
|
// * @RestService(name = "loginstat", comment = "用户统计服务")
|
||||||
* public class LoginStatService implements Service{
|
// * public class LoginStatService implements Service{
|
||||||
*
|
// *
|
||||||
* private LongAdder counter = new LongAdder();
|
// * private LongAdder counter = new LongAdder();
|
||||||
*
|
// *
|
||||||
* @RestMapping(name = "login", comment = "用户登录统计")
|
// * @RestMapping(name = "login", comment = "用户登录统计")
|
||||||
* public void stat(LoginBean bean){ //参数必须和UserService.login方法一致
|
// * public void stat(LoginBean bean){ //参数必须和UserService.login方法一致
|
||||||
* counter.increment();
|
// * counter.increment();
|
||||||
* }
|
// * }
|
||||||
* }
|
// * }
|
||||||
* </pre></blockquote>
|
// * </pre></blockquote>
|
||||||
*
|
// *
|
||||||
* <p>
|
// * <p>
|
||||||
* 注: 标记 @MessageMultiConsumer 的Service的@RestMapping方法都只能是void返回类型 <br>
|
// * 注: 标记 @MessageMultiConsumer 的Service的@RestMapping方法都只能是void返回类型 <br>
|
||||||
* 由 MessageConsumer 代替
|
// * 由 MessageConsumer 代替
|
||||||
* <p>
|
// * <p>
|
||||||
* 详情见: https://redkale.org
|
// * 详情见: https://redkale.org
|
||||||
*
|
// *
|
||||||
*
|
// *
|
||||||
* @author zhangjx
|
// * @author zhangjx
|
||||||
* @deprecated
|
// * @deprecated
|
||||||
*
|
// *
|
||||||
* @since 2.1.0
|
// * @since 2.1.0
|
||||||
*/
|
// */
|
||||||
@Inherited
|
//@Inherited
|
||||||
@Documented
|
//@Documented
|
||||||
@Target({TYPE})
|
//@Target({TYPE})
|
||||||
@Retention(RUNTIME)
|
//@Retention(RUNTIME)
|
||||||
@Deprecated(since = "2.8.0")
|
//@Deprecated(since = "2.8.0")
|
||||||
public @interface MessageMultiConsumer {
|
//public @interface MessageMultiConsumer {
|
||||||
|
//
|
||||||
String module();
|
// String module();
|
||||||
}
|
//}
|
||||||
|
|||||||
@@ -17,8 +17,8 @@ import org.redkale.net.*;
|
|||||||
import org.redkale.net.Filter;
|
import org.redkale.net.Filter;
|
||||||
import org.redkale.net.http.Rest.RestDynSourceType;
|
import org.redkale.net.http.Rest.RestDynSourceType;
|
||||||
import org.redkale.service.Service;
|
import org.redkale.service.Service;
|
||||||
import org.redkale.util.AnyValue.DefaultAnyValue;
|
|
||||||
import org.redkale.util.*;
|
import org.redkale.util.*;
|
||||||
|
import org.redkale.util.AnyValue.DefaultAnyValue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* HTTP Servlet的总入口,请求在HttpDispatcherServlet中进行分流。 <br>
|
* HTTP Servlet的总入口,请求在HttpDispatcherServlet中进行分流。 <br>
|
||||||
@@ -523,10 +523,6 @@ public class HttpDispatcherServlet extends DispatcherServlet<String, HttpContext
|
|||||||
return filterServlets(x -> x._reqtopic != null && x._reqtopic.equals(topic)).findFirst().orElse(null);
|
return filterServlets(x -> x._reqtopic != null && x._reqtopic.equals(topic)).findFirst().orElse(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Stream<HttpServlet> filterServletsByMmcTopic(String mmctopic) {
|
|
||||||
return filterServlets(x -> x._mmctopic != null && x._mmctopic.equals(mmctopic));
|
|
||||||
}
|
|
||||||
|
|
||||||
public Stream<HttpServlet> filterServlets(Predicate<HttpServlet> predicate) {
|
public Stream<HttpServlet> filterServlets(Predicate<HttpServlet> predicate) {
|
||||||
return predicate == null ? servletStream() : servletStream().filter(predicate);
|
return predicate == null ? servletStream() : servletStream().filter(predicate);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -322,10 +322,6 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
|
|||||||
servlet = Rest.createRestServlet(classLoader, userType, baseServletType, serviceType);
|
servlet = Rest.createRestServlet(classLoader, userType, baseServletType, serviceType);
|
||||||
if (servlet != null) {
|
if (servlet != null) {
|
||||||
servlet._reqtopic = MessageAgent.generateHttpReqTopic(Rest.getRestModule(service));
|
servlet._reqtopic = MessageAgent.generateHttpReqTopic(Rest.getRestModule(service));
|
||||||
if (serviceType.getAnnotation(MessageMultiConsumer.class) != null) {
|
|
||||||
MessageMultiConsumer mmc = serviceType.getAnnotation(MessageMultiConsumer.class);
|
|
||||||
servlet._mmctopic = MessageAgent.generateHttpReqTopic(mmc.module(), resname);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (servlet == null) {
|
if (servlet == null) {
|
||||||
|
|||||||
@@ -13,8 +13,8 @@ import java.util.concurrent.*;
|
|||||||
import java.util.function.*;
|
import java.util.function.*;
|
||||||
import java.util.logging.*;
|
import java.util.logging.*;
|
||||||
import org.redkale.annotation.*;
|
import org.redkale.annotation.*;
|
||||||
import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES;
|
|
||||||
import org.redkale.asm.*;
|
import org.redkale.asm.*;
|
||||||
|
import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES;
|
||||||
import static org.redkale.asm.Opcodes.*;
|
import static org.redkale.asm.Opcodes.*;
|
||||||
import org.redkale.boot.*;
|
import org.redkale.boot.*;
|
||||||
import org.redkale.net.*;
|
import org.redkale.net.*;
|
||||||
@@ -42,8 +42,6 @@ public class HttpServlet extends Servlet<HttpContext, HttpRequest, HttpResponse>
|
|||||||
|
|
||||||
String _reqtopic; //根据RestService+MQ生成的值 @since 2.5.0
|
String _reqtopic; //根据RestService+MQ生成的值 @since 2.5.0
|
||||||
|
|
||||||
String _mmctopic; //根据RestService+@MessageMultiConsumer生成的值 @since 2.5.0
|
|
||||||
|
|
||||||
HashMap<String, ActionEntry> _actionmap; //Rest生成时赋值, 字段名Rest有用到
|
HashMap<String, ActionEntry> _actionmap; //Rest生成时赋值, 字段名Rest有用到
|
||||||
|
|
||||||
private Map.Entry<String, ActionEntry>[] mappings; //字段名Rest有用到
|
private Map.Entry<String, ActionEntry>[] mappings; //字段名Rest有用到
|
||||||
|
|||||||
@@ -6,18 +6,18 @@
|
|||||||
package org.redkale.net.http;
|
package org.redkale.net.http;
|
||||||
|
|
||||||
import java.io.*;
|
import java.io.*;
|
||||||
|
import java.lang.annotation.*;
|
||||||
import static java.lang.annotation.ElementType.TYPE;
|
import static java.lang.annotation.ElementType.TYPE;
|
||||||
import static java.lang.annotation.RetentionPolicy.RUNTIME;
|
import static java.lang.annotation.RetentionPolicy.RUNTIME;
|
||||||
import java.lang.annotation.*;
|
|
||||||
import java.lang.reflect.*;
|
import java.lang.reflect.*;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.nio.channels.CompletionHandler;
|
import java.nio.channels.CompletionHandler;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.CompletionStage;
|
import java.util.concurrent.CompletionStage;
|
||||||
import org.redkale.annotation.Comment;
|
|
||||||
import org.redkale.annotation.*;
|
import org.redkale.annotation.*;
|
||||||
import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES;
|
import org.redkale.annotation.Comment;
|
||||||
import org.redkale.asm.*;
|
import org.redkale.asm.*;
|
||||||
|
import static org.redkale.asm.ClassWriter.COMPUTE_FRAMES;
|
||||||
import static org.redkale.asm.Opcodes.*;
|
import static org.redkale.asm.Opcodes.*;
|
||||||
import org.redkale.asm.Type;
|
import org.redkale.asm.Type;
|
||||||
import org.redkale.convert.*;
|
import org.redkale.convert.*;
|
||||||
@@ -1627,16 +1627,6 @@ public final class Rest {
|
|||||||
boolean dynsimple = baseServletType == HttpServlet.class; //有自定义的BaseServlet会存在读取header的操作
|
boolean dynsimple = baseServletType == HttpServlet.class; //有自定义的BaseServlet会存在读取header的操作
|
||||||
//获取所有可以转换成HttpMapping的方法
|
//获取所有可以转换成HttpMapping的方法
|
||||||
int methodidex = 0;
|
int methodidex = 0;
|
||||||
final MessageMultiConsumer mmc = serviceType.getAnnotation(MessageMultiConsumer.class);
|
|
||||||
if (mmc != null && (mmc.module() == null || mmc.module().isEmpty())) {
|
|
||||||
throw new RestException("@" + MessageMultiConsumer.class.getSimpleName() + ".module can not empty in " + serviceType.getName());
|
|
||||||
}
|
|
||||||
if (mmc != null && !checkName2(mmc.module())) {
|
|
||||||
throw new RestException(serviceType.getName() + " have illegal " + MessageMultiConsumer.class.getSimpleName() + ".module, only 0-9 a-z A-Z _ - . cannot begin 0-9");
|
|
||||||
}
|
|
||||||
if (mmc != null) {
|
|
||||||
Asms.visitAnnotation(cw.visitAnnotation(Type.getDescriptor(mmc.annotationType()), true), mmc);
|
|
||||||
}
|
|
||||||
final Method[] allMethods = serviceType.getMethods();
|
final Method[] allMethods = serviceType.getMethods();
|
||||||
Arrays.sort(allMethods, (m1, m2) -> { //必须排序,否则paramTypes顺序容易乱
|
Arrays.sort(allMethods, (m1, m2) -> { //必须排序,否则paramTypes顺序容易乱
|
||||||
int s = m1.getName().compareTo(m2.getName());
|
int s = m1.getName().compareTo(m2.getName());
|
||||||
@@ -1691,9 +1681,6 @@ public final class Rest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (mmc != null && method.getReturnType() != void.class) {
|
|
||||||
throw new RestException("@" + RestMapping.class.getSimpleName() + " only for method(" + method + ") with return void by @" + MessageMultiConsumer.class.getSimpleName() + " Service");
|
|
||||||
}
|
|
||||||
paramTypes.add(TypeToken.getGenericType(method.getGenericParameterTypes(), serviceType));
|
paramTypes.add(TypeToken.getGenericType(method.getGenericParameterTypes(), serviceType));
|
||||||
retvalTypes.add(formatRestReturnType(method, serviceType));
|
retvalTypes.add(formatRestReturnType(method, serviceType));
|
||||||
if (mappings.length == 0) { //没有Mapping,设置一个默认值
|
if (mappings.length == 0) { //没有Mapping,设置一个默认值
|
||||||
|
|||||||
Reference in New Issue
Block a user