This commit is contained in:
redkale
2023-10-13 21:37:41 +08:00
parent 5936ec1986
commit 98ff3cdfcf
40 changed files with 1092 additions and 1154 deletions

View File

@@ -1171,10 +1171,6 @@ public final class Application {
this.resourceFactory.inject(agent); this.resourceFactory.inject(agent);
agent.init(agent.getConfig()); agent.init(agent.getConfig());
this.resourceFactory.register(agent.getName(), MessageAgent.class, agent); this.resourceFactory.register(agent.getName(), MessageAgent.class, agent);
if (this.clusterAgent == null) {
this.resourceFactory.register(agent.getName(), HttpMessageClient.class, agent.getHttpMessageClient());
//this.resourceFactory.register(agent.getName(), SncpMessageClient.class, agent.getSncpMessageClient()); //不需要给开发者使用
}
} }
logger.info("MessageAgent init in " + (System.currentTimeMillis() - s) + " ms"); logger.info("MessageAgent init in " + (System.currentTimeMillis() - s) + " ms");
} }
@@ -1207,29 +1203,39 @@ public final class Application {
return ResourceProducer.class; return ResourceProducer.class;
} }
}); });
//------------------------------------ 注册 HttpMessageClient ------------------------------------ //------------------------------------ 注册 HttpRpcClient ------------------------------------
resourceFactory.register((ResourceFactory rf, String srcResourceName, final Object srcObj, String resourceName, Field field, final Object attachment) -> { resourceFactory.register((ResourceFactory rf, String srcResourceName, final Object srcObj, String resourceName, Field field, final Object attachment) -> {
try { try {
if (field.getAnnotation(Resource.class) == null && field.getAnnotation(javax.annotation.Resource.class) == null) { if (field.getAnnotation(Resource.class) == null && field.getAnnotation(javax.annotation.Resource.class) == null) {
return null; return null;
} }
if (clusterAgent == null) { if (this.messageAgents != null) {
HttpMessageClient messageClient = new HttpMessageLocalClient(application, resourceName); MessageAgent messageAgent = this.resourceFactory.find(resourceName, MessageAgent.class);
field.set(srcObj, messageClient); if (messageAgent != null) {
rf.inject(resourceName, messageClient, null); // 给其可能包含@Resource的字段赋值; HttpRpcClient rpcClient = messageAgent.getHttpRpcClient();
rf.register(resourceName, HttpMessageClient.class, messageClient); field.set(srcObj, rpcClient);
return messageClient; rf.inject(resourceName, rpcClient, null); // 给其可能包含@Resource的字段赋值;
rf.register(resourceName, HttpRpcClient.class, rpcClient);
return rpcClient;
}
} }
HttpMessageClient messageClient = new HttpMessageClusterClient(application, resourceName, clusterAgent); if (clusterAgent == null) {
field.set(srcObj, messageClient); HttpRpcClient rpcClient = new HttpLocalRpcClient(application, resourceName);
rf.inject(resourceName, messageClient, null); // 给其可能包含@Resource的字段赋值; field.set(srcObj, rpcClient);
rf.register(resourceName, HttpMessageClient.class, messageClient); rf.inject(resourceName, rpcClient, null); // 给其可能包含@Resource的字段赋值;
return messageClient; rf.register(resourceName, HttpRpcClient.class, rpcClient);
return rpcClient;
}
HttpRpcClient rpcClient = new HttpClusterRpcClient(application, resourceName, clusterAgent);
field.set(srcObj, rpcClient);
rf.inject(resourceName, rpcClient, null); // 给其可能包含@Resource的字段赋值;
rf.register(resourceName, HttpRpcClient.class, rpcClient);
return rpcClient;
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, "HttpMessageClient inject error", e); logger.log(Level.SEVERE, HttpRpcClient.class.getSimpleName() + " inject error", e);
return null; return null;
} }
}, HttpMessageClient.class); }, HttpRpcClient.class);
initResources(); initResources();
} }

View File

@@ -167,7 +167,8 @@ public abstract class NodeServer {
server.init(this.serverConf); server.init(this.serverConf);
if (this.sncpAddress != null) { //初始化SncpClient if (this.sncpAddress != null) { //初始化SncpClient
this.sncpAsyncGroup = new AsyncIOGroup("Redkale-SncpClient-IOThread-%s", application.getWorkExecutor(), server.getBufferCapacity(), server.getBufferPoolSize()).skipClose(true); this.sncpAsyncGroup = new AsyncIOGroup("Redkale-SncpClient-IOThread-%s", application.getWorkExecutor(), server.getBufferCapacity(), server.getBufferPoolSize()).skipClose(true);
this.sncpClient = new SncpClient(server.getName(), this.sncpAsyncGroup, this.sncpAddress, new ClientAddress(this.sncpAddress), server.getNetprotocol(), Utility.cpus(), 1000); this.sncpClient = new SncpClient(server.getName(), this.sncpAsyncGroup, application.getNodeid(),
this.sncpAddress, new ClientAddress(this.sncpAddress), server.getNetprotocol(), Utility.cpus(), 1000);
} }
initResource(); //给DataSource、CacheSource注册依赖注入时的监听回调事件。 initResource(); //给DataSource、CacheSource注册依赖注入时的监听回调事件。

View File

@@ -0,0 +1,36 @@
/*
*
*/
package org.redkale.cluster;
import java.util.concurrent.CompletableFuture;
/**
* cluster模式下的rpc client
*
*
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.8.0
*/
public interface ClusterRpcClient<R, P> {
/**
* 发送消息,需要响应
*
* @param message 消息体
*
* @return 应答消息
*/
public CompletableFuture<P> sendMessage(final R message);
/**
* 发送消息,无需响应
*
* @param message 消息体
*/
public void produceMessage(R message);
}

View File

@@ -1,246 +1,246 @@
/* package org.redkale.cluster;
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates import java.io.Serializable;
* and open the template in the editor. import java.net.*;
*/ import java.nio.charset.StandardCharsets;
package org.redkale.mq; import java.time.Duration;
import java.util.*;
import java.io.Serializable; import java.util.concurrent.*;
import java.net.*; import java.util.logging.Level;
import java.nio.charset.StandardCharsets; import java.util.logging.Logger;
import java.time.Duration; import org.redkale.annotation.Resource;
import java.util.*; import org.redkale.boot.Application;
import java.util.concurrent.*; import org.redkale.net.http.*;
import java.util.concurrent.atomic.LongAdder; import org.redkale.util.Utility;
import java.util.logging.Level;
import org.redkale.annotation.Resource; /**
import org.redkale.boot.Application; * 没有配置MQ的情况下依赖ClusterAgent实现的默认HttpMessageClient实例
import org.redkale.cluster.ClusterAgent; *
import org.redkale.net.http.*; * <p>
import org.redkale.util.Utility; * 详情见: https://redkale.org
*
/** * @author zhangjx
* 没有配置MQ的情况下依赖ClusterAgent实现的默认HttpMessageClient实例 *
* * @since 2.1.0
* <p> */
* 详情见: https://redkale.org public class HttpClusterRpcClient extends HttpRpcClient {
*
* @author zhangjx //jdk.internal.net.http.common.Utils.DISALLOWED_HEADERS_SET
* private static final Set<String> DISALLOWED_HEADERS_SET = Utility.ofSet("connection", "content-length",
* @since 2.1.0 "date", "expect", "from", "host", "origin", "referer", "upgrade", "via", "warning");
*/
public class HttpMessageClusterClient extends HttpMessageClient { protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
//jdk.internal.net.http.common.Utils.DISALLOWED_HEADERS_SET protected final HttpLocalRpcClient localClient;
private static final Set<String> DISALLOWED_HEADERS_SET = Utility.ofSet("connection", "content-length",
"date", "expect", "from", "host", "origin", "referer", "upgrade", "via", "warning"); protected final ConcurrentHashMap<String, Boolean> topicServletMap = new ConcurrentHashMap<>();
protected final HttpMessageLocalClient localClient; protected ClusterAgent clusterAgent;
protected final ConcurrentHashMap<String, Boolean> topicServletMap = new ConcurrentHashMap<>(); @Resource(name = "cluster.httpClient", required = false)
protected java.net.http.HttpClient httpClient;
protected ClusterAgent clusterAgent;
@Resource(name = "cluster.httpClient", required = false)
@Resource(name = "cluster.httpClient", required = false) protected HttpSimpleClient httpSimpleClient;
protected java.net.http.HttpClient httpClient;
public HttpClusterRpcClient(Application application, String resourceName, ClusterAgent clusterAgent) {
@Resource(name = "cluster.httpClient", required = false) Objects.requireNonNull(clusterAgent);
protected HttpSimpleClient httpSimpleClient; this.localClient = new HttpLocalRpcClient(application, resourceName);
this.clusterAgent = clusterAgent;
public HttpMessageClusterClient(Application application, String resourceName, ClusterAgent clusterAgent) { }
super(null);
Objects.requireNonNull(clusterAgent); @Override
this.localClient = new HttpMessageLocalClient(application, resourceName); protected int getNodeid() {
this.clusterAgent = clusterAgent; return localClient.getNodeid();
} }
@Override @Override
protected CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) {
if (topicServletMap.computeIfAbsent(topic, t -> localClient.findHttpServlet(t) != null)) { if (topicServletMap.computeIfAbsent(topic, t -> localClient.findHttpServlet(t) != null)) {
return localClient.sendMessage(topic, userid, groupid, request, counter); return localClient.sendMessage(topic, userid, groupid, request);
} else { } else {
return httpAsync(false, userid, request); return httpAsync(false, userid, request);
} }
} }
@Override @Override
protected void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) {
if (topicServletMap.computeIfAbsent(topic, t -> localClient.findHttpServlet(t) != null)) { if (topicServletMap.computeIfAbsent(topic, t -> localClient.findHttpServlet(t) != null)) {
localClient.produceMessage(topic, userid, groupid, request, counter); localClient.produceMessage(topic, userid, groupid, request);
} else { } else {
httpAsync(true, userid, request); httpAsync(true, userid, request);
} }
} }
private CompletableFuture<HttpResult<byte[]>> httpAsync(boolean produce, Serializable userid, HttpSimpleRequest req) { private CompletableFuture<HttpResult<byte[]>> httpAsync(boolean produce, Serializable userid, HttpSimpleRequest req) {
String module = req.getRequestURI(); String module = req.getRequestURI();
module = module.substring(1); //去掉/ module = module.substring(1); //去掉/
module = module.substring(0, module.indexOf('/')); module = module.substring(0, module.indexOf('/'));
Map<String, String> headers = req.getHeaders(); Map<String, String> headers = req.getHeaders();
String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, ""); String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, "");
final String localModule = module; final String localModule = module;
if (logger.isLoggable(Level.FINEST)) { if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "httpAsync.queryHttpAddress: module=" + localModule + ", resname=" + resname); logger.log(Level.FINEST, "httpAsync.queryHttpAddress: module=" + localModule + ", resname=" + resname);
} }
return clusterAgent.queryHttpAddress("http", module, resname).thenCompose(addrs -> { return clusterAgent.queryHttpAddress("http", module, resname).thenCompose(addrs -> {
if (addrs == null || addrs.isEmpty()) { if (addrs == null || addrs.isEmpty()) {
if (logger.isLoggable(Level.FINE)) { if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "httpAsync." + (produce ? "produceMessage" : "sendMessage") + ": module=" + localModule + ", resname=" + resname + ", addrmap is empty"); logger.log(Level.FINE, "httpAsync." + (produce ? "produceMessage" : "sendMessage") + ": module=" + localModule + ", resname=" + resname + ", addrmap is empty");
} }
return new HttpResult<byte[]>().status(404).toFuture(); return new HttpResult<byte[]>().status(404).toFuture();
} }
final Map<String, String> clientHeaders = new LinkedHashMap<>(); final Map<String, String> clientHeaders = new LinkedHashMap<>();
byte[] clientBody = null; byte[] clientBody = null;
if (req.isRpc()) { if (req.isRpc()) {
clientHeaders.put(Rest.REST_HEADER_RPC, "true"); clientHeaders.put(Rest.REST_HEADER_RPC, "true");
} }
if (req.isFrombody()) { if (req.isFrombody()) {
clientHeaders.put(Rest.REST_HEADER_PARAM_FROM_BODY, "true"); clientHeaders.put(Rest.REST_HEADER_PARAM_FROM_BODY, "true");
} }
if (req.getReqConvertType() != null) { if (req.getReqConvertType() != null) {
clientHeaders.put(Rest.REST_HEADER_REQ_CONVERT_TYPE, req.getReqConvertType().toString()); clientHeaders.put(Rest.REST_HEADER_REQ_CONVERT_TYPE, req.getReqConvertType().toString());
} }
if (req.getRespConvertType() != null) { if (req.getRespConvertType() != null) {
clientHeaders.put(Rest.REST_HEADER_RESP_CONVERT_TYPE, req.getRespConvertType().toString()); clientHeaders.put(Rest.REST_HEADER_RESP_CONVERT_TYPE, req.getRespConvertType().toString());
} }
if (userid != null) { if (userid != null) {
clientHeaders.put(Rest.REST_HEADER_CURRUSERID, "" + userid); clientHeaders.put(Rest.REST_HEADER_CURRUSERID, "" + userid);
} }
if (headers != null) { if (headers != null) {
boolean ws = headers.containsKey("Sec-WebSocket-Key"); boolean ws = headers.containsKey("Sec-WebSocket-Key");
headers.forEach((n, v) -> { headers.forEach((n, v) -> {
if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase()) if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase())
&& (!ws || (!"Connection".equals(n) && !"Sec-WebSocket-Key".equals(n) && (!ws || (!"Connection".equals(n) && !"Sec-WebSocket-Key".equals(n)
&& !"Sec-WebSocket-Version".equals(n)))) { && !"Sec-WebSocket-Version".equals(n)))) {
clientHeaders.put(n, v); clientHeaders.put(n, v);
} }
}); });
} }
clientHeaders.put("Content-Type", "x-www-form-urlencoded"); clientHeaders.put("Content-Type", "x-www-form-urlencoded");
if (req.getBody() != null && req.getBody().length > 0) { if (req.getBody() != null && req.getBody().length > 0) {
String paramstr = req.getParametersToString(); String paramstr = req.getParametersToString();
if (paramstr != null) { if (paramstr != null) {
if (req.getRequestURI().indexOf('?') > 0) { if (req.getRequestURI().indexOf('?') > 0) {
req.setRequestURI(req.getRequestURI() + "&" + paramstr); req.setRequestURI(req.getRequestURI() + "&" + paramstr);
} else { } else {
req.setRequestURI(req.getRequestURI() + "?" + paramstr); req.setRequestURI(req.getRequestURI() + "?" + paramstr);
} }
} }
clientBody = req.getBody(); clientBody = req.getBody();
} else { } else {
String paramstr = req.getParametersToString(); String paramstr = req.getParametersToString();
if (paramstr != null) { if (paramstr != null) {
clientBody = paramstr.getBytes(StandardCharsets.UTF_8); clientBody = paramstr.getBytes(StandardCharsets.UTF_8);
} }
} }
if (logger.isLoggable(Level.FINEST)) { if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "httpAsync: module=" + localModule + ", resname=" + resname + ", enter forEachCollectionFuture"); logger.log(Level.FINEST, "httpAsync: module=" + localModule + ", resname=" + resname + ", enter forEachCollectionFuture");
} }
return forEachCollectionFuture(logger.isLoggable(Level.FINEST), userid, req, return forEachCollectionFuture(logger.isLoggable(Level.FINEST), userid, req,
(req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + req.getRequestURI(), (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + req.getRequestURI(),
clientHeaders, clientBody, addrs.iterator()); clientHeaders, clientBody, addrs.iterator());
}); });
} }
private CompletableFuture<HttpResult<byte[]>> forEachCollectionFuture(boolean finest, Serializable userid, private CompletableFuture<HttpResult<byte[]>> forEachCollectionFuture(boolean finest, Serializable userid,
HttpSimpleRequest req, String requesturi, final Map<String, String> clientHeaders, byte[] clientBody, Iterator<InetSocketAddress> it) { HttpSimpleRequest req, String requesturi, final Map<String, String> clientHeaders, byte[] clientBody, Iterator<InetSocketAddress> it) {
if (!it.hasNext()) { if (!it.hasNext()) {
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }
InetSocketAddress addr = it.next(); InetSocketAddress addr = it.next();
String url = "http://" + addr.getHostString() + ":" + addr.getPort() + requesturi; String url = "http://" + addr.getHostString() + ":" + addr.getPort() + requesturi;
if (finest) { if (finest) {
logger.log(Level.FINEST, "forEachCollectionFuture: url=" + url + ", headers=" + clientHeaders); logger.log(Level.FINEST, "forEachCollectionFuture: url=" + url + ", headers=" + clientHeaders);
} }
if (httpSimpleClient != null) { if (httpSimpleClient != null) {
return httpSimpleClient.postAsync(url, clientHeaders, clientBody); return httpSimpleClient.postAsync(url, clientHeaders, clientBody);
} }
java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().uri(URI.create(url)) java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().uri(URI.create(url))
.timeout(Duration.ofMillis(10_000)) .timeout(Duration.ofMillis(10_000))
//存在sendHeader后不发送body数据的问题 java.net.http.HttpRequest的bug? //存在sendHeader后不发送body数据的问题 java.net.http.HttpRequest的bug?
.method("POST", clientBody == null ? java.net.http.HttpRequest.BodyPublishers.noBody() : java.net.http.HttpRequest.BodyPublishers.ofByteArray(clientBody)); .method("POST", clientBody == null ? java.net.http.HttpRequest.BodyPublishers.noBody() : java.net.http.HttpRequest.BodyPublishers.ofByteArray(clientBody));
if (clientHeaders != null) { if (clientHeaders != null) {
clientHeaders.forEach((n, v) -> builder.header(n, v)); clientHeaders.forEach((n, v) -> builder.header(n, v));
} }
return httpClient.sendAsync(builder.build(), java.net.http.HttpResponse.BodyHandlers.ofByteArray()) return httpClient.sendAsync(builder.build(), java.net.http.HttpResponse.BodyHandlers.ofByteArray())
.thenApply((java.net.http.HttpResponse<byte[]> resp) -> { .thenApply((java.net.http.HttpResponse<byte[]> resp) -> {
final int rs = resp.statusCode(); final int rs = resp.statusCode();
if (rs != 200) { if (rs != 200) {
return new HttpResult<byte[]>().status(rs); return new HttpResult<byte[]>().status(rs);
} }
return new HttpResult<byte[]>(resp.body()); return new HttpResult<byte[]>(resp.body());
}); });
} }
// //
// private CompletableFuture<HttpResult<byte[]>> httpAsync(Serializable userid, HttpSimpleRequest req) { // private CompletableFuture<HttpResult<byte[]>> httpAsync(Serializable userid, HttpSimpleRequest req) {
// final boolean finest = logger.isLoggable(Level.FINEST); // final boolean finest = logger.isLoggable(Level.FINEST);
// String module = req.getRequestURI(); // String module = req.getRequestURI();
// module = module.substring(1); //去掉/ // module = module.substring(1); //去掉/
// module = module.substring(0, module.indexOf('/')); // module = module.substring(0, module.indexOf('/'));
// Map<String, String> headers = req.getHeaders(); // Map<String, String> headers = req.getHeaders();
// String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, ""); // String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, "");
// return clusterAgent.queryHttpAddress("http", module, resname).thenCompose(addrs -> { // return clusterAgent.queryHttpAddress("http", module, resname).thenCompose(addrs -> {
// if (addrs == null || addrs.isEmpty()) return new HttpResult().status(404).toAnyFuture(); // if (addrs == null || addrs.isEmpty()) return new HttpResult().status(404).toAnyFuture();
// java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().timeout(Duration.ofMillis(30000)); // java.net.http.HttpRequest.Builder builder = java.net.http.HttpRequest.newBuilder().timeout(Duration.ofMillis(30000));
// if (req.isRpc()) builder.header(Rest.REST_HEADER_RPC_NAME, "true"); // if (req.isRpc()) builder.header(Rest.REST_HEADER_RPC_NAME, "true");
// if (req.isFrombody()) builder.header(Rest.REST_HEADER_PARAM_FROM_BODY, "true"); // if (req.isFrombody()) builder.header(Rest.REST_HEADER_PARAM_FROM_BODY, "true");
// if (req.getReqConvertType() != null) builder.header(Rest.REST_HEADER_REQ_CONVERT_TYPE, req.getReqConvertType().toString()); // if (req.getReqConvertType() != null) builder.header(Rest.REST_HEADER_REQ_CONVERT_TYPE, req.getReqConvertType().toString());
// if (req.getRespConvertType() != null) builder.header(Rest.REST_HEADER_RESP_CONVERT_TYPE, req.getRespConvertType().toString()); // if (req.getRespConvertType() != null) builder.header(Rest.REST_HEADER_RESP_CONVERT_TYPE, req.getRespConvertType().toString());
// if (userid != 0) builder.header(Rest.REST_HEADER_CURRUSERID, "" + userid); // if (userid != 0) builder.header(Rest.REST_HEADER_CURRUSERID, "" + userid);
// if (headers != null) headers.forEach((n, v) -> { // if (headers != null) headers.forEach((n, v) -> {
// if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase())) builder.header(n, v); // if (!DISALLOWED_HEADERS_SET.contains(n.toLowerCase())) builder.header(n, v);
// }); // });
// builder.header("Content-Type", "x-www-form-urlencoded"); // builder.header("Content-Type", "x-www-form-urlencoded");
// if (req.getBody() != null && req.getBody().length > 0) { // if (req.getBody() != null && req.getBody().length > 0) {
// String paramstr = req.getParametersToString(); // String paramstr = req.getParametersToString();
// if (paramstr != null) { // if (paramstr != null) {
// if (req.getRequestURI().indexOf('?') > 0) { // if (req.getRequestURI().indexOf('?') > 0) {
// req.setRequestURI(req.getRequestURI() + "&" + paramstr); // req.setRequestURI(req.getRequestURI() + "&" + paramstr);
// } else { // } else {
// req.setRequestURI(req.getRequestURI() + "?" + paramstr); // req.setRequestURI(req.getRequestURI() + "?" + paramstr);
// } // }
// } // }
// builder.POST(java.net.http.HttpRequest.BodyPublishers.ofByteArray(req.getBody())); // builder.POST(java.net.http.HttpRequest.BodyPublishers.ofByteArray(req.getBody()));
// } else { // } else {
// String paramstr = req.getParametersToString(); // String paramstr = req.getParametersToString();
// if (paramstr != null) builder.POST(java.net.http.HttpRequest.BodyPublishers.ofString(paramstr)); // if (paramstr != null) builder.POST(java.net.http.HttpRequest.BodyPublishers.ofString(paramstr));
// } // }
// return forEachCollectionFuture(finest, userid, req, (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + req.getRequestURI(), builder, addrs.iterator()); // return forEachCollectionFuture(finest, userid, req, (req.getPath() != null && !req.getPath().isEmpty() ? req.getPath() : "") + req.getRequestURI(), builder, addrs.iterator());
// }); // });
// } // }
// //
// private CompletableFuture<HttpResult<byte[]>> forEachCollectionFuture(boolean finest, Serializable userid, HttpSimpleRequest req, String requesturi, java.net.http.HttpRequest.Builder builder, Iterator<InetSocketAddress> it) { // private CompletableFuture<HttpResult<byte[]>> forEachCollectionFuture(boolean finest, Serializable userid, HttpSimpleRequest req, String requesturi, java.net.http.HttpRequest.Builder builder, Iterator<InetSocketAddress> it) {
// if (!it.hasNext()) return CompletableFuture.completedFuture(null); // if (!it.hasNext()) return CompletableFuture.completedFuture(null);
// InetSocketAddress addr = it.next(); // InetSocketAddress addr = it.next();
// String url = "http://" + addr.getHostString() + ":" + addr.getPort() + requesturi; // String url = "http://" + addr.getHostString() + ":" + addr.getPort() + requesturi;
// return httpClient.sendAsync(builder.copy().uri(URI.create(url)).build(), java.net.http.HttpResponse.BodyHandlers.ofByteArray()).thenCompose(resp -> { // return httpClient.sendAsync(builder.copy().uri(URI.create(url)).build(), java.net.http.HttpResponse.BodyHandlers.ofByteArray()).thenCompose(resp -> {
// if (resp.statusCode() != 200) return forEachCollectionFuture(finest, userid, req, requesturi, builder, it); // if (resp.statusCode() != 200) return forEachCollectionFuture(finest, userid, req, requesturi, builder, it);
// HttpResult rs = new HttpResult(); // HttpResult rs = new HttpResult();
// java.net.http.HttpHeaders hs = resp.headers(); // java.net.http.HttpHeaders hs = resp.headers();
// if (hs != null) { // if (hs != null) {
// Map<String, List<String>> hm = hs.map(); // Map<String, List<String>> hm = hs.map();
// if (hm != null) { // if (hm != null) {
// for (Map.Entry<String, List<String>> en : hm.entrySet()) { // for (Map.Entry<String, List<String>> en : hm.entrySet()) {
// if ("date".equals(en.getKey()) || "content-type".equals(en.getKey()) // if ("date".equals(en.getKey()) || "content-type".equals(en.getKey())
// || "server".equals(en.getKey()) || "connection".equals(en.getKey())) continue; // || "server".equals(en.getKey()) || "connection".equals(en.getKey())) continue;
// List<String> val = en.getValue(); // List<String> val = en.getValue();
// if (val != null && val.size() == 1) { // if (val != null && val.size() == 1) {
// rs.header(en.getKey(), val.get(0)); // rs.header(en.getKey(), val.get(0));
// } // }
// } // }
// } // }
// } // }
// rs.setResult(resp.body()); // rs.setResult(resp.body());
// if (finest) { // if (finest) {
// StringBuilder sb = new StringBuilder(); // StringBuilder sb = new StringBuilder();
// Map<String, String> params = req.getParams(); // Map<String, String> params = req.getParams();
// if (params != null && !params.isEmpty()) { // if (params != null && !params.isEmpty()) {
// params.forEach((n, v) -> sb.append('&').append(n).append('=').append(v)); // params.forEach((n, v) -> sb.append('&').append(n).append('=').append(v));
// } // }
// logger.log(Level.FINEST, url + "?userid=" + userid + sb + ", result = " + new String(resp.body(), StandardCharsets.UTF_8)); // logger.log(Level.FINEST, url + "?userid=" + userid + sb + ", result = " + new String(resp.body(), StandardCharsets.UTF_8));
// } // }
// return CompletableFuture.completedFuture(rs); // return CompletableFuture.completedFuture(rs);
// }); // });
// } // }
} }

