修复WebSocketNode在半远程模式下SNCP出异常的BUG

This commit is contained in:
Redkale
2017-11-13 16:22:48 +08:00
parent d46807a585
commit da600ecf20
7 changed files with 53 additions and 13 deletions

View File

@@ -240,20 +240,23 @@ public final class Application {
AsynchronousChannelGroup transportGroup = null; AsynchronousChannelGroup transportGroup = null;
final AnyValue resources = config.getAnyValue("resources"); final AnyValue resources = config.getAnyValue("resources");
TransportStrategy strategy = null; TransportStrategy strategy = null;
int bufferCapacity = 8 * 1024;
int bufferPoolSize = Runtime.getRuntime().availableProcessors() * 16;
AtomicLong createBufferCounter = new AtomicLong();
AtomicLong cycleBufferCounter = new AtomicLong();
if (resources != null) { if (resources != null) {
AnyValue transportConf = resources.getAnyValue("transport"); AnyValue transportConf = resources.getAnyValue("transport");
int groupsize = resources.getAnyValues("group").length; int groupsize = resources.getAnyValues("group").length;
if (groupsize > 0 && transportConf == null) transportConf = new DefaultAnyValue(); if (groupsize > 0 && transportConf == null) transportConf = new DefaultAnyValue();
if (transportConf != null) { if (transportConf != null) {
//--------------transportBufferPool----------- //--------------transportBufferPool-----------
AtomicLong createBufferCounter = new AtomicLong(); bufferCapacity = Math.max(parseLenth(transportConf.getValue("bufferCapacity"), bufferCapacity), 4 * 1024);
AtomicLong cycleBufferCounter = new AtomicLong(); bufferPoolSize = parseLenth(transportConf.getValue("bufferPoolSize"), groupsize * Runtime.getRuntime().availableProcessors() * 8);
final int bufferCapacity = Math.max(parseLenth(transportConf.getValue("bufferCapacity"), 8 * 1024), 4 * 1024);
final int bufferPoolSize = parseLenth(transportConf.getValue("bufferPoolSize"), groupsize * Runtime.getRuntime().availableProcessors() * 8);
final int threads = parseLenth(transportConf.getValue("threads"), groupsize * Runtime.getRuntime().availableProcessors() * 8); final int threads = parseLenth(transportConf.getValue("threads"), groupsize * Runtime.getRuntime().availableProcessors() * 8);
final int capacity = bufferCapacity;
transportPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize, transportPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> { (Object... params) -> ByteBuffer.allocateDirect(capacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) return false; if (e == null || e.isReadOnly() || e.capacity() != capacity) return false;
e.clear(); e.clear();
return true; return true;
}); });
@@ -291,6 +294,15 @@ public final class Application {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
if (transportPool == null) {
final int capacity = bufferCapacity;
transportPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize,
(Object... params) -> ByteBuffer.allocateDirect(capacity), null, (e) -> {
if (e == null || e.isReadOnly() || e.capacity() != capacity) return false;
e.clear();
return true;
});
}
this.sncpTransportFactory = TransportFactory.create(transportExec, transportPool, transportGroup, strategy); this.sncpTransportFactory = TransportFactory.create(transportExec, transportPool, transportGroup, strategy);
DefaultAnyValue tarnsportConf = DefaultAnyValue.create(TransportFactory.NAME_PINGINTERVAL, System.getProperty("net.transport.pinginterval", "30")); DefaultAnyValue tarnsportConf = DefaultAnyValue.create(TransportFactory.NAME_PINGINTERVAL, System.getProperty("net.transport.pinginterval", "30"));
this.sncpTransportFactory.init(tarnsportConf, Sncp.PING_BUFFER, Sncp.PONG_BUFFER.remaining()); this.sncpTransportFactory.init(tarnsportConf, Sncp.PING_BUFFER, Sncp.PONG_BUFFER.remaining());
@@ -652,7 +664,6 @@ public final class Application {
// } // }
// } // }
// } // }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void runServers(CountDownLatch timecd, final List<AnyValue> serconfs) throws Exception { private void runServers(CountDownLatch timecd, final List<AnyValue> serconfs) throws Exception {
this.servicecdl = new CountDownLatch(serconfs.size()); this.servicecdl = new CountDownLatch(serconfs.size());

View File

@@ -50,6 +50,12 @@ public final class MapDecoder<K, V> implements Decodeable<Reader, Map<K, V>> {
factory.register(type, this); factory.register(type, this);
this.keyDecoder = factory.loadDecoder(this.keyType); this.keyDecoder = factory.loadDecoder(this.keyType);
this.valueDecoder = factory.loadDecoder(this.valueType); this.valueDecoder = factory.loadDecoder(this.valueType);
} else if (factory.isReversible()) {
this.keyType = Object.class;
this.valueType = Object.class;
this.creator = factory.loadCreator((Class) type);
this.keyDecoder = factory.loadDecoder(this.keyType);
this.valueDecoder = factory.loadDecoder(this.valueType);
} else { } else {
throw new ConvertException("mapdecoder not support the type (" + type + ")"); throw new ConvertException("mapdecoder not support the type (" + type + ")");
} }

View File

@@ -13,6 +13,7 @@ import java.nio.channels.*;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.*; import java.util.concurrent.atomic.*;
import java.util.function.Supplier;
import java.util.logging.*; import java.util.logging.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.redkale.service.Service; import org.redkale.service.Service;
@@ -223,7 +224,7 @@ public class TransportFactory {
if (info == null) continue; if (info == null) continue;
addresses.addAll(info.addresses); addresses.addAll(info.addresses);
} }
if (info == null) return null; if (info == null) info = new TransportGroupInfo("TCP");
if (sncpAddress != null) addresses.remove(sncpAddress); if (sncpAddress != null) addresses.remove(sncpAddress);
return new Transport(groups.stream().sorted().collect(Collectors.joining(";")), info.protocol, info.subprotocol, this, this.bufferPool, this.channelGroup, sncpAddress, addresses, this.strategy); return new Transport(groups.stream().sorted().collect(Collectors.joining(";")), info.protocol, info.subprotocol, this, this.bufferPool, this.channelGroup, sncpAddress, addresses, this.strategy);
} }
@@ -239,6 +240,10 @@ public class TransportFactory {
return executor; return executor;
} }
public Supplier<ByteBuffer> getBufferSupplier() {
return bufferPool;
}
public List<TransportGroupInfo> getGroupInfos() { public List<TransportGroupInfo> getGroupInfos() {
return new ArrayList<>(this.groupInfos.values()); return new ArrayList<>(this.groupInfos.values());
} }

