Transport 改版 2

This commit is contained in:
wentch
2016-01-12 18:17:58 +08:00
parent 5c6ebf9fb6
commit 7ea12638c5
8 changed files with 148 additions and 37 deletions

View File

@@ -562,6 +562,26 @@ public final class Application {
System.exit(0);
}
Set<String> findSncpGroups(Transport sameGroupTransport, Collection<Transport> diffGroupTransports) {
Set<String> gs = new HashSet<>();
if (sameGroupTransport != null) gs.add(sameGroupTransport.getName());
if (diffGroupTransports != null) {
for (Transport t : diffGroupTransports) {
gs.add(t.getName());
}
}
return gs;
}
NodeSncpServer findNodeSncpServer(final InetSocketAddress sncpAddr) {
for (NodeServer node : servers) {
if (node.isSNCP() && sncpAddr.equals(node.getSncpAddress())) {
return (NodeSncpServer) node;
}
}
return null;
}
String findGroupProtocol(String group) {
if (group == null) return null;
return globalGroupProtocols.get(group);

View File

@@ -23,6 +23,7 @@ import java.util.logging.*;
import javax.annotation.*;
import javax.persistence.*;
import org.redkale.net.*;
import org.redkale.net.sncp.*;
import org.redkale.service.*;
import org.redkale.source.*;
import org.redkale.util.*;
@@ -157,6 +158,8 @@ public abstract class NodeServer {
DataSource source = new DataDefaultSource(resourceName);
application.dataSources.add(source);
regFactory.register(resourceName, DataSource.class, source);
SncpClient client = null;
Transport sameGroupTransport = null;
List<Transport> diffGroupTransports = null;
try {
@@ -167,16 +170,24 @@ public abstract class NodeServer {
ts = src.getClass().getDeclaredField("_diffGroupTransports");
ts.setAccessible(true);
diffGroupTransports = Arrays.asList((Transport[]) ts.get(src));
ts = src.getClass().getDeclaredField("_client");
ts.setAccessible(true);
client = (SncpClient) ts.get(src);
} catch (Exception e) {
//src 不含 MultiRun 方法
}
if (factory.find(resourceName, DataCacheListener.class) == null) {
Service cacheListenerService = Sncp.createLocalService(resourceName, getExecutor(), DataCacheListenerService.class, this.sncpAddress, sameGroupTransport, diffGroupTransports);
final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress();
if (sncpAddr != null && factory.find(resourceName, DataCacheListener.class) == null) {
Service cacheListenerService = Sncp.createLocalService(resourceName, getExecutor(), DataCacheListenerService.class, sncpAddr, sameGroupTransport, diffGroupTransports);
regFactory.register(resourceName, DataCacheListener.class, cacheListenerService);
ServiceWrapper wrapper = new ServiceWrapper(DataCacheListenerService.class, cacheListenerService, resourceName, sncpGroup, null, null);
final NodeSncpServer sncpServer = application.findNodeSncpServer(sncpAddr);
Set<String> gs = application.findSncpGroups(sameGroupTransport, diffGroupTransports);
ServiceWrapper wrapper = new ServiceWrapper(DataCacheListenerService.class, cacheListenerService, resourceName, sncpServer.getSncpGroup(), gs, null);
localServiceWrappers.add(wrapper);
if (consumer != null) consumer.accept(wrapper);
sncpServer.consumerAccept(wrapper);
rf.inject(cacheListenerService, self);
if (fine) logger.fine("[" + Thread.currentThread().getName() + "] Load Service " + wrapper.getService());
}
field.set(src, source);
rf.inject(source, self); // 给 "datasource.nodeid" 赋值
@@ -187,7 +198,9 @@ public abstract class NodeServer {
factory.add(CacheSource.class, (ResourceFactory rf, final Object src, final String resourceName, Field field, final Object attachment) -> {
try {
if (field.getAnnotation(Resource.class) == null) return;
if ((src instanceof Service) && Sncp.isRemote((Service) src)) return; //远程模式不得注入 CacheSource
if ((src instanceof Service) && Sncp.isRemote((Service) src)) return; //远程模式不得注入 CacheSource
SncpClient client = null;
Transport sameGroupTransport = null;
List<Transport> diffGroupTransports = null;
try {
@@ -198,10 +211,15 @@ public abstract class NodeServer {
ts = src.getClass().getDeclaredField("_diffGroupTransports");
ts.setAccessible(true);
diffGroupTransports = Arrays.asList((Transport[]) ts.get(src));
ts = src.getClass().getDeclaredField("_client");
ts.setAccessible(true);
client = (SncpClient) ts.get(src);
} catch (Exception e) {
//src 不含 MultiRun 方法
}
CacheSourceService source = Sncp.createLocalService(resourceName, getExecutor(), CacheSourceService.class, this.sncpAddress, sameGroupTransport, diffGroupTransports);
final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress();
CacheSourceService source = Sncp.createLocalService(resourceName, getExecutor(), CacheSourceService.class, sncpAddr, sameGroupTransport, diffGroupTransports);
Type genericType = field.getGenericType();
ParameterizedType pt = (genericType instanceof ParameterizedType) ? (ParameterizedType) genericType : null;
Type valType = pt == null ? null : pt.getActualTypeArguments()[1];
@@ -212,15 +230,13 @@ public abstract class NodeServer {
field.set(src, source);
rf.inject(source, self); //
((Service) source).init(null);
if (getSncpAddress() != null) {
NodeSncpServer sncpServer = null;
for (NodeServer node : application.servers) {
if (node.isSNCP() && getSncpAddress().equals(node.getSncpAddress())) {
sncpServer = (NodeSncpServer) node;
}
}
ServiceWrapper wrapper = new ServiceWrapper(CacheSourceService.class, (Service) source, resourceName, getSncpGroup(), null, null);
if (sncpAddr != null) {
NodeSncpServer sncpServer = application.findNodeSncpServer(sncpAddr);
Set<String> gs = application.findSncpGroups(sameGroupTransport, diffGroupTransports);
ServiceWrapper wrapper = new ServiceWrapper(CacheSourceService.class, (Service) source, resourceName, sncpServer.getSncpGroup(), gs, null);
sncpServer.getSncpServer().addService(wrapper);
if (fine) logger.fine("[" + Thread.currentThread().getName() + "] Load Service " + wrapper.getService());
}
logger.fine("[" + Thread.currentThread().getName() + "] Load Source " + source);
} catch (Exception e) {
@@ -257,7 +273,6 @@ public abstract class NodeServer {
service = Sncp.createRemoteService(entry.getName(), getExecutor(), type, this.sncpAddress, loadTransport(groups));
}
final ServiceWrapper wrapper = new ServiceWrapper(type, service, entry.getName(), localed ? this.sncpGroup : null, groups, entry.getProperty());
if (fine) logger.fine("[" + Thread.currentThread().getName() + "] Load Service " + wrapper.getService());
if (factory.find(wrapper.getName(), wrapper.getType()) == null) {
regFactory.register(wrapper.getName(), wrapper.getService());
if (wrapper.isRemote()) {
@@ -275,22 +290,28 @@ public abstract class NodeServer {
final StringBuilder sb = logger.isLoggable(Level.INFO) ? new StringBuilder() : null;
//---------------- inject ----------------
new HashSet<>(localServiceWrappers).forEach(y -> {
new ArrayList<>(localServiceWrappers).forEach(y -> {
factory.inject(y.getService(), NodeServer.this);
});
remoteServiceWrappers.forEach(y -> {
factory.inject(y.getService(), NodeServer.this);
if (sb != null) {
sb.append(threadName).append("RemoteService(").append(y.getType()).append(':').append(y.getName()).append(") injected").append(LINE_SEPARATOR);
sb.append(threadName).append(y.toSimpleString()).append(" loaded and injected").append(LINE_SEPARATOR);
}
});
//----------------- init -----------------
List<ServiceWrapper> swlist = new ArrayList<>(localServiceWrappers);
Collections.sort(swlist);
localServiceWrappers.clear();
localServiceWrappers.addAll(swlist);
localServiceWrappers.parallelStream().forEach(y -> {
long s = System.currentTimeMillis();
y.getService().init(y.getConf());
long e = System.currentTimeMillis() - s;
if (e > 2 && sb != null) {
sb.append(threadName).append("LocalService(").append(y.getType()).append(':').append(y.getName()).append(") init ").append(e).append("ms").append(LINE_SEPARATOR);
if (sb != null) {
synchronized (sb) { //parallelStream 必须要锁
sb.append(threadName).append(y.toSimpleString()).append(" loaded and init ").append(e).append(" ms").append(LINE_SEPARATOR);
}
}
});
if (sb != null && sb.length() > 0) logger.log(Level.INFO, sb.toString());
@@ -456,7 +477,7 @@ public abstract class NodeServer {
y.getService().destroy(y.getConf());
long e = System.currentTimeMillis() - s;
if (e > 2 && sb != null) {
sb.append("LocalService(").append(y.getType()).append(':').append(y.getName()).append(") destroy ").append(e).append("ms").append(LINE_SEPARATOR);
sb.append(y.toSimpleString()).append(" destroy ").append(e).append("ms").append(LINE_SEPARATOR);
}
});
if (sb != null && sb.length() > 0) logger.log(Level.INFO, sb.toString());

View File

@@ -6,6 +6,7 @@
package org.redkale.boot;
import java.net.*;
import java.util.*;
import java.util.logging.*;
import org.redkale.net.*;
import org.redkale.net.sncp.*;
@@ -38,6 +39,10 @@ public final class NodeSncpServer extends NodeServer {
return sncpServer == null ? null : sncpServer.getSocketAddress();
}
public void consumerAccept(ServiceWrapper wrapper) {
if (this.consumer != null) this.consumer.accept(wrapper);
}
@Override
public void init(AnyValue config) throws Exception {
super.init(config);
@@ -45,7 +50,9 @@ public final class NodeSncpServer extends NodeServer {
if (sncpServer == null) return; //调试时server才可能为null
final StringBuilder sb = logger.isLoggable(Level.FINE) ? new StringBuilder() : null;
final String threadName = "[" + Thread.currentThread().getName() + "] ";
for (SncpServlet en : sncpServer.getSncpServlets()) {
List<SncpServlet> servlets = sncpServer.getSncpServlets();
Collections.sort(servlets);
for (SncpServlet en : servlets) {
if (sb != null) sb.append(threadName).append(" Loaded ").append(en).append(LINE_SEPARATOR);
}
if (sb != null && sb.length() > 0) logger.log(Level.FINE, sb.toString());

View File

@@ -19,7 +19,11 @@ import org.redkale.util.*;
* @author zhangjx
* @param <T> Service的子类
*/
public final class ServiceWrapper<T extends Service> {
public final class ServiceWrapper<T extends Service> implements Comparable<ServiceWrapper> {
private static volatile int maxClassNameLength = 0;
private static volatile int maxNameLength = 0;
private final Class<T> type;
@@ -47,6 +51,24 @@ public final class ServiceWrapper<T extends Service> {
this.remote = Sncp.isRemote(service);
ResourceType rty = service.getClass().getAnnotation(ResourceType.class);
this.resTypes = rty == null ? new Class[]{this.type} : rty.value();
maxNameLength = Math.max(maxNameLength, name.length() + 1);
maxClassNameLength = Math.max(maxClassNameLength, type.getName().length());
}
public String toSimpleString() {
StringBuilder sb = new StringBuilder();
sb.append(remote ? "RemoteService" : "LocalService ").append("(type=").append(type.getName());
int len = maxClassNameLength - type.getName().length();
for (int i = 0; i < len; i++) {
sb.append(' ');
}
sb.append(", name='").append(name).append("'");
for (int i = 0; i < maxNameLength - name.length(); i++) {
sb.append(' ');
}
sb.append(")");
return sb.toString();
}
@Override
@@ -68,6 +90,13 @@ public final class ServiceWrapper<T extends Service> {
return hash;
}
@Override
public int compareTo(ServiceWrapper o) {
int rs = this.type.getName().compareTo(o.type.getName());
if (rs == 0) rs = this.name.compareTo(o.name);
return rs;
}
public Class<? extends Service> getType() {
return type;
}

View File

@@ -115,9 +115,9 @@ public final class SncpClient {
}
}
private static final Logger logger = Logger.getLogger(SncpClient.class.getSimpleName());
protected static final Logger logger = Logger.getLogger(SncpClient.class.getSimpleName());
private final boolean finest = logger.isLoggable(Level.FINEST);
protected final boolean finest = logger.isLoggable(Level.FINEST);
protected final JsonConvert jsonConvert = JsonFactory.root().getConvert();
@@ -127,7 +127,7 @@ public final class SncpClient {
private final Class serviceClass;
protected final InetSocketAddress address;
protected final InetSocketAddress clientAddress;
private final byte[] addrBytes;
@@ -141,13 +141,12 @@ public final class SncpClient {
protected final Consumer<Runnable> executor;
public SncpClient(final String serviceName, final Consumer<Runnable> executor, final DLong serviceid, boolean remote, final Class serviceClass,
final InetSocketAddress clientAddress) {
public SncpClient(final String serviceName, final Consumer<Runnable> executor, final DLong serviceid, boolean remote,
final Class serviceClass, final InetSocketAddress clientAddress) {
this.remote = remote;
this.executor = executor;
this.serviceClass = serviceClass;
this.address = clientAddress;
//if (subLocalClass != null && !serviceClass.isAssignableFrom(subLocalClass)) throw new RuntimeException(subLocalClass + " is not " + serviceClass + " sub class ");
this.clientAddress = clientAddress;
this.name = serviceName;
this.nameid = Sncp.hash(serviceName);
this.serviceid = serviceid;
@@ -163,7 +162,7 @@ public final class SncpClient {
}
public InetSocketAddress getClientAddress() {
return address;
return clientAddress;
}
public DLong getNameid() {
@@ -183,7 +182,7 @@ public final class SncpClient {
String service = serviceClass.getName();
if (remote) service = service.replace(Sncp.LOCALPREFIX, Sncp.REMOTEPREFIX);
return this.getClass().getSimpleName() + "(service = " + service + ", serviceid = " + serviceid + ", nameid = " + nameid
+ ", name = '" + name + "', address = " + (address == null ? "" : (address.getHostString() + ":" + address.getPort()))
+ ", name = '" + name + "', address = " + (clientAddress == null ? "" : (clientAddress.getHostString() + ":" + clientAddress.getPort()))
+ ", actions.size = " + actions.length + ")";
}
@@ -299,7 +298,7 @@ public final class SncpClient {
private Future<byte[]> remote0(final CompletionHandler handler, final BsonConvert convert, final Transport transport, final SocketAddress addr0, final SncpAction action, final Object... params) {
Type[] myparamtypes = action.paramTypes;
if (action.addressSourceParamIndex >= 0) params[action.addressSourceParamIndex] = this.address;
if (action.addressSourceParamIndex >= 0) params[action.addressSourceParamIndex] = this.clientAddress;
final BsonWriter writer = convert.pollBsonWriter(transport.getBufferSupplier()); // 将head写入
writer.writeTo(DEFAULT_HEADER);
for (int i = 0; i < params.length; i++) {

View File

@@ -31,6 +31,10 @@ import org.redkale.service.DynCall;
*/
public final class SncpDynServlet extends SncpServlet {
private static volatile int maxClassNameLength = 0;
private static volatile int maxNameLength = 0;
private static final Logger logger = Logger.getLogger(SncpDynServlet.class.getSimpleName());
private final boolean finest = logger.isLoggable(Level.FINEST);
@@ -71,11 +75,24 @@ public final class SncpDynServlet extends SncpServlet {
actions.put(actionid, action);
actionids.add(actionid);
}
maxNameLength = Math.max(maxNameLength, serviceName.length() + 1);
maxClassNameLength = Math.max(maxClassNameLength, type.getName().length());
}
@Override
public String toString() {
return this.getClass().getSimpleName() + "(type=" + type.getName() + ", serviceid=" + serviceid + ", name='" + serviceName + "', nameid=" + nameid + ", actions.size=" + actions.size() + ")";
StringBuilder sb = new StringBuilder();
sb.append(this.getClass().getSimpleName()).append("(type=").append(type.getName());
int len = maxClassNameLength - type.getName().length();
for (int i = 0; i < len; i++) {
sb.append(' ');
}
sb.append(", serviceid=").append(serviceid).append(", name='").append(serviceName).append("'");
for (int i = 0; i < maxNameLength - serviceName.length(); i++) {
sb.append(' ');
}
sb.append(", nameid=").append(nameid).append(", actions.size=").append(actions.size() > 9 ? "" : " ").append(actions.size()).append(")");
return sb.toString();
}
@Override
@@ -88,6 +105,15 @@ public final class SncpDynServlet extends SncpServlet {
return serviceid;
}
@Override
public int compareTo(SncpServlet o0) {
if (!(o0 instanceof SncpDynServlet)) return 1;
SncpDynServlet o = (SncpDynServlet) o0;
int rs = this.type.getName().compareTo(o.type.getName());
if (rs == 0) rs = this.serviceName.compareTo(o.serviceName);
return rs;
}
@Override
public void execute(SncpRequest request, SncpResponse response) throws IOException {
if (bufferSupplier == null) {

View File

@@ -16,7 +16,9 @@ import org.redkale.watch.*;
/**
* Service Node Communicate Protocol
*
* <p> 详情见: http://www.redkale.org
* <p>
* 详情见: http://www.redkale.org
*
* @author zhangjx
*/
public final class SncpServer extends Server {
@@ -34,8 +36,10 @@ public final class SncpServer extends Server {
super.init(config);
}
public void addService(ServiceWrapper entry) {
((SncpPrepareServlet) this.prepare).addSncpServlet(new SncpDynServlet(BsonFactory.root().getConvert(), entry.getName(), entry.getType(), entry.getService(), entry.getConf()));
public SncpDynServlet addService(ServiceWrapper entry) {
SncpDynServlet sds = new SncpDynServlet(BsonFactory.root().getConvert(), entry.getName(), entry.getType(), entry.getService(), entry.getConf());
((SncpPrepareServlet) this.prepare).addSncpServlet(sds);
return sds;
}
public List<SncpServlet> getSncpServlets() {

View File

@@ -13,7 +13,7 @@ import org.redkale.util.*;
* <p> 详情见: http://www.redkale.org
* @author zhangjx
*/
public abstract class SncpServlet extends Servlet<SncpRequest, SncpResponse> {
public abstract class SncpServlet extends Servlet<SncpRequest, SncpResponse> implements Comparable<SncpServlet> {
AnyValue conf;
@@ -30,4 +30,9 @@ public abstract class SncpServlet extends Servlet<SncpRequest, SncpResponse> {
public final int hashCode() {
return this.getClass().hashCode();
}
@Override
public int compareTo(SncpServlet o) {
return 0;
}
}