8 Commits
1.9.7 ... 1.9.8

Author SHA1 Message Date
Redkale
a5a926fd94 FilterNode、FilterJoinNode增加copy方法 2018-10-18 14:47:07 +08:00
Redkale
0cb9f2cad3 2018-10-17 08:18:23 +08:00
Redkale
98209cc82e 修改WebSocketNode.existsWebSocket的实现 2018-10-10 14:03:54 +08:00
Redkale
1879afa6a4 2018-10-09 16:34:35 +08:00
Redkale
4a6404dfec 2018-09-27 09:54:42 +08:00
Redkale
0fa1c4a08f 修复WebSocket中Connection重复的bug 2018-09-27 09:51:44 +08:00
Redkale
573d7c5776 修复Rest.createRestWebSocketServlet空指针bug 2018-09-26 17:23:19 +08:00
Redkale
c56c9bf260 Redkale 1.9.8 开始 2018-09-26 17:21:33 +08:00
9 changed files with 118 additions and 28 deletions

View File

@@ -24,5 +24,5 @@ done
export CLASSPATH=$CLASSPATH:$lib
echo "$APP_HOME"
nohup java -DAPP_HOME="$APP_HOME" org.redkale.boot.Application > "$APP_HOME"/log.out &
nohup java -DAPP_HOME="$APP_HOME" org.redkale.boot.Application > "$APP_HOME"/logs.out &

View File

@@ -489,7 +489,7 @@ public abstract class NodeServer {
for (String s : wlist) {
sb.append(s);
}
sb.append(threadName).append(" All Services load cost " + (System.currentTimeMillis() - starts) + " ms" + LINE_SEPARATOR);
sb.append(threadName).append("All Services load cost " + (System.currentTimeMillis() - starts) + " ms" + LINE_SEPARATOR);
}
if (sb != null && sb.length() > 0) logger.log(Level.INFO, sb.toString());
}

View File