View File

@@ -3,16 +3,17 @@
* To change this template file, choose Tools | Templates * To change this template file, choose Tools | Templates
* and open the template in the editor. * and open the template in the editor.
*/ */
package org.redkale.mq; package org.redkale.cluster;
import java.io.Serializable; import java.io.Serializable;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.LongAdder;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger;
import org.redkale.boot.*; import org.redkale.boot.*;
import org.redkale.cluster.HttpRpcClient;
import org.redkale.convert.Convert; import org.redkale.convert.Convert;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import org.redkale.net.http.*; import org.redkale.net.http.*;
@@ -28,22 +29,23 @@ import org.redkale.util.RedkaleException;
* *
* @since 2.4.0 * @since 2.4.0
*/ */
public class HttpMessageLocalClient extends HttpMessageClient { public class HttpLocalRpcClient extends HttpRpcClient {
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
protected final Application application; protected final Application application;
protected final String resourceName; protected final String resourceName;
protected HttpServer server; protected HttpServer currServer;
public HttpMessageLocalClient(Application application, String resourceName) { public HttpLocalRpcClient(Application application, String resourceName) {
super(null);
this.application = application; this.application = application;
this.resourceName = resourceName; this.resourceName = resourceName;
} }
private HttpServer httpServer() { private HttpServer httpServer() {
if (this.server == null) { if (this.currServer == null) {
NodeHttpServer nodeHttpServer = null; NodeHttpServer nodeHttpServer = null;
List<NodeServer> nodeServers = application.getNodeServers(); List<NodeServer> nodeServers = application.getNodeServers();
for (NodeServer n : nodeServers) { for (NodeServer n : nodeServers) {
@@ -60,9 +62,14 @@ public class HttpMessageLocalClient extends HttpMessageClient {
} }
} }
} }
this.server = nodeHttpServer.getServer(); this.currServer = nodeHttpServer.getServer();
} }
return this.server; return this.currServer;
}
@Override
protected int getNodeid() {
return application.getNodeid();
} }
protected HttpContext context() { protected HttpContext context() {
@@ -73,11 +80,11 @@ public class HttpMessageLocalClient extends HttpMessageClient {
return (HttpDispatcherServlet) httpServer().getDispatcherServlet(); return (HttpDispatcherServlet) httpServer().getDispatcherServlet();
} }
protected HttpServlet findHttpServlet(String topic) { public HttpServlet findHttpServlet(String topic) {
return dispatcherServlet().findServletByTopic(topic); return dispatcherServlet().findServletByTopic(topic);
} }
protected HttpServlet findHttpServlet(HttpSimpleRequest request) { public HttpServlet findHttpServlet(HttpSimpleRequest request) {
return dispatcherServlet().findServletByTopic(generateHttpReqTopic(request, request.getPath())); return dispatcherServlet().findServletByTopic(generateHttpReqTopic(request, request.getPath()));
} }
@@ -114,7 +121,7 @@ public class HttpMessageLocalClient extends HttpMessageClient {
} }
@Override @Override
protected CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) {
HttpServlet servlet = findHttpServlet(topic); HttpServlet servlet = findHttpServlet(topic);
if (servlet == null) { if (servlet == null) {
if (logger.isLoggable(Level.FINE)) { if (logger.isLoggable(Level.FINE)) {
@@ -146,7 +153,7 @@ public class HttpMessageLocalClient extends HttpMessageClient {
} }
@Override @Override
protected void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) { public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) {
HttpDispatcherServlet ps = dispatcherServlet(); HttpDispatcherServlet ps = dispatcherServlet();
HttpServlet servlet = ps.findServletByTopic(topic); HttpServlet servlet = ps.findServletByTopic(topic);
if (servlet == null) { if (servlet == null) {

View File

@@ -3,16 +3,13 @@
* To change this template file, choose Tools | Templates * To change this template file, choose Tools | Templates
* and open the template in the editor. * and open the template in the editor.
*/ */
package org.redkale.mq; package org.redkale.cluster;
import java.io.Serializable; import java.io.Serializable;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.LongAdder;
import java.util.logging.Logger;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import static org.redkale.mq.MessageRecord.CTYPE_HTTP_REQUEST;
import org.redkale.net.http.*; import org.redkale.net.http.*;
/** /**
@@ -25,25 +22,77 @@ import org.redkale.net.http.*;
* *
* @since 2.1.0 * @since 2.1.0
*/ */
public class HttpMessageClient extends MessageClient { public abstract class HttpRpcClient implements ClusterRpcClient<HttpSimpleRequest, HttpResult<byte[]>> {
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); @Override
public final void produceMessage(HttpSimpleRequest request) {
produceMessage(generateHttpReqTopic(request, null), 0, null, request);
}
protected HttpMessageClient(MessageAgent messageAgent) { public final void produceMessage(Serializable userid, HttpSimpleRequest request) {
super(messageAgent); produceMessage(generateHttpReqTopic(request, null), userid, null, request);
if (messageAgent != null) { // //RPC方式下无messageAgent }
this.appRespTopic = messageAgent.generateAppHttpRespTopic();
} public final void produceMessage(Serializable userid, String groupid, HttpSimpleRequest request) {
produceMessage(generateHttpReqTopic(request, null), userid, groupid, request);
}
public final void produceMessage(String topic, HttpSimpleRequest request) {
produceMessage(topic, 0, null, request);
}
@Override
public final CompletableFuture<HttpResult<byte[]>> sendMessage(HttpSimpleRequest request) {
return sendMessage(generateHttpReqTopic(request, null), 0, null, request);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(Serializable userid, HttpSimpleRequest request) {
return sendMessage(generateHttpReqTopic(request, null), userid, null, request);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(Serializable userid, String groupid, HttpSimpleRequest request) {
return sendMessage(generateHttpReqTopic(request, null), userid, groupid, request);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, HttpSimpleRequest request) {
return sendMessage(topic, 0, null, request);
}
public <T> CompletableFuture<T> sendMessage(HttpSimpleRequest request, Type type) {
return sendMessage(generateHttpReqTopic(request, null), 0, null, request).thenApply((HttpResult<byte[]> httbs) -> {
if (httbs == null || httbs.getResult() == null) {
return null;
}
return JsonConvert.root().convertFrom(type, httbs.getResult());
});
}
public <T> CompletableFuture<T> sendMessage(Serializable userid, HttpSimpleRequest request, Type type) {
return sendMessage(generateHttpReqTopic(request, null), userid, null, request).thenApply((HttpResult<byte[]> httbs) -> {
if (httbs == null || httbs.getResult() == null) {
return null;
}
return JsonConvert.root().convertFrom(type, httbs.getResult());
});
}
public <T> CompletableFuture<T> sendMessage(Serializable userid, String groupid, HttpSimpleRequest request, Type type) {
return sendMessage(generateHttpReqTopic(request, null), userid, groupid, request).thenApply((HttpResult<byte[]> httbs) -> {
if (httbs == null || httbs.getResult() == null) {
return null;
}
return JsonConvert.root().convertFrom(type, httbs.getResult());
});
} }
//格式: http.req.user //格式: http.req.user
public String generateHttpReqTopic(String module) { public String generateHttpReqTopic(String module) {
return MessageAgent.generateHttpReqTopic(module); return Rest.generateHttpReqTopic(module, getNodeid());
} }
//格式: http.req.user-n10 //格式: http.req.user-n10
public String generateHttpReqTopic(String module, String resname) { public String generateHttpReqTopic(String module, String resname) {
return MessageAgent.generateHttpReqTopic(module, resname); return Rest.generateHttpReqTopic(module, resname, getNodeid());
} }
public String generateHttpReqTopic(HttpSimpleRequest request, String path) { public String generateHttpReqTopic(HttpSimpleRequest request, String path) {
@@ -55,91 +104,13 @@ public class HttpMessageClient extends MessageClient {
module = module.substring(0, module.indexOf('/')); module = module.substring(0, module.indexOf('/'));
Map<String, String> headers = request.getHeaders(); Map<String, String> headers = request.getHeaders();
String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, ""); String resname = headers == null ? "" : headers.getOrDefault(Rest.REST_HEADER_RESOURCE_NAME, "");
return MessageAgent.generateHttpReqTopic(module, resname); return Rest.generateHttpReqTopic(module, resname, getNodeid());
} }
public final void produceMessage(HttpSimpleRequest request) { public abstract CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request);
produceMessage(generateHttpReqTopic(request, null), 0, null, request, null);
}
public final void produceMessage(Serializable userid, HttpSimpleRequest request) { public abstract void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request);
produceMessage(generateHttpReqTopic(request, null), userid, null, request, null);
}
public final void produceMessage(Serializable userid, String groupid, HttpSimpleRequest request) { protected abstract int getNodeid();
produceMessage(generateHttpReqTopic(request, null), userid, groupid, request, null);
}
public final void produceMessage(String topic, HttpSimpleRequest request) {
produceMessage(topic, 0, null, request, null);
}
public final void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) {
produceMessage(topic, userid, groupid, request, null);
}
public <T> CompletableFuture<T> sendMessage(HttpSimpleRequest request, Type type) {
return sendMessage(generateHttpReqTopic(request, null), 0, null, request, null).thenApply((HttpResult<byte[]> httbs) -> {
if (httbs == null || httbs.getResult() == null) {
return null;
}
return JsonConvert.root().convertFrom(type, httbs.getResult());
});
}
public <T> CompletableFuture<T> sendMessage(Serializable userid, HttpSimpleRequest request, Type type) {
return sendMessage(generateHttpReqTopic(request, null), userid, null, request, null).thenApply((HttpResult<byte[]> httbs) -> {
if (httbs == null || httbs.getResult() == null) {
return null;
}
return JsonConvert.root().convertFrom(type, httbs.getResult());
});
}
public <T> CompletableFuture<T> sendMessage(Serializable userid, String groupid, HttpSimpleRequest request, Type type) {
return sendMessage(generateHttpReqTopic(request, null), userid, groupid, request, null).thenApply((HttpResult<byte[]> httbs) -> {
if (httbs == null || httbs.getResult() == null) {
return null;
}
return JsonConvert.root().convertFrom(type, httbs.getResult());
});
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(HttpSimpleRequest request) {
return sendMessage(generateHttpReqTopic(request, null), 0, null, request, null);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(Serializable userid, HttpSimpleRequest request) {
return sendMessage(generateHttpReqTopic(request, null), userid, null, request, null);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(Serializable userid, String groupid, HttpSimpleRequest request) {
return sendMessage(generateHttpReqTopic(request, null), userid, groupid, request, null);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, HttpSimpleRequest request) {
return sendMessage(topic, 0, null, request, null);
}
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) {
return sendMessage(topic, userid, null, request, (LongAdder) null);
}
protected CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request));
message.userid(userid).groupid(groupid);
//if (finest) logger.log(Level.FINEST, "HttpMessageClient.sendMessage: " + message);
return sendMessage(message, true, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance()));
}
protected void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request));
message.userid(userid).groupid(groupid);
sendMessage(message, false, counter);
}
@Override
protected MessageClientProducer getProducer() {
return messageAgent.getHttpMessageClientProducer();
}
} }

