This commit is contained in:
Redkale
2017-05-21 21:21:56 +08:00
parent 45fe7cb3e9
commit 05925b4f78
5 changed files with 36 additions and 20 deletions

View File

@@ -36,8 +36,9 @@ import org.redkale.util.Comment;
* 详情见: https://redkale.org * 详情见: https://redkale.org
* *
* @author zhangjx * @author zhangjx
* @param <T> 泛型
*/ */
public abstract class WebSocket { public abstract class WebSocket<T> {
@Comment("消息不合法") @Comment("消息不合法")
public static final int RETCODE_SEND_ILLPACKET = 1 << 1; //2 public static final int RETCODE_SEND_ILLPACKET = 1 << 1; //2
@@ -79,6 +80,8 @@ public abstract class WebSocket {
JsonConvert _jsonConvert; //不可能为空 JsonConvert _jsonConvert; //不可能为空
java.lang.reflect.Type _messageTextType; //不可能为空
private final long createtime = System.currentTimeMillis(); private final long createtime = System.currentTimeMillis();
private Map<String, Object> attributes = new HashMap<>(); //非线程安全 private Map<String, Object> attributes = new HashMap<>(); //非线程安全
@@ -528,17 +531,13 @@ public abstract class WebSocket {
public void onPong(byte[] bytes) { public void onPong(byte[] bytes) {
} }
public java.lang.reflect.Type getTextMessageType() { public void onMessage(T message) {
return String.class;
}
public void onMessage(Object message) {
} }
public void onMessage(byte[] bytes) { public void onMessage(byte[] bytes) {
} }
public void onFragment(Object message, boolean last) { public void onFragment(T message, boolean last) {
} }
public void onFragment(byte[] bytes, boolean last) { public void onFragment(byte[] bytes, boolean last) {

View File

@@ -99,9 +99,9 @@ public final class WebSocketGroup {
CompletableFuture<Integer> future = null; CompletableFuture<Integer> future = null;
for (WebSocket s : list) { for (WebSocket s : list) {
if (future == null) { if (future == null) {
future = s.send(packet); future = s.sendPacket(packet);
} else { } else {
future.thenCombine(s.send(packet), (a, b) -> a | b); future.thenCombine(s.sendPacket(packet), (a, b) -> a | (Integer) b);
} }
} }
return future == null ? CompletableFuture.completedFuture(0) : future; return future == null ? CompletableFuture.completedFuture(0) : future;

View File

@@ -117,7 +117,7 @@ public class WebSocketRunner implements Runnable {
webSocket._group.setRecentWebSocket(webSocket); webSocket._group.setRecentWebSocket(webSocket);
try { try {
if (packet.type == FrameType.TEXT) { if (packet.type == FrameType.TEXT) {
Object message = convert.convertFrom(webSocket.getTextMessageType(), packet.receiveMasker, packet.receiveBuffers); Object message = convert.convertFrom(webSocket._messageTextType, packet.receiveMasker, packet.receiveBuffers);
if (readBuffer != null) { if (readBuffer != null) {
readBuffer.clear(); readBuffer.clear();
channel.read(readBuffer, null, this); channel.read(readBuffer, null, this);

View File

@@ -6,6 +6,7 @@
package org.redkale.net.http; package org.redkale.net.http;
import java.io.*; import java.io.*;
import java.lang.reflect.*;
import java.net.*; import java.net.*;
import java.nio.*; import java.nio.*;
import java.security.*; import java.security.*;
@@ -61,6 +62,27 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
@Resource(name = "$") @Resource(name = "$")
protected WebSocketNode node; protected WebSocketNode node;
protected final Type messageTextType;
protected WebSocketServlet() {
Type msgtype = String.class;
try {
for (Method method : this.getClass().getDeclaredMethods()) {
if (!method.getName().equals("createWebSocket")) continue;
if (method.getParameterCount() > 0) continue;
Type rt = method.getGenericReturnType();
if (rt instanceof ParameterizedType) {
msgtype = ((ParameterizedType) rt).getActualTypeArguments()[0];
}
if (msgtype == Object.class) msgtype = String.class;
break;
}
} catch (Exception e) {
logger.warning(this.getClass().getName() + " not designate text message type on createWebSocket Method");
}
this.messageTextType = msgtype;
}
@Override @Override
final void preInit(HttpContext context, AnyValue conf) { final void preInit(HttpContext context, AnyValue conf) {
InetSocketAddress addr = context.getServerAddress(); InetSocketAddress addr = context.getServerAddress();
@@ -103,6 +125,7 @@ public abstract class WebSocketServlet extends HttpServlet implements Resourcabl
} }
final WebSocket webSocket = this.createWebSocket(); final WebSocket webSocket = this.createWebSocket();
webSocket._engine = this.node.localEngine; webSocket._engine = this.node.localEngine;
webSocket._messageTextType = this.messageTextType;
webSocket._jsonConvert = jsonConvert; webSocket._jsonConvert = jsonConvert;
webSocket._remoteAddress = request.getRemoteAddress(); webSocket._remoteAddress = request.getRemoteAddress();
webSocket._remoteAddr = request.getRemoteAddr(); webSocket._remoteAddr = request.getRemoteAddr();

View File

@@ -9,7 +9,6 @@ import org.redkale.net.http.WebServlet;
import org.redkale.net.http.WebSocketServlet; import org.redkale.net.http.WebSocketServlet;
import org.redkale.net.http.WebSocket; import org.redkale.net.http.WebSocket;
import java.io.*; import java.io.*;
import java.lang.reflect.Type;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
import org.redkale.convert.json.JsonConvert; import org.redkale.convert.json.JsonConvert;
import org.redkale.util.Utility; import org.redkale.util.Utility;
@@ -51,15 +50,14 @@ public class ChatWebSocketServlet extends WebSocketServlet {
} }
@Override @Override
protected WebSocket createWebSocket() { protected WebSocket<ChatMessage> createWebSocket() {
return new WebSocket() { return new WebSocket<ChatMessage>() {
@Override @Override
public void onMessage(Object text) { public void onMessage(ChatMessage message) {
icounter.incrementAndGet(); icounter.incrementAndGet();
counter.incrementAndGet(); counter.incrementAndGet();
ChatMessage message = (ChatMessage) text;//jsonConvert.convertFrom(ChatMessage.class, text.toString());
if (debug) System.out.println("收到消息: " + message); if (debug) System.out.println("收到消息: " + message);
super.getWebSocketGroup().getWebSockets().forEach(x -> x.send(message)); super.getWebSocketGroup().getWebSockets().forEach(x -> x.send(message));
} }
@@ -68,11 +66,7 @@ public class ChatWebSocketServlet extends WebSocketServlet {
protected Serializable createGroupid() { protected Serializable createGroupid() {
return ""; return "";
} }
@Override
public Type getTextMessageType(){
return ChatMessage.class;
}
}; };
} }