net浼樺寲璁℃暟鍣?
This commit is contained in:
@@ -94,9 +94,9 @@ public class LoggingFileHandler extends LoggingBaseHandler {
|
|||||||
|
|
||||||
protected Pattern denyregx;
|
protected Pattern denyregx;
|
||||||
|
|
||||||
private final AtomicLong loglength = new AtomicLong();
|
private final AtomicLong logLength = new AtomicLong();
|
||||||
|
|
||||||
private final AtomicLong logunusuallength = new AtomicLong();
|
private final AtomicLong logUnusualLength = new AtomicLong();
|
||||||
|
|
||||||
private File logfile;
|
private File logfile;
|
||||||
|
|
||||||
@@ -139,7 +139,7 @@ public class LoggingFileHandler extends LoggingBaseHandler {
|
|||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
LogRecord log = logqueue.take();
|
LogRecord log = logqueue.take();
|
||||||
final boolean bigger = (limit > 0 && limit <= loglength.get());
|
final boolean bigger = (limit > 0 && limit <= logLength.get());
|
||||||
final boolean changeday = tomorrow <= log.getMillis();
|
final boolean changeday = tomorrow <= log.getMillis();
|
||||||
if (bigger || changeday) {
|
if (bigger || changeday) {
|
||||||
updateTomorrow();
|
updateTomorrow();
|
||||||
@@ -163,7 +163,7 @@ public class LoggingFileHandler extends LoggingBaseHandler {
|
|||||||
}
|
}
|
||||||
if (unusual != null && changeday && logunusualstream != null) {
|
if (unusual != null && changeday && logunusualstream != null) {
|
||||||
logunusualstream.close();
|
logunusualstream.close();
|
||||||
if (limit > 0 && limit <= logunusuallength.get()) {
|
if (limit > 0 && limit <= logUnusualLength.get()) {
|
||||||
for (int i = Math.min(count - 2, logunusualindex.get() - 1); i > 0; i--) {
|
for (int i = Math.min(count - 2, logunusualindex.get() - 1); i > 0; i--) {
|
||||||
File greater = new File(logunusualfile.getPath() + "." + i);
|
File greater = new File(logunusualfile.getPath() + "." + i);
|
||||||
if (greater.exists()) {
|
if (greater.exists()) {
|
||||||
@@ -182,14 +182,14 @@ public class LoggingFileHandler extends LoggingBaseHandler {
|
|||||||
logindex.incrementAndGet();
|
logindex.incrementAndGet();
|
||||||
logfile = new File(patternDateFormat == null ? pattern : Utility.formatTime(patternDateFormat, -1, System.currentTimeMillis()));
|
logfile = new File(patternDateFormat == null ? pattern : Utility.formatTime(patternDateFormat, -1, System.currentTimeMillis()));
|
||||||
logfile.getParentFile().mkdirs();
|
logfile.getParentFile().mkdirs();
|
||||||
loglength.set(logfile.length());
|
logLength.set(logfile.length());
|
||||||
logstream = new FileOutputStream(logfile, append);
|
logstream = new FileOutputStream(logfile, append);
|
||||||
}
|
}
|
||||||
if (unusual != null && logunusualstream == null) {
|
if (unusual != null && logunusualstream == null) {
|
||||||
logunusualindex.incrementAndGet();
|
logunusualindex.incrementAndGet();
|
||||||
logunusualfile = new File(unusualDateFormat == null ? unusual : Utility.formatTime(unusualDateFormat, -1, System.currentTimeMillis()));
|
logunusualfile = new File(unusualDateFormat == null ? unusual : Utility.formatTime(unusualDateFormat, -1, System.currentTimeMillis()));
|
||||||
logunusualfile.getParentFile().mkdirs();
|
logunusualfile.getParentFile().mkdirs();
|
||||||
logunusuallength.set(logunusualfile.length());
|
logUnusualLength.set(logunusualfile.length());
|
||||||
logunusualstream = new FileOutputStream(logunusualfile, append);
|
logunusualstream = new FileOutputStream(logunusualfile, append);
|
||||||
}
|
}
|
||||||
//----------------------写日志-------------------------
|
//----------------------写日志-------------------------
|
||||||
@@ -197,10 +197,10 @@ public class LoggingFileHandler extends LoggingBaseHandler {
|
|||||||
String encoding = getEncoding();
|
String encoding = getEncoding();
|
||||||
byte[] bytes = encoding == null ? message.getBytes() : message.getBytes(encoding);
|
byte[] bytes = encoding == null ? message.getBytes() : message.getBytes(encoding);
|
||||||
logstream.write(bytes);
|
logstream.write(bytes);
|
||||||
loglength.addAndGet(bytes.length);
|
logLength.addAndGet(bytes.length);
|
||||||
if (unusual != null && (log.getLevel() == Level.WARNING || log.getLevel() == Level.SEVERE)) {
|
if (unusual != null && (log.getLevel() == Level.WARNING || log.getLevel() == Level.SEVERE)) {
|
||||||
logunusualstream.write(bytes);
|
logunusualstream.write(bytes);
|
||||||
logunusuallength.addAndGet(bytes.length);
|
logUnusualLength.addAndGet(bytes.length);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
ErrorManager err = getErrorManager();
|
ErrorManager err = getErrorManager();
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ import java.io.Serializable;
|
|||||||
import java.lang.reflect.Type;
|
import java.lang.reflect.Type;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
import org.redkale.convert.json.JsonConvert;
|
import org.redkale.convert.json.JsonConvert;
|
||||||
import static org.redkale.mq.MessageRecord.CTYPE_HTTP_REQUEST;
|
import static org.redkale.mq.MessageRecord.CTYPE_HTTP_REQUEST;
|
||||||
@@ -62,7 +62,7 @@ public class HttpMessageClient extends MessageClient {
|
|||||||
produceMessage(generateHttpReqTopic(request, null), 0, null, request, null);
|
produceMessage(generateHttpReqTopic(request, null), 0, null, request, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public final void produceMessage(HttpSimpleRequest request, AtomicLong counter) {
|
public final void produceMessage(HttpSimpleRequest request, LongAdder counter) {
|
||||||
produceMessage(generateHttpReqTopic(request, null), 0, null, request, counter);
|
produceMessage(generateHttpReqTopic(request, null), 0, null, request, counter);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -74,7 +74,7 @@ public class HttpMessageClient extends MessageClient {
|
|||||||
produceMessage(generateHttpReqTopic(request, null), userid, groupid, request, null);
|
produceMessage(generateHttpReqTopic(request, null), userid, groupid, request, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public final void produceMessage(Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
|
public final void produceMessage(Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
|
||||||
produceMessage(generateHttpReqTopic(request, null), userid, groupid, request, counter);
|
produceMessage(generateHttpReqTopic(request, null), userid, groupid, request, counter);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -82,7 +82,7 @@ public class HttpMessageClient extends MessageClient {
|
|||||||
produceMessage(topic, 0, null, request, null);
|
produceMessage(topic, 0, null, request, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public final void produceMessage(String topic, HttpSimpleRequest request, AtomicLong counter) {
|
public final void produceMessage(String topic, HttpSimpleRequest request, LongAdder counter) {
|
||||||
produceMessage(topic, 0, null, request, counter);
|
produceMessage(topic, 0, null, request, counter);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -94,7 +94,7 @@ public class HttpMessageClient extends MessageClient {
|
|||||||
broadcastMessage(generateHttpReqTopic(request, null), 0, null, request, null);
|
broadcastMessage(generateHttpReqTopic(request, null), 0, null, request, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public final void broadcastMessage(HttpSimpleRequest request, AtomicLong counter) {
|
public final void broadcastMessage(HttpSimpleRequest request, LongAdder counter) {
|
||||||
broadcastMessage(generateHttpReqTopic(request, null), 0, null, request, counter);
|
broadcastMessage(generateHttpReqTopic(request, null), 0, null, request, counter);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -106,7 +106,7 @@ public class HttpMessageClient extends MessageClient {
|
|||||||
broadcastMessage(generateHttpReqTopic(request, null), userid, groupid, request, null);
|
broadcastMessage(generateHttpReqTopic(request, null), userid, groupid, request, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public final void broadcastMessage(Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
|
public final void broadcastMessage(Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
|
||||||
broadcastMessage(generateHttpReqTopic(request, null), userid, groupid, request, counter);
|
broadcastMessage(generateHttpReqTopic(request, null), userid, groupid, request, counter);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -114,7 +114,7 @@ public class HttpMessageClient extends MessageClient {
|
|||||||
broadcastMessage(topic, 0, null, request, null);
|
broadcastMessage(topic, 0, null, request, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public final void broadcastMessage(String topic, HttpSimpleRequest request, AtomicLong counter) {
|
public final void broadcastMessage(String topic, HttpSimpleRequest request, LongAdder counter) {
|
||||||
broadcastMessage(topic, 0, null, request, counter);
|
broadcastMessage(topic, 0, null, request, counter);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -153,7 +153,7 @@ public class HttpMessageClient extends MessageClient {
|
|||||||
return sendMessage(generateHttpReqTopic(request, null), 0, null, request, null);
|
return sendMessage(generateHttpReqTopic(request, null), 0, null, request, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public final CompletableFuture<HttpResult<byte[]>> sendMessage(HttpSimpleRequest request, AtomicLong counter) {
|
public final CompletableFuture<HttpResult<byte[]>> sendMessage(HttpSimpleRequest request, LongAdder counter) {
|
||||||
return sendMessage(generateHttpReqTopic(request, null), 0, null, request, counter);
|
return sendMessage(generateHttpReqTopic(request, null), 0, null, request, counter);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -165,7 +165,7 @@ public class HttpMessageClient extends MessageClient {
|
|||||||
return sendMessage(generateHttpReqTopic(request, null), userid, groupid, request, null);
|
return sendMessage(generateHttpReqTopic(request, null), userid, groupid, request, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public final CompletableFuture<HttpResult<byte[]>> sendMessage(Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
|
public final CompletableFuture<HttpResult<byte[]>> sendMessage(Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
|
||||||
return sendMessage(generateHttpReqTopic(request, null), userid, groupid, request, counter);
|
return sendMessage(generateHttpReqTopic(request, null), userid, groupid, request, counter);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -173,28 +173,28 @@ public class HttpMessageClient extends MessageClient {
|
|||||||
return sendMessage(topic, 0, null, request, null);
|
return sendMessage(topic, 0, null, request, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, HttpSimpleRequest request, AtomicLong counter) {
|
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, HttpSimpleRequest request, LongAdder counter) {
|
||||||
return sendMessage(topic, 0, null, request, counter);
|
return sendMessage(topic, 0, null, request, counter);
|
||||||
}
|
}
|
||||||
|
|
||||||
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) {
|
public final CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request) {
|
||||||
return sendMessage(topic, userid, null, request, (AtomicLong) null);
|
return sendMessage(topic, userid, null, request, (LongAdder) null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
|
public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
|
||||||
MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request));
|
MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request));
|
||||||
message.userid(userid).groupid(groupid);
|
message.userid(userid).groupid(groupid);
|
||||||
//if (finest) logger.log(Level.FINEST, "HttpMessageClient.sendMessage: " + message);
|
//if (finest) logger.log(Level.FINEST, "HttpMessageClient.sendMessage: " + message);
|
||||||
return sendMessage(message, true, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance()));
|
return sendMessage(message, true, counter).thenApply(r -> r.decodeContent(HttpResultCoder.getInstance()));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
|
public void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
|
||||||
MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request));
|
MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request));
|
||||||
message.userid(userid).groupid(groupid);
|
message.userid(userid).groupid(groupid);
|
||||||
sendMessage(message, false, counter);
|
sendMessage(message, false, counter);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
|
public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
|
||||||
MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request));
|
MessageRecord message = createMessageRecord(CTYPE_HTTP_REQUEST, topic, null, request.getTraceid(), HttpSimpleRequestCoder.getInstance().encode(request));
|
||||||
message.userid(userid).groupid(groupid);
|
message.userid(userid).groupid(groupid);
|
||||||
sendMessage(message, false, counter);
|
sendMessage(message, false, counter);
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ import java.nio.charset.StandardCharsets;
|
|||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.*;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
import java.util.logging.Level;
|
import java.util.logging.Level;
|
||||||
import org.redkale.annotation.Resource;
|
import org.redkale.annotation.Resource;
|
||||||
import org.redkale.boot.Application;
|
import org.redkale.boot.Application;
|
||||||
@@ -55,7 +55,7 @@ public class HttpMessageClusterClient extends HttpMessageClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
|
public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
|
||||||
if (topicServletMap.computeIfAbsent(topic, t -> localClient.findHttpServlet(t) != null)) {
|
if (topicServletMap.computeIfAbsent(topic, t -> localClient.findHttpServlet(t) != null)) {
|
||||||
return localClient.sendMessage(topic, userid, groupid, request, counter);
|
return localClient.sendMessage(topic, userid, groupid, request, counter);
|
||||||
} else {
|
} else {
|
||||||
@@ -64,7 +64,7 @@ public class HttpMessageClusterClient extends HttpMessageClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
|
public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
|
||||||
if (topicServletMap.computeIfAbsent(topic, t -> localClient.findHttpServlet(t) != null)) {
|
if (topicServletMap.computeIfAbsent(topic, t -> localClient.findHttpServlet(t) != null)) {
|
||||||
localClient.produceMessage(topic, userid, groupid, request, counter);
|
localClient.produceMessage(topic, userid, groupid, request, counter);
|
||||||
} else {
|
} else {
|
||||||
@@ -73,7 +73,7 @@ public class HttpMessageClusterClient extends HttpMessageClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
|
public void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
|
||||||
mqtpAsync(userid, request);
|
mqtpAsync(userid, request);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ import java.lang.reflect.Type;
|
|||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
import java.util.logging.Level;
|
import java.util.logging.Level;
|
||||||
import org.redkale.boot.*;
|
import org.redkale.boot.*;
|
||||||
import org.redkale.convert.Convert;
|
import org.redkale.convert.Convert;
|
||||||
@@ -113,7 +113,7 @@ public class HttpMessageLocalClient extends HttpMessageClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
|
public CompletableFuture<HttpResult<byte[]>> sendMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
|
||||||
HttpServlet servlet = findHttpServlet(topic);
|
HttpServlet servlet = findHttpServlet(topic);
|
||||||
if (servlet == null) {
|
if (servlet == null) {
|
||||||
if (logger.isLoggable(Level.FINE)) {
|
if (logger.isLoggable(Level.FINE)) {
|
||||||
@@ -145,7 +145,7 @@ public class HttpMessageLocalClient extends HttpMessageClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
|
public void produceMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
|
||||||
HttpDispatcherServlet ps = dispatcherServlet();
|
HttpDispatcherServlet ps = dispatcherServlet();
|
||||||
HttpServlet servlet = ps.findServletByTopic(topic);
|
HttpServlet servlet = ps.findServletByTopic(topic);
|
||||||
if (servlet == null) {
|
if (servlet == null) {
|
||||||
@@ -164,7 +164,7 @@ public class HttpMessageLocalClient extends HttpMessageClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, AtomicLong counter) {
|
public void broadcastMessage(String topic, Serializable userid, String groupid, HttpSimpleRequest request, LongAdder counter) {
|
||||||
HttpDispatcherServlet ps = dispatcherServlet();
|
HttpDispatcherServlet ps = dispatcherServlet();
|
||||||
HttpRequest req = new HttpMessageLocalRequest(context(), request, userid);
|
HttpRequest req = new HttpMessageLocalRequest(context(), request, userid);
|
||||||
HttpResponse resp = new HttpMessageLocalResponse(req, null);
|
HttpResponse resp = new HttpMessageLocalResponse(req, null);
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ package org.redkale.mq;
|
|||||||
|
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.*;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.*;
|
||||||
import java.util.logging.Level;
|
import java.util.logging.Level;
|
||||||
import org.redkale.convert.Convert;
|
import org.redkale.convert.Convert;
|
||||||
import org.redkale.convert.json.JsonConvert;
|
import org.redkale.convert.json.JsonConvert;
|
||||||
@@ -53,7 +53,7 @@ public abstract class MessageClient {
|
|||||||
return this.respConsumer.shutdown();
|
return this.respConsumer.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected CompletableFuture<MessageRecord> sendMessage(final MessageRecord message, boolean needresp, AtomicLong counter) {
|
protected CompletableFuture<MessageRecord> sendMessage(final MessageRecord message, boolean needresp, LongAdder counter) {
|
||||||
CompletableFuture<MessageRecord> future = new CompletableFuture<>();
|
CompletableFuture<MessageRecord> future = new CompletableFuture<>();
|
||||||
boolean finest = messageAgent != null && messageAgent.logger.isLoggable(Level.FINEST);
|
boolean finest = messageAgent != null && messageAgent.logger.isLoggable(Level.FINEST);
|
||||||
try {
|
try {
|
||||||
@@ -73,9 +73,9 @@ public abstract class MessageClient {
|
|||||||
if (node.scheduledFuture != null) {
|
if (node.scheduledFuture != null) {
|
||||||
node.scheduledFuture.cancel(true);
|
node.scheduledFuture.cancel(true);
|
||||||
}
|
}
|
||||||
AtomicLong ncer = node.getCounter();
|
LongAdder ncer = node.getCounter();
|
||||||
if (ncer != null) {
|
if (ncer != null) {
|
||||||
ncer.decrementAndGet();
|
ncer.decrement();
|
||||||
}
|
}
|
||||||
final long cha = now - msg.createTime;
|
final long cha = now - msg.createTime;
|
||||||
if (finest) {
|
if (finest) {
|
||||||
@@ -106,7 +106,7 @@ public abstract class MessageClient {
|
|||||||
message.setRespTopic(respTopic);
|
message.setRespTopic(respTopic);
|
||||||
}
|
}
|
||||||
if (counter != null) {
|
if (counter != null) {
|
||||||
counter.incrementAndGet();
|
counter.increment();
|
||||||
}
|
}
|
||||||
getProducer().apply(message);
|
getProducer().apply(message);
|
||||||
if (needresp) {
|
if (needresp) {
|
||||||
|
|||||||
@@ -6,7 +6,7 @@
|
|||||||
package org.redkale.mq;
|
package org.redkale.mq;
|
||||||
|
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.*;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
import java.util.logging.*;
|
import java.util.logging.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -25,7 +25,7 @@ public class MessageRespFutureNode implements Runnable {
|
|||||||
|
|
||||||
protected final long createTime;
|
protected final long createTime;
|
||||||
|
|
||||||
protected final AtomicLong counter;
|
protected final LongAdder counter;
|
||||||
|
|
||||||
protected final CompletableFuture<MessageRecord> future;
|
protected final CompletableFuture<MessageRecord> future;
|
||||||
|
|
||||||
@@ -37,7 +37,7 @@ public class MessageRespFutureNode implements Runnable {
|
|||||||
|
|
||||||
protected ScheduledFuture<?> scheduledFuture;
|
protected ScheduledFuture<?> scheduledFuture;
|
||||||
|
|
||||||
public MessageRespFutureNode(Logger logger, MessageRecord message, ConcurrentHashMap<Long, MessageRespFutureNode> respNodes, AtomicLong counter, CompletableFuture<MessageRecord> future) {
|
public MessageRespFutureNode(Logger logger, MessageRecord message, ConcurrentHashMap<Long, MessageRespFutureNode> respNodes, LongAdder counter, CompletableFuture<MessageRecord> future) {
|
||||||
this.logger = logger;
|
this.logger = logger;
|
||||||
this.message = message;
|
this.message = message;
|
||||||
this.seqid = message.getSeqid();
|
this.seqid = message.getSeqid();
|
||||||
@@ -63,7 +63,7 @@ public class MessageRespFutureNode implements Runnable {
|
|||||||
return createTime;
|
return createTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
public AtomicLong getCounter() {
|
public LongAdder getCounter() {
|
||||||
return counter;
|
return counter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -6,7 +6,7 @@
|
|||||||
package org.redkale.mq;
|
package org.redkale.mq;
|
||||||
|
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
@@ -39,7 +39,7 @@ public class SncpMessageClient extends MessageClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//只发送消息,不需要响应
|
//只发送消息,不需要响应
|
||||||
public final void produceMessage(MessageRecord message, AtomicLong counter) {
|
public final void produceMessage(MessageRecord message, LongAdder counter) {
|
||||||
sendMessage(message, false, counter);
|
sendMessage(message, false, counter);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -49,7 +49,7 @@ public class SncpMessageClient extends MessageClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//发送消息,需要响应
|
//发送消息,需要响应
|
||||||
public final CompletableFuture<MessageRecord> sendMessage(MessageRecord message, AtomicLong counter) {
|
public final CompletableFuture<MessageRecord> sendMessage(MessageRecord message, LongAdder counter) {
|
||||||
return sendMessage(message, true, counter);
|
return sendMessage(message, true, counter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -63,4 +63,5 @@ public abstract class AsyncGroup {
|
|||||||
public abstract AsyncGroup start();
|
public abstract AsyncGroup start();
|
||||||
|
|
||||||
public abstract AsyncGroup close();
|
public abstract AsyncGroup close();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ import java.nio.ByteBuffer;
|
|||||||
import java.nio.channels.*;
|
import java.nio.channels.*;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.*;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
import java.util.function.*;
|
import java.util.function.*;
|
||||||
import java.util.logging.*;
|
import java.util.logging.*;
|
||||||
import org.redkale.util.*;
|
import org.redkale.util.*;
|
||||||
@@ -32,7 +32,7 @@ public class AsyncIOThread extends WorkThread {
|
|||||||
final Selector selector;
|
final Selector selector;
|
||||||
|
|
||||||
//如果有read/write两IOThread,只记readThread
|
//如果有read/write两IOThread,只记readThread
|
||||||
final AtomicInteger connCounter = new AtomicInteger();
|
final LongAdder connCounter = new LongAdder();
|
||||||
|
|
||||||
private final Supplier<ByteBuffer> bufferSupplier;
|
private final Supplier<ByteBuffer> bufferSupplier;
|
||||||
|
|
||||||
@@ -142,7 +142,7 @@ public class AsyncIOThread extends WorkThread {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public int currConnections() {
|
public int currConnections() {
|
||||||
return connCounter.get();
|
return connCounter.intValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -39,7 +39,7 @@ class AsyncNioTcpConnection extends AsyncNioConnection {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.remoteAddress = addr;
|
this.remoteAddress = addr;
|
||||||
ioReadThread.connCounter.incrementAndGet();
|
ioReadThread.connCounter.increment();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -269,9 +269,7 @@ class AsyncNioTcpConnection extends AsyncNioConnection {
|
|||||||
@Override
|
@Override
|
||||||
public final void close() throws IOException {
|
public final void close() throws IOException {
|
||||||
super.close();
|
super.close();
|
||||||
ioReadThread.connCounter.decrementAndGet();
|
ioReadThread.connCounter.decrement();
|
||||||
channel.shutdownInput();
|
|
||||||
channel.shutdownOutput();
|
|
||||||
channel.close();
|
channel.close();
|
||||||
if (this.connectKey != null) {
|
if (this.connectKey != null) {
|
||||||
this.connectKey.cancel();
|
this.connectKey.cancel();
|
||||||
|
|||||||
Reference in New Issue
Block a user