View File

@@ -1,140 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
import java.util.concurrent.*;
import java.util.function.*;
import java.util.logging.*;
import org.redkale.boot.NodeHttpServer;
import org.redkale.net.http.*;
import org.redkale.service.Service;
import org.redkale.util.*;
/**
* 一个Service对应一个MessageProcessor
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
public class HttpMessageClientProcessor implements MessageClientProcessor {
protected final Logger logger;
protected HttpMessageClient messageClient;
protected final MessageClientProducer producer;
protected final NodeHttpServer server;
protected final Service service;
protected final HttpServlet servlet;
protected final String restModule; // 前后有/, 例如: /user/
protected ThreadLocal<ObjectPool<HttpMessageResponse>> respPoolThreadLocal;
protected final Supplier<HttpMessageResponse> respSupplier;
protected final Consumer<HttpMessageResponse> respConsumer;
protected CountDownLatch cdl;
protected long startTime;
protected final Runnable innerCallback = () -> {
if (cdl != null) {
cdl.countDown();
}
};
public HttpMessageClientProcessor(Logger logger, HttpMessageClient messageClient, MessageClientProducer producer, NodeHttpServer server, Service service, HttpServlet servlet) {
this.logger = logger;
this.messageClient = messageClient;
this.producer = producer;
this.server = server;
this.service = service;
this.servlet = servlet;
this.restModule = "/" + Rest.getRestModule(service) + "/";
this.respSupplier = () -> respPoolThreadLocal.get().get();
this.respConsumer = resp -> respPoolThreadLocal.get().accept(resp);
this.respPoolThreadLocal = Utility.withInitialThreadLocal(() -> ObjectPool.createUnsafePool(Utility.cpus(),
ps -> new HttpMessageResponse(server.getHttpServer().getContext(), messageClient, respSupplier, respConsumer), HttpMessageResponse::prepare, HttpMessageResponse::recycle));
}
@Override
public void begin(final int size, long starttime) {
this.startTime = starttime;
this.cdl = size > 1 ? new CountDownLatch(size) : null;
}
@Override
public void process(final MessageRecord message, final Runnable callback) {
execute(message, innerCallback);
}
private void execute(final MessageRecord message, final Runnable callback) {
HttpMessageRequest request = null;
try {
Traces.computeIfAbsent(message.getTraceid());
long now = System.currentTimeMillis();
long cha = now - message.createTime;
long e = now - startTime;
HttpMessageResponse response = respSupplier.get();
request = response.request();
response.prepare(message, callback, producer);
server.getHttpServer().getContext().execute(servlet, request, response);
long o = System.currentTimeMillis() - now;
if ((cha > 1000 || e > 100 || o > 1000) && logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "HttpMessageProcessor.process (mqs.delays = " + cha + " ms, mqs.blocks = " + e + " ms, mqs.executes = " + o + " ms) message: " + message);
} else if ((cha > 50 || e > 10 || o > 50) && logger.isLoggable(Level.FINER)) {
logger.log(Level.FINER, "HttpMessageProcessor.process (mq.delays = " + cha + " ms, mq.blocks = " + e + " ms, mq.executes = " + o + " ms) message: " + message);
} else if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "HttpMessageProcessor.process (mq.delay = " + cha + " ms, mq.block = " + e + " ms, mq.execute = " + o + " ms) message: " + message);
}
} catch (Throwable ex) {
if (message.getRespTopic() != null && !message.getRespTopic().isEmpty()) {
HttpMessageResponse.finishHttpResult(logger.isLoggable(Level.FINEST), request == null ? null : request.getRespConvert(),
null, message, callback, messageClient, producer, message.getRespTopic(), new HttpResult().status(500));
}
logger.log(Level.SEVERE, HttpMessageClientProcessor.class.getSimpleName() + " process error, message=" + message, ex instanceof CompletionException ? ((CompletionException) ex).getCause() : ex);
}
}
@Override
public void commit() {
if (this.cdl != null) {
try {
this.cdl.await(30, TimeUnit.SECONDS);
} catch (Exception ex) {
logger.log(Level.SEVERE, HttpMessageClientProcessor.class.getSimpleName() + " commit error, restmodule=" + this.restModule, ex);
}
this.cdl = null;
}
}
public MessageClientProducer getProducer() {
return producer;
}
public NodeHttpServer getServer() {
return server;
}
public Service getService() {
return service;
}
public HttpServlet getServlet() {
return servlet;
}
}

View File

@@ -22,7 +22,14 @@ public class HttpMessageRequest extends HttpRequest {
protected MessageRecord message; protected MessageRecord message;
public HttpMessageRequest(HttpContext context) { public HttpMessageRequest(HttpContext context) {
this(context, (MessageRecord) null);
}
public HttpMessageRequest(HttpContext context, MessageRecord message) {
super(context, (HttpSimpleRequest) null); super(context, (HttpSimpleRequest) null);
if (message != null) {
prepare(message);
}
} }
protected HttpMessageRequest prepare(MessageRecord message) { protected HttpMessageRequest prepare(MessageRecord message) {

View File

@@ -29,26 +29,19 @@ import org.redkale.service.RetResult;
*/ */
public class HttpMessageResponse extends HttpResponse { public class HttpMessageResponse extends HttpResponse {
protected final HttpMessageClient messageClient; protected MessageClient messageClient;
protected MessageRecord message; protected MessageRecord message;
protected MessageClientProducer producer; public HttpMessageResponse(MessageClient messageClient, HttpContext context, HttpMessageRequest request) {
super(context, request, null);
protected Runnable callback;
public HttpMessageResponse(HttpContext context, HttpMessageClient messageClient, Supplier<HttpMessageResponse> respSupplier, Consumer<HttpMessageResponse> respConsumer) {
super(context, new HttpMessageRequest(context), null);
this.responseSupplier = (Supplier) respSupplier;
this.responseConsumer = (Consumer) respConsumer;
this.messageClient = messageClient; this.messageClient = messageClient;
this.message = request.message;
} }
public void prepare(MessageRecord message, Runnable callback, MessageClientProducer producer) { public void prepare(MessageRecord message) {
((HttpMessageRequest) request).prepare(message); ((HttpMessageRequest) request).prepare(message);
this.message = message; this.message = message;
this.callback = callback;
this.producer = producer;
} }
public HttpMessageRequest request() { public HttpMessageRequest request() {
@@ -56,22 +49,19 @@ public class HttpMessageResponse extends HttpResponse {
} }
public void finishHttpResult(Type type, HttpResult result) { public void finishHttpResult(Type type, HttpResult result) {
finishHttpResult(producer.logger.isLoggable(Level.FINEST), ((HttpMessageRequest) this.request).getRespConvert(), finishHttpResult(messageClient.logger.isLoggable(Level.FINEST), ((HttpMessageRequest) this.request).getRespConvert(),
type, this.message, this.callback, this.messageClient, this.producer, message.getRespTopic(), result); type, this.message, this.messageClient, message.getRespTopic(), result);
} }
public void finishHttpResult(Convert respConvert, Type type, HttpResult result) { public void finishHttpResult(Convert respConvert, Type type, HttpResult result) {
finishHttpResult(producer.logger.isLoggable(Level.FINEST), finishHttpResult(messageClient.logger.isLoggable(Level.FINEST),
respConvert == null ? ((HttpMessageRequest) this.request).getRespConvert() : respConvert, respConvert == null ? ((HttpMessageRequest) this.request).getRespConvert() : respConvert,
type, this.message, this.callback, this.messageClient, this.producer, message.getRespTopic(), result); type, this.message, this.messageClient, message.getRespTopic(), result);
} }
public static void finishHttpResult(boolean finest, Convert respConvert, Type type, MessageRecord msg, public static void finishHttpResult(boolean finest, Convert respConvert, Type type, MessageRecord msg,
Runnable callback, MessageClient messageClient, MessageClientProducer producer, String resptopic, HttpResult result) { MessageClient messageClient, String respTopic, HttpResult result) {
if (callback != null) { if (respTopic == null || respTopic.isEmpty()) {
callback.run();
}
if (resptopic == null || resptopic.isEmpty()) {
return; return;
} }
if (result.getResult() instanceof RetResult) { if (result.getResult() instanceof RetResult) {
@@ -89,11 +79,11 @@ public class HttpMessageResponse extends HttpResponse {
if (innerrs instanceof byte[]) { if (innerrs instanceof byte[]) {
innerrs = new String((byte[]) innerrs, StandardCharsets.UTF_8); innerrs = new String((byte[]) innerrs, StandardCharsets.UTF_8);
} }
producer.logger.log(Level.FINEST, "HttpMessageResponse.finishHttpResult seqid=" + msg.getSeqid() messageClient.logger.log(Level.FINEST, "HttpMessageResponse.finishHttpResult seqid=" + msg.getSeqid()
+ ", content: " + innerrs + ", status: " + result.getStatus() + ", headers: " + result.getHeaders()); + ", content: " + innerrs + ", status: " + result.getStatus() + ", headers: " + result.getHeaders());
} }
byte[] content = HttpResultCoder.getInstance().encode(result); byte[] content = HttpResultCoder.getInstance().encode(result);
producer.apply(messageClient.createMessageRecord(msg.getSeqid(), CTYPE_HTTP_RESULT, resptopic, null, content)); messageClient.getProducer().apply(messageClient.createMessageRecord(msg.getSeqid(), CTYPE_HTTP_RESULT, respTopic, null, content));
} }
@Override @Override
@@ -109,17 +99,12 @@ public class HttpMessageResponse extends HttpResponse {
this.responseSupplier = respSupplier; this.responseSupplier = respSupplier;
this.responseConsumer = respConsumer; this.responseConsumer = respConsumer;
this.message = null; this.message = null;
this.producer = null;
this.callback = null;
return rs; return rs;
} }
@Override @Override
public void finish(final Convert convert, Type type, RetResult ret) { public void finish(final Convert convert, Type type, RetResult ret) {
if (message.isEmptyRespTopic()) { if (message.isEmptyRespTopic()) {
if (callback != null) {
callback.run();
}
return; return;
} }
finishHttpResult(convert, type, new HttpResult(ret).convert(ret == null ? null : ret.convert())); finishHttpResult(convert, type, new HttpResult(ret).convert(ret == null ? null : ret.convert()));
@@ -128,9 +113,6 @@ public class HttpMessageResponse extends HttpResponse {
@Override @Override
public void finish(final Convert convert, Type type, HttpResult result) { public void finish(final Convert convert, Type type, HttpResult result) {
if (message.isEmptyRespTopic()) { if (message.isEmptyRespTopic()) {
if (callback != null) {
callback.run();
}
return; return;
} }
if (convert != null) { if (convert != null) {
@@ -152,9 +134,6 @@ public class HttpMessageResponse extends HttpResponse {
finish(convert, type, (RetResult) obj); finish(convert, type, (RetResult) obj);
} else { } else {
if (message.isEmptyRespTopic()) { if (message.isEmptyRespTopic()) {
if (callback != null) {
callback.run();
}
return; return;
} }
finishHttpResult(convert, type, new HttpResult(obj)); finishHttpResult(convert, type, new HttpResult(obj));
@@ -164,9 +143,6 @@ public class HttpMessageResponse extends HttpResponse {
@Override @Override
public void finish(String obj) { public void finish(String obj) {
if (message.isEmptyRespTopic()) { if (message.isEmptyRespTopic()) {
if (callback != null) {
callback.run();
}
return; return;
} }
finishHttpResult(String.class, new HttpResult(obj == null ? "" : obj)); finishHttpResult(String.class, new HttpResult(obj == null ? "" : obj));
@@ -195,14 +171,11 @@ public class HttpMessageResponse extends HttpResponse {
@Override @Override
public void finish(int status, String msg) { public void finish(int status, String msg) {
if (status > 400) { if (status > 400) {
producer.logger.log(Level.WARNING, "HttpMessageResponse.finish status: " + status + ", uri: " + this.request.getRequestURI() + ", message: " + this.message); messageClient.logger.log(Level.WARNING, "HttpMessageResponse.finish status: " + status + ", uri: " + this.request.getRequestURI() + ", message: " + this.message);
} else if (producer.logger.isLoggable(Level.FINEST)) { } else if (messageClient.logger.isLoggable(Level.FINEST)) {
producer.logger.log(Level.FINEST, "HttpMessageResponse.finish status: " + status); messageClient.logger.log(Level.FINEST, "HttpMessageResponse.finish status: " + status);
} }
if (this.message.isEmptyRespTopic()) { if (this.message.isEmptyRespTopic()) {
if (callback != null) {
callback.run();
}
return; return;
} }
finishHttpResult(String.class, new HttpResult(msg == null ? "" : msg).status(status)); finishHttpResult(String.class, new HttpResult(msg == null ? "" : msg).status(status));
@@ -211,9 +184,6 @@ public class HttpMessageResponse extends HttpResponse {
@Override @Override
public void finish(boolean kill, final byte[] bs, int offset, int length) { public void finish(boolean kill, final byte[] bs, int offset, int length) {
if (message.isEmptyRespTopic()) { if (message.isEmptyRespTopic()) {
if (callback != null) {
callback.run();
}
return; return;
} }
if (offset == 0 && bs.length == length) { if (offset == 0 && bs.length == length) {
@@ -226,9 +196,6 @@ public class HttpMessageResponse extends HttpResponse {
@Override @Override
public void finish(boolean kill, final String contentType, final byte[] bs, int offset, int length) { public void finish(boolean kill, final String contentType, final byte[] bs, int offset, int length) {
if (message.isEmptyRespTopic()) { if (message.isEmptyRespTopic()) {
if (callback != null) {
callback.run();
}
return; return;
} }
byte[] rs = (offset == 0 && bs.length == length) ? bs : Arrays.copyOfRange(bs, offset, offset + length); byte[] rs = (offset == 0 && bs.length == length) ? bs : Arrays.copyOfRange(bs, offset, offset + length);
@@ -238,9 +205,6 @@ public class HttpMessageResponse extends HttpResponse {
@Override @Override
protected <A> void finish(boolean kill, final String contentType, final byte[] bs, int offset, int length, Consumer<A> consumer, A attachment) { protected <A> void finish(boolean kill, final String contentType, final byte[] bs, int offset, int length, Consumer<A> consumer, A attachment) {
if (message.isEmptyRespTopic()) { if (message.isEmptyRespTopic()) {
if (callback != null) {
callback.run();
}
return; return;
} }
byte[] rs = (offset == 0 && bs.length == length) ? bs : Arrays.copyOfRange(bs, offset, offset + length); byte[] rs = (offset == 0 && bs.length == length) ? bs : Arrays.copyOfRange(bs, offset, offset + length);
@@ -250,9 +214,6 @@ public class HttpMessageResponse extends HttpResponse {
@Override @Override
public void finishBuffer(boolean kill, ByteBuffer buffer) { public void finishBuffer(boolean kill, ByteBuffer buffer) {
if (message.isEmptyRespTopic()) { if (message.isEmptyRespTopic()) {
if (callback != null) {
callback.run();
}
return; return;
} }
byte[] bs = new byte[buffer.remaining()]; byte[] bs = new byte[buffer.remaining()];
@@ -263,9 +224,6 @@ public class HttpMessageResponse extends HttpResponse {
@Override @Override
public void finishBuffers(boolean kill, ByteBuffer... buffers) { public void finishBuffers(boolean kill, ByteBuffer... buffers) {
if (message.isEmptyRespTopic()) { if (message.isEmptyRespTopic()) {
if (callback != null) {
callback.run();
}
return; return;
} }
int size = 0; int size = 0;

View File

@@ -0,0 +1,52 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
import java.util.logging.*;
import org.redkale.boot.NodeHttpServer;
import org.redkale.net.Context;
import org.redkale.net.Request;
import org.redkale.net.Response;
import org.redkale.net.http.*;
import org.redkale.service.Service;
/**
* 一个Service对应一个MessageProcessor
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
public class HttpMessageServlet extends MessageServlet {
public HttpMessageServlet(MessageClient messageClient, NodeHttpServer server,
Service service, HttpServlet servlet, String topic) {
super(messageClient, server, service, servlet, topic);
}
@Override
protected Request createRequest(Context context, MessageRecord message) {
return new HttpMessageRequest((HttpContext) context, message);
}
@Override
protected Response createResponse(Context context, Request request) {
return new HttpMessageResponse(messageClient, (HttpContext) context, (HttpMessageRequest) request);
}
@Override
protected void onError(Response response, MessageRecord message, Throwable t) {
if (message.getRespTopic() != null && !message.getRespTopic().isEmpty()) {
HttpMessageRequest request = ((HttpMessageResponse) response).request();
HttpMessageResponse.finishHttpResult(logger.isLoggable(Level.FINEST), response == null ? null : request.getRespConvert(),
null, message, messageClient, message.getRespTopic(), new HttpResult().status(500));
}
}
}

View File

@@ -0,0 +1,49 @@
/*
*
*/
package org.redkale.mq;
import java.io.Serializable;
import java.util.concurrent.CompletableFuture;
import org.redkale.cluster.HttpRpcClient;
import static org.redkale.mq.MessageRecord.CTYPE_HTTP_REQUEST;
import org.redkale.net.http.HttpResult;
import org.redkale.net.http.HttpSimpleRequest;
/**
*
* @author zhangjx
*/
final class HttpRpcMessageClient extends HttpRpcClient {
private final MessageCoder<HttpSimpleRequest> requestCoder = HttpSimpleRequestCoder.getInstance();
private final int nodeid;
private final MessageClient messageClient;
public HttpRpcMessageClient(MessageClient messageClient, final int nodeid) {
this.messageClient = messageClient;
this.nodeid = nodeid;
}
@Override
public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) {
MessageRecord message = messageClient.createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), requestCoder.encode(request));
message.userid(userid).groupid(groupid);
return messageClient.sendMessage(message).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance()));
}
@Override
public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) {
MessageRecord message = messageClient.createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), requestCoder.encode(request));
message.userid(userid).groupid(groupid);
messageClient.produceMessage(message);
}
@Override
protected int getNodeid() {
return nodeid;
}
}

View File

@@ -14,13 +14,13 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.*; import java.util.logging.*;
import java.util.stream.Collectors;
import org.redkale.annotation.*; import org.redkale.annotation.*;
import org.redkale.annotation.AutoLoad; import org.redkale.annotation.AutoLoad;
import org.redkale.annotation.ResourceListener; import org.redkale.annotation.ResourceListener;
import org.redkale.boot.*; import org.redkale.boot.*;
import static org.redkale.boot.Application.RESNAME_APP_NAME; import static org.redkale.boot.Application.RESNAME_APP_NAME;
import static org.redkale.boot.Application.RESNAME_APP_NODEID; import static org.redkale.boot.Application.RESNAME_APP_NODEID;
import org.redkale.cluster.HttpRpcClient;
import org.redkale.convert.Convert; import org.redkale.convert.Convert;
import org.redkale.convert.ConvertFactory; import org.redkale.convert.ConvertFactory;
import org.redkale.convert.ConvertType; import org.redkale.convert.ConvertType;
@@ -61,6 +61,11 @@ public abstract class MessageAgent implements Resourcable {
private ExecutorService workExecutor; private ExecutorService workExecutor;
private int timeoutSeconds;
final AtomicLong msgSeqno = new AtomicLong(Math.abs(System.nanoTime()));
//-------------------------- MessageConsumer、MessageProducer --------------------------
protected final ReentrantLock messageProducerLock = new ReentrantLock(); protected final ReentrantLock messageProducerLock = new ReentrantLock();
protected MessageProducer messageBaseProducer; protected MessageProducer messageBaseProducer;
@@ -72,31 +77,31 @@ public abstract class MessageAgent implements Resourcable {
//key: group, sub-key: topic //key: group, sub-key: topic
protected final Map<String, Map<String, MessageConsumerWrapper>> messageConsumerMap = new HashMap<>(); protected final Map<String, Map<String, MessageConsumerWrapper>> messageConsumerMap = new HashMap<>();
protected MessageClientProducer httpClientProducer; //-------------------------- HttpRpcClient、SncpMessageClient --------------------------
private HttpRpcMessageClient httpRpcClient;
protected MessageClientProducer sncpClientProducer; private String httpAppRespTopic;
protected HttpMessageClient httpMessageClient; private String sncpAppRespTopic;
protected SncpMessageClient sncpMessageClient; protected MessageClient httpMessageClient;
protected MessageClient sncpMessageClient;
protected MessageClientProducer clientMessageProducer;
protected final ReentrantLock clientConsumerLock = new ReentrantLock(); protected final ReentrantLock clientConsumerLock = new ReentrantLock();
protected final ReentrantLock clientProducerLock = new ReentrantLock(); protected final ReentrantLock clientProducerLock = new ReentrantLock();
protected final ReentrantLock serviceLock = new ReentrantLock();
protected MessageCoder<MessageRecord> clientMessageCoder = MessageRecordSerializer.getInstance(); protected MessageCoder<MessageRecord> clientMessageCoder = MessageRecordSerializer.getInstance();
//本地Service消息接收处理器 key:consumerid
protected HashMap<String, MessageClientConsumerWrapper> clientConsumerNodes = new LinkedHashMap<>();
protected final AtomicLong msgSeqno = new AtomicLong(System.nanoTime());
protected ScheduledThreadPoolExecutor timeoutExecutor; protected ScheduledThreadPoolExecutor timeoutExecutor;
public void init(AnyValue config) { public void init(AnyValue config) {
this.name = checkName(config.getValue("name", "")); this.name = checkName(config.getValue("name", ""));
this.httpAppRespTopic = generateHttpAppRespTopic();
this.sncpAppRespTopic = generateSncpAppRespTopic();
int threads = config.getIntValue("threads", -1); int threads = config.getIntValue("threads", -1);
if (threads == 0) { if (threads == 0) {
this.workExecutor = application.getWorkExecutor(); this.workExecutor = application.getWorkExecutor();
@@ -105,8 +110,9 @@ public abstract class MessageAgent implements Resourcable {
this.workExecutor = threads > 0 ? WorkThread.createExecutor(threads, "Redkale-MessageConsumerThread-[" + name + "]-%s") this.workExecutor = threads > 0 ? WorkThread.createExecutor(threads, "Redkale-MessageConsumerThread-[" + name + "]-%s")
: WorkThread.createWorkExecutor(Utility.cpus(), "Redkale-MessageConsumerThread-[" + name + "]-%s"); : WorkThread.createWorkExecutor(Utility.cpus(), "Redkale-MessageConsumerThread-[" + name + "]-%s");
} }
this.httpMessageClient = new HttpMessageClient(this); this.httpMessageClient = new MessageClient(this, this.httpAppRespTopic, Rest.getHttpReqTopicPrefix());
this.sncpMessageClient = new SncpMessageClient(this); this.sncpMessageClient = new MessageClient(this, this.sncpAppRespTopic, Sncp.getSncpReqTopicPrefix());
String coderType = config.getValue("coder", ""); String coderType = config.getValue("coder", "");
if (!coderType.trim().isEmpty()) { if (!coderType.trim().isEmpty()) {
try { try {
@@ -132,6 +138,7 @@ public abstract class MessageAgent implements Resourcable {
t.setDaemon(true); t.setDaemon(true);
return t; return t;
}); });
this.timeoutSeconds = config.getIntValue("timeoutSeconds", 30);
this.timeoutExecutor.setRemoveOnCancelPolicy(true); this.timeoutExecutor.setRemoveOnCancelPolicy(true);
} }
@@ -145,20 +152,38 @@ public abstract class MessageAgent implements Resourcable {
if (loginfo.length() > 0) { if (loginfo.length() > 0) {
logger.log(Level.INFO, loginfo.toString()); logger.log(Level.INFO, loginfo.toString());
} }
this.clientConsumerNodes.values().forEach(node -> {
long s = System.currentTimeMillis(); this.clientMessageProducer = createMessageClientProducer("redkale-message-producer");
node.consumer.start(); if (this.httpRpcClient != null || !this.httpMessageClient.isEmpty()) {
long e = System.currentTimeMillis() - s; this.httpMessageClient.putMessageRespProcessor();
}); }
if (!this.sncpMessageClient.isEmpty()) {
this.sncpMessageClient.putMessageRespProcessor();
}
this.startMessageClientConsumers();
List<String> topics = new ArrayList<>();
if (this.httpMessageClient.isEmpty()) {
topics.addAll(this.httpMessageClient.getTopics());
}
if (!this.sncpMessageClient.isEmpty()) {
topics.addAll(this.sncpMessageClient.getTopics());
}
if (!topics.isEmpty()) {
Collections.sort(topics);
loginfo = new StringBuilder();
loginfo.append(MessageClientConsumer.class.getSimpleName() + " subscribe topics:\r\n");
for (String topic : topics) {
loginfo.append(" ").append(topic).append("\r\n");
}
logger.log(Level.INFO, loginfo.toString());
}
} }
//Application.stop 在执行server.shutdown之前执行 //Application.stop 在执行server.shutdown之前执行
public void stop() { public void stop() {
this.stopMessageConsumer(); this.stopMessageConsumer();
this.stopMessageProducer(); this.stopMessageProducer();
this.clientConsumerNodes.values().forEach(node -> { this.stopMessageClientConsumers();
node.consumer.stop();
});
} }
//Application.stop 在所有server.shutdown执行后执行 //Application.stop 在所有server.shutdown执行后执行
@@ -168,15 +193,11 @@ public abstract class MessageAgent implements Resourcable {
} }
this.messageConsumerList.clear(); this.messageConsumerList.clear();
this.messageConsumerMap.clear(); this.messageConsumerMap.clear();
//-------------- MessageClient --------------
this.httpMessageClient.close(); this.httpMessageClient.stop();
this.sncpMessageClient.close(); this.sncpMessageClient.stop();
if (this.clientMessageProducer != null) {
if (this.httpClientProducer != null) { this.clientMessageProducer.stop();
this.httpClientProducer.stop();
}
if (this.sncpClientProducer != null) {
this.sncpClientProducer.stop();
} }
if (this.clientMessageCoder instanceof Service) { if (this.clientMessageCoder instanceof Service) {
((Service) this.clientMessageCoder).destroy(config); ((Service) this.clientMessageCoder).destroy(config);
@@ -287,6 +308,10 @@ public abstract class MessageAgent implements Resourcable {
return logger; return logger;
} }
public int getTimeoutSeconds() {
return timeoutSeconds;
}
public String getName() { public String getName() {
return name; return name;
} }
@@ -303,11 +328,25 @@ public abstract class MessageAgent implements Resourcable {
return workExecutor; return workExecutor;
} }
public HttpMessageClient getHttpMessageClient() { public HttpRpcClient getHttpRpcClient() {
if (this.httpRpcClient == null) {
messageProducerLock.lock();
try {
if (this.httpRpcClient == null) {
this.httpRpcClient = new HttpRpcMessageClient(this.httpMessageClient, this.nodeid);
}
} finally {
messageProducerLock.unlock();
}
}
return httpRpcClient;
}
public MessageClient getHttpMessageClient() {
return httpMessageClient; return httpMessageClient;
} }
public SncpMessageClient getSncpMessageClient() { public MessageClient getSncpMessageClient() {
return sncpMessageClient; return sncpMessageClient;
} }
@@ -326,83 +365,19 @@ public abstract class MessageAgent implements Resourcable {
return name; return name;
} }
protected List<MessageClientConsumer> getMessageClientConsumers() {
List<MessageClientConsumer> consumers = new ArrayList<>();
MessageClientConsumer one = this.httpMessageClient == null ? null : this.httpMessageClient.respConsumer;
if (one != null) {
consumers.add(one);
}
one = this.sncpMessageClient == null ? null : this.sncpMessageClient.respConsumer;
if (one != null) {
consumers.add(one);
}
consumers.addAll(clientConsumerNodes.values().stream().map(mcn -> mcn.consumer).collect(Collectors.toList()));
return consumers;
}
protected List<MessageClientProducer> getMessageClientProducers() {
List<MessageClientProducer> producers = new ArrayList<>();
if (this.httpClientProducer != null) {
producers.add(this.httpClientProducer);
}
if (this.sncpClientProducer != null) {
producers.add(this.sncpClientProducer);
}
MessageClientProducer one = this.httpMessageClient == null ? null : this.httpMessageClient.getProducer();
if (one != null) {
producers.add(one);
}
one = this.sncpMessageClient == null ? null : this.sncpMessageClient.getProducer();
if (one != null) {
producers.add(one);
}
return producers;
}
public MessageCoder<MessageRecord> getClientMessageCoder() { public MessageCoder<MessageRecord> getClientMessageCoder() {
return this.clientMessageCoder; return this.clientMessageCoder;
} }
//获取指定topic的生产处理器 public MessageClientProducer getMessageClientProducer() {
public MessageClientProducer getSncpMessageClientProducer() { return this.clientMessageProducer;
if (this.sncpClientProducer == null) {
clientProducerLock.lock();
try {
if (this.sncpClientProducer == null) {
long s = System.currentTimeMillis();
this.sncpClientProducer = createMessageClientProducer("SncpProducer");
long e = System.currentTimeMillis() - s;
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "MessageAgent.SncpProducer startup all in " + e + "ms");
}
}
} finally {
clientProducerLock.unlock();
}
}
return this.sncpClientProducer;
} }
public MessageClientProducer getHttpMessageClientProducer() { //
if (this.httpClientProducer == null) { protected abstract void startMessageClientConsumers();
clientProducerLock.lock();
try { protected abstract void stopMessageClientConsumers();
if (this.httpClientProducer == null) {
long s = System.currentTimeMillis();
this.httpClientProducer = createMessageClientProducer("HttpProducer");
long e = System.currentTimeMillis() - s;
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "MessageAgent.HttpProducer startup all in " + e + "ms");
}
}
} finally {
clientProducerLock.unlock();
}
}
return this.httpClientProducer;
}
//
protected abstract void startMessageConsumer(); protected abstract void startMessageConsumer();
protected abstract void stopMessageConsumer(); protected abstract void stopMessageConsumer();
@@ -429,9 +404,6 @@ public abstract class MessageAgent implements Resourcable {
//创建指定topic的生产处理器 //创建指定topic的生产处理器
protected abstract MessageClientProducer createMessageClientProducer(String producerName); protected abstract MessageClientProducer createMessageClientProducer(String producerName);
//创建指定topic的消费处理器
public abstract MessageClientConsumer createMessageClientConsumer(String topic, String group, MessageClientProcessor processor);
public final void putService(NodeHttpServer ns, Service service, HttpServlet servlet) { public final void putService(NodeHttpServer ns, Service service, HttpServlet servlet) {
AutoLoad al = service.getClass().getAnnotation(AutoLoad.class); AutoLoad al = service.getClass().getAnnotation(AutoLoad.class);
if (al != null && !al.value() && service.getClass().getAnnotation(Local.class) != null) { if (al != null && !al.value() && service.getClass().getAnnotation(Local.class) != null) {
@@ -447,18 +419,12 @@ public abstract class MessageAgent implements Resourcable {
return; return;
} }
} }
String topic = generateHttpReqTopic(service); if (WebSocketNode.class.isAssignableFrom(Sncp.getResourceType(service)) && nodeid == 0) {
String consumerid = generateHttpConsumerid(topic, service); throw new RedkaleException("Application.node not config in WebSocket Cluster");
serviceLock.lock();
try {
if (clientConsumerNodes.containsKey(consumerid)) {
throw new RedkaleException("consumerid(" + consumerid + ") is repeat");
}
HttpMessageClientProcessor processor = new HttpMessageClientProcessor(this.logger, httpMessageClient, getHttpMessageClientProducer(), ns, service, servlet);
this.clientConsumerNodes.put(consumerid, new MessageClientConsumerWrapper(ns, service, servlet, processor, createMessageClientConsumer(topic, consumerid, processor)));
} finally {
serviceLock.unlock();
} }
String topic = Rest.generateHttpReqTopic(service, this.nodeid);
MessageServlet processor = new HttpMessageServlet(this.httpMessageClient, ns, service, servlet, topic);
this.httpMessageClient.putMessageServlet(processor);
} }
public final void putService(NodeSncpServer ns, Service service, SncpServlet servlet) { public final void putService(NodeSncpServer ns, Service service, SncpServlet servlet) {
@@ -470,71 +436,36 @@ public abstract class MessageAgent implements Resourcable {
if (al2 != null && !al2.value() && service.getClass().getAnnotation(Local.class) != null) { if (al2 != null && !al2.value() && service.getClass().getAnnotation(Local.class) != null) {
return; return;
} }
String topic = generateSncpReqTopic(service); if (WebSocketNode.class.isAssignableFrom(Sncp.getResourceType(service)) && nodeid == 0) {
String consumerid = generateSncpConsumerid(topic, service); throw new RedkaleException("Application.node not config in WebSocket Cluster");
serviceLock.lock();
try {
if (clientConsumerNodes.containsKey(consumerid)) {
throw new RedkaleException("consumerid(" + consumerid + ") is repeat");
}
SncpMessageClientProcessor processor = new SncpMessageClientProcessor(this.logger, sncpMessageClient, getSncpMessageClientProducer(), ns, service, servlet);
this.clientConsumerNodes.put(consumerid, new MessageClientConsumerWrapper(ns, service, servlet, processor, createMessageClientConsumer(topic, consumerid, processor)));
} finally {
serviceLock.unlock();
} }
} String topic = Sncp.generateSncpReqTopic(service, this.nodeid);
MessageServlet processor = new SncpMessageServlet(this.sncpMessageClient, ns, service, servlet, topic);
//格式: sncp.req.module.user this.sncpMessageClient.putMessageServlet(processor);
public final String generateSncpReqTopic(Service service) {
return generateSncpReqTopic(Sncp.getResourceName(service), Sncp.getResourceType(service));
}
//格式: sncp.req.module.user
public final String generateSncpReqTopic(String resourceName, Class resourceType) {
if (WebSocketNode.class.isAssignableFrom(resourceType)) {
return "sncp.req.module.ws" + (resourceName.isEmpty() ? "" : ("-" + resourceName)) + ".node" + nodeid;
}
return "sncp.req.module." + resourceType.getSimpleName().replaceAll("Service.*$", "").toLowerCase() + (resourceName.isEmpty() ? "" : ("-" + resourceName));
}
//格式: http.req.module.user
public static String generateHttpReqTopic(String module) {
return "http.req.module." + module.toLowerCase();
}
//格式: http.req.module.user
public static String generateHttpReqTopic(String module, String resname) {
return "http.req.module." + module.toLowerCase() + (resname == null || resname.isEmpty() ? "" : ("-" + resname));
} }
//格式: sncp.resp.app.node10 //格式: sncp.resp.app.node10
protected String generateAppSncpRespTopic() { //格式参考Rest.generateHttpReqTopic
return "sncp.resp.app." + (Utility.isEmpty(nodeName) ? "node" : nodeName) + "-" + nodeid; private String generateSncpAppRespTopic() {
return Sncp.getSncpRespTopicPrefix() + "app." + (Utility.isEmpty(nodeName) ? "node" : nodeName) + "-" + nodeid;
} }
//格式: http.resp.app.node10 //格式: http.resp.app.node10
protected String generateAppHttpRespTopic() { //格式参考Rest.generateHttpReqTopic
return "http.resp.app." + (Utility.isEmpty(nodeName) ? "node" : nodeName) + "-" + nodeid; private String generateHttpAppRespTopic() {
return Rest.getHttpRespTopicPrefix() + "app." + (Utility.isEmpty(nodeName) ? "node" : nodeName) + "-" + nodeid;
} }
//格式: http.req.module.user public final String getHttpAppRespTopic() {
protected String generateHttpReqTopic(Service service) { return this.httpAppRespTopic;
String resname = Sncp.getResourceName(service);
String module = Rest.getRestModule(service).toLowerCase();
return "http.req.module." + module + (resname.isEmpty() ? "" : ("-" + resname));
} }
//格式: consumer-sncp.req.module.user 不提供外部使用 public final String getSncpAppRespTopic() {
protected final String generateSncpConsumerid(String topic, Service service) { return this.sncpAppRespTopic;
return "consumer-" + topic;
} }
//格式: consumer-http.req.module.user public final int getNodeid() {
protected String generateHttpConsumerid(String topic, Service service) { return this.nodeid;
String resname = Sncp.getResourceName(service);
String key = Rest.getRestModule(service).toLowerCase();
return "consumer-http.req.module." + key + (resname.isEmpty() ? "" : ("-" + resname));
} }
public static class MessageConsumerWrapper<T> { public static class MessageConsumerWrapper<T> {
@@ -647,11 +578,11 @@ public abstract class MessageAgent implements Resourcable {
public final Servlet servlet; public final Servlet servlet;
public final MessageClientProcessor processor; public final MessageServlet processor;
public final MessageClientConsumer consumer; public final MessageClientConsumer consumer;
public MessageClientConsumerWrapper(NodeServer server, Service service, Servlet servlet, MessageClientProcessor processor, MessageClientConsumer consumer) { public MessageClientConsumerWrapper(NodeServer server, Service service, Servlet servlet, MessageServlet processor, MessageClientConsumer consumer) {
this.server = server; this.server = server;
this.service = service; this.service = service;
this.servlet = servlet; this.servlet = servlet;

View File

@@ -6,15 +6,26 @@
package org.redkale.mq; package org.redkale.mq;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.concurrent.*; import java.util.Collection;
import java.util.concurrent.atomic.*; import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level; import java.util.logging.Logger;
import org.redkale.cluster.ClusterRpcClient;
import org.redkale.convert.Convert; import org.redkale.convert.Convert;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import static org.redkale.mq.MessageRecord.*; import static org.redkale.mq.MessageRecord.CTYPE_HTTP_REQUEST;
import org.redkale.net.http.*; import static org.redkale.mq.MessageRecord.CTYPE_HTTP_RESULT;
import static org.redkale.mq.MessageRecord.CTYPE_STRING;
import org.redkale.net.http.HttpResult;
import org.redkale.net.http.HttpSimpleRequest;
import org.redkale.util.RedkaleException;
import org.redkale.util.Traces; import org.redkale.util.Traces;
import org.redkale.util.Utility;
/** /**
* *
@@ -25,108 +36,68 @@ import org.redkale.util.Traces;
* *
* @since 2.1.0 * @since 2.1.0
*/ */
public abstract class MessageClient { public class MessageClient implements ClusterRpcClient<MessageRecord, MessageRecord>, MessageProcessor {
protected final ConcurrentHashMap<Long, MessageRespFutureNode> respNodes = new ConcurrentHashMap<>(); protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
private final ReentrantLock lock = new ReentrantLock(); private final MessageAgent messageAgent;
protected final MessageAgent messageAgent; private final String appRespTopic;
private final String reqTopicPrefix;
protected final ReentrantLock processorLock = new ReentrantLock();
protected final AtomicLong msgSeqno; protected final AtomicLong msgSeqno;
protected MessageClientConsumer respConsumer; //key: reqTopic
private final HashMap<String, MessageProcessor> messageProcessors = new HashMap<>();
protected String appRespTopic; final ConcurrentHashMap<Long, MessageRespFuture> respQueue = new ConcurrentHashMap<>();
protected String appRespConsumerid; protected MessageClient(MessageAgent messageAgent, String appRespTopic, String reqTopicPrefix) {
private final String clazzName;
protected MessageClient(MessageAgent messageAgent) {
this.messageAgent = messageAgent; this.messageAgent = messageAgent;
this.msgSeqno = messageAgent == null ? new AtomicLong() : messageAgent.msgSeqno; this.appRespTopic = appRespTopic;
this.clazzName = getClass().getSimpleName(); this.reqTopicPrefix = reqTopicPrefix;
this.msgSeqno = messageAgent.msgSeqno;
} }
protected void close() { @Override
if (this.respConsumer != null) { public void process(final MessageRecord msg, long time) {
this.respConsumer.stop(); MessageProcessor processor = messageProcessors.get(msg.getTopic());
if (processor == null) {
throw new RedkaleException(msg.getTopic() + " not found MessageProcessor, record=" + msg);
} else {
processor.process(msg, time);
} }
} }
protected CompletableFuture<MessageRecord> sendMessage(final MessageRecord message, boolean needresp) { void putMessageRespProcessor() {
return sendMessage(message, needresp, null); this.messageProcessors.put(appRespTopic, new MessageRespProcessor(this));
} }
protected CompletableFuture<MessageRecord> sendMessage(final MessageRecord message, boolean needresp, LongAdder counter) { public Collection<String> getTopics() {
return this.messageProcessors.keySet();
}
@Override
public void produceMessage(MessageRecord message) {
messageAgent.getMessageClientProducer().apply(message);
}
@Override
public CompletableFuture<MessageRecord> sendMessage(final MessageRecord message) {
CompletableFuture<MessageRecord> future = new CompletableFuture<>(); CompletableFuture<MessageRecord> future = new CompletableFuture<>();
boolean finest = messageAgent != null && messageAgent.logger.isLoggable(Level.FINEST);
try { try {
if (this.respConsumer == null) { if (Utility.isEmpty(message.getRespTopic())) {
lock.lock();
try {
if (this.appRespConsumerid == null) {
this.appRespConsumerid = "consumer-" + this.appRespTopic;
}
if (this.respConsumer == null) {
MessageClientProcessor processor = (msg, callback) -> {
long now = System.currentTimeMillis();
MessageRespFutureNode node = respNodes.remove(msg.getSeqid());
if (node == null) {
messageAgent.logger.log(Level.WARNING, MessageClient.this.getClass().getSimpleName() + " process " + msg + " error not found mqresp.futurenode");
return;
}
if (node.scheduledFuture != null) {
node.scheduledFuture.cancel(true);
}
LongAdder ncer = node.getCounter();
if (ncer != null) {
ncer.decrement();
}
final long cha = now - msg.createTime;
if (finest) {
messageAgent.logger.log(Level.FINEST, clazzName + ".MessageRespFutureNode.receive (mq.delay = " + cha + "ms, mq.seqid = " + msg.getSeqid() + ")");
}
node.future.complete(msg);
long cha2 = System.currentTimeMillis() - now;
if ((cha > 1000 || cha2 > 1000) && messageAgent != null && messageAgent.logger.isLoggable(Level.FINE)) {
messageAgent.logger.log(Level.FINE, clazzName + ".MessageRespFutureNode.complete (mqs.delays = " + cha + "ms, mqs.completes = " + cha2 + "ms, mqs.counters = " + ncer + ") mqresp.msg: " + formatRespMessage(msg));
} else if ((cha > 50 || cha2 > 50) && messageAgent != null && messageAgent.logger.isLoggable(Level.FINER)) {
messageAgent.logger.log(Level.FINER, clazzName + ".MessageRespFutureNode.complete (mq.delays = " + cha + "ms, mq.completes = " + cha2 + "ms, mq.counters = " + ncer + ") mqresp.msg: " + formatRespMessage(msg));
} else if (finest) {
messageAgent.logger.log(Level.FINEST, clazzName + ".MessageRespFutureNode.complete (mq.delay = " + cha + "ms, mq.complete = " + cha2 + "ms, mq.counter = " + ncer + ") mqresp.msg: " + formatRespMessage(msg));
}
};
long ones = System.currentTimeMillis();
MessageClientConsumer one = messageAgent.createMessageClientConsumer(appRespTopic, appRespConsumerid, processor);
one.start();
long onee = System.currentTimeMillis() - ones;
if (finest) {
messageAgent.logger.log(Level.FINEST, clazzName + ".MessageRespFutureNode.startup " + onee + "ms ");
}
this.respConsumer = one;
}
} finally {
lock.unlock();
}
}
if (needresp && (message.getRespTopic() == null || message.getRespTopic().isEmpty())) {
message.setRespTopic(appRespTopic); message.setRespTopic(appRespTopic);
} }
if (counter != null) { messageAgent.getMessageClientProducer().apply(message);
counter.increment(); MessageRespFuture respNode = new MessageRespFuture(this, future, message);
} respQueue.put(message.getSeqid(), respNode);
getProducer().apply(message); ScheduledThreadPoolExecutor executor = messageAgent.timeoutExecutor;
if (needresp) { if (executor != null && messageAgent.getTimeoutSeconds() > 0) {
MessageRespFutureNode node = new MessageRespFutureNode(messageAgent.logger, message, respNodes, counter, future); respNode.scheduledFuture = executor.schedule(respNode, messageAgent.getTimeoutSeconds(), TimeUnit.SECONDS);
respNodes.put(message.getSeqid(), node);
ScheduledThreadPoolExecutor executor = messageAgent.timeoutExecutor;
if (executor != null) {
node.scheduledFuture = executor.schedule(node, 30, TimeUnit.SECONDS);
}
} else {
future.complete(null);
} }
} catch (Throwable ex) { } catch (Throwable ex) {
future.completeExceptionally(ex); future.completeExceptionally(ex);
@@ -134,84 +105,96 @@ public abstract class MessageClient {
return future; return future;
} }
protected MessageRecord formatRespMessage(MessageRecord message) { //非线程安全
return message; public void putMessageServlet(MessageServlet servlet) {
String topic = servlet.getTopic();
processorLock.lock();
try {
if (messageProcessors.containsKey(topic)) {
throw new RedkaleException("req-topic(" + topic + ") is repeat");
}
messageProcessors.put(topic, servlet);
} finally {
processorLock.unlock();
}
} }
protected abstract MessageClientProducer getProducer(); public boolean isEmpty() {
return messageProcessors.size() < 1;
}
public MessageRecord createMessageRecord(String resptopic, String content) { public MessageRecord createMessageRecord(String respTopic, String content) {
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0,
null, null, resptopic, Traces.currentTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); null, null, respTopic, Traces.currentTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8));
} }
public MessageRecord createMessageRecord(String topic, String resptopic, String content) { public MessageRecord createMessageRecord(String topic, String respTopic, String content) {
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0,
null, topic, resptopic, Traces.currentTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); null, topic, respTopic, Traces.currentTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8));
} }
public MessageRecord createMessageRecord(String topic, String resptopic, String traceid, String content) { public MessageRecord createMessageRecord(String topic, String respTopic, String traceid, String content) {
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0, return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), 0,
null, topic, resptopic, traceid, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); null, topic, respTopic, traceid, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
} }
public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String content) { public MessageRecord createMessageRecord(int userid, String topic, String respTopic, String content) {
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid, return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid,
null, topic, resptopic, Traces.currentTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8)); null, topic, respTopic, Traces.currentTraceid(), content == null ? null : content.getBytes(StandardCharsets.UTF_8));
} }
public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String traceid, String content) { public MessageRecord createMessageRecord(int userid, String topic, String respTopic, String traceid, String content) {
return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid, return new MessageRecord(msgSeqno.incrementAndGet(), CTYPE_STRING, 1, 0, System.currentTimeMillis(), userid,
null, topic, resptopic, traceid, content == null ? null : content.getBytes(StandardCharsets.UTF_8)); null, topic, respTopic, traceid, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
} }
public MessageRecord createMessageRecord(String topic, String resptopic, Convert convert, Object bean) { public MessageRecord createMessageRecord(String topic, String respTopic, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0, return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0,
null, topic, resptopic, Traces.currentTraceid(), convert.convertToBytes(bean)); null, topic, respTopic, Traces.currentTraceid(), convert.convertToBytes(bean));
} }
public MessageRecord createMessageRecord(String topic, String resptopic, String traceid, Convert convert, Object bean) { public MessageRecord createMessageRecord(String topic, String respTopic, String traceid, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0, return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), 0,
null, topic, resptopic, traceid, convert.convertToBytes(bean)); null, topic, respTopic, traceid, convert.convertToBytes(bean));
} }
public MessageRecord createMessageRecord(int userid, String topic, String resptopic, Convert convert, Object bean) { public MessageRecord createMessageRecord(int userid, String topic, String respTopic, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid,
null, topic, resptopic, Traces.currentTraceid(), convert.convertToBytes(bean)); null, topic, respTopic, Traces.currentTraceid(), convert.convertToBytes(bean));
} }
public MessageRecord createMessageRecord(int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { public MessageRecord createMessageRecord(int userid, String groupid, String topic, String respTopic, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid, return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, 0, System.currentTimeMillis(), userid,
groupid, topic, resptopic, Traces.currentTraceid(), convert.convertToBytes(bean)); groupid, topic, respTopic, Traces.currentTraceid(), convert.convertToBytes(bean));
} }
public MessageRecord createMessageRecord(int flag, int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) { public MessageRecord createMessageRecord(int flag, int userid, String groupid, String topic, String respTopic, Convert convert, Object bean) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, flag, System.currentTimeMillis(), userid, return new MessageRecord(msgSeqno.incrementAndGet(), ctype(convert, bean), 1, flag, System.currentTimeMillis(), userid,
groupid, topic, resptopic, Traces.currentTraceid(), convert.convertToBytes(bean)); groupid, topic, respTopic, Traces.currentTraceid(), convert.convertToBytes(bean));
} }
public MessageRecord createMessageRecord(String topic, String resptopic, byte[] content) { public MessageRecord createMessageRecord(String topic, String respTopic, byte[] content) {
return new MessageRecord(msgSeqno.incrementAndGet(), (byte) 0, topic, resptopic, Traces.currentTraceid(), content); return new MessageRecord(msgSeqno.incrementAndGet(), (byte) 0, topic, respTopic, Traces.currentTraceid(), content);
} }
public MessageRecord createMessageRecord(long seqid, String topic, String resptopic, byte[] content) { public MessageRecord createMessageRecord(long seqid, String topic, String respTopic, byte[] content) {
return new MessageRecord(seqid, (byte) 0, topic, resptopic, Traces.currentTraceid(), content); return new MessageRecord(seqid, (byte) 0, topic, respTopic, Traces.currentTraceid(), content);
} }
protected MessageRecord createMessageRecord(byte ctype, String topic, String resptopic, byte[] content) { protected MessageRecord createMessageRecord(byte ctype, String topic, String respTopic, byte[] content) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype, topic, resptopic, Traces.currentTraceid(), content); return new MessageRecord(msgSeqno.incrementAndGet(), ctype, topic, respTopic, Traces.currentTraceid(), content);
} }
protected MessageRecord createMessageRecord(byte ctype, String topic, String resptopic, String traceid, byte[] content) { protected MessageRecord createMessageRecord(byte ctype, String topic, String respTopic, String traceid, byte[] content) {
return new MessageRecord(msgSeqno.incrementAndGet(), ctype, topic, resptopic, traceid, content); return new MessageRecord(msgSeqno.incrementAndGet(), ctype, topic, respTopic, traceid, content);
} }
protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String resptopic, byte[] content) { protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String respTopic, byte[] content) {
return new MessageRecord(seqid, ctype, topic, resptopic, Traces.currentTraceid(), content); return new MessageRecord(seqid, ctype, topic, respTopic, Traces.currentTraceid(), content);
} }
protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String resptopic, String traceid, byte[] content) { protected MessageRecord createMessageRecord(long seqid, byte ctype, String topic, String respTopic, String traceid, byte[] content) {
return new MessageRecord(seqid, ctype, topic, resptopic, traceid, content); return new MessageRecord(seqid, ctype, topic, respTopic, traceid, content);
} }
private byte ctype(Convert convert, Object bean) { private byte ctype(Convert convert, Object bean) {
@@ -225,4 +208,31 @@ public abstract class MessageClient {
} }
return ctype; return ctype;
} }
public void start() {
}
public void stop() {
}
public MessageAgent getMessageAgent() {
return messageAgent;
}
public MessageCoder<MessageRecord> getClientMessageCoder() {
return this.messageAgent.getClientMessageCoder();
}
public MessageClientProducer getProducer() {
return messageAgent.getMessageClientProducer();
}
public String getAppRespTopic() {
return appRespTopic;
}
public String getReqTopicPrefix() {
return reqTopicPrefix;
}
} }