@@ -894,16 +894,21 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
buffer.put(("HTTP/1.1 " + this.status + " " + httpCodes.get(this.status) + "\r\n").getBytes());
}
if (this.contentLength >= 0) buffer.put(("Content-Length: " + this.contentLength + "\r\n").getBytes());
if (this.contentType == this.jsonContentType) {
buffer.put(this.jsonContentTypeBytes);
} else if (this.contentType == null || this.contentType == this.plainContentType) {
buffer.put(this.plainContentTypeBytes);
} else {
buffer.put(("Content-Type: " + (this.contentType == null ? this.plainContentType : this.contentType) + "\r\n").getBytes());
if (!this.request.isWebSocket()) {
if (this.contentType == this.jsonContentType) {
buffer.put(this.jsonContentTypeBytes);
} else if (this.contentType == null || this.contentType == this.plainContentType) {
buffer.put(this.plainContentTypeBytes);
} else {
buffer.put(("Content-Type: " + (this.contentType == null ? this.plainContentType : this.contentType) + "\r\n").getBytes());
}
}
buffer.put(serverNameBytes);
if (dateSupplier != null) buffer.put(dateSupplier.get());
buffer.put(this.request.isKeepAlive() ? connectAliveBytes : connectCloseBytes);
if (this.header.getValue("Connection") == null) {
buffer.put(this.request.isKeepAlive() ? connectAliveBytes : connectCloseBytes);
}
if (this.defaultAddHeaders != null) {
for (String[] headers : this.defaultAddHeaders) {

View File

@@ -428,7 +428,7 @@ public final class Rest {
cw2.visitInnerClass(newDynMessageFullName + endfix, newDynName, newDynMessageSimpleName + endfix, ACC_PUBLIC + ACC_STATIC);
Set<String> paramnames = new HashSet<>();
String methodesc = method.getName() + ":" + Type.getMethodDescriptor(method);
List<String> names = asmParamMap.get(methodesc);
List<String> names = asmParamMap == null ? null : asmParamMap.get(methodesc);
if (names != null) while (names.remove(" ")); //删掉空元素
Parameter[] params = method.getParameters();
final LinkedHashMap<String, Parameter> paramap = new LinkedHashMap(); //必须使用LinkedHashMap确保顺序

View File

@@ -104,6 +104,8 @@ public abstract class WebSocketNode {
protected abstract CompletableFuture<Void> changeUserid(Serializable fromuserid, Serializable touserid, InetSocketAddress sncpAddr);
protected abstract CompletableFuture<Boolean> existsWebSocket(Serializable userid, @RpcTargetAddress InetSocketAddress targetAddress);
protected abstract CompletableFuture<Integer> forceCloseWebSocket(Serializable userid, @RpcTargetAddress InetSocketAddress targetAddress);
//--------------------------------------------------------------------------------
@@ -186,23 +188,6 @@ public abstract class WebSocketNode {
});
}
/**
* 判断指定用户是否WebSocket在线
*
* @param userid Serializable
*
* @return boolean
*/
public CompletableFuture<Boolean> existsWebSocket(final Serializable userid) {
if (this.localEngine != null && this.sncpNodeAddresses == null) {
return CompletableFuture.completedFuture(this.localEngine.existsLocalWebSocket(userid));
}
tryAcquireSemaphore();
CompletableFuture<Boolean> rs = this.sncpNodeAddresses.existsAsync(SOURCE_SNCP_USERID_PREFIX + userid);
if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore());
return rs;
}
/**
* 获取在线用户总数
*
@@ -219,6 +204,59 @@ public abstract class WebSocketNode {
return rs;
}
/**
* @deprecated
*
* 判断指定用户是否WebSocket在线
*
* @param userid Serializable
*
* @return boolean
*/
private CompletableFuture<Boolean> existsWebSocket2(final Serializable userid) {
if (this.localEngine != null && this.sncpNodeAddresses == null) {
return CompletableFuture.completedFuture(this.localEngine.existsLocalWebSocket(userid));
}
tryAcquireSemaphore();
CompletableFuture<Boolean> rs = this.sncpNodeAddresses.existsAsync(SOURCE_SNCP_USERID_PREFIX + userid);
if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore());
return rs;
}
/**
* 判断指定用户是否WebSocket在线
*
* @param userid Serializable
*
* @return boolean
*/
@Local
public CompletableFuture<Boolean> existsWebSocket(final Serializable userid) {
CompletableFuture<Boolean> localFuture = null;
if (this.localEngine != null) localFuture = CompletableFuture.completedFuture(localEngine.existsLocalWebSocket(userid));
if (this.sncpNodeAddresses == null || this.remoteNode == null) {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null");
//没有CacheSource就不会有分布式节点
return localFuture;
}
//远程节点关闭
tryAcquireSemaphore();
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class);
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
CompletableFuture<Boolean> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket found userid:" + userid + " on " + addrs);
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(false);
CompletableFuture<Boolean> future = null;
for (InetSocketAddress addr : addrs) {
if (addr == null || addr.equals(localSncpAddress)) continue;
future = future == null ? remoteNode.existsWebSocket(userid, addr)
: future.thenCombine(remoteNode.existsWebSocket(userid, addr), (a, b) -> a | b);
}
return future == null ? CompletableFuture.completedFuture(false) : future;
});
return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b);
}
/**
* 强制关闭用户WebSocket
*

View File

@@ -133,6 +133,21 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
return future;
}
/**
* 判断用户是否有WebSocket
*
* @param userid Serializable
* @param targetAddress InetSocketAddress
*
* @return 无返回值
*/
@Override
public CompletableFuture<Boolean> existsWebSocket(Serializable userid, @RpcTargetAddress InetSocketAddress targetAddress) {
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " existsWebSocket from " + targetAddress);
if (localEngine == null) return CompletableFuture.completedFuture(false);
return CompletableFuture.completedFuture(localEngine.existsLocalWebSocket(userid));
}
/**
* 强制关闭用户的WebSocket
*

View File

@@ -82,6 +82,18 @@ public class FilterJoinNode extends FilterNode {
return new FilterJoinNode(joinClass, joinColumns, column, express, itemand, value);
}
@Override
public FilterJoinNode copy() {
FilterJoinNode node = (FilterJoinNode) copy(new FilterJoinNode());
node.joinClass = this.joinClass;
node.joinEntity = this.joinEntity;
if (this.joinColumns != null) {
node.joinColumns = new String[this.joinColumns.length];
System.arraycopy(this.joinColumns, 0, node.joinColumns, 0, this.joinColumns.length);
}
return node;
}
@Override
protected FilterNode any(final FilterNode node0, boolean signor) {
Objects.requireNonNull(node0);

View File

@@ -82,6 +82,26 @@ public class FilterNode { //FilterNode 不能实现Serializable接口 否则
this.value = val;
}
public FilterNode copy() {
return copy(new FilterNode());
}
protected FilterNode copy(FilterNode node) {
node.readOnly = this.readOnly;
node.column = this.column;
node.express = this.express;
node.value = this.value;
node.itemand = this.itemand;
node.or = this.or;
if (this.nodes != null) {
node.nodes = new FilterNode[this.nodes.length];
for (int i = 0; i < node.nodes.length; i++) {
node.nodes[i] = this.nodes[i] == null ? null : this.nodes[i].copy();
}
}
return node;
}
public FilterNode asReadOnly() {
this.readOnly = true;
return this;

View File

@@ -17,7 +17,7 @@ public final class Redkale {
}
public static String getDotedVersion() {
return "1.9.7";
return "1.9.8";
}
public static int getMajorVersion() {