From 7ea12638c5a4e5570ba7ec9f600d056894d13750 Mon Sep 17 00:00:00 2001 From: wentch <22250530@qq.com> Date: Tue, 12 Jan 2016 18:17:58 +0800 Subject: [PATCH] =?UTF-8?q?Transport=20=E6=94=B9=E7=89=88=202?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/boot/Application.java | 20 +++++++ src/org/redkale/boot/NodeServer.java | 61 +++++++++++++------- src/org/redkale/boot/NodeSncpServer.java | 9 ++- src/org/redkale/net/sncp/ServiceWrapper.java | 31 +++++++++- src/org/redkale/net/sncp/SncpClient.java | 19 +++--- src/org/redkale/net/sncp/SncpDynServlet.java | 28 ++++++++- src/org/redkale/net/sncp/SncpServer.java | 10 +++- src/org/redkale/net/sncp/SncpServlet.java | 7 ++- 8 files changed, 148 insertions(+), 37 deletions(-) diff --git a/src/org/redkale/boot/Application.java b/src/org/redkale/boot/Application.java index 90029b947..deef6da25 100644 --- a/src/org/redkale/boot/Application.java +++ b/src/org/redkale/boot/Application.java @@ -562,6 +562,26 @@ public final class Application { System.exit(0); } + Set findSncpGroups(Transport sameGroupTransport, Collection diffGroupTransports) { + Set gs = new HashSet<>(); + if (sameGroupTransport != null) gs.add(sameGroupTransport.getName()); + if (diffGroupTransports != null) { + for (Transport t : diffGroupTransports) { + gs.add(t.getName()); + } + } + return gs; + } + + NodeSncpServer findNodeSncpServer(final InetSocketAddress sncpAddr) { + for (NodeServer node : servers) { + if (node.isSNCP() && sncpAddr.equals(node.getSncpAddress())) { + return (NodeSncpServer) node; + } + } + return null; + } + String findGroupProtocol(String group) { if (group == null) return null; return globalGroupProtocols.get(group); diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index 047c6f6e6..fa2a72dbf 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -23,6 +23,7 @@ import java.util.logging.*; import javax.annotation.*; import javax.persistence.*; import org.redkale.net.*; +import org.redkale.net.sncp.*; import org.redkale.service.*; import org.redkale.source.*; import org.redkale.util.*; @@ -157,6 +158,8 @@ public abstract class NodeServer { DataSource source = new DataDefaultSource(resourceName); application.dataSources.add(source); regFactory.register(resourceName, DataSource.class, source); + + SncpClient client = null; Transport sameGroupTransport = null; List diffGroupTransports = null; try { @@ -167,16 +170,24 @@ public abstract class NodeServer { ts = src.getClass().getDeclaredField("_diffGroupTransports"); ts.setAccessible(true); diffGroupTransports = Arrays.asList((Transport[]) ts.get(src)); + + ts = src.getClass().getDeclaredField("_client"); + ts.setAccessible(true); + client = (SncpClient) ts.get(src); } catch (Exception e) { //src 不含 MultiRun 方法 } - if (factory.find(resourceName, DataCacheListener.class) == null) { - Service cacheListenerService = Sncp.createLocalService(resourceName, getExecutor(), DataCacheListenerService.class, this.sncpAddress, sameGroupTransport, diffGroupTransports); + final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress(); + if (sncpAddr != null && factory.find(resourceName, DataCacheListener.class) == null) { + Service cacheListenerService = Sncp.createLocalService(resourceName, getExecutor(), DataCacheListenerService.class, sncpAddr, sameGroupTransport, diffGroupTransports); regFactory.register(resourceName, DataCacheListener.class, cacheListenerService); - ServiceWrapper wrapper = new ServiceWrapper(DataCacheListenerService.class, cacheListenerService, resourceName, sncpGroup, null, null); + final NodeSncpServer sncpServer = application.findNodeSncpServer(sncpAddr); + Set gs = application.findSncpGroups(sameGroupTransport, diffGroupTransports); + ServiceWrapper wrapper = new ServiceWrapper(DataCacheListenerService.class, cacheListenerService, resourceName, sncpServer.getSncpGroup(), gs, null); localServiceWrappers.add(wrapper); - if (consumer != null) consumer.accept(wrapper); + sncpServer.consumerAccept(wrapper); rf.inject(cacheListenerService, self); + if (fine) logger.fine("[" + Thread.currentThread().getName() + "] Load Service " + wrapper.getService()); } field.set(src, source); rf.inject(source, self); // 给 "datasource.nodeid" 赋值 @@ -187,7 +198,9 @@ public abstract class NodeServer { factory.add(CacheSource.class, (ResourceFactory rf, final Object src, final String resourceName, Field field, final Object attachment) -> { try { if (field.getAnnotation(Resource.class) == null) return; - if ((src instanceof Service) && Sncp.isRemote((Service) src)) return; //远程模式不得注入 CacheSource + if ((src instanceof Service) && Sncp.isRemote((Service) src)) return; //远程模式不得注入 CacheSource + + SncpClient client = null; Transport sameGroupTransport = null; List diffGroupTransports = null; try { @@ -198,10 +211,15 @@ public abstract class NodeServer { ts = src.getClass().getDeclaredField("_diffGroupTransports"); ts.setAccessible(true); diffGroupTransports = Arrays.asList((Transport[]) ts.get(src)); + + ts = src.getClass().getDeclaredField("_client"); + ts.setAccessible(true); + client = (SncpClient) ts.get(src); } catch (Exception e) { //src 不含 MultiRun 方法 } - CacheSourceService source = Sncp.createLocalService(resourceName, getExecutor(), CacheSourceService.class, this.sncpAddress, sameGroupTransport, diffGroupTransports); + final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress(); + CacheSourceService source = Sncp.createLocalService(resourceName, getExecutor(), CacheSourceService.class, sncpAddr, sameGroupTransport, diffGroupTransports); Type genericType = field.getGenericType(); ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null; Type valType = pt == null ? null : pt.getActualTypeArguments()[1]; @@ -212,15 +230,13 @@ public abstract class NodeServer { field.set(src, source); rf.inject(source, self); // ((Service) source).init(null); - if (getSncpAddress() != null) { - NodeSncpServer sncpServer = null; - for (NodeServer node : application.servers) { - if (node.isSNCP() && getSncpAddress().equals(node.getSncpAddress())) { - sncpServer = (NodeSncpServer) node; - } - } - ServiceWrapper wrapper = new ServiceWrapper(CacheSourceService.class, (Service) source, resourceName, getSncpGroup(), null, null); + + if (sncpAddr != null) { + NodeSncpServer sncpServer = application.findNodeSncpServer(sncpAddr); + Set gs = application.findSncpGroups(sameGroupTransport, diffGroupTransports); + ServiceWrapper wrapper = new ServiceWrapper(CacheSourceService.class, (Service) source, resourceName, sncpServer.getSncpGroup(), gs, null); sncpServer.getSncpServer().addService(wrapper); + if (fine) logger.fine("[" + Thread.currentThread().getName() + "] Load Service " + wrapper.getService()); } logger.fine("[" + Thread.currentThread().getName() + "] Load Source " + source); } catch (Exception e) { @@ -257,7 +273,6 @@ public abstract class NodeServer { service = Sncp.createRemoteService(entry.getName(), getExecutor(), type, this.sncpAddress, loadTransport(groups)); } final ServiceWrapper wrapper = new ServiceWrapper(type, service, entry.getName(), localed ? this.sncpGroup : null, groups, entry.getProperty()); - if (fine) logger.fine("[" + Thread.currentThread().getName() + "] Load Service " + wrapper.getService()); if (factory.find(wrapper.getName(), wrapper.getType()) == null) { regFactory.register(wrapper.getName(), wrapper.getService()); if (wrapper.isRemote()) { @@ -275,22 +290,28 @@ public abstract class NodeServer { final StringBuilder sb = logger.isLoggable(Level.INFO) ? new StringBuilder() : null; //---------------- inject ---------------- - new HashSet<>(localServiceWrappers).forEach(y -> { + new ArrayList<>(localServiceWrappers).forEach(y -> { factory.inject(y.getService(), NodeServer.this); }); remoteServiceWrappers.forEach(y -> { factory.inject(y.getService(), NodeServer.this); if (sb != null) { - sb.append(threadName).append("RemoteService(").append(y.getType()).append(':').append(y.getName()).append(") injected").append(LINE_SEPARATOR); + sb.append(threadName).append(y.toSimpleString()).append(" loaded and injected").append(LINE_SEPARATOR); } }); //----------------- init ----------------- + List swlist = new ArrayList<>(localServiceWrappers); + Collections.sort(swlist); + localServiceWrappers.clear(); + localServiceWrappers.addAll(swlist); localServiceWrappers.parallelStream().forEach(y -> { long s = System.currentTimeMillis(); y.getService().init(y.getConf()); long e = System.currentTimeMillis() - s; - if (e > 2 && sb != null) { - sb.append(threadName).append("LocalService(").append(y.getType()).append(':').append(y.getName()).append(") init ").append(e).append("ms").append(LINE_SEPARATOR); + if (sb != null) { + synchronized (sb) { //parallelStream 必须要锁 + sb.append(threadName).append(y.toSimpleString()).append(" loaded and init ").append(e).append(" ms").append(LINE_SEPARATOR); + } } }); if (sb != null && sb.length() > 0) logger.log(Level.INFO, sb.toString()); @@ -456,7 +477,7 @@ public abstract class NodeServer { y.getService().destroy(y.getConf()); long e = System.currentTimeMillis() - s; if (e > 2 && sb != null) { - sb.append("LocalService(").append(y.getType()).append(':').append(y.getName()).append(") destroy ").append(e).append("ms").append(LINE_SEPARATOR); + sb.append(y.toSimpleString()).append(" destroy ").append(e).append("ms").append(LINE_SEPARATOR); } }); if (sb != null && sb.length() > 0) logger.log(Level.INFO, sb.toString()); diff --git a/src/org/redkale/boot/NodeSncpServer.java b/src/org/redkale/boot/NodeSncpServer.java index efae0982e..538f3394e 100644 --- a/src/org/redkale/boot/NodeSncpServer.java +++ b/src/org/redkale/boot/NodeSncpServer.java @@ -6,6 +6,7 @@ package org.redkale.boot; import java.net.*; +import java.util.*; import java.util.logging.*; import org.redkale.net.*; import org.redkale.net.sncp.*; @@ -38,6 +39,10 @@ public final class NodeSncpServer extends NodeServer { return sncpServer == null ? null : sncpServer.getSocketAddress(); } + public void consumerAccept(ServiceWrapper wrapper) { + if (this.consumer != null) this.consumer.accept(wrapper); + } + @Override public void init(AnyValue config) throws Exception { super.init(config); @@ -45,7 +50,9 @@ public final class NodeSncpServer extends NodeServer { if (sncpServer == null) return; //调试时server才可能为null final StringBuilder sb = logger.isLoggable(Level.FINE) ? new StringBuilder() : null; final String threadName = "[" + Thread.currentThread().getName() + "] "; - for (SncpServlet en : sncpServer.getSncpServlets()) { + List servlets = sncpServer.getSncpServlets(); + Collections.sort(servlets); + for (SncpServlet en : servlets) { if (sb != null) sb.append(threadName).append(" Loaded ").append(en).append(LINE_SEPARATOR); } if (sb != null && sb.length() > 0) logger.log(Level.FINE, sb.toString()); diff --git a/src/org/redkale/net/sncp/ServiceWrapper.java b/src/org/redkale/net/sncp/ServiceWrapper.java index 8d50dcc21..a5f07b165 100644 --- a/src/org/redkale/net/sncp/ServiceWrapper.java +++ b/src/org/redkale/net/sncp/ServiceWrapper.java @@ -19,7 +19,11 @@ import org.redkale.util.*; * @author zhangjx * @param Service的子类 */ -public final class ServiceWrapper { +public final class ServiceWrapper implements Comparable { + + private static volatile int maxClassNameLength = 0; + + private static volatile int maxNameLength = 0; private final Class type; @@ -47,6 +51,24 @@ public final class ServiceWrapper { this.remote = Sncp.isRemote(service); ResourceType rty = service.getClass().getAnnotation(ResourceType.class); this.resTypes = rty == null ? new Class[]{this.type} : rty.value(); + + maxNameLength = Math.max(maxNameLength, name.length() + 1); + maxClassNameLength = Math.max(maxClassNameLength, type.getName().length()); + } + + public String toSimpleString() { + StringBuilder sb = new StringBuilder(); + sb.append(remote ? "RemoteService" : "LocalService ").append("(type=").append(type.getName()); + int len = maxClassNameLength - type.getName().length(); + for (int i = 0; i < len; i++) { + sb.append(' '); + } + sb.append(", name='").append(name).append("'"); + for (int i = 0; i < maxNameLength - name.length(); i++) { + sb.append(' '); + } + sb.append(")"); + return sb.toString(); } @Override @@ -68,6 +90,13 @@ public final class ServiceWrapper { return hash; } + @Override + public int compareTo(ServiceWrapper o) { + int rs = this.type.getName().compareTo(o.type.getName()); + if (rs == 0) rs = this.name.compareTo(o.name); + return rs; + } + public Class getType() { return type; } diff --git a/src/org/redkale/net/sncp/SncpClient.java b/src/org/redkale/net/sncp/SncpClient.java index 15c27f7bb..fa6907beb 100644 --- a/src/org/redkale/net/sncp/SncpClient.java +++ b/src/org/redkale/net/sncp/SncpClient.java @@ -115,9 +115,9 @@ public final class SncpClient { } } - private static final Logger logger = Logger.getLogger(SncpClient.class.getSimpleName()); + protected static final Logger logger = Logger.getLogger(SncpClient.class.getSimpleName()); - private final boolean finest = logger.isLoggable(Level.FINEST); + protected final boolean finest = logger.isLoggable(Level.FINEST); protected final JsonConvert jsonConvert = JsonFactory.root().getConvert(); @@ -127,7 +127,7 @@ public final class SncpClient { private final Class serviceClass; - protected final InetSocketAddress address; + protected final InetSocketAddress clientAddress; private final byte[] addrBytes; @@ -141,13 +141,12 @@ public final class SncpClient { protected final Consumer executor; - public SncpClient(final String serviceName, final Consumer executor, final DLong serviceid, boolean remote, final Class serviceClass, - final InetSocketAddress clientAddress) { + public SncpClient(final String serviceName, final Consumer executor, final DLong serviceid, boolean remote, + final Class serviceClass, final InetSocketAddress clientAddress) { this.remote = remote; this.executor = executor; this.serviceClass = serviceClass; - this.address = clientAddress; - //if (subLocalClass != null && !serviceClass.isAssignableFrom(subLocalClass)) throw new RuntimeException(subLocalClass + " is not " + serviceClass + " sub class "); + this.clientAddress = clientAddress; this.name = serviceName; this.nameid = Sncp.hash(serviceName); this.serviceid = serviceid; @@ -163,7 +162,7 @@ public final class SncpClient { } public InetSocketAddress getClientAddress() { - return address; + return clientAddress; } public DLong getNameid() { @@ -183,7 +182,7 @@ public final class SncpClient { String service = serviceClass.getName(); if (remote) service = service.replace(Sncp.LOCALPREFIX, Sncp.REMOTEPREFIX); return this.getClass().getSimpleName() + "(service = " + service + ", serviceid = " + serviceid + ", nameid = " + nameid - + ", name = '" + name + "', address = " + (address == null ? "" : (address.getHostString() + ":" + address.getPort())) + + ", name = '" + name + "', address = " + (clientAddress == null ? "" : (clientAddress.getHostString() + ":" + clientAddress.getPort())) + ", actions.size = " + actions.length + ")"; } @@ -299,7 +298,7 @@ public final class SncpClient { private Future remote0(final CompletionHandler handler, final BsonConvert convert, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) { Type[] myparamtypes = action.paramTypes; - if (action.addressSourceParamIndex >= 0) params[action.addressSourceParamIndex] = this.address; + if (action.addressSourceParamIndex >= 0) params[action.addressSourceParamIndex] = this.clientAddress; final BsonWriter writer = convert.pollBsonWriter(transport.getBufferSupplier()); // 将head写入 writer.writeTo(DEFAULT_HEADER); for (int i = 0; i < params.length; i++) { diff --git a/src/org/redkale/net/sncp/SncpDynServlet.java b/src/org/redkale/net/sncp/SncpDynServlet.java index 046320432..e1c4daf69 100644 --- a/src/org/redkale/net/sncp/SncpDynServlet.java +++ b/src/org/redkale/net/sncp/SncpDynServlet.java @@ -31,6 +31,10 @@ import org.redkale.service.DynCall; */ public final class SncpDynServlet extends SncpServlet { + private static volatile int maxClassNameLength = 0; + + private static volatile int maxNameLength = 0; + private static final Logger logger = Logger.getLogger(SncpDynServlet.class.getSimpleName()); private final boolean finest = logger.isLoggable(Level.FINEST); @@ -71,11 +75,24 @@ public final class SncpDynServlet extends SncpServlet { actions.put(actionid, action); actionids.add(actionid); } + maxNameLength = Math.max(maxNameLength, serviceName.length() + 1); + maxClassNameLength = Math.max(maxClassNameLength, type.getName().length()); } @Override public String toString() { - return this.getClass().getSimpleName() + "(type=" + type.getName() + ", serviceid=" + serviceid + ", name='" + serviceName + "', nameid=" + nameid + ", actions.size=" + actions.size() + ")"; + StringBuilder sb = new StringBuilder(); + sb.append(this.getClass().getSimpleName()).append("(type=").append(type.getName()); + int len = maxClassNameLength - type.getName().length(); + for (int i = 0; i < len; i++) { + sb.append(' '); + } + sb.append(", serviceid=").append(serviceid).append(", name='").append(serviceName).append("'"); + for (int i = 0; i < maxNameLength - serviceName.length(); i++) { + sb.append(' '); + } + sb.append(", nameid=").append(nameid).append(", actions.size=").append(actions.size() > 9 ? "" : " ").append(actions.size()).append(")"); + return sb.toString(); } @Override @@ -88,6 +105,15 @@ public final class SncpDynServlet extends SncpServlet { return serviceid; } + @Override + public int compareTo(SncpServlet o0) { + if (!(o0 instanceof SncpDynServlet)) return 1; + SncpDynServlet o = (SncpDynServlet) o0; + int rs = this.type.getName().compareTo(o.type.getName()); + if (rs == 0) rs = this.serviceName.compareTo(o.serviceName); + return rs; + } + @Override public void execute(SncpRequest request, SncpResponse response) throws IOException { if (bufferSupplier == null) { diff --git a/src/org/redkale/net/sncp/SncpServer.java b/src/org/redkale/net/sncp/SncpServer.java index 603863f37..36f7f383d 100644 --- a/src/org/redkale/net/sncp/SncpServer.java +++ b/src/org/redkale/net/sncp/SncpServer.java @@ -16,7 +16,9 @@ import org.redkale.watch.*; /** * Service Node Communicate Protocol * - *

详情见: http://www.redkale.org + *

+ * 详情见: http://www.redkale.org + * * @author zhangjx */ public final class SncpServer extends Server { @@ -34,8 +36,10 @@ public final class SncpServer extends Server { super.init(config); } - public void addService(ServiceWrapper entry) { - ((SncpPrepareServlet) this.prepare).addSncpServlet(new SncpDynServlet(BsonFactory.root().getConvert(), entry.getName(), entry.getType(), entry.getService(), entry.getConf())); + public SncpDynServlet addService(ServiceWrapper entry) { + SncpDynServlet sds = new SncpDynServlet(BsonFactory.root().getConvert(), entry.getName(), entry.getType(), entry.getService(), entry.getConf()); + ((SncpPrepareServlet) this.prepare).addSncpServlet(sds); + return sds; } public List getSncpServlets() { diff --git a/src/org/redkale/net/sncp/SncpServlet.java b/src/org/redkale/net/sncp/SncpServlet.java index 3d6522382..02ac6574e 100644 --- a/src/org/redkale/net/sncp/SncpServlet.java +++ b/src/org/redkale/net/sncp/SncpServlet.java @@ -13,7 +13,7 @@ import org.redkale.util.*; *

详情见: http://www.redkale.org * @author zhangjx */ -public abstract class SncpServlet extends Servlet { +public abstract class SncpServlet extends Servlet implements Comparable { AnyValue conf; @@ -30,4 +30,9 @@ public abstract class SncpServlet extends Servlet { public final int hashCode() { return this.getClass().hashCode(); } + + @Override + public int compareTo(SncpServlet o) { + return 0; + } }