View File

@@ -5,9 +5,7 @@
*/ */
package org.redkale.mq; package org.redkale.mq;
import java.util.Arrays; import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.logging.Logger; import java.util.logging.Logger;
@@ -21,45 +19,28 @@ import java.util.logging.Logger;
* *
* @since 2.1.0 * @since 2.1.0
*/ */
public abstract class MessageClientConsumer { public abstract class MessageClientConsumer implements MessageProcessor {
protected final List<String> topics;
protected final String consumerid;
protected MessageAgent messageAgent;
protected final MessageClientProcessor processor;
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
protected volatile boolean closed; protected MessageClient messageClient;
protected MessageClientConsumer(MessageAgent messageAgent, String topic, final String consumerid, MessageClientProcessor processor) { protected MessageClientConsumer(MessageClient messageClient) {
Objects.requireNonNull(messageAgent); Objects.requireNonNull(messageClient);
Objects.requireNonNull(topic); this.messageClient = messageClient;
Objects.requireNonNull(consumerid);
Objects.requireNonNull(processor);
this.messageAgent = messageAgent;
this.topics = Collections.unmodifiableList(Arrays.asList(topic));
this.consumerid = consumerid;
this.processor = processor;
} }
public MessageClientProcessor getProcessor() { public Collection<String> getTopics() {
return processor; return messageClient.getTopics();
} }
public List<String> getTopics() { @Override
return topics; public void process(MessageRecord message, long time) {
messageClient.process(message, time);
} }
public abstract void start(); public abstract void start();
public abstract void stop(); public abstract void stop();
public boolean isClosed() {
return closed;
}
} }

