8 Commits
1.9.7 ... 1.9.8

Author SHA1 Message Date
Redkale
a5a926fd94 FilterNode、FilterJoinNode增加copy方法 2018-10-18 14:47:07 +08:00
Redkale
0cb9f2cad3 2018-10-17 08:18:23 +08:00
Redkale
98209cc82e 修改WebSocketNode.existsWebSocket的实现 2018-10-10 14:03:54 +08:00
Redkale
1879afa6a4 2018-10-09 16:34:35 +08:00
Redkale
4a6404dfec 2018-09-27 09:54:42 +08:00
Redkale
0fa1c4a08f 修复WebSocket中Connection重复的bug 2018-09-27 09:51:44 +08:00
Redkale
573d7c5776 修复Rest.createRestWebSocketServlet空指针bug 2018-09-26 17:23:19 +08:00
Redkale
c56c9bf260 Redkale 1.9.8 开始 2018-09-26 17:21:33 +08:00
9 changed files with 118 additions and 28 deletions

View File

@@ -24,5 +24,5 @@ done
export CLASSPATH=$CLASSPATH:$lib export CLASSPATH=$CLASSPATH:$lib
echo "$APP_HOME" echo "$APP_HOME"
nohup java -DAPP_HOME="$APP_HOME" org.redkale.boot.Application > "$APP_HOME"/log.out & nohup java -DAPP_HOME="$APP_HOME" org.redkale.boot.Application > "$APP_HOME"/logs.out &

View File

@@ -489,7 +489,7 @@ public abstract class NodeServer {
for (String s : wlist) { for (String s : wlist) {
sb.append(s); sb.append(s);
} }
sb.append(threadName).append(" All Services load cost " + (System.currentTimeMillis() - starts) + " ms" + LINE_SEPARATOR); sb.append(threadName).append("All Services load cost " + (System.currentTimeMillis() - starts) + " ms" + LINE_SEPARATOR);
} }
if (sb != null && sb.length() > 0) logger.log(Level.INFO, sb.toString()); if (sb != null && sb.length() > 0) logger.log(Level.INFO, sb.toString());
} }

View File

