This commit is contained in:
地平线
2015-10-26 10:55:57 +08:00
parent 6f3acd72a1
commit 5697bfc0c1
6 changed files with 55 additions and 57 deletions

View File

@@ -23,6 +23,7 @@ import java.nio.file.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.function.*;
import java.util.logging.*;
import javax.xml.parsers.*;
import org.w3c.dom.*;
@@ -465,8 +466,9 @@ public final class Application {
public static <T extends Service> T singleton(Class<T> serviceClass, boolean remote) throws Exception {
final Application application = Application.create();
T service = remote ? Sncp.createRemoteService("", serviceClass, null, new LinkedHashSet<>(), null)
: Sncp.createLocalService("", serviceClass, null, new LinkedHashSet<>(), null, null);
Consumer<Runnable> executor = (x) -> Executors.newFixedThreadPool(8).submit(x);
T service = remote ? Sncp.createRemoteService("", executor, serviceClass, null, new LinkedHashSet<>(), null)
: Sncp.createLocalService("", executor, serviceClass, null, new LinkedHashSet<>(), null, null);
application.init();
application.factory.register(service);
application.servicecdl = new CountDownLatch(1);

View File

@@ -74,7 +74,7 @@ public final class NodeHttpServer extends NodeServer {
synchronized (regFactory) {
Service nodeService = (Service) rf.find(rcname, WebSocketNode.class);
if (nodeService == null) {
nodeService = Sncp.createLocalService(rcname, (Class<? extends Service>) WebSocketNodeService.class,
nodeService = Sncp.createLocalService(rcname, getExecutor(), (Class<? extends Service>) WebSocketNodeService.class,
getSncpAddress(), sncpDefaultGroups, sncpSameGroupTransports, sncpDiffGroupTransports);
regFactory.register(rcname, WebSocketNode.class, nodeService);
factory.inject(nodeService);

View File

@@ -73,6 +73,14 @@ public abstract class NodeServer {
this.fine = logger.isLoggable(Level.FINE);
}
protected Consumer<Runnable> getExecutor() throws Exception {
if (server == null) return null;
Field field = Server.class.getDeclaredField("context");
field.setAccessible(true);
Context context = (Context) field.get(server);
return (x) -> context.submit(x);
}
public static <T extends NodeServer> NodeServer create(Class<T> clazz, Application application, AnyValue serconf) {
try {
return clazz.getConstructor(Application.class, AnyValue.class).newInstance(application, serconf);
@@ -134,7 +142,7 @@ public abstract class NodeServer {
application.sources.add(source);
regFactory.register(rs.name(), DataSource.class, source);
if (factory.find(rs.name(), DataCacheListener.class) == null) {
Service cacheListenerService = Sncp.createLocalService(rs.name(), DataCacheListenerService.class, this.sncpAddress, sncpDefaultGroups, sncpSameGroupTransports, sncpDiffGroupTransports);
Service cacheListenerService = Sncp.createLocalService(rs.name(), getExecutor(), DataCacheListenerService.class, this.sncpAddress, sncpDefaultGroups, sncpSameGroupTransports, sncpDiffGroupTransports);
regFactory.register(rs.name(), DataCacheListener.class, cacheListenerService);
ServiceWrapper wrapper = new ServiceWrapper(DataCacheListenerService.class, cacheListenerService, rs.name(), sncpGroup, sncpDefaultGroups, null);
localServiceWrappers.add(wrapper);
@@ -233,7 +241,7 @@ public abstract class NodeServer {
tset.add(iaddr);
sameGroupTransports.add(loadTransport(this.sncpGroup, server.getProtocol(), tset));
}
Service service = Sncp.createLocalService(entry.getName(), type, this.sncpAddress, groups, sameGroupTransports, diffGroupTransports);
Service service = Sncp.createLocalService(entry.getName(), getExecutor(), type, this.sncpAddress, groups, sameGroupTransports, diffGroupTransports);
wrapper = new ServiceWrapper(type, service, this.sncpGroup, entry);
if (fine) logger.fine("[" + Thread.currentThread().getName() + "] Load " + service);
} else {
@@ -245,7 +253,7 @@ public abstract class NodeServer {
sameGroupAddrs.addAll(v);
});
if (sameGroupAddrs.isEmpty()) throw new RuntimeException(type.getName() + " has no remote address on group (" + groups + ")");
Service service = Sncp.createRemoteService(entry.getName(), type, this.sncpAddress, groups, loadTransport(g.toString(), server.getProtocol(), sameGroupAddrs));
Service service = Sncp.createRemoteService(entry.getName(), getExecutor(), type, this.sncpAddress, groups, loadTransport(g.toString(), server.getProtocol(), sameGroupAddrs));
wrapper = new ServiceWrapper(type, service, "", entry);
if (fine) logger.fine("[" + Thread.currentThread().getName() + "] Load " + service);
}

View File

@@ -24,4 +24,8 @@ public class WorkThread extends Thread {
public void submit(Runnable runner) {
executor.submit(runner);
}
public ExecutorService getExecutor() {
return executor;
}
}

View File

@@ -13,6 +13,7 @@ import com.wentch.redkale.util.*;
import java.lang.reflect.*;
import java.net.*;
import java.util.*;
import java.util.function.*;
import javax.annotation.*;
import jdk.internal.org.objectweb.asm.*;
import static jdk.internal.org.objectweb.asm.ClassWriter.COMPUTE_FRAMES;
@@ -612,6 +613,7 @@ public abstract class Sncp {
* 创建本地模式Service实例
* @param <T>
* @param name
* @param executor
* @param serviceClass
* @param clientAddress
* @param groups
@@ -620,7 +622,7 @@ public abstract class Sncp {
* @return
*/
@SuppressWarnings("unchecked")
public static <T extends Service> T createLocalService(final String name, final Class<T> serviceClass,
public static <T extends Service> T createLocalService(final String name, final Consumer<Runnable> executor, final Class<T> serviceClass,
final InetSocketAddress clientAddress, HashSet<String> groups, Collection<Transport> sameGroupTransports, Collection<Transport> diffGroupTransports) {
try {
final Class newClazz = createLocalServiceClass(name, serviceClass);
@@ -645,7 +647,7 @@ public abstract class Sncp {
if (!list.isEmpty()) remoteTransport = new Transport(list.get(0), clientAddress, list);
}
if (field.getType().isAssignableFrom(newClazz) && remoteTransport != null) {
field.set(rs, createRemoteService(name, serviceClass, clientAddress, groups, remoteTransport));
field.set(rs, createRemoteService(name, executor, serviceClass, clientAddress, groups, remoteTransport));
}
continue;
}
@@ -675,7 +677,7 @@ public abstract class Sncp {
try {
Field e = newClazz.getDeclaredField("_client");
e.setAccessible(true);
client = new SncpClient(name, hash(serviceClass), false, newClazz, true, clientAddress, groups);
client = new SncpClient(name, executor, hash(serviceClass), false, newClazz, true, clientAddress, groups);
e.set(rs, client);
} catch (NoSuchFieldException ne) {
}
@@ -769,6 +771,7 @@ public abstract class Sncp {
* <p>
* @param <T>
* @param name
* @param executor
* @param serviceClass
* @param clientAddress
* @param groups
@@ -776,7 +779,8 @@ public abstract class Sncp {
* @return
*/
@SuppressWarnings("unchecked")
public static <T extends Service> T createRemoteService(final String name, final Class<T> serviceClass, final InetSocketAddress clientAddress, HashSet<String> groups, final Transport transport) {
public static <T extends Service> T createRemoteService(final String name, final Consumer<Runnable> executor,
final Class<T> serviceClass, final InetSocketAddress clientAddress, HashSet<String> groups, final Transport transport) {
if (serviceClass == null) return null;
if (!Service.class.isAssignableFrom(serviceClass)) return null;
int mod = serviceClass.getModifiers();
@@ -791,7 +795,7 @@ public abstract class Sncp {
final String anyValueDesc = Type.getDescriptor(AnyValue.class);
ClassLoader loader = Sncp.class.getClassLoader();
String newDynName = supDynName.substring(0, supDynName.lastIndexOf('/') + 1) + REMOTEPREFIX + serviceClass.getSimpleName();
final SncpClient client = new SncpClient(name, hash(serviceClass), true, createLocalServiceClass(name, serviceClass), false, clientAddress, groups);
final SncpClient client = new SncpClient(name, executor, hash(serviceClass), true, createLocalServiceClass(name, serviceClass), false, clientAddress, groups);
try {
Class newClazz = Class.forName(newDynName.replace('/', '.'));
T rs = (T) newClazz.newInstance();

View File

@@ -15,6 +15,7 @@ import java.net.*;
import java.nio.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.*;
import java.util.logging.*;
/**
@@ -93,12 +94,13 @@ public final class SncpClient {
protected final SncpAction[] actions;
protected final BlockingQueue<Runnable> queue = new ArrayBlockingQueue(1024 * 64);
protected final Consumer<Runnable> executor;
public SncpClient(final String serviceName, final long serviceid0, boolean remote, final Class serviceClass,
public SncpClient(final String serviceName, final Consumer<Runnable> executor, final long serviceid0, boolean remote, final Class serviceClass,
boolean onlySncpDyn, final InetSocketAddress clientAddress, final HashSet<String> groups) {
if (serviceName.length() > 10) throw new RuntimeException(serviceClass + " @Resource name(" + serviceName + ") too long , must less 11");
this.remote = remote;
this.executor = executor;
this.serviceClass = serviceClass;
this.address = clientAddress;
this.groups = groups;
@@ -115,24 +117,6 @@ public final class SncpClient {
this.actions = methodens.toArray(new SncpAction[methodens.size()]);
this.addrBytes = clientAddress == null ? new byte[4] : clientAddress.getAddress().getAddress();
this.addrPort = clientAddress == null ? 0 : clientAddress.getPort();
new Thread() {
{
setName(SncpClient.class.getSimpleName() + serviceClass.getSimpleName() + "-" + serviceName + "-Thread");
setDaemon(true);
}
@Override
public void run() {
while (true) {
try {
Runnable runner = queue.take();
runner.run();
} catch (Exception e) {
logger.log(Level.SEVERE, SncpClient.class.getSimpleName() + " runnable occur error", e);
}
}
}
}.start();
}
public long getNameid() {
@@ -208,8 +192,8 @@ public final class SncpClient {
private <T> void remote(final boolean async, final BsonConvert convert, final Transport[] transports, final boolean run, final int index, final Object... params) {
if (!run) return;
if (async) {
submit(() -> {
if (async && executor != null) {
executor.accept(() -> {
for (Transport transport : transports) {
convert.convertFrom(actions[index].resultTypes, send(convert, transport, actions[index], params));
}
@@ -221,10 +205,6 @@ public final class SncpClient {
}
}
private void submit(Runnable runner) {
if (!queue.offer(runner)) runner.run();
}
private byte[] send(final BsonConvert convert, Transport transport, final SncpAction action, Object... params) {
int bodyLength = 2;
Type[] myparamtypes = action.paramTypes;
@@ -256,7 +236,7 @@ public final class SncpClient {
System.arraycopy(bs, 0, all, pos, bs.length);
pos += bs.length;
}
if (pos != all.length) logger.warning(this.serviceid + "," + this.nameid + "," + action + " sncp body.length : " + all.length + ", but pos=" + pos);
if (pos != all.length) logger.warning(this.serviceid + "," + this.nameid + "," + action + " sncp(" + action.method + ") body.length : " + all.length + ", but pos=" + pos);
pos = 0;
for (int i = patch - 1; i >= 0; i--) {
fillHeader(buffer, seqid, actionid, patch, i, bodyLength);
@@ -286,23 +266,23 @@ public final class SncpClient {
conn.read(buffer).get(readto > 0 ? readto : 5, TimeUnit.SECONDS);
buffer.flip();
long rseqid = buffer.getLong();
if (rseqid != seqid) throw new RuntimeException("sncp send seqid = " + seqid + ", but receive seqid =" + rseqid);
if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp buffer receive header.length not " + HEADER_SIZE);
if (rseqid != seqid) throw new RuntimeException("sncp(" + action.method + ") send seqid = " + seqid + ", but receive seqid =" + rseqid);
if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp(" + action.method + ") buffer receive header.length not " + HEADER_SIZE);
long rserviceid = buffer.getLong();
if (rserviceid != serviceid) throw new RuntimeException("sncp send serviceid = " + serviceid + ", but receive serviceid =" + rserviceid);
if (rserviceid != serviceid) throw new RuntimeException("sncp(" + action.method + ") send serviceid = " + serviceid + ", but receive serviceid =" + rserviceid);
long rnameid = buffer.getLong();
if (rnameid != nameid) throw new RuntimeException("sncp send nameid = " + nameid + ", but receive nameid =" + rnameid);
if (rnameid != nameid) throw new RuntimeException("sncp(" + action.method + ") send nameid = " + nameid + ", but receive nameid =" + rnameid);
long ractionid1 = buffer.getLong();
long ractionid2 = buffer.getLong();
if (!actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp send actionid = " + actionid + ", but receive actionid =(" + ractionid1 + "_" + ractionid2 + ")");
if (!actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp(" + action.method + ") send actionid = " + actionid + ", but receive actionid =(" + ractionid1 + "_" + ractionid2 + ")");
buffer.getInt();
buffer.getInt();
final int frameCount = buffer.get();
if (frameCount < 1) throw new RuntimeException("sncp send nameid = " + nameid + ", but frame.count =" + frameCount);
if (frameCount < 1) throw new RuntimeException("sncp(" + action.method + ") send nameid = " + nameid + ", but frame.count =" + frameCount);
int frameIndex = buffer.get();
if (frameIndex < 0 || frameIndex >= frameCount) throw new RuntimeException("sncp send nameid = " + nameid + ", but frame.count =" + frameCount + " & frame.index =" + frameIndex);
if (frameIndex < 0 || frameIndex >= frameCount) throw new RuntimeException("sncp(" + action.method + ") send nameid = " + nameid + ", but frame.count =" + frameCount + " & frame.index =" + frameIndex);
final int retcode = buffer.getInt();
if (retcode != 0) throw new RuntimeException("remote service deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")");
if (retcode != 0) throw new RuntimeException("remote service(" + action.method + ") deal error (retcode=" + retcode + ", retinfo=" + SncpResponse.getRetCodeInfo(retcode) + ")");
final int bodylen = buffer.getInt();
final byte[] body = new byte[bodylen];
if (frameCount == 1) {
@@ -318,33 +298,33 @@ public final class SncpClient {
conn.read(buffer).get(readto > 0 ? readto : 5, TimeUnit.SECONDS);
buffer.flip();
rseqid = buffer.getLong();
if (rseqid != seqid) throw new RuntimeException("sncp send seqid = " + seqid + ", but receive next.seqid =" + rseqid);
if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp buffer receive header.length not " + HEADER_SIZE);
if (rseqid != seqid) throw new RuntimeException("sncp(" + action.method + ") send seqid = " + seqid + ", but receive next.seqid =" + rseqid);
if (buffer.getChar() != HEADER_SIZE) throw new RuntimeException("sncp(" + action.method + ") buffer receive header.length not " + HEADER_SIZE);
rserviceid = buffer.getLong();
if (rserviceid != serviceid) throw new RuntimeException("sncp send serviceid = " + serviceid + ", but receive next.serviceid =" + rserviceid);
if (rserviceid != serviceid) throw new RuntimeException("sncp(" + action.method + ") send serviceid = " + serviceid + ", but receive next.serviceid =" + rserviceid);
rnameid = buffer.getLong();
if (rnameid != nameid) throw new RuntimeException("sncp send nameid = " + nameid + ", but receive next.nameid =" + rnameid);
if (rnameid != nameid) throw new RuntimeException("sncp(" + action.method + ") send nameid = " + nameid + ", but receive next.nameid =" + rnameid);
ractionid1 = buffer.getLong();
ractionid2 = buffer.getLong();
if (!actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp send actionid = " + actionid + ", but receive next.actionid =(" + ractionid1 + "_" + ractionid2 + ")");
if (!actionid.compare(ractionid1, ractionid2)) throw new RuntimeException("sncp(" + action.method + ") send actionid = " + actionid + ", but receive next.actionid =(" + ractionid1 + "_" + ractionid2 + ")");
buffer.getInt();
buffer.getInt();
if (buffer.get() < 1) throw new RuntimeException("sncp send nameid = " + nameid + ", but next.frame.count != " + frameCount);
if (buffer.get() < 1) throw new RuntimeException("sncp(" + action.method + ") send nameid = " + nameid + ", but next.frame.count != " + frameCount);
frameIndex = buffer.get();
if (frameIndex < 0 || frameIndex >= frameCount)
throw new RuntimeException("sncp receive nameid = " + nameid + ", but frame.count =" + frameCount + " & next.frame.index =" + frameIndex);
throw new RuntimeException("sncp(" + action.method + ") receive nameid = " + nameid + ", but frame.count =" + frameCount + " & next.frame.index =" + frameIndex);
int rretcode = buffer.getInt();
if (rretcode != 0) throw new RuntimeException("remote service deal error (receive retcode =" + rretcode + ")");
if (rretcode != 0) throw new RuntimeException("remote service(" + action.method + ") deal error (receive retcode =" + rretcode + ")");
int rbodylen = buffer.getInt();
if (rbodylen != bodylen) throw new RuntimeException("sncp receive bodylength = " + bodylen + ", but receive next.bodylength =" + rbodylen);
if (rbodylen != bodylen) throw new RuntimeException("sncp(" + action.method + ") receive bodylength = " + bodylen + ", but receive next.bodylength =" + rbodylen);
}
if (received != bodylen) throw new RuntimeException("sncp receive bodylength = " + bodylen + ", but receive next.receivedlength =" + received);
if (received != bodylen) throw new RuntimeException("sncp(" + action.method + ") receive bodylength = " + bodylen + ", but receive next.receivedlength =" + received);
return body;
}
} catch (RuntimeException ex) {
throw ex;
} catch (Exception e) {
throw new RuntimeException(conn.getRemoteAddress() + " connect failed.", e);
throw new RuntimeException("sncp(" + action.method + ") " + conn.getRemoteAddress() + " connect failed.", e);
} finally {
transport.offerBuffer(buffer);
transport.offerConnection(conn);