diff --git a/src/com/wentch/redkale/boot/Application.java b/src/com/wentch/redkale/boot/Application.java index 1677e0d50..de88c4b32 100644 --- a/src/com/wentch/redkale/boot/Application.java +++ b/src/com/wentch/redkale/boot/Application.java @@ -23,6 +23,7 @@ import java.nio.file.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import java.util.function.*; import java.util.logging.*; import javax.xml.parsers.*; import org.w3c.dom.*; @@ -465,8 +466,9 @@ public final class Application { public static T singleton(Class serviceClass, boolean remote) throws Exception { final Application application = Application.create(); - T service = remote ? Sncp.createRemoteService("", serviceClass, null, new LinkedHashSet<>(), null) - : Sncp.createLocalService("", serviceClass, null, new LinkedHashSet<>(), null, null); + Consumer executor = (x) -> Executors.newFixedThreadPool(8).submit(x); + T service = remote ? Sncp.createRemoteService("", executor, serviceClass, null, new LinkedHashSet<>(), null) + : Sncp.createLocalService("", executor, serviceClass, null, new LinkedHashSet<>(), null, null); application.init(); application.factory.register(service); application.servicecdl = new CountDownLatch(1); diff --git a/src/com/wentch/redkale/boot/NodeHttpServer.java b/src/com/wentch/redkale/boot/NodeHttpServer.java index 40a945e83..5f8992e36 100644 --- a/src/com/wentch/redkale/boot/NodeHttpServer.java +++ b/src/com/wentch/redkale/boot/NodeHttpServer.java @@ -74,7 +74,7 @@ public final class NodeHttpServer extends NodeServer { synchronized (regFactory) { Service nodeService = (Service) rf.find(rcname, WebSocketNode.class); if (nodeService == null) { - nodeService = Sncp.createLocalService(rcname, (Class) WebSocketNodeService.class, + nodeService = Sncp.createLocalService(rcname, getExecutor(), (Class) WebSocketNodeService.class, getSncpAddress(), sncpDefaultGroups, sncpSameGroupTransports, sncpDiffGroupTransports); regFactory.register(rcname, WebSocketNode.class, nodeService); factory.inject(nodeService); diff --git a/src/com/wentch/redkale/boot/NodeServer.java b/src/com/wentch/redkale/boot/NodeServer.java index df9d31f63..fda7171d5 100644 --- a/src/com/wentch/redkale/boot/NodeServer.java +++ b/src/com/wentch/redkale/boot/NodeServer.java @@ -73,6 +73,14 @@ public abstract class NodeServer { this.fine = logger.isLoggable(Level.FINE); } + protected Consumer getExecutor() throws Exception { + if (server == null) return null; + Field field = Server.class.getDeclaredField("context"); + field.setAccessible(true); + Context context = (Context) field.get(server); + return (x) -> context.submit(x); + } + public static NodeServer create(Class clazz, Application application, AnyValue serconf) { try { return clazz.getConstructor(Application.class, AnyValue.class).newInstance(application, serconf); @@ -134,7 +142,7 @@ public abstract class NodeServer { application.sources.add(source); regFactory.register(rs.name(), DataSource.class, source); if (factory.find(rs.name(), DataCacheListener.class) == null) { - Service cacheListenerService = Sncp.createLocalService(rs.name(), DataCacheListenerService.class, this.sncpAddress, sncpDefaultGroups, sncpSameGroupTransports, sncpDiffGroupTransports); + Service cacheListenerService = Sncp.createLocalService(rs.name(), getExecutor(), DataCacheListenerService.class, this.sncpAddress, sncpDefaultGroups, sncpSameGroupTransports, sncpDiffGroupTransports); regFactory.register(rs.name(), DataCacheListener.class, cacheListenerService); ServiceWrapper wrapper = new ServiceWrapper(DataCacheListenerService.class, cacheListenerService, rs.name(), sncpGroup, sncpDefaultGroups, null); localServiceWrappers.add(wrapper); @@ -233,7 +241,7 @@ public abstract class NodeServer { tset.add(iaddr); sameGroupTransports.add(loadTransport(this.sncpGroup, server.getProtocol(), tset)); } - Service service = Sncp.createLocalService(entry.getName(), type, this.sncpAddress, groups, sameGroupTransports, diffGroupTransports); + Service service = Sncp.createLocalService(entry.getName(), getExecutor(), type, this.sncpAddress, groups, sameGroupTransports, diffGroupTransports); wrapper = new ServiceWrapper(type, service, this.sncpGroup, entry); if (fine) logger.fine("[" + Thread.currentThread().getName() + "] Load " + service); } else { @@ -245,7 +253,7 @@ public abstract class NodeServer { sameGroupAddrs.addAll(v); }); if (sameGroupAddrs.isEmpty()) throw new RuntimeException(type.getName() + " has no remote address on group (" + groups + ")"); - Service service = Sncp.createRemoteService(entry.getName(), type, this.sncpAddress, groups, loadTransport(g.toString(), server.getProtocol(), sameGroupAddrs)); + Service service = Sncp.createRemoteService(entry.getName(), getExecutor(), type, this.sncpAddress, groups, loadTransport(g.toString(), server.getProtocol(), sameGroupAddrs)); wrapper = new ServiceWrapper(type, service, "", entry); if (fine) logger.fine("[" + Thread.currentThread().getName() + "] Load " + service); } diff --git a/src/com/wentch/redkale/net/WorkThread.java b/src/com/wentch/redkale/net/WorkThread.java index 98b3c76ce..d88e15b10 100644 --- a/src/com/wentch/redkale/net/WorkThread.java +++ b/src/com/wentch/redkale/net/WorkThread.java @@ -24,4 +24,8 @@ public class WorkThread extends Thread { public void submit(Runnable runner) { executor.submit(runner); } + + public ExecutorService getExecutor() { + return executor; + } } diff --git a/src/com/wentch/redkale/net/sncp/Sncp.java b/src/com/wentch/redkale/net/sncp/Sncp.java index 074d7747d..d486a608f 100644 --- a/src/com/wentch/redkale/net/sncp/Sncp.java +++ b/src/com/wentch/redkale/net/sncp/Sncp.java @@ -13,6 +13,7 @@ import com.wentch.redkale.util.*; import java.lang.reflect.*; import java.net.*; import java.util.*; +import java.util.function.*; import javax.annotation.*; import jdk.internal.org.objectweb.asm.*; import static jdk.internal.org.objectweb.asm.ClassWriter.COMPUTE_FRAMES; @@ -612,6 +613,7 @@ public abstract class Sncp { * 创建本地模式Service实例 * @param * @param name + * @param executor * @param serviceClass * @param clientAddress * @param groups @@ -620,7 +622,7 @@ public abstract class Sncp { * @return */ @SuppressWarnings("unchecked") - public static T createLocalService(final String name, final Class serviceClass, + public static T createLocalService(final String name, final Consumer executor, final Class serviceClass, final InetSocketAddress clientAddress, HashSet groups, Collection sameGroupTransports, Collection diffGroupTransports) { try { final Class newClazz = createLocalServiceClass(name, serviceClass); @@ -645,7 +647,7 @@ public abstract class Sncp { if (!list.isEmpty()) remoteTransport = new Transport(list.get(0), clientAddress, list); } if (field.getType().isAssignableFrom(newClazz) && remoteTransport != null) { - field.set(rs, createRemoteService(name, serviceClass, clientAddress, groups, remoteTransport)); + field.set(rs, createRemoteService(name, executor, serviceClass, clientAddress, groups, remoteTransport)); } continue; } @@ -675,7 +677,7 @@ public abstract class Sncp { try { Field e = newClazz.getDeclaredField("_client"); e.setAccessible(true); - client = new SncpClient(name, hash(serviceClass), false, newClazz, true, clientAddress, groups); + client = new SncpClient(name, executor, hash(serviceClass), false, newClazz, true, clientAddress, groups); e.set(rs, client); } catch (NoSuchFieldException ne) { } @@ -769,6 +771,7 @@ public abstract class Sncp { *

* @param * @param name + * @param executor * @param serviceClass * @param clientAddress * @param groups @@ -776,7 +779,8 @@ public abstract class Sncp { * @return */ @SuppressWarnings("unchecked") - public static T createRemoteService(final String name, final Class serviceClass, final InetSocketAddress clientAddress, HashSet groups, final Transport transport) { + public static T createRemoteService(final String name, final Consumer executor, + final Class serviceClass, final InetSocketAddress clientAddress, HashSet groups, final Transport transport) { if (serviceClass == null) return null; if (!Service.class.isAssignableFrom(serviceClass)) return null; int mod = serviceClass.getModifiers(); @@ -791,7 +795,7 @@ public abstract class Sncp { final String anyValueDesc = Type.getDescriptor(AnyValue.class); ClassLoader loader = Sncp.class.getClassLoader(); String newDynName = supDynName.substring(0, supDynName.lastIndexOf('/') + 1) + REMOTEPREFIX + serviceClass.getSimpleName(); - final SncpClient client = new SncpClient(name, hash(serviceClass), true, createLocalServiceClass(name, serviceClass), false, clientAddress, groups); + final SncpClient client = new SncpClient(name, executor, hash(serviceClass), true, createLocalServiceClass(name, serviceClass), false, clientAddress, groups); try { Class newClazz = Class.forName(newDynName.replace('/', '.')); T rs = (T) newClazz.newInstance(); diff --git a/src/com/wentch/redkale/net/sncp/SncpClient.java b/src/com/wentch/redkale/net/sncp/SncpClient.java index e18132317..b89f463e9 100644 --- a/src/com/wentch/redkale/net/sncp/SncpClient.java +++ b/src/com/wentch/redkale/net/sncp/SncpClient.java @@ -15,6 +15,7 @@ import java.net.*; import java.nio.*; import java.util.*; import java.util.concurrent.*; +import java.util.function.*; import java.util.logging.*; /** @@ -93,12 +94,13 @@ public final class SncpClient { protected final SncpAction[] actions; - protected final BlockingQueue queue = new ArrayBlockingQueue(1024 * 64); + protected final Consumer executor; - public SncpClient(final String serviceName, final long serviceid0, boolean remote, final Class serviceClass, + public SncpClient(final String serviceName, final Consumer executor, final long serviceid0, boolean remote, final Class serviceClass, boolean onlySncpDyn, final InetSocketAddress clientAddress, final HashSet groups) { if (serviceName.length() > 10) throw new RuntimeException(serviceClass + " @Resource name(" + serviceName + ") too long , must less 11"); this.remote = remote; + this.executor = executor; this.serviceClass = serviceClass; this.address = clientAddress; this.groups = groups; @@ -115,24 +117,6 @@ public final class SncpClient { this.actions = methodens.toArray(new SncpAction[methodens.size()]); this.addrBytes = clientAddress == null ? new byte[4] : clientAddress.getAddress().getAddress(); this.addrPort = clientAddress == null ? 0 : clientAddress.getPort(); - new Thread() { - { - setName(SncpClient.class.getSimpleName() + serviceClass.getSimpleName() + "-" + serviceName + "-Thread"); - setDaemon(true); - } - - @Override - public void run() { - while (true) { - try { - Runnable runner = queue.take(); - runner.run(); - } catch (Exception e) { - logger.log(Level.SEVERE, SncpClient.class.getSimpleName() + " runnable occur error", e); - } - } - } - }.start(); } public long getNameid() { @@ -208,8 +192,8 @@ public final class SncpClient { private void remote(final boolean async, final BsonConvert convert, final Transport[] transports, final boolean run, final int index, final Object... params) { if (!run) return; - if (async) { - submit(() -> { + if (async && executor != null) { + executor.accept(() -> { for (Transport transport : transports) { convert.convertFrom(actions[index].resultTypes, send(convert, transport, actions[index], params)); } @@ -221,10 +205,6 @@ public final class SncpClient { } } - private void submit(Runnable runner) { - if (!queue.offer(runner)) runner.run(); - } - private byte[] send(final BsonConvert convert, Transport transport, final SncpAction action, Object... params) { int bodyLength = 2; Type[] myparamtypes = action.paramTypes; @@ -256,7 +236,7 @@ public final class SncpClient { System.arraycopy(bs, 0, all, pos, bs.length); pos += bs.length; } - if (pos != all.length) logger.warning(this.serviceid + "," + this.nameid + "," + action + " sncp body.length : " + all.length + ", but pos=" + pos); + if (pos != all.length) logger.warning(this.serviceid + "," + this.nameid + "," + action + " sncp(" + action.method + ") body.length : " + all.length + ", but pos=" + pos); pos = 0; for (int i = patch - 1; i >= 0; i--) { fillHeader(buffer, seqid, actionid, patch, i, bodyLength); @@ -286,23 +266,23 @@ public final class SncpClient { conn.read(buffer).get(readto > 0 ? readto : 5, TimeUnit.SECONDS); buffer.flip(); long rseqid = buffer.getLong(); - if (rseqid != seqid) throw new RuntimeException("sncp send seqid = " + seqid + ", but receive seqid =" + rseqid); - if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp buffer receive header.length not " + HEADER_SIZE); + if (rseqid != seqid) throw new RuntimeException("sncp(" + action.method + ") send seqid = " + seqid + ", but receive seqid =" + rseqid); + if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp(" + action.method + ") buffer receive header.length not " + HEADER_SIZE); long rserviceid = buffer.getLong(); - if (rserviceid != serviceid) throw new RuntimeException("sncp send serviceid = " + serviceid + ", but receive serviceid =" + rserviceid); + if (rserviceid != serviceid) throw new RuntimeException("sncp(" + action.method + ") send serviceid = " + serviceid + ", but receive serviceid =" + rserviceid); long rnameid = buffer.getLong(); - if (rnameid != nameid) throw new RuntimeException("sncp send nameid = " + nameid + ", but receive nameid =" + rnameid); + if (rnameid != nameid) throw new RuntimeException("sncp(" + action.method + ") send nameid = " + nameid + ", but receive nameid =" + rnameid); long ractionid1 = buffer.getLong(); long ractionid2 = buffer.getLong(); - if (!actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp send actionid = " + actionid + ", but receive actionid =(" + ractionid1 + "_" + ractionid2 + ")"); + if (!actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp(" + action.method + ") send actionid = " + actionid + ", but receive actionid =(" + ractionid1 + "_" + ractionid2 + ")"); buffer.getInt(); buffer.getInt(); final int frameCount = buffer.get(); - if (frameCount < 1) throw new RuntimeException("sncp send nameid = " + nameid + ", but frame.count =" + frameCount); + if (frameCount < 1) throw new RuntimeException("sncp(" + action.method + ") send nameid = " + nameid + ", but frame.count =" + frameCount); int frameIndex = buffer.get(); - if (frameIndex < 0 || frameIndex >= frameCount) throw new RuntimeException("sncp send nameid = " + nameid + ", but frame.count =" + frameCount + " & frame.index =" + frameIndex); + if (frameIndex < 0 || frameIndex >= frameCount) throw new RuntimeException("sncp(" + action.method + ") send nameid = " + nameid + ", but frame.count =" + frameCount + " & frame.index =" + frameIndex); final int retcode = buffer.getInt(); - if (retcode != 0) throw new RuntimeException("remote service deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")"); + if (retcode != 0) throw new RuntimeException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")"); final int bodylen = buffer.getInt(); final byte[] body = new byte[bodylen]; if (frameCount == 1) { @@ -318,33 +298,33 @@ public final class SncpClient { conn.read(buffer).get(readto > 0 ? readto : 5, TimeUnit.SECONDS); buffer.flip(); rseqid = buffer.getLong(); - if (rseqid != seqid) throw new RuntimeException("sncp send seqid = " + seqid + ", but receive next.seqid =" + rseqid); - if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp buffer receive header.length not " + HEADER_SIZE); + if (rseqid != seqid) throw new RuntimeException("sncp(" + action.method + ") send seqid = " + seqid + ", but receive next.seqid =" + rseqid); + if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp(" + action.method + ") buffer receive header.length not " + HEADER_SIZE); rserviceid = buffer.getLong(); - if (rserviceid != serviceid) throw new RuntimeException("sncp send serviceid = " + serviceid + ", but receive next.serviceid =" + rserviceid); + if (rserviceid != serviceid) throw new RuntimeException("sncp(" + action.method + ") send serviceid = " + serviceid + ", but receive next.serviceid =" + rserviceid); rnameid = buffer.getLong(); - if (rnameid != nameid) throw new RuntimeException("sncp send nameid = " + nameid + ", but receive next.nameid =" + rnameid); + if (rnameid != nameid) throw new RuntimeException("sncp(" + action.method + ") send nameid = " + nameid + ", but receive next.nameid =" + rnameid); ractionid1 = buffer.getLong(); ractionid2 = buffer.getLong(); - if (!actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp send actionid = " + actionid + ", but receive next.actionid =(" + ractionid1 + "_" + ractionid2 + ")"); + if (!actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp(" + action.method + ") send actionid = " + actionid + ", but receive next.actionid =(" + ractionid1 + "_" + ractionid2 + ")"); buffer.getInt(); buffer.getInt(); - if (buffer.get() < 1) throw new RuntimeException("sncp send nameid = " + nameid + ", but next.frame.count != " + frameCount); + if (buffer.get() < 1) throw new RuntimeException("sncp(" + action.method + ") send nameid = " + nameid + ", but next.frame.count != " + frameCount); frameIndex = buffer.get(); if (frameIndex < 0 || frameIndex >= frameCount) - throw new RuntimeException("sncp receive nameid = " + nameid + ", but frame.count =" + frameCount + " & next.frame.index =" + frameIndex); + throw new RuntimeException("sncp(" + action.method + ") receive nameid = " + nameid + ", but frame.count =" + frameCount + " & next.frame.index =" + frameIndex); int rretcode = buffer.getInt(); - if (rretcode != 0) throw new RuntimeException("remote service deal error (receive retcode =" + rretcode + ")"); + if (rretcode != 0) throw new RuntimeException("remote service(" + action.method + ") deal error (receive retcode =" + rretcode + ")"); int rbodylen = buffer.getInt(); - if (rbodylen != bodylen) throw new RuntimeException("sncp receive bodylength = " + bodylen + ", but receive next.bodylength =" + rbodylen); + if (rbodylen != bodylen) throw new RuntimeException("sncp(" + action.method + ") receive bodylength = " + bodylen + ", but receive next.bodylength =" + rbodylen); } - if (received != bodylen) throw new RuntimeException("sncp receive bodylength = " + bodylen + ", but receive next.receivedlength =" + received); + if (received != bodylen) throw new RuntimeException("sncp(" + action.method + ") receive bodylength = " + bodylen + ", but receive next.receivedlength =" + received); return body; } } catch (RuntimeException ex) { throw ex; } catch (Exception e) { - throw new RuntimeException(conn.getRemoteAddress() + " connect failed.", e); + throw new RuntimeException("sncp(" + action.method + ") " + conn.getRemoteAddress() + " connect failed.", e); } finally { transport.offerBuffer(buffer); transport.offerConnection(conn);