修改MessageRecord的seqid生成规则
This commit is contained in:
@@ -9,7 +9,7 @@ import java.lang.reflect.Type;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.logging.Logger;
|
||||
import java.util.logging.*;
|
||||
import org.redkale.convert.json.JsonConvert;
|
||||
import org.redkale.net.http.*;
|
||||
|
||||
@@ -165,19 +165,20 @@ public class HttpMessageClient extends MessageClient {
|
||||
}
|
||||
|
||||
public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
|
||||
MessageRecord message = new MessageRecord(topic, null, HttpSimpleRequestCoder.getInstance().encode(request));
|
||||
MessageRecord message = createMessageRecord(topic, null, HttpSimpleRequestCoder.getInstance().encode(request));
|
||||
message.userid(userid).groupid(groupid);
|
||||
//if (finest) logger.log(Level.FINEST, "HttpMessageClient.sendMessage: " + message);
|
||||
return sendMessage(message, true, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance()));
|
||||
}
|
||||
|
||||
public void broadcastMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
|
||||
MessageRecord message = new MessageRecord(topic, null, HttpSimpleRequestCoder.getInstance().encode(request));
|
||||
MessageRecord message = createMessageRecord(topic, null, HttpSimpleRequestCoder.getInstance().encode(request));
|
||||
message.userid(userid).groupid(groupid);
|
||||
sendMessage(message, false, counter);
|
||||
}
|
||||
|
||||
public void produceMessage(String topic, int userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
|
||||
MessageRecord message = new MessageRecord(topic, null, HttpSimpleRequestCoder.getInstance().encode(request));
|
||||
MessageRecord message = createMessageRecord(topic, null, HttpSimpleRequestCoder.getInstance().encode(request));
|
||||
message.userid(userid).groupid(groupid);
|
||||
sendMessage(message, false, counter);
|
||||
}
|
||||
|
||||
@@ -31,6 +31,8 @@ public class HttpMessageProcessor implements MessageProcessor {
|
||||
|
||||
protected final Logger logger;
|
||||
|
||||
protected MessageClient messageClient;
|
||||
|
||||
protected final MessageProducers producer;
|
||||
|
||||
protected final NodeHttpServer server;
|
||||
@@ -55,11 +57,12 @@ public class HttpMessageProcessor implements MessageProcessor {
|
||||
if (cdl != null) cdl.countDown();
|
||||
};
|
||||
|
||||
public HttpMessageProcessor(Logger logger, ThreadHashExecutor workExecutor, MessageProducers producer, NodeHttpServer server, Service service, HttpServlet servlet) {
|
||||
public HttpMessageProcessor(Logger logger, ThreadHashExecutor workExecutor, MessageClient messageClient, MessageProducers producer, NodeHttpServer server, Service service, HttpServlet servlet) {
|
||||
this.logger = logger;
|
||||
this.finest = logger.isLoggable(Level.FINEST);
|
||||
this.finer = logger.isLoggable(Level.FINER);
|
||||
this.fine = logger.isLoggable(Level.FINE);
|
||||
this.messageClient = messageClient;
|
||||
this.producer = producer;
|
||||
this.server = server;
|
||||
this.service = service;
|
||||
@@ -98,7 +101,7 @@ public class HttpMessageProcessor implements MessageProcessor {
|
||||
if (multiconsumer) {
|
||||
request.setRequestURI(request.getRequestURI().replaceFirst(this.multimodule, this.restmodule));
|
||||
}
|
||||
HttpMessageResponse response = new HttpMessageResponse(context, request, callback, null, null, producer.getProducer(message));
|
||||
HttpMessageResponse response = new HttpMessageResponse(context, request, callback, null, null, messageClient, producer.getProducer(message));
|
||||
servlet.execute(request, response);
|
||||
long o = System.currentTimeMillis() - now;
|
||||
if ((cha > 1000 || e > 100 || o > 1000) && fine) {
|
||||
@@ -111,7 +114,7 @@ public class HttpMessageProcessor implements MessageProcessor {
|
||||
} catch (Throwable ex) {
|
||||
if (message.getResptopic() != null && !message.getResptopic().isEmpty()) {
|
||||
HttpMessageResponse.finishHttpResult(finest, request == null ? null : request.getRespConvert(),
|
||||
message, callback, producer.getProducer(message), message.getResptopic(), new HttpResult().status(500));
|
||||
message, callback, messageClient, producer.getProducer(message), message.getResptopic(), new HttpResult().status(500));
|
||||
}
|
||||
logger.log(Level.SEVERE, HttpMessageProcessor.class.getSimpleName() + " process error, message=" + message, ex instanceof CompletionException ? ((CompletionException) ex).getCause() : ex);
|
||||
}
|
||||
|
||||
@@ -26,6 +26,8 @@ import org.redkale.util.ObjectPool;
|
||||
*/
|
||||
public class HttpMessageResponse extends HttpResponse {
|
||||
|
||||
protected MessageClient messageClient;
|
||||
|
||||
protected MessageRecord message;
|
||||
|
||||
protected MessageProducer producer;
|
||||
@@ -35,26 +37,28 @@ public class HttpMessageResponse extends HttpResponse {
|
||||
protected Runnable callback;
|
||||
|
||||
public HttpMessageResponse(HttpContext context, HttpMessageRequest request, Runnable callback,
|
||||
ObjectPool<Response> responsePool, HttpResponseConfig config, MessageProducer producer) {
|
||||
ObjectPool<Response> responsePool, HttpResponseConfig config, MessageClient messageClient, MessageProducer producer) {
|
||||
super(context, request, responsePool, config);
|
||||
this.message = request.message;
|
||||
this.callback = callback;
|
||||
this.messageClient = messageClient;
|
||||
this.producer = producer;
|
||||
this.finest = producer.logger.isLoggable(Level.FINEST);
|
||||
}
|
||||
|
||||
public HttpMessageResponse(HttpContext context, MessageRecord message, Runnable callback, HttpResponseConfig config, MessageProducer producer) {
|
||||
public HttpMessageResponse(HttpContext context, MessageRecord message, Runnable callback, HttpResponseConfig config, MessageClient messageClient, MessageProducer producer) {
|
||||
super(context, new HttpMessageRequest(context, message), null, config);
|
||||
this.message = message;
|
||||
this.callback = callback;
|
||||
this.messageClient = messageClient;
|
||||
this.producer = producer;
|
||||
}
|
||||
|
||||
public void finishHttpResult(HttpResult result) {
|
||||
finishHttpResult(this.finest, ((HttpMessageRequest) this.request).getRespConvert(), this.message, this.callback, this.producer, message.getResptopic(), result);
|
||||
finishHttpResult(this.finest, ((HttpMessageRequest) this.request).getRespConvert(), this.message, this.callback, this.messageClient, this.producer, message.getResptopic(), result);
|
||||
}
|
||||
|
||||
public static void finishHttpResult(boolean finest, Convert respConvert, MessageRecord msg, Runnable callback, MessageProducer producer, String resptopic, HttpResult result) {
|
||||
public static void finishHttpResult(boolean finest, Convert respConvert, MessageRecord msg, Runnable callback, MessageClient messageClient, MessageProducer producer, String resptopic, HttpResult result) {
|
||||
if (callback != null) callback.run();
|
||||
if (resptopic == null || resptopic.isEmpty()) return;
|
||||
if (result.getResult() instanceof RetResult) {
|
||||
@@ -66,10 +70,10 @@ public class HttpMessageResponse extends HttpResponse {
|
||||
if (finest) {
|
||||
Object innerrs = result.getResult();
|
||||
if (innerrs instanceof byte[]) innerrs = new String((byte[]) innerrs, StandardCharsets.UTF_8);
|
||||
producer.logger.log(Level.FINEST, "HttpMessageProcessor.process seqid=" + msg.getSeqid() + ", content: " + innerrs + ", status: " + result.getStatus() + ", headers: " + result.getHeaders());
|
||||
producer.logger.log(Level.FINEST, "HttpMessageResponse.finishHttpResult seqid=" + msg.getSeqid() + ", content: " + innerrs + ", status: " + result.getStatus() + ", headers: " + result.getHeaders());
|
||||
}
|
||||
byte[] content = HttpResultCoder.getInstance().encode(result);
|
||||
producer.apply(new MessageRecord(msg.getSeqid(), resptopic, null, content));
|
||||
producer.apply(messageClient.createMessageRecord(msg.getSeqid(), resptopic, null, content));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -224,7 +224,7 @@ public abstract class MessageAgent {
|
||||
String[] topics = generateHttpReqTopics(service);
|
||||
String consumerid = generateHttpConsumerid(topics, service);
|
||||
if (messageNodes.containsKey(consumerid)) throw new RuntimeException("consumerid(" + consumerid + ") is repeat");
|
||||
HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, this.workExecutor, getHttpProducer(), ns, service, servlet);
|
||||
HttpMessageProcessor processor = new HttpMessageProcessor(this.logger, this.workExecutor, httpMessageClient, getHttpProducer(), ns, service, servlet);
|
||||
this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(topics, consumerid, processor)));
|
||||
}
|
||||
|
||||
@@ -232,7 +232,7 @@ public abstract class MessageAgent {
|
||||
String topic = generateSncpReqTopic(service);
|
||||
String consumerid = generateSncpConsumerid(topic, service);
|
||||
if (messageNodes.containsKey(consumerid)) throw new RuntimeException("consumerid(" + consumerid + ") is repeat");
|
||||
SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, this.workExecutor, getSncpProducer(), ns, service, servlet);
|
||||
SncpMessageProcessor processor = new SncpMessageProcessor(this.logger, this.workExecutor, sncpMessageClient, getSncpProducer(), ns, service, servlet);
|
||||
this.messageNodes.put(consumerid, new MessageConsumerNode(ns, service, servlet, processor, createConsumer(new String[]{topic}, consumerid, processor)));
|
||||
}
|
||||
|
||||
|
||||
@@ -5,9 +5,11 @@
|
||||
*/
|
||||
package org.redkale.mq;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.logging.Level;
|
||||
import org.redkale.convert.Convert;
|
||||
|
||||
/**
|
||||
*
|
||||
@@ -22,6 +24,8 @@ public abstract class MessageClient {
|
||||
|
||||
protected final ConcurrentHashMap<Long, MessageRespFutureNode> respNodes = new ConcurrentHashMap<>();
|
||||
|
||||
protected final AtomicLong msgSeqno = new AtomicLong(System.nanoTime());
|
||||
|
||||
protected final MessageAgent messageAgent;
|
||||
|
||||
protected MessageConsumer respConsumer;
|
||||
@@ -102,4 +106,40 @@ public abstract class MessageClient {
|
||||
}
|
||||
|
||||
protected abstract MessageProducers getProducer();
|
||||
|
||||
public MessageRecord createMessageRecord(String resptopic, String content) {
|
||||
return new MessageRecord(msgSeqno.incrementAndGet(), 1, 0, System.currentTimeMillis(), 0, null, null, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
public MessageRecord createMessageRecord(String topic, String resptopic, String content) {
|
||||
return new MessageRecord(msgSeqno.incrementAndGet(), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
public MessageRecord createMessageRecord(int userid, String topic, String resptopic, String content) {
|
||||
return new MessageRecord(msgSeqno.incrementAndGet(), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
public MessageRecord createMessageRecord(String topic, String resptopic, Convert convert, Object bean) {
|
||||
return new MessageRecord(msgSeqno.incrementAndGet(), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, convert.convertToBytes(bean));
|
||||
}
|
||||
|
||||
public MessageRecord createMessageRecord(int userid, String topic, String resptopic, Convert convert, Object bean) {
|
||||
return new MessageRecord(msgSeqno.incrementAndGet(), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, convert.convertToBytes(bean));
|
||||
}
|
||||
|
||||
public MessageRecord createMessageRecord(int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) {
|
||||
return new MessageRecord(msgSeqno.incrementAndGet(), 1, 0, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean));
|
||||
}
|
||||
|
||||
public MessageRecord createMessageRecord(int flag, int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) {
|
||||
return new MessageRecord(msgSeqno.incrementAndGet(), 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean));
|
||||
}
|
||||
|
||||
public MessageRecord createMessageRecord(String topic, String resptopic, byte[] content) {
|
||||
return new MessageRecord(msgSeqno.incrementAndGet(), topic, resptopic, content);
|
||||
}
|
||||
|
||||
public MessageRecord createMessageRecord(long seqid, String topic, String resptopic, byte[] content) {
|
||||
return new MessageRecord(seqid, topic, resptopic, content);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,47 +65,15 @@ public class MessageRecord implements Serializable {
|
||||
public MessageRecord() {
|
||||
}
|
||||
|
||||
public MessageRecord(String resptopic, String content) {
|
||||
this(System.nanoTime(), 1, 0, System.currentTimeMillis(), 0, null, null, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
public MessageRecord(String topic, String resptopic, String content) {
|
||||
this(System.nanoTime(), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
public MessageRecord(int userid, String topic, String resptopic, String content) {
|
||||
this(System.nanoTime(), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, content == null ? null : content.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
public MessageRecord(String topic, String resptopic, Convert convert, Object bean) {
|
||||
this(System.nanoTime(), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, convert.convertToBytes(bean));
|
||||
}
|
||||
|
||||
public MessageRecord(int userid, String topic, String resptopic, Convert convert, Object bean) {
|
||||
this(System.nanoTime(), 1, 0, System.currentTimeMillis(), userid, null, topic, resptopic, convert.convertToBytes(bean));
|
||||
}
|
||||
|
||||
public MessageRecord(int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) {
|
||||
this(System.nanoTime(), 1, 0, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean));
|
||||
}
|
||||
|
||||
public MessageRecord(int flag, int userid, String groupid, String topic, String resptopic, Convert convert, Object bean) {
|
||||
this(System.nanoTime(), 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, convert.convertToBytes(bean));
|
||||
}
|
||||
|
||||
public MessageRecord(String topic, String resptopic, byte[] content) {
|
||||
this(System.nanoTime(), 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, content);
|
||||
}
|
||||
|
||||
public MessageRecord(long seqid, String topic, String resptopic, byte[] content) {
|
||||
protected MessageRecord(long seqid, String topic, String resptopic, byte[] content) {
|
||||
this(seqid, 1, 0, System.currentTimeMillis(), 0, null, topic, resptopic, content);
|
||||
}
|
||||
|
||||
public MessageRecord(long seqid, int flag, int userid, String groupid, String topic, String resptopic, byte[] content) {
|
||||
protected MessageRecord(long seqid, int flag, int userid, String groupid, String topic, String resptopic, byte[] content) {
|
||||
this(seqid, 1, flag, System.currentTimeMillis(), userid, groupid, topic, resptopic, content);
|
||||
}
|
||||
|
||||
public MessageRecord(long seqid, int version, int flag, long createtime, int userid, String groupid, String topic, String resptopic, byte[] content) {
|
||||
protected MessageRecord(long seqid, int version, int flag, long createtime, int userid, String groupid, String topic, String resptopic, byte[] content) {
|
||||
this.seqid = seqid;
|
||||
this.version = version;
|
||||
this.flag = flag;
|
||||
|
||||
@@ -31,6 +31,8 @@ public class SncpMessageProcessor implements MessageProcessor {
|
||||
|
||||
protected final Logger logger;
|
||||
|
||||
protected MessageClient messageClient;
|
||||
|
||||
protected final MessageProducers producer;
|
||||
|
||||
protected final NodeSncpServer server;
|
||||
@@ -49,11 +51,12 @@ public class SncpMessageProcessor implements MessageProcessor {
|
||||
if (cdl != null) cdl.countDown();
|
||||
};
|
||||
|
||||
public SncpMessageProcessor(Logger logger, ThreadHashExecutor workExecutor, MessageProducers producer, NodeSncpServer server, Service service, SncpServlet servlet) {
|
||||
public SncpMessageProcessor(Logger logger, ThreadHashExecutor workExecutor, MessageClient messageClient, MessageProducers producer, NodeSncpServer server, Service service, SncpServlet servlet) {
|
||||
this.logger = logger;
|
||||
this.finest = logger.isLoggable(Level.FINEST);
|
||||
this.finer = logger.isLoggable(Level.FINER);
|
||||
this.fine = logger.isLoggable(Level.FINE);
|
||||
this.messageClient = messageClient;
|
||||
this.producer = producer;
|
||||
this.server = server;
|
||||
this.service = service;
|
||||
@@ -84,7 +87,7 @@ public class SncpMessageProcessor implements MessageProcessor {
|
||||
long e = now - starttime;
|
||||
SncpContext context = server.getSncpServer().getContext();
|
||||
SncpMessageRequest request = new SncpMessageRequest(context, message);
|
||||
response = new SncpMessageResponse(context, request, callback, null, producer.getProducer(message));
|
||||
response = new SncpMessageResponse(context, request, callback, null, messageClient, producer.getProducer(message));
|
||||
servlet.execute(request, response);
|
||||
long o = System.currentTimeMillis() - now;
|
||||
if ((cha > 1000 || e > 100 || o > 1000) && fine) {
|
||||
|
||||
@@ -23,23 +23,27 @@ import org.redkale.util.ObjectPool;
|
||||
*/
|
||||
public class SncpMessageResponse extends SncpResponse {
|
||||
|
||||
protected MessageClient messageClient;
|
||||
|
||||
protected MessageRecord message;
|
||||
|
||||
protected MessageProducer producer;
|
||||
|
||||
protected Runnable callback;
|
||||
|
||||
public SncpMessageResponse(SncpContext context, SncpMessageRequest request, Runnable callback, ObjectPool<Response> responsePool, MessageProducer producer) {
|
||||
public SncpMessageResponse(SncpContext context, SncpMessageRequest request, Runnable callback, ObjectPool<Response> responsePool, MessageClient messageClient, MessageProducer producer) {
|
||||
super(context, request, responsePool);
|
||||
this.message = request.message;
|
||||
this.callback = callback;
|
||||
this.messageClient = messageClient;
|
||||
this.producer = producer;
|
||||
}
|
||||
|
||||
public SncpMessageResponse(SncpContext context, MessageRecord message, Runnable callback, ObjectPool<Response> responsePool, MessageProducer producer) {
|
||||
public SncpMessageResponse(SncpContext context, MessageRecord message, Runnable callback, ObjectPool<Response> responsePool, MessageClient messageClient, MessageProducer producer) {
|
||||
super(context, new SncpMessageRequest(context, message), responsePool);
|
||||
this.message = message;
|
||||
this.callback = callback;
|
||||
this.messageClient = messageClient;
|
||||
this.producer = producer;
|
||||
}
|
||||
|
||||
@@ -49,12 +53,12 @@ public class SncpMessageResponse extends SncpResponse {
|
||||
if (out == null) {
|
||||
final byte[] result = new byte[SncpRequest.HEADER_SIZE];
|
||||
fillHeader(ByteBuffer.wrap(result), 0, retcode);
|
||||
producer.apply(new MessageRecord(message.getSeqid(), message.getResptopic(), null, (byte[]) null));
|
||||
producer.apply(messageClient.createMessageRecord(message.getSeqid(), message.getResptopic(), null, (byte[]) null));
|
||||
return;
|
||||
}
|
||||
final int respBodyLength = out.count(); //body总长度
|
||||
final byte[] result = out.toArray();
|
||||
fillHeader(ByteBuffer.wrap(result), respBodyLength - HEADER_SIZE, retcode);
|
||||
producer.apply(new MessageRecord(message.getSeqid(), message.getResptopic(), null, result));
|
||||
producer.apply(messageClient.createMessageRecord(message.getSeqid(), message.getResptopic(), null, result));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -287,7 +287,7 @@ public final class SncpClient {
|
||||
fillHeader(ByteBuffer.wrap(reqbytes), seqid, actionid, reqBodyLength);
|
||||
String targetTopic = action.topicTargetParamIndex >= 0 ? (String) params[action.topicTargetParamIndex] : this.topic;
|
||||
if (targetTopic == null) targetTopic = this.topic;
|
||||
MessageRecord message = new MessageRecord(targetTopic, null, reqbytes);
|
||||
MessageRecord message = messageClient.createMessageRecord(targetTopic, null, reqbytes);
|
||||
return messageClient.sendMessage(message).thenApply(msg -> {
|
||||
ByteBuffer buffer = ByteBuffer.wrap(msg.getContent());
|
||||
checkResult(seqid, action, buffer);
|
||||
|
||||
Reference in New Issue
Block a user