RestWebSocket增加wsthreads属性配置

This commit is contained in:
Redkale
2018-06-02 15:49:10 +08:00
parent 7361823ece
commit 4c32422493
7 changed files with 78 additions and 10 deletions

View File

@@ -418,7 +418,7 @@ public final class Transport {
public TransportNode(int poolmaxconns, InetSocketAddress address, long disabletime) {
this.address = address;
this.disabletime = disabletime;
this.conns = new ArrayBlockingQueue<>(poolmaxconns);
this.conns = new LinkedBlockingQueue<>(poolmaxconns);
}
public int getPoolmaxconns() {

View File

@@ -81,6 +81,13 @@ public @interface RestWebSocket {
*/
int wsmaxconns() default 0;
/**
* 操作WebSocketNode对应CacheSource并发数, 为-1表示无限制为0表示系统默认值(CPU*8)
*
* @return 最大连接数
*/
int wsthreads() default 0;
/**
* 最大消息体长度, 小于1表示无限制
*

View File

@@ -70,6 +70,9 @@ public class WebSocketEngine {
@Comment("最大连接数, 为0表示无限制")
protected int wsmaxconns;
@Comment("操作WebSocketNode对应CacheSource并发数, 为-1表示无限制为0表示系统默认值(CPU*8)")
protected int wsthreads;
@Comment("最大消息体长度, 小于1表示无限制")
protected int wsmaxbody;
@@ -77,7 +80,7 @@ public class WebSocketEngine {
protected Cryptor cryptor;
protected WebSocketEngine(String engineid, boolean single, HttpContext context, int liveinterval,
int wsmaxconns, int wsmaxbody, Cryptor cryptor, WebSocketNode node, Convert sendConvert, Logger logger) {
int wsmaxconns, int wsthreads, int wsmaxbody, Cryptor cryptor, WebSocketNode node, Convert sendConvert, Logger logger) {
this.engineid = engineid;
this.single = single;
this.context = context;
@@ -85,6 +88,7 @@ public class WebSocketEngine {
this.node = node;
this.liveinterval = liveinterval;
this.wsmaxconns = wsmaxconns;
this.wsthreads = wsthreads;
this.wsmaxbody = wsmaxbody;
this.cryptor = cryptor;
this.logger = logger;
@@ -97,6 +101,7 @@ public class WebSocketEngine {
this.liveinterval = props == null ? (liveinterval < 0 ? DEFAILT_LIVEINTERVAL : liveinterval) : props.getIntValue(WEBPARAM__LIVEINTERVAL, (liveinterval < 0 ? DEFAILT_LIVEINTERVAL : liveinterval));
if (liveinterval <= 0) return;
if (props != null) this.wsmaxconns = props.getIntValue(WEBPARAM__WSMAXCONNS, this.wsmaxconns);
if (props != null) this.wsthreads = props.getIntValue(WEBPARAM__WSTHREADS, this.wsthreads);
if (props != null) this.wsmaxbody = props.getIntValue(WEBPARAM__WSMAXBODY, this.wsmaxbody);
if (scheduler != null) return;
this.scheduler = new ScheduledThreadPoolExecutor(1, (Runnable r) -> {
@@ -139,7 +144,7 @@ public class WebSocketEngine {
}
@Comment("从WebSocketEngine删除指定WebSocket")
void remove(WebSocket socket) {
void removeThenClose(WebSocket socket) {
Serializable userid = socket._userid;
if (single) {
currconns.decrementAndGet();

View File

@@ -60,13 +60,25 @@ public abstract class WebSocketNode {
//当前节点的本地WebSocketEngine
protected WebSocketEngine localEngine;
protected Semaphore semaphore;
public void init(AnyValue conf) {
if (sncpNodeAddresses != null) sncpNodeAddresses.initValueType(InetSocketAddress.class);
if (localEngine != null) {
int wsthreads = localEngine.wsthreads;
if (wsthreads == 0) wsthreads = Runtime.getRuntime().availableProcessors() * 8;
if (wsthreads > 0) this.semaphore = new Semaphore(wsthreads);
}
}
public void destroy(AnyValue conf) {
}
@Local
public final Semaphore getSemaphore() {
return semaphore;
}
@Local
public final void postDestroy(AnyValue conf) {
if (this.localEngine == null) return;
@@ -136,7 +148,12 @@ public abstract class WebSocketNode {
* @return 地址列表
*/
public CompletableFuture<Collection<InetSocketAddress>> getRpcNodeAddresses(final Serializable userid) {
if (this.sncpNodeAddresses != null) return this.sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid);
if (this.sncpNodeAddresses != null) {
tryAcquireSemaphore();
CompletableFuture<Collection<InetSocketAddress>> result = this.sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid);
if (semaphore != null) result.whenComplete((r, e) -> releaseSemaphore());
return result;
}
List<InetSocketAddress> rs = new ArrayList<>();
rs.add(this.localSncpAddress);
return CompletableFuture.completedFuture(rs);
@@ -177,7 +194,10 @@ public abstract class WebSocketNode {
if (this.localEngine != null && this.sncpNodeAddresses == null) {
return CompletableFuture.completedFuture(this.localEngine.existsLocalWebSocket(userid));
}
return this.sncpNodeAddresses.existsAsync(SOURCE_SNCP_USERID_PREFIX + userid);
tryAcquireSemaphore();
CompletableFuture<Boolean> rs = this.sncpNodeAddresses.existsAsync(SOURCE_SNCP_USERID_PREFIX + userid);
if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore());
return rs;
}
/**
@@ -190,7 +210,10 @@ public abstract class WebSocketNode {
if (this.localEngine != null && this.sncpNodeAddresses == null) {
return CompletableFuture.completedFuture(this.localEngine.getLocalUserSize());
}
return this.sncpNodeAddresses.getLongAsync(SOURCE_SNCP_USERCOUNT_KEY, 0L).thenApply(v -> v.intValue());
tryAcquireSemaphore();
CompletableFuture<Integer> rs = this.sncpNodeAddresses.getLongAsync(SOURCE_SNCP_USERCOUNT_KEY, 0L).thenApply(v -> v.intValue());
if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore());
return rs;
}
/**
@@ -210,7 +233,9 @@ public abstract class WebSocketNode {
return localFuture;
}
//远程节点关闭
tryAcquireSemaphore();
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid);
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket found userid:" + userid + " on " + addrs);
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0);
@@ -486,7 +511,9 @@ public abstract class WebSocketNode {
}
final Object remoteMessage = formatRemoteMessage(message);
CompletableFuture<Integer> localFuture = this.localEngine == null ? null : this.localEngine.broadcastMessage(wsrange, message, last);
tryAcquireSemaphore();
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_ADDRS_KEY);
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket broadcast message (" + remoteMessage + ") on " + addrs);
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(0);
@@ -515,7 +542,9 @@ public abstract class WebSocketNode {
}
//远程节点发送消息
final Object remoteMessage = formatRemoteMessage(message);
tryAcquireSemaphore();
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid);
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
CompletableFuture<Integer> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
if (addrs == null || addrs.isEmpty()) {
if (logger.isLoggable(Level.FINER)) logger.finer("websocket not found userid:" + userid + " on any node ");
@@ -541,4 +570,19 @@ public abstract class WebSocketNode {
if (sendConvert instanceof BinaryConvert) ((BinaryConvert) sendConvert).convertTo(message);
return JsonConvert.root().convertTo(message);
}
protected boolean tryAcquireSemaphore() {
if (this.semaphore == null) return true;
try {
System.out.println("---------this.semaphore.tryAcquire" );
return this.semaphore.tryAcquire(6, TimeUnit.SECONDS);
} catch (Exception e) {
return false;
}
}
protected void releaseSemaphore() {
if (this.semaphore != null) this.semaphore.release();
System.out.println("---------this.semaphore.release " + this.semaphore );
}
}

View File

@@ -79,7 +79,7 @@ class WebSocketRunner implements Runnable {
@Override
public void completed(Integer count, Void attachment1) {
if (count < 1) {
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner abort on read buffer count, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds");
if (debug) context.getLogger().log(Level.FINEST, "WebSocketRunner(userid="+webSocket.getUserid()+") abort on read buffer count, force to close channel, live " + (System.currentTimeMillis() - webSocket.getCreatetime()) / 1000 + " seconds");
closeRunner(0, "read buffer count is " + count);
return;
}
@@ -329,7 +329,7 @@ class WebSocketRunner implements Runnable {
channel.dispose();
context.offerBuffer(readBuffer);
readBuffer = null;
engine.remove(webSocket);
engine.removeThenClose(webSocket);
webSocket.onClose(code, reason);
QueueEntry entry = writeQueue.poll();
while (entry != null) {

View File

@@ -50,6 +50,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
@Comment("WebScoket服务器最大连接数为0表示无限制")
public static final String WEBPARAM__WSMAXCONNS = "wsmaxconns";
@Comment("WebScoket服务器操作WebSocketNode对应CacheSource并发数, 为-1表示无限制为0表示系统默认值(CPU*8)")
public static final String WEBPARAM__WSTHREADS = "wsthreads";
@Comment("最大消息体长度, 小于1表示无限制")
public static final String WEBPARAM__WSMAXBODY = "wsmaxbody";
@@ -76,6 +79,9 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
//同RestWebSocket.wsmaxconns
protected int wsmaxconns = 0;
//同RestWebSocket.wsthreads
protected int wsthreads = 0;
//同RestWebSocket.wsmaxbody
protected int wsmaxbody = 32 * 1024;
@@ -147,7 +153,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
}
//存在WebSocketServlet则此WebSocketNode必须是本地模式Service
this.node.localEngine = new WebSocketEngine("WebSocketEngine-" + addr.getHostString() + ":" + addr.getPort() + "-[" + resourceName() + "]",
this.single, context, liveinterval, wsmaxconns, wsmaxbody, this.cryptor, this.node, this.sendConvert, logger);
this.single, context, liveinterval, wsmaxconns, wsthreads, wsmaxbody, this.cryptor, this.node, this.sendConvert, logger);
this.node.init(conf);
this.node.localEngine.init(conf);

View File

@@ -47,7 +47,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
executor = ((WorkThread) thread).getExecutor();
}
if (executor == null) executor = ForkJoinPool.commonPool();
return CompletableFuture.supplyAsync(() -> {
final List<String> rs = new ArrayList<>();
this.localEngine.getLocalWebSockets(groupid).forEach(x -> rs.add(x.getRemoteAddr()));
@@ -77,9 +77,11 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
*/
@Override
public CompletableFuture<Void> connect(Serializable userid, InetSocketAddress sncpAddr) {
tryAcquireSemaphore();
CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, sncpAddr);
future = future.thenAccept((a) -> sncpNodeAddresses.incr(SOURCE_SNCP_USERCOUNT_KEY));
future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_ADDRS_KEY, sncpAddr));
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + sncpAddr);
return future;
}
@@ -94,8 +96,10 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
*/
@Override
public CompletableFuture<Void> disconnect(Serializable userid, InetSocketAddress sncpAddr) {
tryAcquireSemaphore();
CompletableFuture<Void> future = sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, sncpAddr);
future = future.thenAccept((a) -> sncpNodeAddresses.decr(SOURCE_SNCP_USERCOUNT_KEY));
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + sncpAddr);
return future;
}
@@ -111,8 +115,10 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
*/
@Override
public CompletableFuture<Void> changeUserid(Serializable olduserid, Serializable newuserid, InetSocketAddress sncpAddr) {
tryAcquireSemaphore();
CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + newuserid, sncpAddr);
future = future.thenAccept((a) -> sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + olduserid, sncpAddr));
if (semaphore != null) future.whenComplete((r, e) -> releaseSemaphore());
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + olduserid + " changeUserid to " + newuserid + " from " + sncpAddr);
return future;
}