View File

@@ -1,27 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
/**
* 一个Service对应一个MessageProcessor
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
public interface MessageClientProcessor {
default void begin(int size, long starttime) {
}
public void process(MessageRecord message, Runnable callback);
default void commit() {
}
}

View File

@@ -25,7 +25,7 @@ public interface MessageConsumer<T> {
default void init(AnyValue config) { default void init(AnyValue config) {
} }
public void onMessage(MessageConext context, T messages); public void onMessage(MessageConext context, T message);
default void destroy(AnyValue config) { default void destroy(AnyValue config) {
} }

View File

@@ -0,0 +1,13 @@
/*
*
*/
package org.redkale.mq;
/**
*
* @author zhangjx
*/
public interface MessageProcessor {
public void process(final MessageRecord msg, long time);
}

View File

@@ -32,11 +32,14 @@ public class MessageRecord implements Serializable {
protected static final byte CTYPE_STRING = 1; protected static final byte CTYPE_STRING = 1;
protected static final byte CTYPE_HTTP_REQUEST = 2; //Bson bytes
protected static final byte CTYPE_BSON = 2;
protected static final byte CTYPE_HTTP_RESULT = 3; //HttpSimpleRequest
protected static final byte CTYPE_HTTP_REQUEST = 3;
protected static final byte CTYPE_BSON_RESULT = 4; //HttpResult<byte[]>
protected static final byte CTYPE_HTTP_RESULT = 4;
@ConvertColumn(index = 1) @ConvertColumn(index = 1)
@Comment("消息序列号") @Comment("消息序列号")
@@ -54,8 +57,9 @@ public class MessageRecord implements Serializable {
@Comment("创建时间") @Comment("创建时间")
protected long createTime; protected long createTime;
//@since 2.5.0 由int改成Serializable
@ConvertColumn(index = 5) @ConvertColumn(index = 5)
@Comment("用户ID无用户信息视为null或0, 具体数据类型只能是int、long、String") //@since 2.5.0 由int改成Serializable @Comment("用户ID无用户信息视为null或0, 具体数据类型只能是int、long、String")
protected Serializable userid; protected Serializable userid;
@ConvertColumn(index = 6) @ConvertColumn(index = 6)
@@ -330,7 +334,7 @@ public class MessageRecord implements Serializable {
sb.append(",\"respTopic\":\"").append(this.respTopic).append("\""); sb.append(",\"respTopic\":\"").append(this.respTopic).append("\"");
} }
if (this.content != null) { if (this.content != null) {
if (this.ctype == CTYPE_BSON_RESULT && this.content.length > SncpHeader.HEADER_SUBSIZE) { if (this.ctype == CTYPE_BSON && this.content.length > SncpHeader.HEADER_SUBSIZE) {
//int offset = new ByteArray(this.content).getChar(0) + 1; //循环占位符 //int offset = new ByteArray(this.content).getChar(0) + 1; //循环占位符
//Object rs = BsonConvert.root().convertFrom(Object.class, this.content, offset, this.content.length - offset); //Object rs = BsonConvert.root().convertFrom(Object.class, this.content, offset, this.content.length - offset);
//sb.append(",\"content\":").append(rs); //sb.append(",\"content\":").append(rs);

View File

@@ -46,7 +46,7 @@ public class MessageRecordSerializer implements MessageCoder<MessageRecord> {
+ 1 //ctype + 1 //ctype
+ 4 //version + 4 //version
+ 4 //flag + 4 //flag
+ 8 //createtime + 8 //createTime
+ 2 + userid.length + 2 + userid.length
+ 2 + groupid.length + 2 + groupid.length
+ 2 + topic.length + 2 + topic.length
@@ -105,12 +105,12 @@ public class MessageRecordSerializer implements MessageCoder<MessageRecord> {
byte ctype = buffer.get(); byte ctype = buffer.get();
int version = buffer.getInt(); int version = buffer.getInt();
int flag = buffer.getInt(); int flag = buffer.getInt();
long createtime = buffer.getLong(); long createTime = buffer.getLong();
Serializable userid = MessageCoder.decodeUserid(buffer); Serializable userid = MessageCoder.decodeUserid(buffer);
String groupid = MessageCoder.getShortString(buffer); String groupid = MessageCoder.getShortString(buffer);
String topic = MessageCoder.getShortString(buffer); String topic = MessageCoder.getShortString(buffer);
String resptopic = MessageCoder.getShortString(buffer); String respTopic = MessageCoder.getShortString(buffer);
String traceid = MessageCoder.getShortString(buffer); String traceid = MessageCoder.getShortString(buffer);
byte[] content = null; byte[] content = null;
@@ -119,7 +119,7 @@ public class MessageRecordSerializer implements MessageCoder<MessageRecord> {
content = new byte[contentlen]; content = new byte[contentlen];
buffer.get(content); buffer.get(content);
} }
return new MessageRecord(seqid, ctype, version, flag, createtime, userid, groupid, topic, resptopic, traceid, content); return new MessageRecord(seqid, ctype, version, flag, createTime, userid, groupid, topic, respTopic, traceid, content);
} }
} }

View File

@@ -6,7 +6,6 @@
package org.redkale.mq; package org.redkale.mq;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;
import java.util.logging.*; import java.util.logging.*;
/** /**
@@ -19,39 +18,33 @@ import java.util.logging.*;
* *
* @since 2.1.0 * @since 2.1.0
*/ */
public class MessageRespFutureNode implements Runnable { public class MessageRespFuture implements Runnable {
protected final long seqid; protected final long seqid;
protected final long createTime; protected final long createTime;
protected final LongAdder counter;
protected final CompletableFuture<MessageRecord> future; protected final CompletableFuture<MessageRecord> future;
protected final Logger logger;
protected final MessageRecord message; protected final MessageRecord message;
protected final ConcurrentHashMap<Long, MessageRespFutureNode> respNodes; protected final MessageClient messageClient;
protected ScheduledFuture<?> scheduledFuture; protected ScheduledFuture<?> scheduledFuture;
public MessageRespFutureNode(Logger logger, MessageRecord message, ConcurrentHashMap<Long, MessageRespFutureNode> respNodes, LongAdder counter, CompletableFuture<MessageRecord> future) { public MessageRespFuture(MessageClient messageClient, CompletableFuture<MessageRecord> future, MessageRecord message) {
this.logger = logger; this.messageClient = messageClient;
this.message = message; this.message = message;
this.seqid = message.getSeqid(); this.seqid = message.getSeqid();
this.respNodes = respNodes;
this.counter = counter;
this.future = future; this.future = future;
this.createTime = System.currentTimeMillis(); this.createTime = System.currentTimeMillis();
} }
@Override //超时后被timeoutExecutor调用 @Override //超时后被timeoutExecutor调用
public void run() { //timeout public void run() { //timeout
respNodes.remove(this.seqid); messageClient.respQueue.remove(this.seqid);
future.completeExceptionally(new TimeoutException("message-record: "+message)); future.completeExceptionally(new TimeoutException("message-record: " + message));
logger.log(Level.WARNING, getClass().getSimpleName() + " wait msg: " + message + " timeout " + (System.currentTimeMillis() - createTime) + "ms" messageClient.logger.log(Level.WARNING, getClass().getSimpleName() + " wait msg: " + message + " timeout " + (System.currentTimeMillis() - createTime) + "ms"
+ (message.userid != null || (message.groupid != null && !message.groupid.isEmpty()) ? (message.userid != null ? (", userid:" + message.userid) : (", groupid:" + message.groupid)) : "")); + (message.userid != null || (message.groupid != null && !message.groupid.isEmpty()) ? (message.userid != null ? (", userid:" + message.userid) : (", groupid:" + message.groupid)) : ""));
} }
@@ -63,10 +56,6 @@ public class MessageRespFutureNode implements Runnable {
return createTime; return createTime;
} }
public LongAdder getCounter() {
return counter;
}
public CompletableFuture<MessageRecord> getFuture() { public CompletableFuture<MessageRecord> getFuture() {
return future; return future;
} }

View File

@@ -0,0 +1,54 @@
/*
*
*/
package org.redkale.mq;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* 响应结果
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.8.0
*/
public class MessageRespProcessor implements MessageProcessor {
private final MessageClient messageClient;
public MessageRespProcessor(MessageClient messageClient) {
this.messageClient = messageClient;
}
@Override
public void process(final MessageRecord msg, long time) {
long now = System.currentTimeMillis();
Logger logger = messageClient.logger;
final boolean finest = logger.isLoggable(Level.FINEST);
MessageRespFuture resp = messageClient.respQueue.remove(msg.getSeqid());
if (resp == null) {
logger.log(Level.WARNING, getClass().getSimpleName() + " process " + msg + " error not found MessageRespFuture");
return;
}
if (resp.scheduledFuture != null) {
resp.scheduledFuture.cancel(true);
}
final long cha = now - msg.createTime;
if (finest) {
logger.log(Level.FINEST, getClass().getSimpleName() + ".MessageRespFuture.receive (mq.delay = " + cha + "ms, mq.seqid = " + msg.getSeqid() + ")");
}
resp.future.complete(msg);
long cha2 = System.currentTimeMillis() - now;
if ((cha > 1000 || cha2 > 1000) && logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, getClass().getSimpleName() + ".MessageRespFuture.complete (mqs.delays = " + cha + "ms, mqs.completes = " + cha2 + "ms) mqresp.msg: " + msg);
} else if ((cha > 50 || cha2 > 50) && logger.isLoggable(Level.FINER)) {
logger.log(Level.FINER, getClass().getSimpleName() + ".MessageRespFuture.complete (mq.delays = " + cha + "ms, mq.completes = " + cha2 + "ms) mqresp.msg: " + msg);
} else if (finest) {
logger.log(Level.FINEST, getClass().getSimpleName() + ".MessageRespFuture.complete (mq.delay = " + cha + "ms, mq.complete = " + cha2 + "ms) mqresp.msg: " + msg);
}
}
}

View File

@@ -0,0 +1,102 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
import java.util.concurrent.CompletionException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.redkale.boot.NodeServer;
import org.redkale.net.Context;
import org.redkale.net.Request;
import org.redkale.net.Response;
import org.redkale.net.Servlet;
import org.redkale.service.Service;
import org.redkale.util.Traces;
/**
* 一个Service对应一个MessageProcessor
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
public abstract class MessageServlet implements MessageProcessor {
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
protected final MessageClient messageClient;
protected final NodeServer server;
protected final Service service;
protected final Servlet servlet;
protected final String topic;
public MessageServlet(MessageClient messageClient, NodeServer server, Service service, Servlet servlet, String topic) {
this.messageClient = messageClient;
this.server = server;
this.service = service;
this.servlet = servlet;
this.topic = topic;
}
@Override
public void process(final MessageRecord message, long time) {
Response response = null;
try {
Traces.computeIfAbsent(message.getTraceid());
long now = System.currentTimeMillis();
long cha = now - message.createTime;
long e = now - time;
Context context = server.getServer().getContext();
Request request = createRequest(context, message);
response = createResponse(context, request);
//执行逻辑
context.execute(servlet, request, response);
long o = System.currentTimeMillis() - now;
if ((cha > 1000 || e > 100 || o > 1000) && logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, getClass().getSimpleName() + ".process (mqs.delays = " + cha + " ms, mqs.blocks = " + e + " ms, mqs.executes = " + o + " ms) message: " + message);
} else if ((cha > 50 || e > 10 || o > 50) && logger.isLoggable(Level.FINER)) {
logger.log(Level.FINER, getClass().getSimpleName() + ".process (mq.delays = " + cha + " ms, mq.blocks = " + e + " ms, mq.executes = " + o + " ms) message: " + message);
} else if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, getClass().getSimpleName() + ".process (mq.delay = " + cha + " ms, mq.block = " + e + " ms, mq.execute = " + o + " ms) message: " + message);
}
} catch (Throwable ex) {
if (response != null) {
onError(response, message, ex);
}
logger.log(Level.SEVERE, getClass().getSimpleName() + " process error, message=" + message, ex instanceof CompletionException ? ((CompletionException) ex).getCause() : ex);
}
}
protected abstract Request createRequest(Context context, MessageRecord message);
protected abstract Response createResponse(Context context, Request request);
protected abstract void onError(Response response, MessageRecord message, Throwable t);
public NodeServer getServer() {
return server;
}
public Service getService() {
return service;
}
public Servlet getServlet() {
return servlet;
}
public String getTopic() {
return topic;
}
}

View File

@@ -1,52 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
import java.util.concurrent.CompletableFuture;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
public class SncpMessageClient extends MessageClient {
protected SncpMessageClient(MessageAgent messageAgent) {
super(messageAgent);
this.appRespTopic = messageAgent.generateAppSncpRespTopic();
}
@Override
protected MessageClientProducer getProducer() {
return messageAgent.getSncpMessageClientProducer();
}
public String getAppRespTopic() {
return this.appRespTopic;
}
//只发送消息,不需要响应
public final void produceMessage(MessageRecord message) {
sendMessage(message, false);
}
//发送消息,需要响应
public final CompletableFuture<MessageRecord> sendMessage(MessageRecord message) {
return sendMessage(message, true);
}
@Override
protected MessageRecord formatRespMessage(MessageRecord message) {
if (message != null) {
message.ctype = MessageRecord.CTYPE_BSON_RESULT;
}
return message;
}
}

View File

@@ -1,124 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
import java.util.concurrent.*;
import java.util.logging.*;
import org.redkale.boot.NodeSncpServer;
import org.redkale.net.sncp.*;
import org.redkale.service.Service;
import org.redkale.util.Traces;
/**
* 一个Service对应一个MessageProcessor
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
public class SncpMessageClientProcessor implements MessageClientProcessor {
protected final Logger logger;
protected MessageClient messageClient;
protected final MessageClientProducer producer;
protected final NodeSncpServer server;
protected final Service service;
protected final SncpServlet servlet;
protected CountDownLatch cdl;
protected long starttime;
protected final Runnable innerCallback = () -> {
if (cdl != null) {
cdl.countDown();
}
};
public SncpMessageClientProcessor(Logger logger, SncpMessageClient messageClient, MessageClientProducer producer, NodeSncpServer server, Service service, SncpServlet servlet) {
this.logger = logger;
this.messageClient = messageClient;
this.producer = producer;
this.server = server;
this.service = service;
this.servlet = servlet;
}
@Override
public void begin(final int size, long starttime) {
this.starttime = starttime;
this.cdl = size > 1 ? new CountDownLatch(size) : null;
}
@Override
public void process(final MessageRecord message, final Runnable callback) {
execute(message, innerCallback);
}
private void execute(final MessageRecord message, final Runnable callback) {
SncpMessageResponse response = null;
try {
Traces.computeIfAbsent(message.getTraceid());
long now = System.currentTimeMillis();
long cha = now - message.createTime;
long e = now - starttime;
SncpContext context = server.getSncpServer().getContext();
SncpMessageRequest request = new SncpMessageRequest(context, message);
response = new SncpMessageResponse(context, request, callback, messageClient, producer);
context.execute(servlet, request, response);
long o = System.currentTimeMillis() - now;
if ((cha > 1000 || e > 100 || o > 1000) && logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "SncpMessageProcessor.process (mqs.delays = " + cha + " ms, mqs.blocks = " + e + " ms, mqs.executes = " + o + " ms) message: " + message);
} else if ((cha > 50 || e > 10 || o > 50) && logger.isLoggable(Level.FINER)) {
logger.log(Level.FINER, "SncpMessageProcessor.process (mq.delays = " + cha + " ms, mq.blocks = " + e + " ms, mq.executes = " + o + " ms) message: " + message);
} else if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "SncpMessageProcessor.process (mq.delay = " + cha + " ms, mq.block = " + e + " ms, mq.execute = " + o + " ms) message: " + message);
}
} catch (Throwable ex) {
if (response != null) {
response.finish(SncpResponse.RETCODE_ILLSERVICEID, null);
}
logger.log(Level.SEVERE, SncpMessageClientProcessor.class.getSimpleName() + " process error, message=" + message, ex instanceof CompletionException ? ((CompletionException) ex).getCause() : ex);
}
}
@Override
public void commit() {
if (this.cdl != null) {
try {
this.cdl.await(30, TimeUnit.SECONDS);
} catch (Exception ex) {
}
this.cdl = null;
}
}
public MessageClientProducer getProducer() {
return producer;
}
public NodeSncpServer getServer() {
return server;
}
public Service getService() {
return service;
}
public SncpServlet getServlet() {
return servlet;
}
}

View File

@@ -24,32 +24,23 @@ public class SncpMessageResponse extends SncpResponse {
protected MessageRecord message; protected MessageRecord message;
protected MessageClientProducer producer; public SncpMessageResponse(MessageClient messageClient, SncpContext context, SncpMessageRequest request) {
protected Runnable callback;
public SncpMessageResponse(SncpContext context, SncpMessageRequest request, Runnable callback, MessageClient messageClient, MessageClientProducer producer) {
super(context, request); super(context, request);
this.message = request.message;
this.callback = callback;
this.messageClient = messageClient; this.messageClient = messageClient;
this.producer = producer; this.message = request.message;
} }
@Override @Override
public void finish(final int retcode, final BsonWriter out) { public void finish(final int retcode, final BsonWriter out) {
if (callback != null) {
callback.run();
}
int headerSize = SncpHeader.calcHeaderSize(request); int headerSize = SncpHeader.calcHeaderSize(request);
if (out == null) { if (out == null) {
final ByteArray result = new ByteArray(headerSize).putPlaceholder(headerSize); final ByteArray result = new ByteArray(headerSize).putPlaceholder(headerSize);
writeHeader(result, 0, retcode); writeHeader(result, 0, retcode);
producer.apply(messageClient.createMessageRecord(message.getSeqid(), message.getRespTopic(), null, (byte[]) null)); messageClient.getProducer().apply(messageClient.createMessageRecord(message.getSeqid(), MessageRecord.CTYPE_BSON, message.getRespTopic(), null, (byte[]) null));
return; return;
} }
final ByteArray result = out.toByteArray(); final ByteArray result = out.toByteArray();
writeHeader(result, result.length() - headerSize, retcode); writeHeader(result, result.length() - headerSize, retcode);
producer.apply(messageClient.createMessageRecord(message.getSeqid(), message.getRespTopic(), null, result.getBytes())); messageClient.getProducer().apply(messageClient.createMessageRecord(message.getSeqid(), MessageRecord.CTYPE_BSON, message.getRespTopic(), null, result.getBytes()));
} }
} }

View File

@@ -0,0 +1,49 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.mq;
import org.redkale.boot.NodeSncpServer;
import org.redkale.net.Context;
import org.redkale.net.Request;
import org.redkale.net.Response;
import org.redkale.net.sncp.*;
import org.redkale.service.Service;
/**
* 一个Service对应一个MessageProcessor
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*
* @since 2.1.0
*/
public class SncpMessageServlet extends MessageServlet {
public SncpMessageServlet(MessageClient messageClient, NodeSncpServer server,
Service service, SncpServlet servlet, String topic) {
super(messageClient, server, service, servlet, topic);
}
@Override
protected Request createRequest(Context context, MessageRecord message) {
return new SncpMessageRequest((SncpContext) context, message);
}
@Override
protected Response createResponse(Context context, Request request) {
return new SncpMessageResponse(messageClient, (SncpContext) context, (SncpMessageRequest) request);
}
@Override
protected void onError(Response response, MessageRecord message, Throwable t) {
if (response != null) {
((SncpMessageResponse) response).finish(SncpResponse.RETCODE_ILLSERVICEID, null);
}
}
}

View File

@@ -27,6 +27,9 @@ public class Context {
//服务启动时间 //服务启动时间
protected final long serverStartTime; protected final long serverStartTime;
//Application节点id
protected final int nodeid;
//Server的线程池 //Server的线程池
protected final ExecutorService workExecutor; protected final ExecutorService workExecutor;
@@ -76,15 +79,16 @@ public class Context {
protected Charset charset; protected Charset charset;
public Context(ContextConfig config) { public Context(ContextConfig config) {
this(config.serverStartTime, config.logger, config.workExecutor, config.sslBuilder, config.sslContext, this(config.serverStartTime, config.nodeid, config.logger, config.workExecutor, config.sslBuilder, config.sslContext,
config.bufferCapacity, config.maxConns, config.maxBody, config.charset, config.serverAddress, config.resourceFactory, config.bufferCapacity, config.maxConns, config.maxBody, config.charset, config.serverAddress, config.resourceFactory,
config.dispatcher, config.aliveTimeoutSeconds, config.readTimeoutSeconds, config.writeTimeoutSeconds); config.dispatcher, config.aliveTimeoutSeconds, config.readTimeoutSeconds, config.writeTimeoutSeconds);
} }
public Context(long serverStartTime, Logger logger, ExecutorService workExecutor, SSLBuilder sslBuilder, SSLContext sslContext, public Context(long serverStartTime, int nodeid, Logger logger, ExecutorService workExecutor, SSLBuilder sslBuilder, SSLContext sslContext,
int bufferCapacity, final int maxConns, final int maxBody, Charset charset, InetSocketAddress address, int bufferCapacity, final int maxConns, final int maxBody, Charset charset, InetSocketAddress address,
ResourceFactory resourceFactory, DispatcherServlet dispatcher, int aliveTimeoutSeconds, int readTimeoutSeconds, int writeTimeoutSeconds) { ResourceFactory resourceFactory, DispatcherServlet dispatcher, int aliveTimeoutSeconds, int readTimeoutSeconds, int writeTimeoutSeconds) {
this.serverStartTime = serverStartTime; this.serverStartTime = serverStartTime;
this.nodeid = nodeid;
this.logger = logger; this.logger = logger;
this.workExecutor = workExecutor; this.workExecutor = workExecutor;
this.sslBuilder = sslBuilder; this.sslBuilder = sslBuilder;
@@ -172,6 +176,10 @@ public class Context {
return serverStartTime; return serverStartTime;
} }
public int getNodeid() {
return nodeid;
}
public Charset getCharset() { public Charset getCharset() {
return charset; return charset;
} }
@@ -209,6 +217,9 @@ public class Context {
//服务启动时间 //服务启动时间
public long serverStartTime; public long serverStartTime;
//Application节点id
public int nodeid;
//Server的线程池 //Server的线程池
public ExecutorService workExecutor; public ExecutorService workExecutor;

View File

@@ -427,6 +427,7 @@ public abstract class Server<K extends Serializable, C extends Context, R extend
protected void initContextConfig(Context.ContextConfig contextConfig) { protected void initContextConfig(Context.ContextConfig contextConfig) {
if (application != null) { if (application != null) {
contextConfig.nodeid = application.getNodeid();
contextConfig.workExecutor = application.getWorkExecutor(); contextConfig.workExecutor = application.getWorkExecutor();
} }
contextConfig.serverStartTime = this.serverStartTime; contextConfig.serverStartTime = this.serverStartTime;

View File

@@ -321,7 +321,7 @@ public class HttpServer extends Server<String, HttpContext, HttpRequest, HttpRes
if (servlet == null) { if (servlet == null) {
servlet = Rest.createRestServlet(classLoader, userType, baseServletType, serviceType, resname); servlet = Rest.createRestServlet(classLoader, userType, baseServletType, serviceType, resname);
if (servlet != null) { if (servlet != null) {
servlet._reqtopic = MessageAgent.generateHttpReqTopic(Rest.getRestModule(service)); servlet._reqtopic = Rest.generateHttpReqTopic(Rest.getRestModule(service), application.getNodeid());
} }
} }
if (servlet == null) { if (servlet == null) {

View File

@@ -325,6 +325,30 @@ public final class Rest {
return serviceType.getSimpleName().replaceAll("Service.*$", "").toLowerCase(); return serviceType.getSimpleName().replaceAll("Service.*$", "").toLowerCase();
} }
//格式: http.req.module.user
public static String generateHttpReqTopic(String module, int nodeid) {
return getHttpReqTopicPrefix() + "module." + module.toLowerCase();
}
//格式: http.req.module.user
public static String generateHttpReqTopic(String module, String resname, int nodeid) {
return getHttpReqTopicPrefix() + "module." + module.toLowerCase() + (resname == null || resname.isEmpty() ? "" : ("-" + resname));
}
public static String generateHttpReqTopic(Service service, int nodeid) {
String resname = Sncp.getResourceName(service);
String module = getRestModule(service).toLowerCase();
return getHttpReqTopicPrefix() + "module." + module + (resname.isEmpty() ? "" : ("-" + resname));
}
public static String getHttpReqTopicPrefix() {
return "http.req.";
}
public static String getHttpRespTopicPrefix() {
return "http.resp.";
}
//仅供Rest动态构建里 currentUserid() 使用 //仅供Rest动态构建里 currentUserid() 使用
@AsmDepends @AsmDepends
public static <T> T orElse(T t, T defValue) { public static <T> T orElse(T t, T defValue) {

View File

@@ -14,10 +14,12 @@ import java.util.stream.*;
import org.redkale.annotation.*; import org.redkale.annotation.*;
import org.redkale.annotation.Comment; import org.redkale.annotation.Comment;
import org.redkale.boot.Application; import org.redkale.boot.Application;
import static org.redkale.boot.Application.RESNAME_APP_NODEID;
import org.redkale.convert.*; import org.redkale.convert.*;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import org.redkale.mq.MessageAgent; import org.redkale.mq.MessageAgent;
import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY; import static org.redkale.net.http.WebSocket.RETCODE_GROUP_EMPTY;
import org.redkale.net.sncp.Sncp;
import org.redkale.service.*; import org.redkale.service.*;
import org.redkale.source.CacheSource; import org.redkale.source.CacheSource;
import org.redkale.util.*; import org.redkale.util.*;
@@ -40,6 +42,9 @@ public abstract class WebSocketNode implements Service {
protected final Logger logger = Logger.getLogger(WebSocketNode.class.getSimpleName()); protected final Logger logger = Logger.getLogger(WebSocketNode.class.getSimpleName());
@Resource(name = RESNAME_APP_NODEID)
protected int nodeid;
//"SNCP_ADDR" 如果不是分布式(没有SNCP) 值为null //"SNCP_ADDR" 如果不是分布式(没有SNCP) 值为null
@Resource(name = Application.RESNAME_SNCP_ADDRESS, required = false) @Resource(name = Application.RESNAME_SNCP_ADDRESS, required = false)
protected InetSocketAddress localSncpAddress; //为SncpServer的服务address protected InetSocketAddress localSncpAddress; //为SncpServer的服务address
@@ -83,7 +88,7 @@ public abstract class WebSocketNode implements Service {
this.semaphore = new Semaphore(wsthreads); this.semaphore = new Semaphore(wsthreads);
} }
} }
String mqtopic = this.messageAgent == null ? null : this.messageAgent.generateSncpReqTopic((Service) this); String mqtopic = this.messageAgent == null ? null : Sncp.generateSncpReqTopic((Service) this, nodeid);
if (mqtopic != null || this.localSncpAddress != null) { if (mqtopic != null || this.localSncpAddress != null) {
this.wsNodeAddress = new WebSocketAddress(mqtopic, localSncpAddress); this.wsNodeAddress = new WebSocketAddress(mqtopic, localSncpAddress);
} }
@@ -717,7 +722,7 @@ public abstract class WebSocketNode implements Service {
return ((CompletableFuture) message).thenApply(msg -> sendOneUserMessage(msg, last, userid)); return ((CompletableFuture) message).thenApply(msg -> sendOneUserMessage(msg, last, userid));
} }
if (logger.isLoggable(Level.FINEST)) { if (logger.isLoggable(Level.FINEST)) {
logger.finest("websocket want send message {userid:" + userid + ", content:" + (message instanceof WebSocketPacket ? ((WebSocketPacket) message).toSimpleString(): (message instanceof CharSequence ? message : JsonConvert.root().convertTo(message))) + "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine"); logger.finest("websocket want send message {userid:" + userid + ", content:" + (message instanceof WebSocketPacket ? ((WebSocketPacket) message).toSimpleString() : (message instanceof CharSequence ? message : JsonConvert.root().convertTo(message))) + "} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine");
} }
CompletableFuture<Integer> localFuture = null; CompletableFuture<Integer> localFuture = null;
if (this.localEngine != null) { if (this.localEngine != null) {

View File

@@ -20,6 +20,7 @@ import org.redkale.asm.Type;
import org.redkale.convert.Convert; import org.redkale.convert.Convert;
import org.redkale.convert.bson.BsonConvert; import org.redkale.convert.bson.BsonConvert;
import org.redkale.mq.MessageAgent; import org.redkale.mq.MessageAgent;
import org.redkale.net.http.WebSocketNode;
import org.redkale.net.sncp.SncpRemoteInfo.SncpRemoteAction; import org.redkale.net.sncp.SncpRemoteInfo.SncpRemoteAction;
import org.redkale.service.*; import org.redkale.service.*;
import org.redkale.util.AnyValue; import org.redkale.util.AnyValue;
@@ -254,6 +255,27 @@ public abstract class Sncp {
return dyn != null ? dyn.type() : serviceImplClass; return dyn != null ? dyn.type() : serviceImplClass;
} }
//格式: sncp.req.module.user
public static String generateSncpReqTopic(Service service, int nodeid) {
return generateSncpReqTopic(Sncp.getResourceName(service), Sncp.getResourceType(service), nodeid);
}
//格式: sncp.req.module.user
public static String generateSncpReqTopic(String resourceName, Class resourceType, int nodeid) {
if (WebSocketNode.class.isAssignableFrom(resourceType)) {
return getSncpReqTopicPrefix() + "module.wsnode" + nodeid + (resourceName.isEmpty() ? "" : ("-" + resourceName));
}
return getSncpReqTopicPrefix() + "module." + resourceType.getSimpleName().replaceAll("Service.*$", "").toLowerCase() + (resourceName.isEmpty() ? "" : ("-" + resourceName));
}
public static String getSncpReqTopicPrefix() {
return "sncp.req.";
}
public static String getSncpRespTopicPrefix() {
return "sncp.resp.";
}
public static AnyValue getResourceConf(Service service) { public static AnyValue getResourceConf(Service service) {
if (service == null || !isSncpDyn(service)) { if (service == null || !isSncpDyn(service)) {
return null; return null;

View File

@@ -23,11 +23,14 @@ public class SncpClient extends Client<SncpClientConnection, SncpClientRequest,
private final AtomicLong seqno = new AtomicLong(); private final AtomicLong seqno = new AtomicLong();
final int nodeid;
final InetSocketAddress clientSncpAddress; final InetSocketAddress clientSncpAddress;
public SncpClient(String name, AsyncGroup group, InetSocketAddress clientSncpAddress, ClientAddress address, String netprotocol, int maxConns, int maxPipelines) { public SncpClient(String name, AsyncGroup group, int nodeid, InetSocketAddress clientSncpAddress, ClientAddress address, String netprotocol, int maxConns, int maxPipelines) {
super(name, group, "TCP".equalsIgnoreCase(netprotocol), address, maxConns, maxPipelines, null, null, null); //maxConns super(name, group, "TCP".equalsIgnoreCase(netprotocol), address, maxConns, maxPipelines, null, null, null); //maxConns
this.clientSncpAddress = clientSncpAddress; this.clientSncpAddress = clientSncpAddress;
this.nodeid = nodeid;
this.readTimeoutSeconds = 15; this.readTimeoutSeconds = 15;
this.writeTimeoutSeconds = 15; this.writeTimeoutSeconds = 15;
} }
@@ -41,6 +44,10 @@ public class SncpClient extends Client<SncpClientConnection, SncpClientRequest,
return clientSncpAddress; return clientSncpAddress;
} }
public int getNodeid() {
return nodeid;
}
protected long nextSeqno() { protected long nextSeqno() {
//System.nanoTime()值并发下会出现重复windows11 jdk17出现过 //System.nanoTime()值并发下会出现重复windows11 jdk17出现过
return seqno.incrementAndGet(); return seqno.incrementAndGet();

View File

@@ -69,7 +69,7 @@ public class SncpRemoteInfo<T extends Service> {
protected final MessageAgent messageAgent; protected final MessageAgent messageAgent;
//MQ模式下此字段才有值 //MQ模式下此字段才有值
protected final SncpMessageClient messageClient; protected final MessageClient messageClient;
SncpRemoteInfo(String resourceName, Class<T> resourceType, Class<T> serviceImplClass, Convert convert, SncpRemoteInfo(String resourceName, Class<T> resourceType, Class<T> serviceImplClass, Convert convert,
SncpRpcGroups sncpRpcGroups, SncpClient sncpClient, MessageAgent messageAgent, String remoteGroup) { SncpRpcGroups sncpRpcGroups, SncpClient sncpClient, MessageAgent messageAgent, String remoteGroup) {
@@ -85,7 +85,7 @@ public class SncpRemoteInfo<T extends Service> {
this.messageAgent = messageAgent; this.messageAgent = messageAgent;
this.remoteGroup = remoteGroup; this.remoteGroup = remoteGroup;
this.messageClient = messageAgent == null ? null : messageAgent.getSncpMessageClient(); this.messageClient = messageAgent == null ? null : messageAgent.getSncpMessageClient();
this.topic = messageAgent == null ? null : messageAgent.generateSncpReqTopic(resourceName, resourceType); this.topic = messageAgent == null ? null : Sncp.generateSncpReqTopic(resourceName, resourceType, messageAgent.getNodeid());
for (Map.Entry<Uint128, Method> en : loadMethodActions(Sncp.getServiceType(serviceImplClass)).entrySet()) { for (Map.Entry<Uint128, Method> en : loadMethodActions(Sncp.getServiceType(serviceImplClass)).entrySet()) {
this.actions.put(en.getKey().toString(), new SncpRemoteAction(serviceImplClass, resourceType, en.getValue(), serviceid, en.getKey(), sncpClient)); this.actions.put(en.getKey().toString(), new SncpRemoteAction(serviceImplClass, resourceType, en.getValue(), serviceid, en.getKey(), sncpClient));
@@ -163,7 +163,7 @@ public class SncpRemoteInfo<T extends Service> {
} }
ByteArray array = new ByteArray(); ByteArray array = new ByteArray();
request.writeTo(null, array); request.writeTo(null, array);
MessageRecord message = messageClient.createMessageRecord(targetTopic, null, array.getBytes()); MessageRecord message = messageAgent.getSncpMessageClient().createMessageRecord(targetTopic, null, array.getBytes());
final String tt = targetTopic; final String tt = targetTopic;
message.localActionName(action.actionName()); message.localActionName(action.actionName());
message.localParams(params); message.localParams(params);

View File

@@ -22,8 +22,8 @@ import org.redkale.net.client.ClientAddress;
import org.redkale.net.http.*; import org.redkale.net.http.*;
import org.redkale.net.sncp.*; import org.redkale.net.sncp.*;
import org.redkale.service.Service; import org.redkale.service.Service;
import org.redkale.util.AnyValue.DefaultAnyValue;
import org.redkale.util.*; import org.redkale.util.*;
import org.redkale.util.AnyValue.DefaultAnyValue;
/** /**
* *
@@ -43,7 +43,7 @@ public class ABMainService implements Service {
final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16); final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16);
asyncGroup.start(); asyncGroup.start();
InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", abport); InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", abport);
final SncpClient client = new SncpClient("", asyncGroup, sncpAddress, new ClientAddress(sncpAddress), "TCP", 16, 100); final SncpClient client = new SncpClient("", asyncGroup, 0, sncpAddress, new ClientAddress(sncpAddress), "TCP", 16, 100);
final ResourceFactory resFactory = ResourceFactory.create(); final ResourceFactory resFactory = ResourceFactory.create();
resFactory.register(JsonConvert.root()); resFactory.register(JsonConvert.root());
resFactory.register(BsonConvert.root()); resFactory.register(BsonConvert.root());

View File

@@ -32,7 +32,7 @@ public class SncpClientCodecTest {
InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", 3389); InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", 3389);
InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 3344); InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 3344);
final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16); final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16);
SncpClient client = new SncpClient("test", asyncGroup, sncpAddress, new ClientAddress(remoteAddress), "TCP", Utility.cpus(), 16); SncpClient client = new SncpClient("test", asyncGroup, 0, sncpAddress, new ClientAddress(remoteAddress), "TCP", Utility.cpus(), 16);
SncpClientConnection conn = client.createClientConnection(1, asyncGroup.newTCPClientConnection()); SncpClientConnection conn = client.createClientConnection(1, asyncGroup.newTCPClientConnection());
SncpClientCodec codec = new SncpClientCodec(conn); SncpClientCodec codec = new SncpClientCodec(conn);
List respResults = new ArrayList(); List respResults = new ArrayList();

View File

@@ -32,7 +32,7 @@ public class SncpRequestParseTest {
InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", 3389); InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", 3389);
InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 3344); InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 3344);
final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16); final AsyncIOGroup asyncGroup = new AsyncIOGroup(8192, 16);
SncpClient client = new SncpClient("test", asyncGroup, sncpAddress, new ClientAddress(remoteAddress), "TCP", Utility.cpus(), 16); SncpClient client = new SncpClient("test", asyncGroup, 0, sncpAddress, new ClientAddress(remoteAddress), "TCP", Utility.cpus(), 16);
SncpClientConnection conn = client.createClientConnection(1, asyncGroup.newTCPClientConnection()); SncpClientConnection conn = client.createClientConnection(1, asyncGroup.newTCPClientConnection());
SncpContext.SncpContextConfig config = new SncpContext.SncpContextConfig(); SncpContext.SncpContextConfig config = new SncpContext.SncpContextConfig();

View File

@@ -46,7 +46,7 @@ public class SncpSleepTest {
int port = server.getSocketAddress().getPort(); int port = server.getSocketAddress().getPort();
System.out.println("SNCP服务器启动端口: " + port); System.out.println("SNCP服务器启动端口: " + port);
InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", port); InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", port);
final SncpClient client = new SncpClient("", asyncGroup, sncpAddress, new ClientAddress(sncpAddress), "TCP", 16, 100); final SncpClient client = new SncpClient("", asyncGroup, 0, sncpAddress, new ClientAddress(sncpAddress), "TCP", 16, 100);
final SncpRpcGroups rpcGroups = application.getSncpRpcGroups(); final SncpRpcGroups rpcGroups = application.getSncpRpcGroups();
rpcGroups.computeIfAbsent("cs", "TCP").putAddress(sncpAddress); rpcGroups.computeIfAbsent("cs", "TCP").putAddress(sncpAddress);
SncpSleepService remoteCService = Sncp.createSimpleRemoteService(SncpSleepService.class, resFactory, rpcGroups, client, "cs"); SncpSleepService remoteCService = Sncp.createSimpleRemoteService(SncpSleepService.class, resFactory, rpcGroups, client, "cs");

View File

@@ -80,7 +80,7 @@ public class SncpTest {
asyncGroup.start(); asyncGroup.start();
InetSocketAddress sncpAddress = addr; InetSocketAddress sncpAddress = addr;
final SncpClient client = new SncpClient("", asyncGroup, sncpAddress, new ClientAddress(sncpAddress), protocol.endsWith(".UDP") ? "UDP" : "TCP", 16, 100); final SncpClient client = new SncpClient("", asyncGroup, 0, sncpAddress, new ClientAddress(sncpAddress), protocol.endsWith(".UDP") ? "UDP" : "TCP", 16, 100);
final SncpTestIService service = Sncp.createSimpleRemoteService(SncpTestIService.class, factory, rpcGroups, client, "client");//Sncp.createSimpleRemoteService(SncpTestIService.class, null, transFactory, addr, "client"); final SncpTestIService service = Sncp.createSimpleRemoteService(SncpTestIService.class, factory, rpcGroups, client, "client");//Sncp.createSimpleRemoteService(SncpTestIService.class, null, transFactory, addr, "client");
factory.inject(service); factory.inject(service);

View File

@@ -93,7 +93,7 @@ public class SncpTestServiceImpl implements SncpTestIService {
final SncpRpcGroups rpcGroups = application.getSncpRpcGroups(); final SncpRpcGroups rpcGroups = application.getSncpRpcGroups();
InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", 7070); InetSocketAddress sncpAddress = new InetSocketAddress("127.0.0.1", 7070);
rpcGroups.computeIfAbsent("g70", "TCP").putAddress(sncpAddress); rpcGroups.computeIfAbsent("g70", "TCP").putAddress(sncpAddress);
final SncpClient client = new SncpClient("", asyncGroup, sncpAddress, new ClientAddress(sncpAddress), "TCP", 16, 100); final SncpClient client = new SncpClient("", asyncGroup, 0, sncpAddress, new ClientAddress(sncpAddress), "TCP", 16, 100);
Service service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, factory); Service service = Sncp.createSimpleLocalService(SncpTestServiceImpl.class, factory);
for (Method method : service.getClass().getDeclaredMethods()) { for (Method method : service.getClass().getDeclaredMethods()) {