From da600ecf20bb6e6c49bafc0d4b6163be0b2df85f Mon Sep 17 00:00:00 2001 From: Redkale <22250530@qq.com> Date: Mon, 13 Nov 2017 16:22:48 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8DWebSocketNode=E5=9C=A8?= =?UTF-8?q?=E5=8D=8A=E8=BF=9C=E7=A8=8B=E6=A8=A1=E5=BC=8F=E4=B8=8BSNCP?= =?UTF-8?q?=E5=87=BA=E5=BC=82=E5=B8=B8=E7=9A=84BUG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/boot/Application.java | 25 +++++++++++++------ src/org/redkale/convert/MapDecoder.java | 6 +++++ src/org/redkale/net/TransportFactory.java | 7 +++++- src/org/redkale/net/http/WebSocketNode.java | 1 + src/org/redkale/net/sncp/Sncp.java | 5 ++-- src/org/redkale/net/sncp/SncpClient.java | 6 ++++- .../redkale/test/convert/BsonTestMain.java | 16 ++++++++++-- 7 files changed, 53 insertions(+), 13 deletions(-) diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index aa76007ea..36acef26f 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -240,20 +240,23 @@ public final class Application { AsynchronousChannelGroup transportGroup = null; final AnyValue resources = config.getAnyValue("resources"); TransportStrategy strategy = null; + int bufferCapacity = 8 * 1024; + int bufferPoolSize = Runtime.getRuntime().availableProcessors() * 16; + AtomicLong createBufferCounter = new AtomicLong(); + AtomicLong cycleBufferCounter = new AtomicLong(); if (resources != null) { AnyValue transportConf = resources.getAnyValue("transport"); int groupsize = resources.getAnyValues("group").length; if (groupsize > 0 && transportConf == null) transportConf = new DefaultAnyValue(); if (transportConf != null) { //--------------transportBufferPool----------- - AtomicLong createBufferCounter = new AtomicLong(); - AtomicLong cycleBufferCounter = new AtomicLong(); - final int bufferCapacity = Math.max(parseLenth(transportConf.getValue("bufferCapacity"), 8 * 1024), 4 * 1024); - final int bufferPoolSize = parseLenth(transportConf.getValue("bufferPoolSize"), groupsize * Runtime.getRuntime().availableProcessors() * 8); + bufferCapacity = Math.max(parseLenth(transportConf.getValue("bufferCapacity"), bufferCapacity), 4 * 1024); + bufferPoolSize = parseLenth(transportConf.getValue("bufferPoolSize"), groupsize * Runtime.getRuntime().availableProcessors() * 8); final int threads = parseLenth(transportConf.getValue("threads"), groupsize * Runtime.getRuntime().availableProcessors() * 8); + final int capacity = bufferCapacity; transportPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize, - (Object... params) -> ByteBuffer.allocateDirect(bufferCapacity), null, (e) -> { - if (e == null || e.isReadOnly() || e.capacity() != bufferCapacity) return false; + (Object... params) -> ByteBuffer.allocateDirect(capacity), null, (e) -> { + if (e == null || e.isReadOnly() || e.capacity() != capacity) return false; e.clear(); return true; }); @@ -291,6 +294,15 @@ public final class Application { throw new RuntimeException(e); } } + if (transportPool == null) { + final int capacity = bufferCapacity; + transportPool = new ObjectPool<>(createBufferCounter, cycleBufferCounter, bufferPoolSize, + (Object... params) -> ByteBuffer.allocateDirect(capacity), null, (e) -> { + if (e == null || e.isReadOnly() || e.capacity() != capacity) return false; + e.clear(); + return true; + }); + } this.sncpTransportFactory = TransportFactory.create(transportExec, transportPool, transportGroup, strategy); DefaultAnyValue tarnsportConf = DefaultAnyValue.create(TransportFactory.NAME_PINGINTERVAL, System.getProperty("net.transport.pinginterval", "30")); this.sncpTransportFactory.init(tarnsportConf, Sncp.PING_BUFFER, Sncp.PONG_BUFFER.remaining()); @@ -652,7 +664,6 @@ public final class Application { // } // } // } - @SuppressWarnings("unchecked") private void runServers(CountDownLatch timecd, final List serconfs) throws Exception { this.servicecdl = new CountDownLatch(serconfs.size()); diff --git a/src/org/redkale/convert/MapDecoder.java b/src/org/redkale/convert/MapDecoder.java index ebee7125d..b93f551a7 100644 --- a/src/org/redkale/convert/MapDecoder.java +++ b/src/org/redkale/convert/MapDecoder.java @@ -50,6 +50,12 @@ public final class MapDecoder implements Decodeable> { factory.register(type, this); this.keyDecoder = factory.loadDecoder(this.keyType); this.valueDecoder = factory.loadDecoder(this.valueType); + } else if (factory.isReversible()) { + this.keyType = Object.class; + this.valueType = Object.class; + this.creator = factory.loadCreator((Class) type); + this.keyDecoder = factory.loadDecoder(this.keyType); + this.valueDecoder = factory.loadDecoder(this.valueType); } else { throw new ConvertException("mapdecoder not support the type (" + type + ")"); } diff --git a/src/org/redkale/net/TransportFactory.java b/src/org/redkale/net/TransportFactory.java index e80706950..8561392ef 100644 --- a/src/org/redkale/net/TransportFactory.java +++ b/src/org/redkale/net/TransportFactory.java @@ -13,6 +13,7 @@ import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import java.util.function.Supplier; import java.util.logging.*; import java.util.stream.Collectors; import org.redkale.service.Service; @@ -223,7 +224,7 @@ public class TransportFactory { if (info == null) continue; addresses.addAll(info.addresses); } - if (info == null) return null; + if (info == null) info = new TransportGroupInfo("TCP"); if (sncpAddress != null) addresses.remove(sncpAddress); return new Transport(groups.stream().sorted().collect(Collectors.joining(";")), info.protocol, info.subprotocol, this, this.bufferPool, this.channelGroup, sncpAddress, addresses, this.strategy); } @@ -239,6 +240,10 @@ public class TransportFactory { return executor; } + public Supplier getBufferSupplier() { + return bufferPool; + } + public List getGroupInfos() { return new ArrayList<>(this.groupInfos.values()); } diff --git a/src/org/redkale/net/http/WebSocketNode.java b/src/org/redkale/net/http/WebSocketNode.java index def7350e7..34d52e076 100644 --- a/src/org/redkale/net/http/WebSocketNode.java +++ b/src/org/redkale/net/http/WebSocketNode.java @@ -414,6 +414,7 @@ public abstract class WebSocketNode { } private CompletableFuture sendOneMessage(final Object message, final boolean last, final Serializable userid) { + if (message instanceof CompletableFuture) return ((CompletableFuture) message).thenApply(msg -> sendOneMessage(msg, last, userid)); if (logger.isLoggable(Level.FINEST)) logger.finest("websocket want send message {userid:" + userid + ", content:'" + message + "'} from locale node to " + ((this.localEngine != null) ? "locale" : "remote") + " engine"); CompletableFuture localFuture = null; if (this.localEngine != null) localFuture = localEngine.sendMessage(message, last, userid); diff --git a/src/org/redkale/net/sncp/Sncp.java b/src/org/redkale/net/sncp/Sncp.java index edfdb77d3..3f049429f 100644 --- a/src/org/redkale/net/sncp/Sncp.java +++ b/src/org/redkale/net/sncp/Sncp.java @@ -855,7 +855,7 @@ public abstract class Sncp { * @param serviceTypeOrImplClass Service类 * @param transportFactory TransportFactory * @param clientAddress 本地IP地址 - * @param groups 所有的组节点,包含自身 + * @param groups0 所有的组节点,包含自身 * @param conf 启动配置项 * * @return Service的远程模式实例 @@ -868,10 +868,11 @@ public abstract class Sncp { final Class serviceTypeOrImplClass, final TransportFactory transportFactory, final InetSocketAddress clientAddress, - final Set groups, + final Set groups0, final AnyValue conf) { if (serviceTypeOrImplClass == null) return null; if (!Service.class.isAssignableFrom(serviceTypeOrImplClass)) return null; + Set groups = groups0 == null ? new HashSet<>() : groups0; ResourceFactory.checkResourceName(name); int mod = serviceTypeOrImplClass.getModifiers(); boolean realed = !(java.lang.reflect.Modifier.isAbstract(mod) || serviceTypeOrImplClass.isInterface()); diff --git a/src/org/redkale/net/sncp/SncpClient.java b/src/org/redkale/net/sncp/SncpClient.java index ed9783f3b..7114cb5f0 100644 --- a/src/org/redkale/net/sncp/SncpClient.java +++ b/src/org/redkale/net/sncp/SncpClient.java @@ -12,6 +12,7 @@ import java.nio.*; import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; +import java.util.function.Supplier; import java.util.logging.*; import javax.annotation.Resource; import org.redkale.convert.bson.*; @@ -55,6 +56,8 @@ public final class SncpClient { protected final ExecutorService executor; + protected final Supplier bufferSupplier; + @Resource protected JsonConvert jsonConvert; @@ -83,6 +86,7 @@ public final class SncpClient { final boolean remote, final Class serviceClass, final InetSocketAddress clientAddress) { this.remote = remote; this.executor = factory.getExecutor(); + this.bufferSupplier = factory.getBufferSupplier(); this.serviceClass = serviceClass; this.serviceversion = 0; this.clientAddress = clientAddress; @@ -338,7 +342,7 @@ public final class SncpClient { final Type[] myparamtypes = action.paramTypes; final Class[] myparamclass = action.paramClass; if (action.addressSourceParamIndex >= 0) params[action.addressSourceParamIndex] = this.clientAddress; - final BsonWriter writer = bsonConvert.pollBsonWriter(transport.getBufferSupplier()); // 将head写入 + final BsonWriter writer = bsonConvert.pollBsonWriter(transport == null ? bufferSupplier : transport.getBufferSupplier()); // 将head写入 writer.writeTo(DEFAULT_HEADER); for (int i = 0; i < params.length; i++) { //params 可能包含: 3 个 boolean bsonConvert.convertTo(writer, AsyncHandler.class.isAssignableFrom(myparamclass[i]) ? AsyncHandler.class : myparamtypes[i], params[i]); diff --git a/test/org/redkale/test/convert/BsonTestMain.java b/test/org/redkale/test/convert/BsonTestMain.java index c94834392..ecf845c48 100644 --- a/test/org/redkale/test/convert/BsonTestMain.java +++ b/test/org/redkale/test/convert/BsonTestMain.java @@ -11,7 +11,7 @@ import org.redkale.convert.bson.BsonFactory; import org.redkale.util.Utility; import org.redkale.convert.bson.BsonConvert; import java.nio.*; -import java.util.Arrays; +import java.util.*; import org.redkale.convert.json.*; /** @@ -24,12 +24,13 @@ public class BsonTestMain { Serializable[] sers = new Serializable[]{"aaa", 4}; final BsonConvert convert = BsonFactory.root().getConvert(); byte[] bytes = convert.convertTo(sers); - Utility.println("---", bytes); + Utility.println("---", bytes); Serializable[] a = convert.convertFrom(Serializable[].class, bytes); System.out.println(Arrays.toString(a)); main2(args); main3(args); main4(args); + main5(args); } public static void main2(String[] args) throws Exception { @@ -86,4 +87,15 @@ public class BsonTestMain { System.out.println(rs.toString()); } + + public static void main5(String[] args) throws Exception { + final BsonConvert convert = BsonFactory.root().getConvert(); + + LinkedHashMap map = new LinkedHashMap(); + map.put("1", 1); + map.put("2", "a2"); + byte[] bs = convert.convertTo(Object.class, map); + Object mapobj = convert.convertFrom(Object.class, bs); + System.out.println(mapobj); + } }