From 5b85343b4eb869ae911aae57a4a2abfc74155ff5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=9C=B0=E5=B9=B3=E7=BA=BF?= <22250530@qq.com> Date: Mon, 10 Aug 2015 14:06:15 +0800 Subject: [PATCH] --- src/com/wentch/redkale/boot/Application.java | 17 +- .../wentch/redkale/boot/NodeSncpServer.java | 7 +- .../wentch/redkale/net/AsyncConnection.java | 41 +-- .../redkale/net/AsyncPooledConnection.java | 50 ---- .../wentch/redkale/net/ProtocolServer.java | 2 +- src/com/wentch/redkale/net/SSLBuilder.java | 160 +++++++++++ src/com/wentch/redkale/net/Transport.java | 68 ++++- .../redkale/net/http/WebSocketNode.java | 59 +++- .../redkale/net/http/WebSocketRunner.java | 2 +- src/com/wentch/redkale/net/sncp/Sncp.java | 2 +- .../wentch/redkale/net/sncp/SncpClient.java | 53 +++- .../redkale/net/sncp/SncpParameter.java | 23 ++ .../redkale/service/WebSocketNodeService.java | 13 +- .../service/WebSocketNodeService2.java | 272 ++++++++++++++++++ 14 files changed, 653 insertions(+), 116 deletions(-) delete mode 100644 src/com/wentch/redkale/net/AsyncPooledConnection.java create mode 100644 src/com/wentch/redkale/net/SSLBuilder.java create mode 100644 src/com/wentch/redkale/net/sncp/SncpParameter.java create mode 100644 src/com/wentch/redkale/service/WebSocketNodeService2.java diff --git a/src/com/wentch/redkale/boot/Application.java b/src/com/wentch/redkale/boot/Application.java index cfbf954f0..5c9efdfeb 100644 --- a/src/com/wentch/redkale/boot/Application.java +++ b/src/com/wentch/redkale/boot/Application.java @@ -16,6 +16,7 @@ import com.wentch.redkale.util.*; import com.wentch.redkale.util.AnyValue.DefaultAnyValue; import com.wentch.redkale.watch.*; import java.io.*; +import java.lang.reflect.*; import java.net.*; import java.nio.*; import java.nio.channels.*; @@ -50,6 +51,18 @@ public final class Application { //application.xml 文件中resources节点的内容, 类型: AnyValue public static final String RESNAME_GRES = "APP_GRES"; + //当前SNCP Server的IP地址+端口 类型: SocketAddress、InetSocketAddress + public static final String RESNAME_SNCP_ADDRESS = "SNCP_ADDRESS"; + + //当前SNCP Server的IP地址+端口集合 类型: Map、HashMap + public static final String RESNAME_SNCP_NODES = "SNCP_NODES"; + + private static final Type NODES1TYPE = new TypeToken>() { + }.getType(); + + private static final Type NODES2TYPE = new TypeToken>() { + }.getType(); + protected final ResourceFactory factory = ResourceFactory.root(); protected final WatchFactory watch = WatchFactory.root(); @@ -348,6 +361,8 @@ public final class Application { if (oldgroup != null && !((sncpconf.getValue("group", "") + ";").contains(oldgroup + ";"))) throw new RuntimeException(addr + " has one more group " + (addrGroups.get(addr))); if (oldgroup == null) addrGroups.put(addr, ""); } + factory.register(RESNAME_SNCP_NODES, NODES1TYPE, new HashMap<>(addrGroups)); + factory.register(RESNAME_SNCP_NODES, NODES2TYPE, new HashMap<>(addrGroups)); runServers(timecd, sncps); //确保sncp都启动后再启动其他协议 runServers(timecd, others); timecd.await(); @@ -427,7 +442,7 @@ public final class Application { } } } -//------------------------------------------------------------------------ + //------------------------------------------------------------------------ AnyValue websocketnodeConf = resources.getAnyValue("websocketnode"); if (websocketnodeConf != null) { String val = websocketnodeConf.getValue("service", ""); diff --git a/src/com/wentch/redkale/boot/NodeSncpServer.java b/src/com/wentch/redkale/boot/NodeSncpServer.java index b4fbbed0b..bfb5cbf6d 100644 --- a/src/com/wentch/redkale/boot/NodeSncpServer.java +++ b/src/com/wentch/redkale/boot/NodeSncpServer.java @@ -5,11 +5,12 @@ */ package com.wentch.redkale.boot; +import static com.wentch.redkale.boot.Application.RESNAME_SNCP_ADDRESS; import com.wentch.redkale.net.sncp.*; import com.wentch.redkale.util.AnyValue; import com.wentch.redkale.service.Service; import java.io.*; -import java.net.InetSocketAddress; +import java.net.*; import java.util.concurrent.CountDownLatch; import java.util.logging.*; @@ -30,6 +31,8 @@ public final class NodeSncpServer extends NodeServer { this.servaddr = addr; this.nodeGroup = application.addrGroups.getOrDefault(addr, ""); this.consumer = server == null ? null : x -> server.addService(x); + this.factory.register(RESNAME_SNCP_ADDRESS, SocketAddress.class, this.servaddr); + this.factory.register(RESNAME_SNCP_ADDRESS, InetSocketAddress.class, this.servaddr); } @Override @@ -46,7 +49,7 @@ public final class NodeSncpServer extends NodeServer { logger.info(this.getClass().getSimpleName() + " load filter class in " + e + " ms"); loadService(config.getAnyValue("services"), serviceFilter); //必须在servlet之前 //------------------------------------------------------------------- - if(server == null) return; //调试时server才可能为null + if (server == null) return; //调试时server才可能为null final StringBuilder sb = logger.isLoggable(Level.FINE) ? new StringBuilder() : null; final String threadName = "[" + Thread.currentThread().getName() + "] "; for (SncpServlet en : server.getSncpServlets()) { diff --git a/src/com/wentch/redkale/net/AsyncConnection.java b/src/com/wentch/redkale/net/AsyncConnection.java index f93f027a3..16fc4e9bd 100644 --- a/src/com/wentch/redkale/net/AsyncConnection.java +++ b/src/com/wentch/redkale/net/AsyncConnection.java @@ -17,7 +17,7 @@ import java.util.concurrent.*; */ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCloseable { - protected AsyncPooledConnection pooledConnection; + public abstract boolean isTCP(); public abstract SocketAddress getRemoteAddress(); @@ -60,7 +60,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl } catch (Exception e) { throw new IOException("AsyncConnection connect " + address, e); } - return create(channel, readTimeoutSecond0, writeTimeoutSecond0); + return create(channel, address, readTimeoutSecond0, writeTimeoutSecond0); } else if ("UDP".equalsIgnoreCase(protocol)) { AsyncDatagramChannel channel = AsyncDatagramChannel.open(null); channel.connect(address); @@ -152,11 +152,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl @Override public final void close() throws IOException { if (client) { - if (pooledConnection == null) { - channel.close(); - } else { - pooledConnection.fireConnectionClosed(); - } + channel.close(); } } @@ -173,14 +169,18 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl return channel.isOpen(); } + @Override + public final boolean isTCP() { + return false; + } }; } public static AsyncConnection create(final AsynchronousSocketChannel ch) { - return create(ch, 0, 0); + return create(ch, null, 0, 0); } - public static AsyncConnection create(final AsynchronousSocketChannel ch, final int readTimeoutSecond0, final int writeTimeoutSecond0) { + public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final int readTimeoutSecond0, final int writeTimeoutSecond0) { return new AsyncConnection() { private int readTimeoutSecond; @@ -194,11 +194,13 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl this.channel = ch; this.readTimeoutSecond = readTimeoutSecond0; this.writeTimeoutSecond = writeTimeoutSecond0; - SocketAddress addr = null; - try { - addr = ch.getRemoteAddress(); - } catch (Exception e) { - //do nothing + SocketAddress addr = addr0; + if (addr == null) { + try { + addr = ch.getRemoteAddress(); + } catch (Exception e) { + //do nothing + } } this.remoteAddress = addr; } @@ -276,11 +278,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl @Override public final void close() throws IOException { - if (pooledConnection == null) { - channel.close(); - } else { - pooledConnection.fireConnectionClosed(); - } + channel.close(); } @Override @@ -288,6 +286,11 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl return channel.isOpen(); } + @Override + public final boolean isTCP() { + return true; + } + @Override public void dispose() { try { diff --git a/src/com/wentch/redkale/net/AsyncPooledConnection.java b/src/com/wentch/redkale/net/AsyncPooledConnection.java deleted file mode 100644 index 3daeedb30..000000000 --- a/src/com/wentch/redkale/net/AsyncPooledConnection.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package com.wentch.redkale.net; - -import java.io.IOException; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicLong; - -/** - * - * @author zhangjx - */ -public class AsyncPooledConnection implements AutoCloseable { - - private final Queue queue; - - private final AtomicLong usingCounter; - - private final AsyncConnection conn; - - public AsyncPooledConnection(Queue queue, AtomicLong usingCounter, AsyncConnection conn) { - this.conn = conn; - this.usingCounter = usingCounter; - this.queue = queue; - } - - public AsyncConnection getAsyncConnection() { - return conn; - } - - public void fireConnectionClosed() { - this.queue.add(this); - } - - @Override - public void close() throws IOException { - usingCounter.decrementAndGet(); - conn.close(); - } - - public void dispose() { - try { - this.close(); - } catch (IOException io) { - } - } -} diff --git a/src/com/wentch/redkale/net/ProtocolServer.java b/src/com/wentch/redkale/net/ProtocolServer.java index 51d6e9eca..e6aaa001d 100644 --- a/src/com/wentch/redkale/net/ProtocolServer.java +++ b/src/com/wentch/redkale/net/ProtocolServer.java @@ -134,7 +134,7 @@ public abstract class ProtocolServer { @Override public void completed(final AsynchronousSocketChannel channel, Void attachment) { serchannel.accept(null, this); - context.submit(new PrepareRunner(context, AsyncConnection.create(channel, context.readTimeoutSecond, context.writeTimeoutSecond), null)); + context.submit(new PrepareRunner(context, AsyncConnection.create(channel, null, context.readTimeoutSecond, context.writeTimeoutSecond), null)); } @Override diff --git a/src/com/wentch/redkale/net/SSLBuilder.java b/src/com/wentch/redkale/net/SSLBuilder.java new file mode 100644 index 000000000..a1cf9ee69 --- /dev/null +++ b/src/com/wentch/redkale/net/SSLBuilder.java @@ -0,0 +1,160 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package com.wentch.redkale.net; + +import java.nio.*; +import java.security.*; +import javax.net.ssl.*; + +/** + * + * @author zhangjx + */ +public class SSLBuilder { + + private static SSLContext sslContext; + + static { + try { + char[] keypasswd = new char[32]; + final KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); + keyStore.load(null, keypasswd); + final String algorithm = System.getProperty("ssl.algorithm", KeyManagerFactory.getDefaultAlgorithm()); + final KeyManagerFactory kmf = KeyManagerFactory.getInstance(algorithm); + kmf.init(keyStore, keypasswd); + SSLContext sslContext0 = SSLContext.getInstance("TLS"); + sslContext0.init(kmf.getKeyManagers(), null, new SecureRandom()); + sslContext = sslContext0; + } catch (Exception e) { + throw new Error(e); + } + } + + private final SSLEngine sslEngine; + + private int appBufferSize; + + private int netBufferSize; + + public SSLBuilder() { + sslEngine = sslContext.createSSLEngine(); + //sslEngine.setEnabledCipherSuites(null); + //sslEngine.setEnabledProtocols(null); + + sslEngine.setUseClientMode(false); + sslEngine.setWantClientAuth(false); + sslEngine.setNeedClientAuth(false); + //--------------------------- + updateBufferSizes(); + } + + private void updateBufferSizes() { + final SSLSession session = sslEngine.getSession(); + appBufferSize = session.getApplicationBufferSize(); + netBufferSize = session.getPacketBufferSize(); + } + + public static void main(String[] args) throws Exception { + + } + + private static int getSSLPacketSize(final ByteBuffer buf) throws SSLException { + + /* + * SSLv2 length field is in bytes 0/1 + * SSLv3/TLS length field is in bytes 3/4 + */ + if (buf.remaining() < 5) return -1; + + final byte byte0; + final byte byte1; + final byte byte2; + final byte byte3; + final byte byte4; + + if (buf.hasArray()) { + final byte[] array = buf.array(); + int pos = buf.arrayOffset() + buf.position(); + byte0 = array[pos++]; + byte1 = array[pos++]; + byte2 = array[pos++]; + byte3 = array[pos++]; + byte4 = array[pos]; + } else { + int pos = buf.position(); + byte0 = buf.get(pos++); + byte1 = buf.get(pos++); + byte2 = buf.get(pos++); + byte3 = buf.get(pos++); + byte4 = buf.get(pos); + } + + int len; + + /* + * If we have already verified previous packets, we can + * ignore the verifications steps, and jump right to the + * determination. Otherwise, try one last hueristic to + * see if it's SSL/TLS. + */ + if (byte0 >= 20 && byte0 <= 23) { + /* + * Last sanity check that it's not a wild record + */ + final byte major = byte1; + final byte minor = byte2; + final int v = (major << 8) | minor & 0xff; + + // Check if too old (currently not possible) + // or if the major version does not match. + // The actual version negotiation is in the handshaker classes + if ((v < 0x0300) || (major > 0x03)) { + throw new SSLException("Unsupported record version major=" + major + " minor=" + minor); + } + + /* + * One of the SSLv3/TLS message types. + */ + len = ((byte3 & 0xff) << 8) + (byte4 & 0xff) + 5; // SSLv3 record header + + } else { + /* + * Must be SSLv2 or something unknown. + * Check if it's short (2 bytes) or + * long (3) header. + * + * Internals can warn about unsupported SSLv2 + */ + boolean isShort = ((byte0 & 0x80) != 0); + + if (isShort && ((byte2 == 1) || byte2 == 4)) { + + final byte major = byte3; + final byte minor = byte4; + final int v = (major << 8) | minor & 0xff; + + // Check if too old (currently not possible) + // or if the major version does not match. + // The actual version negotiation is in the handshaker classes + if ((v < 0x0300) || (major > 0x03)) { + // if it's not SSLv2, we're out of here. + if (v != 0x0002) throw new SSLException("Unsupported record version major=" + major + " minor=" + minor); + } + + /* + * Client or Server Hello + */ + int mask = 0x7f; + len = ((byte0 & mask) << 8) + (byte1 & 0xff) + (2); + } else { + // Gobblygook! + throw new SSLException("Unrecognized SSL message, plaintext connection?"); + } + } + + return len; + } +} diff --git a/src/com/wentch/redkale/net/Transport.java b/src/com/wentch/redkale/net/Transport.java index 51dbc9df1..17fbd2e86 100644 --- a/src/com/wentch/redkale/net/Transport.java +++ b/src/com/wentch/redkale/net/Transport.java @@ -7,7 +7,6 @@ package com.wentch.redkale.net; import com.wentch.redkale.util.*; import com.wentch.redkale.watch.*; -import java.io.*; import java.net.*; import java.nio.*; import java.nio.channels.*; @@ -22,6 +21,8 @@ import java.util.concurrent.atomic.*; */ public final class Transport { + protected static final int MAX_POOL_LIMIT = 16; + protected final String name; protected final String protocol; @@ -34,6 +35,8 @@ public final class Transport { protected final AtomicInteger index = new AtomicInteger(); + protected final ConcurrentHashMap> connPool = new ConcurrentHashMap<>(); + public Transport(String name, String protocol, WatchFactory watch, int bufferPoolSize, Collection addresses) { this.name = name; this.protocol = protocol; @@ -63,6 +66,10 @@ public final class Transport { this.remoteAddres = addresses.toArray(new InetSocketAddress[addresses.size()]); } + public void close() { + connPool.forEach((k, v) -> v.forEach(c -> c.dispose())); + } + public boolean match(Collection addrs) { if (addrs == null) return false; if (addrs.size() != this.remoteAddres.length) return false; @@ -89,14 +96,39 @@ public final class Transport { for (ByteBuffer buffer : buffers) offerBuffer(buffer); } - public AsyncConnection pollConnection() { - int i = index.get(); - SocketAddress addr = remoteAddres[i]; + public AsyncConnection pollConnection(SocketAddress addr) { + final boolean rand = addr == null; try { if ("TCP".equalsIgnoreCase(protocol)) { - AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group); - channel.connect(addr).get(2, TimeUnit.SECONDS); - return AsyncConnection.create(channel, 0, 0); + AsynchronousSocketChannel channel = null; + if (rand) { + int p = 0; + for (int i = index.get(); i < remoteAddres.length; i++) { + p = i; + addr = remoteAddres[i]; + BlockingQueue queue = connPool.get(addr); + if (queue != null && queue.isEmpty()) { + AsyncConnection conn = queue.poll(); + if (conn.isOpen()) return conn; + } + if (channel == null) channel = AsynchronousSocketChannel.open(group); + try { + channel.connect(addr).get(1, TimeUnit.SECONDS); + break; + } catch (Exception iex) { + if (i == remoteAddres.length - 1) { + p = 0; + channel = null; + } + } + } + index.set(p); + } else { + channel = AsynchronousSocketChannel.open(group); + channel.connect(addr).get(2, TimeUnit.SECONDS); + } + if (channel == null) return null; + return AsyncConnection.create(channel, addr, 0, 0); } else { AsyncDatagramChannel channel = AsyncDatagramChannel.open(group); channel.connect(addr); @@ -108,14 +140,22 @@ public final class Transport { } public void offerConnection(AsyncConnection conn) { - try { - conn.close(); - } catch (IOException io) { + if (conn.isTCP()) { + if (conn.isOpen()) { + BlockingQueue queue = connPool.get(conn.getRemoteAddress()); + if (queue == null) { + queue = new ArrayBlockingQueue<>(MAX_POOL_LIMIT); + connPool.put(conn.getRemoteAddress(), queue); + } + if (!queue.offer(conn)) conn.dispose(); + } + } else { + conn.dispose(); } } - public void async(final ByteBuffer buffer, A att, final CompletionHandler handler) { - final AsyncConnection conn = pollConnection(); + public void async(SocketAddress addr, final ByteBuffer buffer, A att, final CompletionHandler handler) { + final AsyncConnection conn = pollConnection(addr); conn.write(buffer, buffer, new CompletionHandler() { @Override @@ -147,8 +187,8 @@ public final class Transport { }); } - public ByteBuffer send(ByteBuffer buffer) { - AsyncConnection conn = pollConnection(); + public ByteBuffer send(SocketAddress addr, ByteBuffer buffer) { + AsyncConnection conn = pollConnection(addr); final int readto = conn.getReadTimeoutSecond(); final int writeto = conn.getWriteTimeoutSecond(); try { diff --git a/src/com/wentch/redkale/net/http/WebSocketNode.java b/src/com/wentch/redkale/net/http/WebSocketNode.java index 52e3a3f6d..7c931b365 100644 --- a/src/com/wentch/redkale/net/http/WebSocketNode.java +++ b/src/com/wentch/redkale/net/http/WebSocketNode.java @@ -5,6 +5,7 @@ */ package com.wentch.redkale.net.http; +import com.wentch.redkale.net.sncp.*; import com.wentch.redkale.util.*; import java.io.*; import java.net.*; @@ -44,12 +45,21 @@ public abstract class WebSocketNode { public void init(AnyValue conf) { if (remoteNode != null) { - try { - Map> map = remoteNode.getDataNodes(); - if (map != null) dataNodes.putAll(map); - } catch (Exception e) { - logger.log(Level.INFO, WebSocketNode.class.getSimpleName() + "(" + this.localSncpAddress + ") not load data nodes ", e); - } + new Thread() { + { + setDaemon(true); + } + + @Override + public void run() { + try { + Map> map = remoteNode.getDataNodes(); + if (map != null) dataNodes.putAll(map); + } catch (Exception e) { + logger.log(Level.INFO, WebSocketNode.class.getSimpleName() + "(" + localSncpAddress + ") not load data nodes ", e); + } + } + }.start(); } } @@ -66,7 +76,7 @@ public abstract class WebSocketNode { return dataNodes; } - protected abstract int sendMessage(Serializable groupid, boolean recent, Serializable message, boolean last); + protected abstract int sendMessage(@SncpParameter InetSocketAddress addr, Serializable groupid, boolean recent, Serializable message, boolean last); protected abstract void connect(Serializable groupid, InetSocketAddress addr); @@ -74,7 +84,7 @@ public abstract class WebSocketNode { //-------------------------------------------------------------------------------- public final void connect(Serializable groupid, String engineid) { - if (finest) logger.finest(localSncpAddress +" receive websocket connect event (" + groupid + " on " + engineid + ")."); + if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + groupid + " on " + engineid + ")."); Set engineids = localNodes.get(groupid); if (engineids == null) { engineids = new CopyOnWriteArraySet<>(); @@ -85,7 +95,7 @@ public abstract class WebSocketNode { } public final void disconnect(Serializable groupid, String engineid) { - if (finest) logger.finest(localSncpAddress +" receive websocket disconnect event (" + groupid + " on " + engineid + ")."); + if (finest) logger.finest(localSncpAddress + " receive websocket disconnect event (" + groupid + " on " + engineid + ")."); Set engineids = localNodes.get(groupid); if (engineids == null || engineids.isEmpty()) return; engineids.remove(engineid); @@ -107,6 +117,37 @@ public abstract class WebSocketNode { engines.put(engine.getEngineid(), engine); } + public final int sendMessage(Serializable groupid, boolean recent, Serializable message, boolean last) { + final Set engineids = localNodes.get(groupid); + int rscode = 0; + if (engineids != null && !engineids.isEmpty()) { + for (String engineid : engineids) { + final WebSocketEngine engine = engines.get(engineid); + if (engine != null) { //在本地 + final WebSocketGroup group = engine.getWebSocketGroup(groupid); + if (group == null || group.isEmpty()) { + if (finest) logger.finest("receive websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + message + "'} but result is " + RETCODE_GROUP_EMPTY); + rscode = RETCODE_GROUP_EMPTY; + break; + } + group.send(recent, message, last); + } + } + } + if ((recent && rscode == 0) || remoteNode == null) return rscode; + Set addrs = dataNodes.get(groupid); + if (addrs != null && !addrs.isEmpty()) { //对方连接在远程节点 + for (InetSocketAddress addr : addrs) { + if (!addr.equals(localSncpAddress)) { + remoteNode.sendMessage(addr, groupid, recent, message, last); + } + } + } else { + rscode = RETCODE_GROUP_EMPTY; + } + return rscode; + } + //-------------------------------------------------------------------------------- public final int sendMessage(Serializable groupid, String text) { return sendMessage(groupid, false, text); diff --git a/src/com/wentch/redkale/net/http/WebSocketRunner.java b/src/com/wentch/redkale/net/http/WebSocketRunner.java index 2ac86f7ad..f5f66b62d 100644 --- a/src/com/wentch/redkale/net/http/WebSocketRunner.java +++ b/src/com/wentch/redkale/net/http/WebSocketRunner.java @@ -52,7 +52,7 @@ public class WebSocketRunner implements Runnable { this.wsbinary = wsbinary; webSocket.runner = this; this.coder.logger = context.getLogger(); - this.coder.debugable = context.getLogger().isLoggable(Level.FINEST); + this.coder.debugable = false;//context.getLogger().isLoggable(Level.FINEST); this.readBuffer = context.pollBuffer(); this.writeBuffer = context.pollBuffer(); } diff --git a/src/com/wentch/redkale/net/sncp/Sncp.java b/src/com/wentch/redkale/net/sncp/Sncp.java index 44ee6f4dd..c49099bf3 100644 --- a/src/com/wentch/redkale/net/sncp/Sncp.java +++ b/src/com/wentch/redkale/net/sncp/Sncp.java @@ -23,7 +23,7 @@ import jdk.internal.org.objectweb.asm.Type; */ public abstract class Sncp { - public static final String DEFAULT_PROTOCOL = "UDP"; + public static final String DEFAULT_PROTOCOL = "TCP"; static final String LOCALPREFIX = "_DynLocal"; diff --git a/src/com/wentch/redkale/net/sncp/SncpClient.java b/src/com/wentch/redkale/net/sncp/SncpClient.java index 432d2ca8c..42a406477 100644 --- a/src/com/wentch/redkale/net/sncp/SncpClient.java +++ b/src/com/wentch/redkale/net/sncp/SncpClient.java @@ -9,6 +9,7 @@ import com.wentch.redkale.convert.bson.*; import com.wentch.redkale.net.*; import static com.wentch.redkale.net.sncp.SncpRequest.HEADER_SIZE; import com.wentch.redkale.util.*; +import java.lang.annotation.*; import java.lang.reflect.*; import java.net.*; import java.nio.*; @@ -36,7 +37,7 @@ public final class SncpClient { protected final Type[] paramTypes; - protected final boolean async; + protected final int addressParamIndex; public SncpAction(Method method, DLong actionid) { this.actionid = actionid; @@ -48,7 +49,22 @@ public final class SncpClient { this.resultTypes = rt == void.class ? null : rt; this.paramTypes = method.getGenericParameterTypes(); this.method = method; - this.async = false;// method.getReturnType() == void.class && method.getAnnotation(Async.class) != null; + Annotation[][] anns = method.getParameterAnnotations(); + int addrIndex = -1; + if (anns.length > 0) { + Class[] params = method.getParameterTypes(); + for (int i = 0; i < anns.length; i++) { + if (anns[i].length > 0) { + for (Annotation ann : anns[i]) { + if (ann.annotationType() == SncpParameter.class && SocketAddress.class.isAssignableFrom(params[i])) { + addrIndex = i; + break; + } + } + } + } + } + this.addressParamIndex = addrIndex; } @Override @@ -73,6 +89,8 @@ public final class SncpClient { protected final SncpAction[] actions; + protected final BlockingQueue queue = new ArrayBlockingQueue(1024 * 64); + public SncpClient(final String serviceName, final long serviceid0, boolean remote, final Class serviceClass, boolean onlySncpDyn, final InetSocketAddress clientAddress) { if (serviceName.length() > 10) throw new RuntimeException(serviceClass + " @Resource name(" + serviceName + ") too long , must less 11"); this.remote = remote; @@ -91,6 +109,24 @@ public final class SncpClient { this.addrBytes = clientAddress == null ? new byte[4] : clientAddress.getAddress().getAddress(); this.addrPort = clientAddress == null ? 0 : clientAddress.getPort(); logger.fine("[" + Thread.currentThread().getName() + "] Load " + this); + new Thread() { + { + setName(SncpClient.class.getSimpleName() + serviceClass.getSimpleName() + "-" + serviceName + "-Thread"); + setDaemon(true); + } + + @Override + public void run() { + while (true) { + try { + Runnable runner = queue.take(); + runner.run(); + } catch (Exception e) { + logger.log(Level.SEVERE, SncpClient.class.getSimpleName() + " runnable occur error", e); + } + } + } + }.start(); } @Override @@ -165,14 +201,7 @@ public final class SncpClient { } private void submit(Runnable runner) { - Thread thread = Thread.currentThread(); - if (false && thread instanceof WorkThread) { //有待验证为什么WorkThread 不工作 - ((WorkThread) thread).submit(runner); - return; - } - Thread t = new Thread(runner); - t.setPriority(Thread.MAX_PRIORITY); - t.start(); + if (!queue.offer(runner)) runner.run(); } private byte[] send(final BsonConvert convert, Transport transport, final SncpAction action, Object... params) { @@ -185,7 +214,7 @@ public final class SncpClient { } final long seqid = System.nanoTime(); final DLong actionid = action.actionid; - final AsyncConnection conn = transport.pollConnection(); + final AsyncConnection conn = transport.pollConnection(action.addressParamIndex >= 0 ? (SocketAddress) params[action.addressParamIndex] : null); if (conn == null || !conn.isOpen()) return null; final ByteBuffer buffer = transport.pollBuffer(); final int readto = conn.getReadTimeoutSecond(); @@ -291,7 +320,7 @@ public final class SncpClient { } catch (RuntimeException ex) { throw ex; } catch (Exception e) { - throw new RuntimeException(e); + throw new RuntimeException(conn.getRemoteAddress() + " connect failed.", e); } finally { transport.offerBuffer(buffer); transport.offerConnection(conn); diff --git a/src/com/wentch/redkale/net/sncp/SncpParameter.java b/src/com/wentch/redkale/net/sncp/SncpParameter.java new file mode 100644 index 000000000..5c7224f16 --- /dev/null +++ b/src/com/wentch/redkale/net/sncp/SncpParameter.java @@ -0,0 +1,23 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package com.wentch.redkale.net.sncp; + +import java.lang.annotation.*; +import static java.lang.annotation.ElementType.*; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +/** + * + * + * @author zhangjx + */ +@Inherited +@Documented +@Target({PARAMETER}) +@Retention(RUNTIME) +public @interface SncpParameter { + +} diff --git a/src/com/wentch/redkale/service/WebSocketNodeService.java b/src/com/wentch/redkale/service/WebSocketNodeService.java index 982f72eb9..8f4a67687 100644 --- a/src/com/wentch/redkale/service/WebSocketNodeService.java +++ b/src/com/wentch/redkale/service/WebSocketNodeService.java @@ -6,6 +6,7 @@ package com.wentch.redkale.service; import com.wentch.redkale.net.http.*; +import com.wentch.redkale.net.sncp.*; import com.wentch.redkale.util.*; import java.io.*; import java.net.*; @@ -30,9 +31,10 @@ public class WebSocketNodeService extends WebSocketNode implements Service { } @Override - public int sendMessage(Serializable groupid, boolean recent, Serializable message, boolean last) { + public int sendMessage(@SncpParameter InetSocketAddress addr, Serializable groupid, boolean recent, Serializable message, boolean last) { final Set engineids = localNodes.get(groupid); if (engineids == null || engineids.isEmpty()) return RETCODE_GROUP_EMPTY; + int code = RETCODE_GROUP_EMPTY; for (String engineid : engineids) { final WebSocketEngine engine = engines.get(engineid); if (engine != null) { //在本地 @@ -42,11 +44,10 @@ public class WebSocketNodeService extends WebSocketNode implements Service { return RETCODE_GROUP_EMPTY; } group.send(recent, message, last); - } else { //对方连接在远程节点 - return RETCODE_WSOFFLINE; } + code = 0; } - return 0; + return code; } @Override @@ -58,7 +59,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service { dataNodes.put(groupid, addrs); } addrs.add(addr); - if(finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid +" connect from " + addr); + if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " connect from " + addr); } @Override @@ -68,6 +69,6 @@ public class WebSocketNodeService extends WebSocketNode implements Service { if (addrs == null) return; addrs.remove(addr); if (addrs.isEmpty()) dataNodes.remove(groupid); - if(finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid +" disconnect from " + addr); + if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " disconnect from " + addr); } } diff --git a/src/com/wentch/redkale/service/WebSocketNodeService2.java b/src/com/wentch/redkale/service/WebSocketNodeService2.java new file mode 100644 index 000000000..b5b105822 --- /dev/null +++ b/src/com/wentch/redkale/service/WebSocketNodeService2.java @@ -0,0 +1,272 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package com.wentch.redkale.service; + +import com.wentch.redkale.net.http.*; +import com.wentch.redkale.util.*; +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.logging.*; +import javax.annotation.*; + +/** + * + * @author zhangjx + */ +@AutoLoad(false) +public class WebSocketNodeService2 implements Service { + + public static final int RETCODE_ENGINE_NULL = 5001; + + public static final int RETCODE_NODESERVICE_NULL = 5002; + + public static final int RETCODE_GROUP_EMPTY = 5005; + + public static final int RETCODE_WSOFFLINE = 5011; + + protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + + protected final boolean finest = logger.isLoggable(Level.FINEST); + + @Resource(name = "APP_NODE") + protected String localNodeName = ""; + + @Resource + protected HashMap nodemaps; + + //用户分布在节点上的队列信息,只保存远程节点的用户分布信息 + protected final ConcurrentHashMap> usernodes = new ConcurrentHashMap(); + + protected final ConcurrentHashMap engines = new ConcurrentHashMap(); + + public void initUserNodes() { + if (this.nodemaps == null || this.nodemaps.isEmpty()) return; + new Thread() { + { + setDaemon(true); + } + + @Override + public void run() { + usernodes.putAll(queryNodes()); + } + }.start(); + } + + public final void addWebSocketEngine(WebSocketEngine engine) { + engines.put(engine.getEngineid(), engine); + } + + ////@RemoteOn + public Map> queryNodes() { + Map> rs = new HashMap<>(); + this.nodemaps.forEach((x, y) -> { + if (!rs.isEmpty()) return; + try { + rs.putAll(y.queryNodes()); + } catch (Exception e) { + logger.log(Level.WARNING, this.getClass().getSimpleName() + " query error (" + x + ")", e); + } + }); + return rs; + } + + public final Map> onQueryNodes() { + Map> rs = new HashMap<>(); + rs.putAll(this.usernodes); + return rs; + } + + public void connectSelf(Serializable userid) { + connect(this.localNodeName, userid); + } + + public void disconnectSelf(Serializable userid) { + disconnect(this.localNodeName, userid); + } + + ////@RemoteOn + public void connect(String nodeid, Serializable userid) { + onConnect(nodeid, userid); + if (this.nodemaps == null) return; + this.nodemaps.forEach((x, y) -> { + try { + if (finest) logger.finest("Node(" + localNodeName + "->" + x + ") send websocket connect event (" + userid + " on " + nodeid + ")"); + y.connect(nodeid, userid); + } catch (Exception e) { + logger.log(Level.WARNING, "Node(" + localNodeName + "->" + x + ") send websocket connect event (" + userid + " on " + nodeid + ")", e); + } + }); + } + + public final void onConnect(String nodeid, Serializable userid) { + if (finest) logger.finest("Node (" + localNodeName + ") receive websocket connect event (" + userid + " on " + nodeid + ")."); + Set userNodelist = usernodes.get(userid); + if (userNodelist == null) { + userNodelist = new CopyOnWriteArraySet<>(); + usernodes.put(userid, userNodelist); + } + userNodelist.add(nodeid); + } + + ////@RemoteOn + public void disconnect(String nodeid, Serializable userid) { + onDisconnect(nodeid, userid); + if (this.nodemaps == null) return; + this.nodemaps.forEach((x, y) -> { + try { + if (finest) logger.finest("Node(" + localNodeName + "->" + x + ") send websocket disconnect event (" + userid + " on " + nodeid + ")"); + y.disconnect(nodeid, userid); + } catch (Exception e) { + logger.log(Level.WARNING, "Node(" + localNodeName + "->" + x + ") send websocket disconnect event (" + userid + " on " + nodeid + ")", e); + } + }); + } + + public final void onDisconnect(String nodeid, Serializable userid) { + if (finest) logger.finest("Node (" + localNodeName + ") receive websocket disconnect event (" + userid + " on " + nodeid + ")."); + Set userNodelist = usernodes.get(userid); + if (userNodelist == null) return; + userNodelist.remove(nodeid); + if (userNodelist.isEmpty()) usernodes.remove(userid); + } + + //@RemoteOn + public int send(String engineid, Serializable groupid, String text) { + return send(engineid, groupid, text, true); + } + + public final int onSend(String engineid, Serializable groupid, String text) { + return onSend(engineid, groupid, text, true); + } + + //@RemoteOn + public int send(String engineid, Serializable groupid, String text, boolean last) { + return send0(engineid, groupid, false, text, last); + } + + public final int onSend(String engineid, Serializable groupid, String text, boolean last) { + return onSend0(engineid, groupid, false, text, last); + } + + //@RemoteOn + public int send(String engineid, Serializable groupid, boolean recent, String text) { + return send0(engineid, groupid, recent, text, true); + } + + public final int onSend(String engineid, Serializable groupid, boolean recent, String text) { + return onSend0(engineid, groupid, recent, text, true); + } + + //@RemoteOn + public int send(String engineid, Serializable groupid, boolean recent, String text, boolean last) { + return send0(engineid, groupid, recent, text, last); + } + + public final int onSend(String engineid, Serializable groupid, boolean recent, String text, boolean last) { + return onSend0(engineid, groupid, recent, text, last); + } + + //@RemoteOn + public int send(String engineid, Serializable groupid, byte[] data) { + return send(engineid, groupid, data, true); + } + + public final int onSend(String engineid, Serializable groupid, byte[] data) { + return onSend(engineid, groupid, data, true); + } + + //@RemoteOn + public int send(String engineid, Serializable groupid, byte[] data, boolean last) { + return send0(engineid, groupid, false, data, last); + } + + public final int onSend(String engineid, Serializable groupid, byte[] data, boolean last) { + return onSend0(engineid, groupid, false, data, last); + } + + //@RemoteOn + public int send(String engineid, Serializable groupid, boolean recent, byte[] data) { + return send0(engineid, groupid, recent, data, true); + } + + public final int onSend(String engineid, Serializable groupid, boolean recent, byte[] data) { + return onSend0(engineid, groupid, recent, data, true); + } + + //@RemoteOn + public int send(String engineid, Serializable groupid, boolean recent, byte[] data, boolean last) { + return send0(engineid, groupid, recent, data, last); + } + + public final int onSend(String engineid, Serializable groupid, boolean recent, byte[] data, boolean last) { + return onSend0(engineid, groupid, recent, data, last); + } + + private int send0(String engineid, Serializable groupid, boolean recent, Serializable text, boolean last) { + final Set nodes = usernodes.get(groupid); + if (nodes == null) return RETCODE_WSOFFLINE; //未登录 + int rs = 0; + if (nodes.contains(this.localNodeName)) rs = onSend0(engineid, groupid, recent, text, last); + if (nodemaps == null) return rs; + this.nodemaps.forEach((x, y) -> { + if (nodes.contains(x)) { + int irs = -1; + try { + if (text != null && text.getClass() == byte[].class) { + irs = y.send(engineid, groupid, (byte[]) text, last); + } else { + irs = y.send(engineid, groupid, (String) text, last); + } + if (finest) logger.finest("Node(" + localNodeName + "->" + x + ") send websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + text + "'} finish and result is " + irs); + } catch (Exception e) { + onDisconnect(x, groupid); + logger.log(Level.WARNING, "Node(" + localNodeName + "->" + x + ") send websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + text + "'} failed and result is " + irs, e); + } + } + }); + return rs; + } + + /** + * 消息接受者存在WebSocket并发送成功返回true, 否则返回false + * + * @param engineid + * @param groupid 接收方 + * @param recent 是否只发送最近的WebSocket端 + * @param text + * @return + */ + private int onSend0(String engineid, Serializable groupid, boolean recent, Serializable text, boolean last) { + WebSocketEngine webSocketEngine = engines.get(engineid); + if (webSocketEngine == null) { + if (finest) logger.finest("Node(" + localNodeName + ") receive websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + text + "'} but result is " + RETCODE_ENGINE_NULL); + return RETCODE_ENGINE_NULL; + } + WebSocketGroup group = webSocketEngine.getWebSocketGroup(groupid); + if (group == null || group.isEmpty()) { + if (finest) logger.finest("Node(" + localNodeName + ") receive websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + text + "'} but result is " + RETCODE_GROUP_EMPTY); + return RETCODE_GROUP_EMPTY; + } + if (finest) logger.finest("Node (" + localNodeName + ") receive websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + text + "'}."); + if (text != null && text.getClass() == byte[].class) { + if (recent) { + group.getRecentWebSocket().send((byte[]) text, last); + } else { + group.getWebSockets().forEach(x -> x.send((byte[]) text, last)); + } + } else { + if (recent) { + group.getRecentWebSocket().send(text.toString(), last); + } else { + group.getWebSockets().forEach(x -> x.send(text.toString(), last)); + } + } + return 0; + } + +}