View File

@@ -414,6 +414,7 @@ public abstract class WebSocketNode {
} }
private CompletableFuture<Integer> sendOneMessage(final Object message, final boolean last, final Serializable userid) { private CompletableFuture<Integer> sendOneMessage(final Object message, final boolean last, final Serializable userid) {
if (message instanceof CompletableFuture) return ((CompletableFuture) message).thenApply(msg -> sendOneMessage(msg, last, userid));
if (logger.isLoggable(Level.FINEST)) logger.finest("websocket want send message {userid:" + userid + ", content:'" + message + "'} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine"); if (logger.isLoggable(Level.FINEST)) logger.finest("websocket want send message {userid:" + userid + ", content:'" + message + "'} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine");
CompletableFuture<Integer> localFuture = null; CompletableFuture<Integer> localFuture = null;
if (this.localEngine != null) localFuture = localEngine.sendMessage(message, last, userid); if (this.localEngine != null) localFuture = localEngine.sendMessage(message, last, userid);

View File

@@ -855,7 +855,7 @@ public abstract class Sncp {
* @param serviceTypeOrImplClass Service类 * @param serviceTypeOrImplClass Service类
* @param transportFactory TransportFactory * @param transportFactory TransportFactory
* @param clientAddress 本地IP地址 * @param clientAddress 本地IP地址
* @param groups 所有的组节点,包含自身 * @param groups0 所有的组节点,包含自身
* @param conf 启动配置项 * @param conf 启动配置项
* *
* @return Service的远程模式实例 * @return Service的远程模式实例
@@ -868,10 +868,11 @@ public abstract class Sncp {
final Class<T> serviceTypeOrImplClass, final Class<T> serviceTypeOrImplClass,
final TransportFactory transportFactory, final TransportFactory transportFactory,
final InetSocketAddress clientAddress, final InetSocketAddress clientAddress,
final Set<String> groups, final Set<String> groups0,
final AnyValue conf) { final AnyValue conf) {
if (serviceTypeOrImplClass == null) return null; if (serviceTypeOrImplClass == null) return null;
if (!Service.class.isAssignableFrom(serviceTypeOrImplClass)) return null; if (!Service.class.isAssignableFrom(serviceTypeOrImplClass)) return null;
Set<String> groups = groups0 == null ? new HashSet<>() : groups0;
ResourceFactory.checkResourceName(name); ResourceFactory.checkResourceName(name);
int mod = serviceTypeOrImplClass.getModifiers(); int mod = serviceTypeOrImplClass.getModifiers();
boolean realed = !(java.lang.reflect.Modifier.isAbstract(mod) || serviceTypeOrImplClass.isInterface()); boolean realed = !(java.lang.reflect.Modifier.isAbstract(mod) || serviceTypeOrImplClass.isInterface());

View File

@@ -12,6 +12,7 @@ import java.nio.*;
import java.nio.channels.*; import java.nio.channels.*;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.function.Supplier;
import java.util.logging.*; import java.util.logging.*;
import javax.annotation.Resource; import javax.annotation.Resource;
import org.redkale.convert.bson.*; import org.redkale.convert.bson.*;
@@ -55,6 +56,8 @@ public final class SncpClient {
protected final ExecutorService executor; protected final ExecutorService executor;
protected final Supplier<ByteBuffer> bufferSupplier;
@Resource @Resource
protected JsonConvert jsonConvert; protected JsonConvert jsonConvert;
@@ -83,6 +86,7 @@ public final class SncpClient {
final boolean remote, final Class serviceClass, final InetSocketAddress clientAddress) { final boolean remote, final Class serviceClass, final InetSocketAddress clientAddress) {
this.remote = remote; this.remote = remote;
this.executor = factory.getExecutor(); this.executor = factory.getExecutor();
this.bufferSupplier = factory.getBufferSupplier();
this.serviceClass = serviceClass; this.serviceClass = serviceClass;
this.serviceversion = 0; this.serviceversion = 0;
this.clientAddress = clientAddress; this.clientAddress = clientAddress;
@@ -338,7 +342,7 @@ public final class SncpClient {
final Type[] myparamtypes = action.paramTypes; final Type[] myparamtypes = action.paramTypes;
final Class[] myparamclass = action.paramClass; final Class[] myparamclass = action.paramClass;
if (action.addressSourceParamIndex >= 0) params[action.addressSourceParamIndex] = this.clientAddress; if (action.addressSourceParamIndex >= 0) params[action.addressSourceParamIndex] = this.clientAddress;
final BsonWriter writer = bsonConvert.pollBsonWriter(transport.getBufferSupplier()); // 将head写入 final BsonWriter writer = bsonConvert.pollBsonWriter(transport == null ? bufferSupplier : transport.getBufferSupplier()); // 将head写入
writer.writeTo(DEFAULT_HEADER); writer.writeTo(DEFAULT_HEADER);
for (int i = 0; i < params.length; i++) { //params 可能包含: 3 个 boolean for (int i = 0; i < params.length; i++) { //params 可能包含: 3 个 boolean
bsonConvert.convertTo(writer, AsyncHandler.class.isAssignableFrom(myparamclass[i]) ? AsyncHandler.class : myparamtypes[i], params[i]); bsonConvert.convertTo(writer, AsyncHandler.class.isAssignableFrom(myparamclass[i]) ? AsyncHandler.class : myparamtypes[i], params[i]);

View File

@@ -11,7 +11,7 @@ import org.redkale.convert.bson.BsonFactory;
import org.redkale.util.Utility; import org.redkale.util.Utility;
import org.redkale.convert.bson.BsonConvert; import org.redkale.convert.bson.BsonConvert;
import java.nio.*; import java.nio.*;
import java.util.Arrays; import java.util.*;
import org.redkale.convert.json.*; import org.redkale.convert.json.*;
/** /**
@@ -30,6 +30,7 @@ public class BsonTestMain {
main2(args); main2(args);
main3(args); main3(args);
main4(args); main4(args);
main5(args);
} }
public static void main2(String[] args) throws Exception { public static void main2(String[] args) throws Exception {
@@ -86,4 +87,15 @@ public class BsonTestMain {
System.out.println(rs.toString()); System.out.println(rs.toString());
} }
public static void main5(String[] args) throws Exception {
final BsonConvert convert = BsonFactory.root().getConvert();
LinkedHashMap map = new LinkedHashMap();
map.put("1", 1);
map.put("2", "a2");
byte[] bs = convert.convertTo(Object.class, map);
Object mapobj = convert.convertFrom(Object.class, bs);
System.out.println(mapobj);
}
} }