This commit is contained in:
Redkale
2017-05-22 22:18:03 +08:00
parent 95c53f99e0
commit a957a18e32
3 changed files with 32 additions and 18 deletions

View File

@@ -71,7 +71,7 @@ public abstract class WebSocket<G extends Serializable, T> {
WebSocketGroup _group; //不可能为空 WebSocketGroup _group; //不可能为空
Serializable _sessionid; //不可能为空 String _sessionid; //不可能为空
G _groupid; //不可能为空 G _groupid; //不可能为空
@@ -83,11 +83,11 @@ public abstract class WebSocket<G extends Serializable, T> {
java.lang.reflect.Type _messageTextType; //不可能为空 java.lang.reflect.Type _messageTextType; //不可能为空
private final long createtime = System.currentTimeMillis(); private long createtime = System.currentTimeMillis();
private Map<String, Object> attributes = new HashMap<>(); //非线程安全 private Map<String, Object> attributes = new HashMap<>(); //非线程安全
protected final long websocketid = Math.abs(System.nanoTime()); //唯一ID protected long websocketid = Math.abs(System.nanoTime()); //唯一ID
protected WebSocket() { protected WebSocket() {
} }
@@ -308,7 +308,7 @@ public abstract class WebSocket<G extends Serializable, T> {
* *
* @return sessionid * @return sessionid
*/ */
public final Serializable getSessionid() { public final String getSessionid() {
return _sessionid; return _sessionid;
} }
@@ -368,7 +368,7 @@ public abstract class WebSocket<G extends Serializable, T> {
* *
* @return sessionid * @return sessionid
*/ */
protected CompletableFuture<Serializable> onOpen(final HttpRequest request) { protected CompletableFuture<String> onOpen(final HttpRequest request) {
return CompletableFuture.completedFuture(request.getSessionid(true)); return CompletableFuture.completedFuture(request.getSessionid(true));
} }

View File

@@ -14,6 +14,7 @@ import java.nio.channels.*;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.logging.*; import java.util.logging.*;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
@@ -25,7 +26,7 @@ import org.redkale.convert.json.JsonConvert;
* *
* @author zhangjx * @author zhangjx
*/ */
public class WebSocketRunner implements Runnable { class WebSocketRunner implements Runnable {
private final WebSocketEngine engine; private final WebSocketEngine engine;
@@ -47,14 +48,17 @@ public class WebSocketRunner implements Runnable {
private final boolean wsbinary; private final boolean wsbinary;
private final BiConsumer<WebSocket, Object> restMessageConsumer; //主要供RestWebSocket使用
protected long lastSendTime; protected long lastSendTime;
protected final JsonConvert convert; protected final JsonConvert convert;
public WebSocketRunner(Context context, WebSocket webSocket, AsyncConnection channel, final boolean wsbinary) { WebSocketRunner(Context context, WebSocket webSocket, BiConsumer<WebSocket, Object> messageConsumer, AsyncConnection channel, final boolean wsbinary) {
this.context = context; this.context = context;
this.engine = webSocket._engine; this.engine = webSocket._engine;
this.webSocket = webSocket; this.webSocket = webSocket;
this.restMessageConsumer = messageConsumer;
this.channel = channel; this.channel = channel;
this.wsbinary = wsbinary; this.wsbinary = wsbinary;
this.readBuffer = context.pollBuffer(); this.readBuffer = context.pollBuffer();
@@ -123,7 +127,11 @@ public class WebSocketRunner implements Runnable {
channel.read(readBuffer, null, this); channel.read(readBuffer, null, this);
} }
try { try {
webSocket.onMessage(message, packet.last); if (restMessageConsumer != null) { //主要供RestWebSocket使用
restMessageConsumer.accept(webSocket, message);
} else {
webSocket.onMessage(message, packet.last);
}
} catch (Exception e) { } catch (Exception e) {
context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e); context.getLogger().log(Level.SEVERE, "WebSocket onTextMessage error (" + packet + ")", e);
} }

View File

@@ -12,6 +12,7 @@ import java.nio.*;
import java.security.*; import java.security.*;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.logging.*; import java.util.logging.*;
import javax.annotation.*; import javax.annotation.*;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
@@ -57,14 +58,16 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
@Comment("是否用于二进制流传输") @Comment("是否用于二进制流传输")
protected final boolean wsbinary = getClass().getAnnotation(WebSocketBinary.class) != null; protected final boolean wsbinary = getClass().getAnnotation(WebSocketBinary.class) != null;
private final BiConsumer<WebSocket, Object> restMessageConsumer = createRestOnMessageConsumer();
Type messageTextType; //RestWebSocket时会被修改
@Resource @Resource
protected JsonConvert jsonConvert; protected JsonConvert jsonConvert;
@Resource(name = "$") @Resource(name = "$")
protected WebSocketNode node; protected WebSocketNode node;
protected final Type messageTextType;
protected WebSocketServlet() { protected WebSocketServlet() {
Type msgtype = String.class; Type msgtype = String.class;
try { try {
@@ -73,7 +76,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
if (method.getParameterCount() > 0) continue; if (method.getParameterCount() > 0) continue;
Type rt = method.getGenericReturnType(); Type rt = method.getGenericReturnType();
if (rt instanceof ParameterizedType) { if (rt instanceof ParameterizedType) {
msgtype = ((ParameterizedType) rt).getActualTypeArguments()[0]; msgtype = ((ParameterizedType) rt).getActualTypeArguments()[1];
} }
if (msgtype == Object.class) msgtype = String.class; if (msgtype == Object.class) msgtype = String.class;
break; break;
@@ -130,8 +133,8 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
webSocket._jsonConvert = jsonConvert; webSocket._jsonConvert = jsonConvert;
webSocket._remoteAddress = request.getRemoteAddress(); webSocket._remoteAddress = request.getRemoteAddress();
webSocket._remoteAddr = request.getRemoteAddr(); webSocket._remoteAddr = request.getRemoteAddr();
initWebSocket(webSocket); initRestWebSocket(webSocket);
CompletableFuture<Serializable> sessionFuture = webSocket.onOpen(request); CompletableFuture<String> sessionFuture = webSocket.onOpen(request);
if (sessionFuture == null) { if (sessionFuture == null) {
if (debug) logger.finest("WebSocket connect abort, Not found sessionid. request=" + request); if (debug) logger.finest("WebSocket connect abort, Not found sessionid. request=" + request);
response.finish(true); response.finish(true);
@@ -172,7 +175,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
} }
webSocket._groupid = groupid; webSocket._groupid = groupid;
WebSocketServlet.this.node.localEngine.add(webSocket); WebSocketServlet.this.node.localEngine.add(webSocket);
WebSocketRunner runner = new WebSocketRunner(context, webSocket, response.removeChannel(), wsbinary); WebSocketRunner runner = new WebSocketRunner(context, webSocket, restMessageConsumer, response.removeChannel(), wsbinary);
webSocket._runner = runner; webSocket._runner = runner;
context.runAsync(runner); context.runAsync(runner);
response.finish(true); response.finish(true);
@@ -188,15 +191,18 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
}); });
} }
protected void initWebSocket(WebSocket websocket) { protected abstract <G extends Serializable, T> WebSocket<G, T> createWebSocket();
}
protected WebSocketNode createWebSocketNode() { protected WebSocketNode createWebSocketNode() {
return null; return null;
} }
protected abstract <G extends Serializable, T> WebSocket<G, T> createWebSocket(); protected void initRestWebSocket(WebSocket websocket) { //RestWebSocket设置@Resource资源
}
protected BiConsumer<WebSocket, Object> createRestOnMessageConsumer() {
return null;
}
private static MessageDigest getMessageDigest() { private static MessageDigest getMessageDigest() {
try { try {