This commit is contained in:
Redkale
2017-11-15 11:00:08 +08:00
parent 7c5ac7970e
commit 945f9f9ef5
6 changed files with 22 additions and 9 deletions

View File

@@ -32,6 +32,9 @@ public abstract class WebSocketNode {
@Comment("存储用户ID的key前缀") @Comment("存储用户ID的key前缀")
public static final String SOURCE_SNCP_USERID_PREFIX = "sncpws_uid:"; public static final String SOURCE_SNCP_USERID_PREFIX = "sncpws_uid:";
@Comment("存储用户数的key")
public static final String SOURCE_SNCP_USERCOUNT_KEY = "sncpws_usercount";
@Comment("存储当前SNCP节点列表的key") @Comment("存储当前SNCP节点列表的key")
public static final String SOURCE_SNCP_ADDRS_KEY = "sncpws_addrs"; public static final String SOURCE_SNCP_ADDRS_KEY = "sncpws_addrs";
@@ -58,6 +61,7 @@ public abstract class WebSocketNode {
protected WebSocketEngine localEngine; protected WebSocketEngine localEngine;
public void init(AnyValue conf) { public void init(AnyValue conf) {
if(sncpNodeAddresses != null) sncpNodeAddresses.initValueType(InetSocketAddress.class);
} }
public void destroy(AnyValue conf) { public void destroy(AnyValue conf) {
@@ -178,9 +182,7 @@ public abstract class WebSocketNode {
if (this.localEngine != null && this.sncpNodeAddresses == null) { if (this.localEngine != null && this.sncpNodeAddresses == null) {
return CompletableFuture.completedFuture(this.localEngine.getLocalUserSize()); return CompletableFuture.completedFuture(this.localEngine.getLocalUserSize());
} }
return this.sncpNodeAddresses.getKeySizeAsync().thenCompose(count -> { return this.sncpNodeAddresses.getLongAsync(SOURCE_SNCP_USERCOUNT_KEY, 0L).thenApply(v -> v.intValue());
return sncpNodeAddresses.existsAsync(SOURCE_SNCP_ADDRS_KEY).thenApply(exists -> exists ? (count - 1) : count);
});
} }
/** /**

View File

@@ -69,6 +69,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
@Override @Override
public CompletableFuture<Void> connect(Serializable userid, InetSocketAddress sncpAddr) { public CompletableFuture<Void> connect(Serializable userid, InetSocketAddress sncpAddr) {
CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, sncpAddr); CompletableFuture<Void> future = sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, sncpAddr);
future = future.thenAccept((a) -> sncpNodeAddresses.incr(SOURCE_SNCP_USERCOUNT_KEY));
future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_ADDRS_KEY, sncpAddr)); future = future.thenAccept((a) -> sncpNodeAddresses.appendSetItemAsync(SOURCE_SNCP_ADDRS_KEY, sncpAddr));
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + sncpAddr); if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " connect from " + sncpAddr);
return future; return future;
@@ -85,6 +86,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
@Override @Override
public CompletableFuture<Void> disconnect(Serializable userid, InetSocketAddress sncpAddr) { public CompletableFuture<Void> disconnect(Serializable userid, InetSocketAddress sncpAddr) {
CompletableFuture<Void> future = sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, sncpAddr); CompletableFuture<Void> future = sncpNodeAddresses.removeSetItemAsync(SOURCE_SNCP_USERID_PREFIX + userid, sncpAddr);
future = future.thenAccept((a) -> sncpNodeAddresses.decr(SOURCE_SNCP_USERCOUNT_KEY));
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + sncpAddr); if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " disconnect from " + sncpAddr);
return future; return future;
} }

View File

@@ -385,7 +385,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
} }
@Override @Override
@RpcMultiRun //@RpcMultiRun
public long getLongAndRefresh(String key, final int expireSeconds, long defValue) { public long getLongAndRefresh(String key, final int expireSeconds, long defValue) {
if (key == null) return defValue; if (key == null) return defValue;
CacheEntry entry = container.get(key); CacheEntry entry = container.get(key);
@@ -551,7 +551,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
} }
@Override @Override
@RpcMultiRun //@RpcMultiRun
public long incr(final String key) { public long incr(final String key) {
return incr(key, 1); return incr(key, 1);
} }
@@ -563,7 +563,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
} }
@Override @Override
@RpcMultiRun //@RpcMultiRun
public long incr(final String key, long num) { public long incr(final String key, long num) {
CacheEntry entry = container.get(key); CacheEntry entry = container.get(key);
if (entry == null) { if (entry == null) {
@@ -585,7 +585,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
} }
@Override @Override
@RpcMultiRun //@RpcMultiRun
public long decr(final String key) { public long decr(final String key) {
return incr(key, -1); return incr(key, -1);
} }
@@ -597,7 +597,7 @@ public class CacheMemorySource<V extends Object> extends AbstractService impleme
} }
@Override @Override
@RpcMultiRun //@RpcMultiRun
public long decr(final String key, long num) { public long decr(final String key, long num) {
return incr(key, -num); return incr(key, -num);
} }

View File

@@ -94,6 +94,7 @@ public class SncpTest {
SncpTestBean callbean = new SncpTestBean(); SncpTestBean callbean = new SncpTestBean();
callbean.setId(1); callbean.setId(1);
callbean.setContent("数据X"); callbean.setContent("数据X");
service.queryLongResult("f", 3,33L);
service.insert(callbean); service.insert(callbean);
System.out.println("bean.id应该会被修改(id不会是1) " + callbean); System.out.println("bean.id应该会被修改(id不会是1) " + callbean);

View File

@@ -17,8 +17,10 @@ public interface SncpTestIService extends Service {
public String queryResult(SncpTestBean bean); public String queryResult(SncpTestBean bean);
public long queryLongResult(String a, int b, long value);
public CompletableFuture<String> queryResultAsync(SncpTestBean bean); public CompletableFuture<String> queryResultAsync(SncpTestBean bean);
public void insert(@RpcCall(DataCallArrayAttribute.class) SncpTestBean... beans); public void insert(@RpcCall(DataCallArrayAttribute.class) SncpTestBean... beans);
public String updateBean(@RpcCall(SncpTestServiceImpl.CallAttribute.class) SncpTestBean bean); public String updateBean(@RpcCall(SncpTestServiceImpl.CallAttribute.class) SncpTestBean bean);

View File

@@ -41,6 +41,12 @@ public class SncpTestServiceImpl implements SncpTestIService {
} }
@Override
@RpcMultiRun
public long queryLongResult(String a, int b, long value) {
return value + 1;
}
public static class CallAttribute implements Attribute<SncpTestBean, Long> { public static class CallAttribute implements Attribute<SncpTestBean, Long> {
@Override @Override