This commit is contained in:
地平线
2015-08-10 14:06:15 +08:00
parent 51895d2aa1
commit 5b85343b4e
14 changed files with 653 additions and 116 deletions

View File

@@ -16,6 +16,7 @@ import com.wentch.redkale.util.*;
import com.wentch.redkale.util.AnyValue.DefaultAnyValue;
import com.wentch.redkale.watch.*;
import java.io.*;
import java.lang.reflect.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
@@ -50,6 +51,18 @@ public final class Application {
//application.xml 文件中resources节点的内容 类型: AnyValue
public static final String RESNAME_GRES = "APP_GRES";
//当前SNCP Server的IP地址+端口 类型: SocketAddress、InetSocketAddress
public static final String RESNAME_SNCP_ADDRESS = "SNCP_ADDRESS";
//当前SNCP Server的IP地址+端口集合 类型: Map<InetSocketAddress, String>、HashMap<InetSocketAddress, String>
public static final String RESNAME_SNCP_NODES = "SNCP_NODES";
private static final Type NODES1TYPE = new TypeToken<Map<InetSocketAddress, String>>() {
}.getType();
private static final Type NODES2TYPE = new TypeToken<HashMap<InetSocketAddress, String>>() {
}.getType();
protected final ResourceFactory factory = ResourceFactory.root();
protected final WatchFactory watch = WatchFactory.root();
@@ -348,6 +361,8 @@ public final class Application {
if (oldgroup != null && !((sncpconf.getValue("group", "") + ";").contains(oldgroup + ";"))) throw new RuntimeException(addr + " has one more group " + (addrGroups.get(addr)));
if (oldgroup == null) addrGroups.put(addr, "");
}
factory.register(RESNAME_SNCP_NODES, NODES1TYPE, new HashMap<>(addrGroups));
factory.register(RESNAME_SNCP_NODES, NODES2TYPE, new HashMap<>(addrGroups));
runServers(timecd, sncps); //确保sncp都启动后再启动其他协议
runServers(timecd, others);
timecd.await();
@@ -427,7 +442,7 @@ public final class Application {
}
}
}
//------------------------------------------------------------------------
//------------------------------------------------------------------------
AnyValue websocketnodeConf = resources.getAnyValue("websocketnode");
if (websocketnodeConf != null) {
String val = websocketnodeConf.getValue("service", "");

View File

@@ -5,11 +5,12 @@
*/
package com.wentch.redkale.boot;
import static com.wentch.redkale.boot.Application.RESNAME_SNCP_ADDRESS;
import com.wentch.redkale.net.sncp.*;
import com.wentch.redkale.util.AnyValue;
import com.wentch.redkale.service.Service;
import java.io.*;
import java.net.InetSocketAddress;
import java.net.*;
import java.util.concurrent.CountDownLatch;
import java.util.logging.*;
@@ -30,6 +31,8 @@ public final class NodeSncpServer extends NodeServer {
this.servaddr = addr;
this.nodeGroup = application.addrGroups.getOrDefault(addr, "");
this.consumer = server == null ? null : x -> server.addService(x);
this.factory.register(RESNAME_SNCP_ADDRESS, SocketAddress.class, this.servaddr);
this.factory.register(RESNAME_SNCP_ADDRESS, InetSocketAddress.class, this.servaddr);
}
@Override
@@ -46,7 +49,7 @@ public final class NodeSncpServer extends NodeServer {
logger.info(this.getClass().getSimpleName() + " load filter class in " + e + " ms");
loadService(config.getAnyValue("services"), serviceFilter); //必须在servlet之前
//-------------------------------------------------------------------
if(server == null) return; //调试时server才可能为null
if (server == null) return; //调试时server才可能为null
final StringBuilder sb = logger.isLoggable(Level.FINE) ? new StringBuilder() : null;
final String threadName = "[" + Thread.currentThread().getName() + "] ";
for (SncpServlet en : server.getSncpServlets()) {

View File

@@ -17,7 +17,7 @@ import java.util.concurrent.*;
*/
public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCloseable {
protected AsyncPooledConnection pooledConnection;
public abstract boolean isTCP();
public abstract SocketAddress getRemoteAddress();
@@ -60,7 +60,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
} catch (Exception e) {
throw new IOException("AsyncConnection connect " + address, e);
}
return create(channel, readTimeoutSecond0, writeTimeoutSecond0);
return create(channel, address, readTimeoutSecond0, writeTimeoutSecond0);
} else if ("UDP".equalsIgnoreCase(protocol)) {
AsyncDatagramChannel channel = AsyncDatagramChannel.open(null);
channel.connect(address);
@@ -152,11 +152,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
@Override
public final void close() throws IOException {
if (client) {
if (pooledConnection == null) {
channel.close();
} else {
pooledConnection.fireConnectionClosed();
}
channel.close();
}
}
@@ -173,14 +169,18 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
return channel.isOpen();
}
@Override
public final boolean isTCP() {
return false;
}
};
}
public static AsyncConnection create(final AsynchronousSocketChannel ch) {
return create(ch, 0, 0);
return create(ch, null, 0, 0);
}
public static AsyncConnection create(final AsynchronousSocketChannel ch, final int readTimeoutSecond0, final int writeTimeoutSecond0) {
public static AsyncConnection create(final AsynchronousSocketChannel ch, final SocketAddress addr0, final int readTimeoutSecond0, final int writeTimeoutSecond0) {
return new AsyncConnection() {
private int readTimeoutSecond;
@@ -194,11 +194,13 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
this.channel = ch;
this.readTimeoutSecond = readTimeoutSecond0;
this.writeTimeoutSecond = writeTimeoutSecond0;
SocketAddress addr = null;
try {
addr = ch.getRemoteAddress();
} catch (Exception e) {
//do nothing
SocketAddress addr = addr0;
if (addr == null) {
try {
addr = ch.getRemoteAddress();
} catch (Exception e) {
//do nothing
}
}
this.remoteAddress = addr;
}
@@ -276,11 +278,7 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
@Override
public final void close() throws IOException {
if (pooledConnection == null) {
channel.close();
} else {
pooledConnection.fireConnectionClosed();
}
channel.close();
}
@Override
@@ -288,6 +286,11 @@ public abstract class AsyncConnection implements AsynchronousByteChannel, AutoCl
return channel.isOpen();
}
@Override
public final boolean isTCP() {
return true;
}
@Override
public void dispose() {
try {

View File

@@ -1,50 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.wentch.redkale.net;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
/**
*
* @author zhangjx
*/
public class AsyncPooledConnection implements AutoCloseable {
private final Queue<AsyncPooledConnection> queue;
private final AtomicLong usingCounter;
private final AsyncConnection conn;
public AsyncPooledConnection(Queue<AsyncPooledConnection> queue, AtomicLong usingCounter, AsyncConnection conn) {
this.conn = conn;
this.usingCounter = usingCounter;
this.queue = queue;
}
public AsyncConnection getAsyncConnection() {
return conn;
}
public void fireConnectionClosed() {
this.queue.add(this);
}
@Override
public void close() throws IOException {
usingCounter.decrementAndGet();
conn.close();
}
public void dispose() {
try {
this.close();
} catch (IOException io) {
}
}
}

View File

@@ -134,7 +134,7 @@ public abstract class ProtocolServer {
@Override
public void completed(final AsynchronousSocketChannel channel, Void attachment) {
serchannel.accept(null, this);
context.submit(new PrepareRunner(context, AsyncConnection.create(channel, context.readTimeoutSecond, context.writeTimeoutSecond), null));
context.submit(new PrepareRunner(context, AsyncConnection.create(channel, null, context.readTimeoutSecond, context.writeTimeoutSecond), null));
}
@Override

View File

@@ -0,0 +1,160 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.wentch.redkale.net;
import java.nio.*;
import java.security.*;
import javax.net.ssl.*;
/**
*
* @author zhangjx
*/
public class SSLBuilder {
private static SSLContext sslContext;
static {
try {
char[] keypasswd = new char[32];
final KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
keyStore.load(null, keypasswd);
final String algorithm = System.getProperty("ssl.algorithm", KeyManagerFactory.getDefaultAlgorithm());
final KeyManagerFactory kmf = KeyManagerFactory.getInstance(algorithm);
kmf.init(keyStore, keypasswd);
SSLContext sslContext0 = SSLContext.getInstance("TLS");
sslContext0.init(kmf.getKeyManagers(), null, new SecureRandom());
sslContext = sslContext0;
} catch (Exception e) {
throw new Error(e);
}
}
private final SSLEngine sslEngine;
private int appBufferSize;
private int netBufferSize;
public SSLBuilder() {
sslEngine = sslContext.createSSLEngine();
//sslEngine.setEnabledCipherSuites(null);
//sslEngine.setEnabledProtocols(null);
sslEngine.setUseClientMode(false);
sslEngine.setWantClientAuth(false);
sslEngine.setNeedClientAuth(false);
//---------------------------
updateBufferSizes();
}
private void updateBufferSizes() {
final SSLSession session = sslEngine.getSession();
appBufferSize = session.getApplicationBufferSize();
netBufferSize = session.getPacketBufferSize();
}
public static void main(String[] args) throws Exception {
}
private static int getSSLPacketSize(final ByteBuffer buf) throws SSLException {
/*
* SSLv2 length field is in bytes 0/1
* SSLv3/TLS length field is in bytes 3/4
*/
if (buf.remaining() < 5) return -1;
final byte byte0;
final byte byte1;
final byte byte2;
final byte byte3;
final byte byte4;
if (buf.hasArray()) {
final byte[] array = buf.array();
int pos = buf.arrayOffset() + buf.position();
byte0 = array[pos++];
byte1 = array[pos++];
byte2 = array[pos++];
byte3 = array[pos++];
byte4 = array[pos];
} else {
int pos = buf.position();
byte0 = buf.get(pos++);
byte1 = buf.get(pos++);
byte2 = buf.get(pos++);
byte3 = buf.get(pos++);
byte4 = buf.get(pos);
}
int len;
/*
* If we have already verified previous packets, we can
* ignore the verifications steps, and jump right to the
* determination. Otherwise, try one last hueristic to
* see if it's SSL/TLS.
*/
if (byte0 >= 20 && byte0 <= 23) {
/*
* Last sanity check that it's not a wild record
*/
final byte major = byte1;
final byte minor = byte2;
final int v = (major << 8) | minor & 0xff;
// Check if too old (currently not possible)
// or if the major version does not match.
// The actual version negotiation is in the handshaker classes
if ((v < 0x0300) || (major > 0x03)) {
throw new SSLException("Unsupported record version major=" + major + " minor=" + minor);
}
/*
* One of the SSLv3/TLS message types.
*/
len = ((byte3 & 0xff) << 8) + (byte4 & 0xff) + 5; // SSLv3 record header
} else {
/*
* Must be SSLv2 or something unknown.
* Check if it's short (2 bytes) or
* long (3) header.
*
* Internals can warn about unsupported SSLv2
*/
boolean isShort = ((byte0 & 0x80) != 0);
if (isShort && ((byte2 == 1) || byte2 == 4)) {
final byte major = byte3;
final byte minor = byte4;
final int v = (major << 8) | minor & 0xff;
// Check if too old (currently not possible)
// or if the major version does not match.
// The actual version negotiation is in the handshaker classes
if ((v < 0x0300) || (major > 0x03)) {
// if it's not SSLv2, we're out of here.
if (v != 0x0002) throw new SSLException("Unsupported record version major=" + major + " minor=" + minor);
}
/*
* Client or Server Hello
*/
int mask = 0x7f;
len = ((byte0 & mask) << 8) + (byte1 & 0xff) + (2);
} else {
// Gobblygook!
throw new SSLException("Unrecognized SSL message, plaintext connection?");
}
}
return len;
}
}

View File

@@ -7,7 +7,6 @@ package com.wentch.redkale.net;
import com.wentch.redkale.util.*;
import com.wentch.redkale.watch.*;
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
@@ -22,6 +21,8 @@ import java.util.concurrent.atomic.*;
*/
public final class Transport {
protected static final int MAX_POOL_LIMIT = 16;
protected final String name;
protected final String protocol;
@@ -34,6 +35,8 @@ public final class Transport {
protected final AtomicInteger index = new AtomicInteger();
protected final ConcurrentHashMap<SocketAddress, BlockingQueue<AsyncConnection>> connPool = new ConcurrentHashMap<>();
public Transport(String name, String protocol, WatchFactory watch, int bufferPoolSize, Collection<InetSocketAddress> addresses) {
this.name = name;
this.protocol = protocol;
@@ -63,6 +66,10 @@ public final class Transport {
this.remoteAddres = addresses.toArray(new InetSocketAddress[addresses.size()]);
}
public void close() {
connPool.forEach((k, v) -> v.forEach(c -> c.dispose()));
}
public boolean match(Collection<InetSocketAddress> addrs) {
if (addrs == null) return false;
if (addrs.size() != this.remoteAddres.length) return false;
@@ -89,14 +96,39 @@ public final class Transport {
for (ByteBuffer buffer : buffers) offerBuffer(buffer);
}
public AsyncConnection pollConnection() {
int i = index.get();
SocketAddress addr = remoteAddres[i];
public AsyncConnection pollConnection(SocketAddress addr) {
final boolean rand = addr == null;
try {
if ("TCP".equalsIgnoreCase(protocol)) {
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
channel.connect(addr).get(2, TimeUnit.SECONDS);
return AsyncConnection.create(channel, 0, 0);
AsynchronousSocketChannel channel = null;
if (rand) {
int p = 0;
for (int i = index.get(); i < remoteAddres.length; i++) {
p = i;
addr = remoteAddres[i];
BlockingQueue<AsyncConnection> queue = connPool.get(addr);
if (queue != null && queue.isEmpty()) {
AsyncConnection conn = queue.poll();
if (conn.isOpen()) return conn;
}
if (channel == null) channel = AsynchronousSocketChannel.open(group);
try {
channel.connect(addr).get(1, TimeUnit.SECONDS);
break;
} catch (Exception iex) {
if (i == remoteAddres.length - 1) {
p = 0;
channel = null;
}
}
}
index.set(p);
} else {
channel = AsynchronousSocketChannel.open(group);
channel.connect(addr).get(2, TimeUnit.SECONDS);
}
if (channel == null) return null;
return AsyncConnection.create(channel, addr, 0, 0);
} else {
AsyncDatagramChannel channel = AsyncDatagramChannel.open(group);
channel.connect(addr);
@@ -108,14 +140,22 @@ public final class Transport {
}
public void offerConnection(AsyncConnection conn) {
try {
conn.close();
} catch (IOException io) {
if (conn.isTCP()) {
if (conn.isOpen()) {
BlockingQueue<AsyncConnection> queue = connPool.get(conn.getRemoteAddress());
if (queue == null) {
queue = new ArrayBlockingQueue<>(MAX_POOL_LIMIT);
connPool.put(conn.getRemoteAddress(), queue);
}
if (!queue.offer(conn)) conn.dispose();
}
} else {
conn.dispose();
}
}
public <A> void async(final ByteBuffer buffer, A att, final CompletionHandler<Integer, A> handler) {
final AsyncConnection conn = pollConnection();
public <A> void async(SocketAddress addr, final ByteBuffer buffer, A att, final CompletionHandler<Integer, A> handler) {
final AsyncConnection conn = pollConnection(addr);
conn.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
@@ -147,8 +187,8 @@ public final class Transport {
});
}
public ByteBuffer send(ByteBuffer buffer) {
AsyncConnection conn = pollConnection();
public ByteBuffer send(SocketAddress addr, ByteBuffer buffer) {
AsyncConnection conn = pollConnection(addr);
final int readto = conn.getReadTimeoutSecond();
final int writeto = conn.getWriteTimeoutSecond();
try {

View File

@@ -5,6 +5,7 @@
*/
package com.wentch.redkale.net.http;
import com.wentch.redkale.net.sncp.*;
import com.wentch.redkale.util.*;
import java.io.*;
import java.net.*;
@@ -44,12 +45,21 @@ public abstract class WebSocketNode {
public void init(AnyValue conf) {
if (remoteNode != null) {
try {
Map<Serializable, Set<InetSocketAddress>> map = remoteNode.getDataNodes();
if (map != null) dataNodes.putAll(map);
} catch (Exception e) {
logger.log(Level.INFO, WebSocketNode.class.getSimpleName() + "(" + this.localSncpAddress + ") not load data nodes ", e);
}
new Thread() {
{
setDaemon(true);
}
@Override
public void run() {
try {
Map<Serializable, Set<InetSocketAddress>> map = remoteNode.getDataNodes();
if (map != null) dataNodes.putAll(map);
} catch (Exception e) {
logger.log(Level.INFO, WebSocketNode.class.getSimpleName() + "(" + localSncpAddress + ") not load data nodes ", e);
}
}
}.start();
}
}
@@ -66,7 +76,7 @@ public abstract class WebSocketNode {
return dataNodes;
}
protected abstract int sendMessage(Serializable groupid, boolean recent, Serializable message, boolean last);
protected abstract int sendMessage(@SncpParameter InetSocketAddress addr, Serializable groupid, boolean recent, Serializable message, boolean last);
protected abstract void connect(Serializable groupid, InetSocketAddress addr);
@@ -74,7 +84,7 @@ public abstract class WebSocketNode {
//--------------------------------------------------------------------------------
public final void connect(Serializable groupid, String engineid) {
if (finest) logger.finest(localSncpAddress +" receive websocket connect event (" + groupid + " on " + engineid + ").");
if (finest) logger.finest(localSncpAddress + " receive websocket connect event (" + groupid + " on " + engineid + ").");
Set<String> engineids = localNodes.get(groupid);
if (engineids == null) {
engineids = new CopyOnWriteArraySet<>();
@@ -85,7 +95,7 @@ public abstract class WebSocketNode {
}
public final void disconnect(Serializable groupid, String engineid) {
if (finest) logger.finest(localSncpAddress +" receive websocket disconnect event (" + groupid + " on " + engineid + ").");
if (finest) logger.finest(localSncpAddress + " receive websocket disconnect event (" + groupid + " on " + engineid + ").");
Set<String> engineids = localNodes.get(groupid);
if (engineids == null || engineids.isEmpty()) return;
engineids.remove(engineid);
@@ -107,6 +117,37 @@ public abstract class WebSocketNode {
engines.put(engine.getEngineid(), engine);
}
public final int sendMessage(Serializable groupid, boolean recent, Serializable message, boolean last) {
final Set<String> engineids = localNodes.get(groupid);
int rscode = 0;
if (engineids != null && !engineids.isEmpty()) {
for (String engineid : engineids) {
final WebSocketEngine engine = engines.get(engineid);
if (engine != null) { //在本地
final WebSocketGroup group = engine.getWebSocketGroup(groupid);
if (group == null || group.isEmpty()) {
if (finest) logger.finest("receive websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + message + "'} but result is " + RETCODE_GROUP_EMPTY);
rscode = RETCODE_GROUP_EMPTY;
break;
}
group.send(recent, message, last);
}
}
}
if ((recent && rscode == 0) || remoteNode == null) return rscode;
Set<InetSocketAddress> addrs = dataNodes.get(groupid);
if (addrs != null && !addrs.isEmpty()) { //对方连接在远程节点
for (InetSocketAddress addr : addrs) {
if (!addr.equals(localSncpAddress)) {
remoteNode.sendMessage(addr, groupid, recent, message, last);
}
}
} else {
rscode = RETCODE_GROUP_EMPTY;
}
return rscode;
}
//--------------------------------------------------------------------------------
public final int sendMessage(Serializable groupid, String text) {
return sendMessage(groupid, false, text);

View File

@@ -52,7 +52,7 @@ public class WebSocketRunner implements Runnable {
this.wsbinary = wsbinary;
webSocket.runner = this;
this.coder.logger = context.getLogger();
this.coder.debugable = context.getLogger().isLoggable(Level.FINEST);
this.coder.debugable = false;//context.getLogger().isLoggable(Level.FINEST);
this.readBuffer = context.pollBuffer();
this.writeBuffer = context.pollBuffer();
}

View File

@@ -23,7 +23,7 @@ import jdk.internal.org.objectweb.asm.Type;
*/
public abstract class Sncp {
public static final String DEFAULT_PROTOCOL = "UDP";
public static final String DEFAULT_PROTOCOL = "TCP";
static final String LOCALPREFIX = "_DynLocal";

View File

@@ -9,6 +9,7 @@ import com.wentch.redkale.convert.bson.*;
import com.wentch.redkale.net.*;
import static com.wentch.redkale.net.sncp.SncpRequest.HEADER_SIZE;
import com.wentch.redkale.util.*;
import java.lang.annotation.*;
import java.lang.reflect.*;
import java.net.*;
import java.nio.*;
@@ -36,7 +37,7 @@ public final class SncpClient {
protected final Type[] paramTypes;
protected final boolean async;
protected final int addressParamIndex;
public SncpAction(Method method, DLong actionid) {
this.actionid = actionid;
@@ -48,7 +49,22 @@ public final class SncpClient {
this.resultTypes = rt == void.class ? null : rt;
this.paramTypes = method.getGenericParameterTypes();
this.method = method;
this.async = false;// method.getReturnType() == void.class && method.getAnnotation(Async.class) != null;
Annotation[][] anns = method.getParameterAnnotations();
int addrIndex = -1;
if (anns.length > 0) {
Class<?>[] params = method.getParameterTypes();
for (int i = 0; i < anns.length; i++) {
if (anns[i].length > 0) {
for (Annotation ann : anns[i]) {
if (ann.annotationType() == SncpParameter.class && SocketAddress.class.isAssignableFrom(params[i])) {
addrIndex = i;
break;
}
}
}
}
}
this.addressParamIndex = addrIndex;
}
@Override
@@ -73,6 +89,8 @@ public final class SncpClient {
protected final SncpAction[] actions;
protected final BlockingQueue<Runnable> queue = new ArrayBlockingQueue(1024 * 64);
public SncpClient(final String serviceName, final long serviceid0, boolean remote, final Class serviceClass, boolean onlySncpDyn, final InetSocketAddress clientAddress) {
if (serviceName.length() > 10) throw new RuntimeException(serviceClass + " @Resource name(" + serviceName + ") too long , must less 11");
this.remote = remote;
@@ -91,6 +109,24 @@ public final class SncpClient {
this.addrBytes = clientAddress == null ? new byte[4] : clientAddress.getAddress().getAddress();
this.addrPort = clientAddress == null ? 0 : clientAddress.getPort();
logger.fine("[" + Thread.currentThread().getName() + "] Load " + this);
new Thread() {
{
setName(SncpClient.class.getSimpleName() + serviceClass.getSimpleName() + "-" + serviceName + "-Thread");
setDaemon(true);
}
@Override
public void run() {
while (true) {
try {
Runnable runner = queue.take();
runner.run();
} catch (Exception e) {
logger.log(Level.SEVERE, SncpClient.class.getSimpleName() + " runnable occur error", e);
}
}
}
}.start();
}
@Override
@@ -165,14 +201,7 @@ public final class SncpClient {
}
private void submit(Runnable runner) {
Thread thread = Thread.currentThread();
if (false && thread instanceof WorkThread) { //有待验证为什么WorkThread 不工作
((WorkThread) thread).submit(runner);
return;
}
Thread t = new Thread(runner);
t.setPriority(Thread.MAX_PRIORITY);
t.start();
if (!queue.offer(runner)) runner.run();
}
private byte[] send(final BsonConvert convert, Transport transport, final SncpAction action, Object... params) {
@@ -185,7 +214,7 @@ public final class SncpClient {
}
final long seqid = System.nanoTime();
final DLong actionid = action.actionid;
final AsyncConnection conn = transport.pollConnection();
final AsyncConnection conn = transport.pollConnection(action.addressParamIndex >= 0 ? (SocketAddress) params[action.addressParamIndex] : null);
if (conn == null || !conn.isOpen()) return null;
final ByteBuffer buffer = transport.pollBuffer();
final int readto = conn.getReadTimeoutSecond();
@@ -291,7 +320,7 @@ public final class SncpClient {
} catch (RuntimeException ex) {
throw ex;
} catch (Exception e) {
throw new RuntimeException(e);
throw new RuntimeException(conn.getRemoteAddress() + " connect failed.", e);
} finally {
transport.offerBuffer(buffer);
transport.offerConnection(conn);

View File

@@ -0,0 +1,23 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.wentch.redkale.net.sncp;
import java.lang.annotation.*;
import static java.lang.annotation.ElementType.*;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
/**
*
*
* @author zhangjx
*/
@Inherited
@Documented
@Target({PARAMETER})
@Retention(RUNTIME)
public @interface SncpParameter {
}

View File

@@ -6,6 +6,7 @@
package com.wentch.redkale.service;
import com.wentch.redkale.net.http.*;
import com.wentch.redkale.net.sncp.*;
import com.wentch.redkale.util.*;
import java.io.*;
import java.net.*;
@@ -30,9 +31,10 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
}
@Override
public int sendMessage(Serializable groupid, boolean recent, Serializable message, boolean last) {
public int sendMessage(@SncpParameter InetSocketAddress addr, Serializable groupid, boolean recent, Serializable message, boolean last) {
final Set<String> engineids = localNodes.get(groupid);
if (engineids == null || engineids.isEmpty()) return RETCODE_GROUP_EMPTY;
int code = RETCODE_GROUP_EMPTY;
for (String engineid : engineids) {
final WebSocketEngine engine = engines.get(engineid);
if (engine != null) { //在本地
@@ -42,11 +44,10 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
return RETCODE_GROUP_EMPTY;
}
group.send(recent, message, last);
} else { //对方连接在远程节点
return RETCODE_WSOFFLINE;
}
code = 0;
}
return 0;
return code;
}
@Override
@@ -58,7 +59,7 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
dataNodes.put(groupid, addrs);
}
addrs.add(addr);
if(finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid +" connect from " + addr);
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " connect from " + addr);
}
@Override
@@ -68,6 +69,6 @@ public class WebSocketNodeService extends WebSocketNode implements Service {
if (addrs == null) return;
addrs.remove(addr);
if (addrs.isEmpty()) dataNodes.remove(groupid);
if(finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid +" disconnect from " + addr);
if (finest) logger.finest(WebSocketNodeService.class.getSimpleName() + ".event: " + groupid + " disconnect from " + addr);
}
}

View File

@@ -0,0 +1,272 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.wentch.redkale.service;
import com.wentch.redkale.net.http.*;
import com.wentch.redkale.util.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.logging.*;
import javax.annotation.*;
/**
*
* @author zhangjx
*/
@AutoLoad(false)
public class WebSocketNodeService2 implements Service {
public static final int RETCODE_ENGINE_NULL = 5001;
public static final int RETCODE_NODESERVICE_NULL = 5002;
public static final int RETCODE_GROUP_EMPTY = 5005;
public static final int RETCODE_WSOFFLINE = 5011;
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
protected final boolean finest = logger.isLoggable(Level.FINEST);
@Resource(name = "APP_NODE")
protected String localNodeName = "";
@Resource
protected HashMap<String, WebSocketNodeService2> nodemaps;
//用户分布在节点上的队列信息,只保存远程节点的用户分布信息
protected final ConcurrentHashMap<Serializable, Set<String>> usernodes = new ConcurrentHashMap();
protected final ConcurrentHashMap<String, WebSocketEngine> engines = new ConcurrentHashMap();
public void initUserNodes() {
if (this.nodemaps == null || this.nodemaps.isEmpty()) return;
new Thread() {
{
setDaemon(true);
}
@Override
public void run() {
usernodes.putAll(queryNodes());
}
}.start();
}
public final void addWebSocketEngine(WebSocketEngine engine) {
engines.put(engine.getEngineid(), engine);
}
////@RemoteOn
public Map<Serializable, Set<String>> queryNodes() {
Map<Serializable, Set<String>> rs = new HashMap<>();
this.nodemaps.forEach((x, y) -> {
if (!rs.isEmpty()) return;
try {
rs.putAll(y.queryNodes());
} catch (Exception e) {
logger.log(Level.WARNING, this.getClass().getSimpleName() + " query error (" + x + ")", e);
}
});
return rs;
}
public final Map<Serializable, Set<String>> onQueryNodes() {
Map<Serializable, Set<String>> rs = new HashMap<>();
rs.putAll(this.usernodes);
return rs;
}
public void connectSelf(Serializable userid) {
connect(this.localNodeName, userid);
}
public void disconnectSelf(Serializable userid) {
disconnect(this.localNodeName, userid);
}
////@RemoteOn
public void connect(String nodeid, Serializable userid) {
onConnect(nodeid, userid);
if (this.nodemaps == null) return;
this.nodemaps.forEach((x, y) -> {
try {
if (finest) logger.finest("Node(" + localNodeName + "->" + x + ") send websocket connect event (" + userid + " on " + nodeid + ")");
y.connect(nodeid, userid);
} catch (Exception e) {
logger.log(Level.WARNING, "Node(" + localNodeName + "->" + x + ") send websocket connect event (" + userid + " on " + nodeid + ")", e);
}
});
}
public final void onConnect(String nodeid, Serializable userid) {
if (finest) logger.finest("Node (" + localNodeName + ") receive websocket connect event (" + userid + " on " + nodeid + ").");
Set<String> userNodelist = usernodes.get(userid);
if (userNodelist == null) {
userNodelist = new CopyOnWriteArraySet<>();
usernodes.put(userid, userNodelist);
}
userNodelist.add(nodeid);
}
////@RemoteOn
public void disconnect(String nodeid, Serializable userid) {
onDisconnect(nodeid, userid);
if (this.nodemaps == null) return;
this.nodemaps.forEach((x, y) -> {
try {
if (finest) logger.finest("Node(" + localNodeName + "->" + x + ") send websocket disconnect event (" + userid + " on " + nodeid + ")");
y.disconnect(nodeid, userid);
} catch (Exception e) {
logger.log(Level.WARNING, "Node(" + localNodeName + "->" + x + ") send websocket disconnect event (" + userid + " on " + nodeid + ")", e);
}
});
}
public final void onDisconnect(String nodeid, Serializable userid) {
if (finest) logger.finest("Node (" + localNodeName + ") receive websocket disconnect event (" + userid + " on " + nodeid + ").");
Set<String> userNodelist = usernodes.get(userid);
if (userNodelist == null) return;
userNodelist.remove(nodeid);
if (userNodelist.isEmpty()) usernodes.remove(userid);
}
//@RemoteOn
public int send(String engineid, Serializable groupid, String text) {
return send(engineid, groupid, text, true);
}
public final int onSend(String engineid, Serializable groupid, String text) {
return onSend(engineid, groupid, text, true);
}
//@RemoteOn
public int send(String engineid, Serializable groupid, String text, boolean last) {
return send0(engineid, groupid, false, text, last);
}
public final int onSend(String engineid, Serializable groupid, String text, boolean last) {
return onSend0(engineid, groupid, false, text, last);
}
//@RemoteOn
public int send(String engineid, Serializable groupid, boolean recent, String text) {
return send0(engineid, groupid, recent, text, true);
}
public final int onSend(String engineid, Serializable groupid, boolean recent, String text) {
return onSend0(engineid, groupid, recent, text, true);
}
//@RemoteOn
public int send(String engineid, Serializable groupid, boolean recent, String text, boolean last) {
return send0(engineid, groupid, recent, text, last);
}
public final int onSend(String engineid, Serializable groupid, boolean recent, String text, boolean last) {
return onSend0(engineid, groupid, recent, text, last);
}
//@RemoteOn
public int send(String engineid, Serializable groupid, byte[] data) {
return send(engineid, groupid, data, true);
}
public final int onSend(String engineid, Serializable groupid, byte[] data) {
return onSend(engineid, groupid, data, true);
}
//@RemoteOn
public int send(String engineid, Serializable groupid, byte[] data, boolean last) {
return send0(engineid, groupid, false, data, last);
}
public final int onSend(String engineid, Serializable groupid, byte[] data, boolean last) {
return onSend0(engineid, groupid, false, data, last);
}
//@RemoteOn
public int send(String engineid, Serializable groupid, boolean recent, byte[] data) {
return send0(engineid, groupid, recent, data, true);
}
public final int onSend(String engineid, Serializable groupid, boolean recent, byte[] data) {
return onSend0(engineid, groupid, recent, data, true);
}
//@RemoteOn
public int send(String engineid, Serializable groupid, boolean recent, byte[] data, boolean last) {
return send0(engineid, groupid, recent, data, last);
}
public final int onSend(String engineid, Serializable groupid, boolean recent, byte[] data, boolean last) {
return onSend0(engineid, groupid, recent, data, last);
}
private int send0(String engineid, Serializable groupid, boolean recent, Serializable text, boolean last) {
final Set<String> nodes = usernodes.get(groupid);
if (nodes == null) return RETCODE_WSOFFLINE; //未登录
int rs = 0;
if (nodes.contains(this.localNodeName)) rs = onSend0(engineid, groupid, recent, text, last);
if (nodemaps == null) return rs;
this.nodemaps.forEach((x, y) -> {
if (nodes.contains(x)) {
int irs = -1;
try {
if (text != null && text.getClass() == byte[].class) {
irs = y.send(engineid, groupid, (byte[]) text, last);
} else {
irs = y.send(engineid, groupid, (String) text, last);
}
if (finest) logger.finest("Node(" + localNodeName + "->" + x + ") send websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + text + "'} finish and result is " + irs);
} catch (Exception e) {
onDisconnect(x, groupid);
logger.log(Level.WARNING, "Node(" + localNodeName + "->" + x + ") send websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + text + "'} failed and result is " + irs, e);
}
}
});
return rs;
}
/**
* 消息接受者存在WebSocket并发送成功返回true 否则返回false
*
* @param engineid
* @param groupid 接收方
* @param recent 是否只发送最近的WebSocket端
* @param text
* @return
*/
private int onSend0(String engineid, Serializable groupid, boolean recent, Serializable text, boolean last) {
WebSocketEngine webSocketEngine = engines.get(engineid);
if (webSocketEngine == null) {
if (finest) logger.finest("Node(" + localNodeName + ") receive websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + text + "'} but result is " + RETCODE_ENGINE_NULL);
return RETCODE_ENGINE_NULL;
}
WebSocketGroup group = webSocketEngine.getWebSocketGroup(groupid);
if (group == null || group.isEmpty()) {
if (finest) logger.finest("Node(" + localNodeName + ") receive websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + text + "'} but result is " + RETCODE_GROUP_EMPTY);
return RETCODE_GROUP_EMPTY;
}
if (finest) logger.finest("Node (" + localNodeName + ") receive websocket message {engineid:'" + engineid + "', groupid:" + groupid + ", content:'" + text + "'}.");
if (text != null && text.getClass() == byte[].class) {
if (recent) {
group.getRecentWebSocket().send((byte[]) text, last);
} else {
group.getWebSockets().forEach(x -> x.send((byte[]) text, last));
}
} else {
if (recent) {
group.getRecentWebSocket().send(text.toString(), last);
} else {
group.getWebSockets().forEach(x -> x.send(text.toString(), last));
}
}
return 0;
}
}