@@ -894,16 +894,21 @@ public class HttpResponse extends Response<HttpContext, HttpRequest> {
buffer.put(("HTTP/1.1 " + this.status + " " + httpCodes.get(this.status) + "\r\n").getBytes()); buffer.put(("HTTP/1.1 " + this.status + " " + httpCodes.get(this.status) + "\r\n").getBytes());
} }
if (this.contentLength >= 0) buffer.put(("Content-Length: " + this.contentLength + "\r\n").getBytes()); if (this.contentLength >= 0) buffer.put(("Content-Length: " + this.contentLength + "\r\n").getBytes());
if (this.contentType == this.jsonContentType) { if (!this.request.isWebSocket()) {
buffer.put(this.jsonContentTypeBytes); if (this.contentType == this.jsonContentType) {
} else if (this.contentType == null || this.contentType == this.plainContentType) { buffer.put(this.jsonContentTypeBytes);
buffer.put(this.plainContentTypeBytes); } else if (this.contentType == null || this.contentType == this.plainContentType) {
} else { buffer.put(this.plainContentTypeBytes);
buffer.put(("Content-Type: " + (this.contentType == null ? this.plainContentType : this.contentType) + "\r\n").getBytes()); } else {
buffer.put(("Content-Type: " + (this.contentType == null ? this.plainContentType : this.contentType) + "\r\n").getBytes());
}
} }
buffer.put(serverNameBytes); buffer.put(serverNameBytes);
if (dateSupplier != null) buffer.put(dateSupplier.get()); if (dateSupplier != null) buffer.put(dateSupplier.get());
buffer.put(this.request.isKeepAlive() ? connectAliveBytes : connectCloseBytes);
if (this.header.getValue("Connection") == null) {
buffer.put(this.request.isKeepAlive() ? connectAliveBytes : connectCloseBytes);
}
if (this.defaultAddHeaders != null) { if (this.defaultAddHeaders != null) {
for (String[] headers : this.defaultAddHeaders) { for (String[] headers : this.defaultAddHeaders) {

View File

@@ -428,7 +428,7 @@ public final class Rest {
cw2.visitInnerClass(newDynMessageFullName + endfix, newDynName, newDynMessageSimpleName + endfix, ACC_PUBLIC + ACC_STATIC); cw2.visitInnerClass(newDynMessageFullName + endfix, newDynName, newDynMessageSimpleName + endfix, ACC_PUBLIC + ACC_STATIC);
Set<String> paramnames = new HashSet<>(); Set<String> paramnames = new HashSet<>();
String methodesc = method.getName() + ":" + Type.getMethodDescriptor(method); String methodesc = method.getName() + ":" + Type.getMethodDescriptor(method);
List<String> names = asmParamMap.get(methodesc); List<String> names = asmParamMap == null ? null : asmParamMap.get(methodesc);
if (names != null) while (names.remove(" ")); //删掉空元素 if (names != null) while (names.remove(" ")); //删掉空元素
Parameter[] params = method.getParameters(); Parameter[] params = method.getParameters();
final LinkedHashMap<String, Parameter> paramap = new LinkedHashMap(); //必须使用LinkedHashMap确保顺序 final LinkedHashMap<String, Parameter> paramap = new LinkedHashMap(); //必须使用LinkedHashMap确保顺序

View File

@@ -104,6 +104,8 @@ public abstract class WebSocketNode {
protected abstract CompletableFuture<Void> changeUserid(Serializable fromuserid, Serializable touserid, InetSocketAddress sncpAddr); protected abstract CompletableFuture<Void> changeUserid(Serializable fromuserid, Serializable touserid, InetSocketAddress sncpAddr);
protected abstract CompletableFuture<Boolean> existsWebSocket(Serializable userid, @RpcTargetAddress InetSocketAddress targetAddress);
protected abstract CompletableFuture<Integer> forceCloseWebSocket(Serializable userid, @RpcTargetAddress InetSocketAddress targetAddress); protected abstract CompletableFuture<Integer> forceCloseWebSocket(Serializable userid, @RpcTargetAddress InetSocketAddress targetAddress);
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
@@ -186,23 +188,6 @@ public abstract class WebSocketNode {
}); });
} }
/**
* 判断指定用户是否WebSocket在线
*
* @param userid Serializable
*
* @return boolean
*/
public CompletableFuture<Boolean> existsWebSocket(final Serializable userid) {
if (this.localEngine != null && this.sncpNodeAddresses == null) {
return CompletableFuture.completedFuture(this.localEngine.existsLocalWebSocket(userid));
}
tryAcquireSemaphore();
CompletableFuture<Boolean> rs = this.sncpNodeAddresses.existsAsync(SOURCE_SNCP_USERID_PREFIX + userid);
if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore());
return rs;
}
/** /**
* 获取在线用户总数 * 获取在线用户总数
* *
@@ -219,6 +204,59 @@ public abstract class WebSocketNode {
return rs; return rs;
} }
/**
* @deprecated
*
* 判断指定用户是否WebSocket在线
*
* @param userid Serializable
*
* @return boolean
*/
private CompletableFuture<Boolean> existsWebSocket2(final Serializable userid) {
if (this.localEngine != null && this.sncpNodeAddresses == null) {
return CompletableFuture.completedFuture(this.localEngine.existsLocalWebSocket(userid));
}
tryAcquireSemaphore();
CompletableFuture<Boolean> rs = this.sncpNodeAddresses.existsAsync(SOURCE_SNCP_USERID_PREFIX + userid);
if (semaphore != null) rs.whenComplete((r, e) -> releaseSemaphore());
return rs;
}
/**
* 判断指定用户是否WebSocket在线
*
* @param userid Serializable
*
* @return boolean
*/
@Local
public CompletableFuture<Boolean> existsWebSocket(final Serializable userid) {
CompletableFuture<Boolean> localFuture = null;
if (this.localEngine != null) localFuture = CompletableFuture.completedFuture(localEngine.existsLocalWebSocket(userid));
if (this.sncpNodeAddresses == null || this.remoteNode == null) {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket remote node is null");
//没有CacheSource就不会有分布式节点
return localFuture;
}
//远程节点关闭
tryAcquireSemaphore();
CompletableFuture<Collection<InetSocketAddress>> addrsFuture = sncpNodeAddresses.getCollectionAsync(SOURCE_SNCP_USERID_PREFIX + userid, InetSocketAddress.class);
if (semaphore != null) addrsFuture.whenComplete((r, e) -> releaseSemaphore());
CompletableFuture<Boolean> remoteFuture = addrsFuture.thenCompose((Collection<InetSocketAddress> addrs) -> {
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket found userid:" + userid + " on " + addrs);
if (addrs == null || addrs.isEmpty()) return CompletableFuture.completedFuture(false);
CompletableFuture<Boolean> future = null;
for (InetSocketAddress addr : addrs) {
if (addr == null || addr.equals(localSncpAddress)) continue;
future = future == null ? remoteNode.existsWebSocket(userid, addr)
: future.thenCombine(remoteNode.existsWebSocket(userid, addr), (a, b) -> a | b);
}
return future == null ? CompletableFuture.completedFuture(false) : future;
});
return localFuture == null ? remoteFuture : localFuture.thenCombine(remoteFuture, (a, b) -> a | b);
}
/** /**
* 强制关闭用户WebSocket * 强制关闭用户WebSocket
* *

View File

@@ -133,6 +133,21 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
return future; return future;
} }
/**
* 判断用户是否有WebSocket
*
* @param userid Serializable
* @param targetAddress InetSocketAddress
*
* @return 无返回值
*/
@Override
public CompletableFuture<Boolean> existsWebSocket(Serializable userid, @RpcTargetAddress InetSocketAddress targetAddress) {
if (logger.isLoggable(Level.FINEST)) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + userid + " existsWebSocket from " + targetAddress);
if (localEngine == null) return CompletableFuture.completedFuture(false);
return CompletableFuture.completedFuture(localEngine.existsLocalWebSocket(userid));
}
/** /**
* 强制关闭用户的WebSocket * 强制关闭用户的WebSocket
* *

View File

@@ -82,6 +82,18 @@ public class FilterJoinNode extends FilterNode {
return new FilterJoinNode(joinClass, joinColumns, column, express, itemand, value); return new FilterJoinNode(joinClass, joinColumns, column, express, itemand, value);
} }
@Override
public FilterJoinNode copy() {
FilterJoinNode node = (FilterJoinNode) copy(new FilterJoinNode());
node.joinClass = this.joinClass;
node.joinEntity = this.joinEntity;
if (this.joinColumns != null) {
node.joinColumns = new String[this.joinColumns.length];
System.arraycopy(this.joinColumns, 0, node.joinColumns, 0, this.joinColumns.length);
}
return node;
}
@Override @Override
protected FilterNode any(final FilterNode node0, boolean signor) { protected FilterNode any(final FilterNode node0, boolean signor) {
Objects.requireNonNull(node0); Objects.requireNonNull(node0);

View File

@@ -82,6 +82,26 @@ public class FilterNode { //FilterNode 不能实现Serializable接口 否则
this.value = val; this.value = val;
} }
public FilterNode copy() {
return copy(new FilterNode());
}
protected FilterNode copy(FilterNode node) {
node.readOnly = this.readOnly;
node.column = this.column;
node.express = this.express;
node.value = this.value;
node.itemand = this.itemand;
node.or = this.or;
if (this.nodes != null) {
node.nodes = new FilterNode[this.nodes.length];
for (int i = 0; i < node.nodes.length; i++) {
node.nodes[i] = this.nodes[i] == null ? null : this.nodes[i].copy();
}
}
return node;
}
public FilterNode asReadOnly() { public FilterNode asReadOnly() {
this.readOnly = true; this.readOnly = true;
return this; return this;

View File

@@ -17,7 +17,7 @@ public final class Redkale {
} }
public static String getDotedVersion() { public static String getDotedVersion() {
return "1.9.7"; return "1.9.8";
} }
public static int getMajorVersion() { public static int getMajorVersion() {