This commit is contained in:
RedKale
2016-02-22 13:50:58 +08:00
parent fad0d69637
commit a4ab83f482
7 changed files with 93 additions and 74 deletions

View File

@@ -434,7 +434,8 @@ public final class Application {
others.add(entry); others.add(entry);
} }
} }
if (!sncps.isEmpty() && globalNodes.isEmpty()) throw new RuntimeException("found SNCP Server node but not found <group> node info."); //单向SNCP服务不需要对等group
//if (!sncps.isEmpty() && globalNodes.isEmpty()) throw new RuntimeException("found SNCP Server node but not found <group> node info.");
runServers(timecd, sncps); //必须确保sncp都启动后再启动其他协议 runServers(timecd, sncps); //必须确保sncp都启动后再启动其他协议
runServers(timecd, others); runServers(timecd, others);

View File

@@ -119,7 +119,8 @@ public abstract class NodeServer {
String host = this.serverConf.getValue("host", "0.0.0.0").replace("0.0.0.0", ""); String host = this.serverConf.getValue("host", "0.0.0.0").replace("0.0.0.0", "");
this.sncpAddress = new InetSocketAddress(host.isEmpty() ? application.localAddress.getHostAddress() : host, this.serverConf.getIntValue("port")); this.sncpAddress = new InetSocketAddress(host.isEmpty() ? application.localAddress.getHostAddress() : host, this.serverConf.getIntValue("port"));
this.sncpGroup = application.globalNodes.get(this.sncpAddress); this.sncpGroup = application.globalNodes.get(this.sncpAddress);
if (this.sncpGroup == null) throw new RuntimeException("Server (" + String.valueOf(config).replaceAll("\\s+", " ") + ") not found <group> info"); //单向SNCP服务不需要对等group
//if (this.sncpGroup == null) throw new RuntimeException("Server (" + String.valueOf(config).replaceAll("\\s+", " ") + ") not found <group> info");
} }
if (this.sncpAddress != null) this.resourceFactory.register(RESNAME_SERVER_ADDR, this.sncpAddress); //单点服务不会有 sncpAddress、sncpGroup if (this.sncpAddress != null) this.resourceFactory.register(RESNAME_SERVER_ADDR, this.sncpAddress); //单点服务不会有 sncpAddress、sncpGroup
@@ -137,7 +138,7 @@ public abstract class NodeServer {
resourceFactory.register(Server.RESNAME_SERVER_ROOT, Path.class, myroot.toPath()); resourceFactory.register(Server.RESNAME_SERVER_ROOT, Path.class, myroot.toPath());
final String homepath = myroot.getCanonicalPath(); final String homepath = myroot.getCanonicalPath();
Server.loadLib(logger, config.getValue("lib", "") + ";" + homepath + "/lib/*;" + homepath + "/classes"); Server.loadLib(logger, config.getValue("lib", "").replace("${APP_HOME}", homepath) + ";" + homepath + "/lib/*;" + homepath + "/classes");
if (server != null) server.init(config); if (server != null) server.init(config);
} }
@@ -272,7 +273,7 @@ public abstract class NodeServer {
if (entry.getName().contains("$")) throw new RuntimeException("<name> value cannot contains '$' in " + entry.getProperty()); if (entry.getName().contains("$")) throw new RuntimeException("<name> value cannot contains '$' in " + entry.getProperty());
if (resourceFactory.find(entry.getName(), type) != null) continue; //Server加载Service时需要判断是否已经加载过了。 if (resourceFactory.find(entry.getName(), type) != null) continue; //Server加载Service时需要判断是否已经加载过了。
final HashSet<String> groups = entry.getGroups(); //groups.isEmpty()表示<services>没有配置groups属性。 final HashSet<String> groups = entry.getGroups(); //groups.isEmpty()表示<services>没有配置groups属性。
if (groups.isEmpty() && isSNCP()) groups.add(this.sncpGroup); if (groups.isEmpty() && isSNCP() && this.sncpGroup != null) groups.add(this.sncpGroup);
final boolean localed = (this.sncpAddress == null && !type.isInterface() && !Modifier.isAbstract(type.getModifiers())) //非SNCP的Server通常是单点服务 final boolean localed = (this.sncpAddress == null && !type.isInterface() && !Modifier.isAbstract(type.getModifiers())) //非SNCP的Server通常是单点服务
|| groups.contains(this.sncpGroup) //本地IP含在内的 || groups.contains(this.sncpGroup) //本地IP含在内的
@@ -286,16 +287,18 @@ public abstract class NodeServer {
service = Sncp.createRemoteService(entry.getName(), getExecutor(), type, this.sncpAddress, loadTransport(groups)); service = Sncp.createRemoteService(entry.getName(), getExecutor(), type, this.sncpAddress, loadTransport(groups));
} }
final ServiceWrapper wrapper = new ServiceWrapper(type, service, entry.getName(), localed ? this.sncpGroup : null, groups, entry.getProperty()); final ServiceWrapper wrapper = new ServiceWrapper(type, service, entry.getName(), localed ? this.sncpGroup : null, groups, entry.getProperty());
if (resourceFactory.find(wrapper.getName(), wrapper.getType()) == null) { for (final Class restype : wrapper.getTypes()) {
regFactory.register(wrapper.getName(), wrapper.getService()); if (resourceFactory.find(wrapper.getName(), restype) == null) {
if (wrapper.isRemote()) { regFactory.register(wrapper.getName(), restype, wrapper.getService());
remoteServiceWrappers.add(wrapper); } else if (isSNCP() && !entry.isAutoload()) {
} else { throw new RuntimeException(ServiceWrapper.class.getSimpleName() + "(class:" + type.getName() + ", name:" + entry.getName() + ", group:" + groups + ") is repeat.");
localServiceWrappers.add(wrapper);
if (consumer != null) consumer.accept(wrapper);
} }
} else if (isSNCP() && !entry.isAutoload()) { }
throw new RuntimeException(ServiceWrapper.class.getSimpleName() + "(class:" + type.getName() + ", name:" + entry.getName() + ", group:" + groups + ") is repeat."); if (wrapper.isRemote()) {
remoteServiceWrappers.add(wrapper);
} else {
localServiceWrappers.add(wrapper);
if (consumer != null) consumer.accept(wrapper);
} }
} }
application.servicecdl.countDown(); application.servicecdl.countDown();

View File

@@ -8,6 +8,7 @@ package org.redkale.net.sncp;
import org.redkale.service.Service; import org.redkale.service.Service;
import org.redkale.util.AnyValue; import org.redkale.util.AnyValue;
import java.util.*; import java.util.*;
import java.util.stream.*;
import org.redkale.util.*; import org.redkale.util.*;
/** /**
@@ -25,8 +26,6 @@ public final class ServiceWrapper<T extends Service> implements Comparable<Servi
private static volatile int maxNameLength = 0; private static volatile int maxNameLength = 0;
private final Class<T> type;
private final T service; private final T service;
private final AnyValue conf; private final AnyValue conf;
@@ -39,10 +38,9 @@ public final class ServiceWrapper<T extends Service> implements Comparable<Servi
private final boolean remote; private final boolean remote;
private final Class[] resTypes; private final Class[] types;
public ServiceWrapper(Class<T> type, T service, String name, String sncpGroup, Set<String> groups, AnyValue conf) { public ServiceWrapper(Class<T> type, T service, String name, String sncpGroup, Set<String> groups, AnyValue conf) {
this.type = type == null ? (Class<T>) service.getClass() : type;
this.service = service; this.service = service;
this.conf = conf; this.conf = conf;
this.sncpGroup = sncpGroup; this.sncpGroup = sncpGroup;
@@ -50,16 +48,36 @@ public final class ServiceWrapper<T extends Service> implements Comparable<Servi
this.name = name; this.name = name;
this.remote = Sncp.isRemote(service); this.remote = Sncp.isRemote(service);
ResourceType rty = service.getClass().getAnnotation(ResourceType.class); ResourceType rty = service.getClass().getAnnotation(ResourceType.class);
this.resTypes = rty == null ? new Class[]{this.type} : rty.value(); this.types = rty == null ? new Class[]{type == null ? (Class<T>) service.getClass() : type} : rty.value();
maxNameLength = Math.max(maxNameLength, name.length()); maxNameLength = Math.max(maxNameLength, name.length());
maxClassNameLength = Math.max(maxClassNameLength, type.getName().length() + 1); StringBuilder s = new StringBuilder();
if (this.types.length == 1) {
s.append(types[0].getName());
} else {
s.append('[');
s.append(Arrays.asList(this.types).stream().map((Class t) -> t.getName()).collect(Collectors.joining(",")));
s.append(']');
}
maxClassNameLength = Math.max(maxClassNameLength, s.length() + 1);
} }
public String toSimpleString() { public String toSimpleString() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append(remote ? "RemoteService" : "LocalService ").append("(type=").append(type.getName()); sb.append(remote ? "RemoteService" : "LocalService ");
int len = maxClassNameLength - type.getName().length(); int len;
if (types.length == 1) {
sb.append("(type= ").append(types[0].getName());
len = maxClassNameLength - types[0].getName().length();
} else {
StringBuilder s = new StringBuilder();
s.append('[');
s.append(Arrays.asList(this.types).stream().map((Class t) -> t.getName()).collect(Collectors.joining(",")));
s.append(']');
sb.append("(types=").append(s);
len = maxClassNameLength - s.length();
}
for (int i = 0; i < len; i++) { for (int i = 0; i < len; i++) {
sb.append(' '); sb.append(' ');
} }
@@ -77,13 +95,13 @@ public final class ServiceWrapper<T extends Service> implements Comparable<Servi
if (obj == null) return false; if (obj == null) return false;
if (!(obj instanceof ServiceWrapper)) return false; if (!(obj instanceof ServiceWrapper)) return false;
ServiceWrapper other = (ServiceWrapper) obj; ServiceWrapper other = (ServiceWrapper) obj;
return (this.type.equals(other.type) && this.remote == other.remote && this.name.equals(other.name) && Objects.equals(this.sncpGroup, other.sncpGroup)); return (this.types[0].equals(other.types[0]) && this.remote == other.remote && this.name.equals(other.name) && Objects.equals(this.sncpGroup, other.sncpGroup));
} }
@Override @Override
public int hashCode() { public int hashCode() {
int hash = 3; int hash = 3;
hash = 67 * hash + Objects.hashCode(this.type); hash = 67 * hash + Objects.hashCode(this.types[0]);
hash = 67 * hash + Objects.hashCode(this.sncpGroup); hash = 67 * hash + Objects.hashCode(this.sncpGroup);
hash = 67 * hash + Objects.hashCode(this.name); hash = 67 * hash + Objects.hashCode(this.name);
hash = 67 * hash + (this.remote ? 1 : 0); hash = 67 * hash + (this.remote ? 1 : 0);
@@ -92,17 +110,13 @@ public final class ServiceWrapper<T extends Service> implements Comparable<Servi
@Override @Override
public int compareTo(ServiceWrapper o) { public int compareTo(ServiceWrapper o) {
int rs = this.type.getName().compareTo(o.type.getName()); int rs = this.types[0].getName().compareTo(o.types[0].getName());
if (rs == 0) rs = this.name.compareTo(o.name); if (rs == 0) rs = this.name.compareTo(o.name);
return rs; return rs;
} }
public Class<? extends Service> getType() { public Class[] getTypes() {
return type; return types;
}
public Class[] getResTypes() {
return resTypes;
} }
public Service getService() { public Service getService() {

View File

@@ -57,19 +57,20 @@ public abstract class Sncp {
return ((0L + ip.getPort()) << 32) | ((0xffffffff & bytes[0]) << 24) | ((0xffffff & bytes[1]) << 16) | ((0xffff & bytes[2]) << 8) | (0xff & bytes[3]); return ((0L + ip.getPort()) << 32) | ((0xffffffff & bytes[0]) << 24) | ((0xffffff & bytes[1]) << 16) | ((0xffff & bytes[2]) << 8) | (0xff & bytes[3]);
} }
public static DLong hash(final Class clazz) {
if (clazz == null) return DLong.ZERO;
return hash(clazz.getName());
}
public static DLong hashClass(final String clazzName) {
if (clazzName == null || clazzName.isEmpty()) return DLong.ZERO;
return hash(clazzName);
}
public static DLong hash(final java.lang.reflect.Method method) { public static DLong hash(final java.lang.reflect.Method method) {
if (method == null) return DLong.ZERO; if (method == null) return DLong.ZERO;
return hash(method.toString()); StringBuilder sb = new StringBuilder(); //不能使用method.toString() 因为包含declaringClass信息导致接口与实现类的方法hash不一致
sb.append(method.getReturnType().getName()).append(' ');
sb.append(method.getName());
sb.append('(');
boolean first = true;
for (Class pt : method.getParameterTypes()) {
if (!first) sb.append(',');
sb.append(pt.getName());
first = false;
}
sb.append(')');
return hash(sb.toString());
} }
/** /**

View File

@@ -139,8 +139,8 @@ public final class SncpClient {
protected final Consumer<Runnable> executor; protected final Consumer<Runnable> executor;
public <T extends Service> SncpClient(final String serviceName, final Class<T> serviceType, final Consumer<Runnable> executor, boolean remote, public <T extends Service> SncpClient(final String serviceName, final Class<T> serviceType, final Consumer<Runnable> executor,
final Class serviceClass, final InetSocketAddress clientAddress) { final boolean remote, final Class serviceClass, final InetSocketAddress clientAddress) {
this.remote = remote; this.remote = remote;
this.executor = executor; this.executor = executor;
this.serviceClass = serviceClass; this.serviceClass = serviceClass;
@@ -150,8 +150,7 @@ public final class SncpClient {
final List<SncpAction> methodens = new ArrayList<>(); final List<SncpAction> methodens = new ArrayList<>();
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
for (java.lang.reflect.Method method : parseMethod(serviceClass)) { for (java.lang.reflect.Method method : parseMethod(serviceClass)) {
SncpAction en = new SncpAction(method, Sncp.hash(method)); methodens.add(new SncpAction(method, Sncp.hash(method)));
methodens.add(en);
} }
this.actions = methodens.toArray(new SncpAction[methodens.size()]); this.actions = methodens.toArray(new SncpAction[methodens.size()]);
this.addrBytes = clientAddress == null ? new byte[4] : clientAddress.getAddress().getAddress(); this.addrBytes = clientAddress == null ? new byte[4] : clientAddress.getAddress().getAddress();

View File

@@ -36,10 +36,11 @@ public final class SncpServer extends Server<DLong, SncpContext, SncpRequest, Sn
super.init(config); super.init(config);
} }
public SncpDynServlet addService(ServiceWrapper entry) { public void addService(ServiceWrapper entry) {
SncpDynServlet sds = new SncpDynServlet(BsonFactory.root().getConvert(), entry.getName(), entry.getType(), entry.getService()); for (Class type : entry.getTypes()) {
this.prepare.addServlet(sds, null, entry.getConf()); SncpDynServlet sds = new SncpDynServlet(BsonFactory.root().getConvert(), entry.getName(), type, entry.getService());
return sds; this.prepare.addServlet(sds, null, entry.getConf());
}
} }
public List<SncpServlet> getSncpServlets() { public List<SncpServlet> getSncpServlets() {

View File

@@ -12,53 +12,53 @@ import java.util.*;
* 16bytes数据结构 * 16bytes数据结构
* 注意: 为了提高性能, DLong中的bytes是直接返回 不得对bytes的内容进行修改。 * 注意: 为了提高性能, DLong中的bytes是直接返回 不得对bytes的内容进行修改。
* *
* <p> 详情见: http://www.redkale.org * <p>
* 详情见: http://www.redkale.org
*
* @author zhangjx * @author zhangjx
*/ */
public final class DLong extends Number implements Comparable<DLong> { public final class DLong extends Number implements Comparable<DLong> {
public static final DLong ZERO = new DLong(new byte[16]); public static final DLong ZERO = new DLong(new byte[16]);
protected final byte[] bytes; protected final byte[] value;
protected DLong(long v1, long v2) { //暂时不用 protected DLong(long v1, long v2) { //暂时不用
this.bytes = new byte[]{(byte) (v1 >> 56), (byte) (v1 >> 48), (byte) (v1 >> 40), (byte) (v1 >> 32), this.value = new byte[]{(byte) (v1 >> 56), (byte) (v1 >> 48), (byte) (v1 >> 40), (byte) (v1 >> 32),
(byte) (v1 >> 24), (byte) (v1 >> 16), (byte) (v1 >> 8), (byte) v1, (byte) (v2 >> 56), (byte) (v2 >> 48), (byte) (v2 >> 40), (byte) (v2 >> 32), (byte) (v1 >> 24), (byte) (v1 >> 16), (byte) (v1 >> 8), (byte) v1, (byte) (v2 >> 56), (byte) (v2 >> 48), (byte) (v2 >> 40), (byte) (v2 >> 32),
(byte) (v2 >> 24), (byte) (v2 >> 16), (byte) (v2 >> 8), (byte) v2}; (byte) (v2 >> 24), (byte) (v2 >> 16), (byte) (v2 >> 8), (byte) v2};
} }
protected DLong(byte[] bytes) { protected DLong(byte[] bytes) {
if (bytes == null || bytes.length != 16) throw new NumberFormatException("Not 16 length bytes"); if (bytes == null || bytes.length != 16) throw new NumberFormatException("Not 16 length bytes");
this.bytes = bytes; this.value = bytes;
} }
public byte[] getBytes() { public byte[] getBytes() {
return Arrays.copyOf(bytes, bytes.length); return Arrays.copyOf(value, value.length);
} }
public byte[] directBytes() { public byte[] directBytes() {
return bytes; return value;
} }
public static DLong create(byte[] bytes) { public static DLong create(byte[] bytes) {
if (ZERO.equals(bytes)) return ZERO;
return new DLong(bytes); return new DLong(bytes);
} }
public static DLong read(ByteBuffer buffer) { public static DLong read(ByteBuffer buffer) {
byte[] bs = new byte[16]; byte[] bs = new byte[16];
buffer.get(bs); buffer.get(bs);
if (ZERO.equals(bs)) return ZERO;
return new DLong(bs); return new DLong(bs);
} }
public static ByteBuffer write(ByteBuffer buffer, DLong value) { public static ByteBuffer write(ByteBuffer buffer, DLong dlong) {
buffer.put(value.bytes); buffer.put(dlong.value);
return buffer; return buffer;
} }
public boolean equals(byte[] bytes) { public boolean equals(byte[] bytes) {
return Arrays.equals(this.bytes, bytes); return Arrays.equals(this.value, bytes);
} }
@Override @Override
@@ -66,35 +66,35 @@ public final class DLong extends Number implements Comparable<DLong> {
if (obj == null) return false; if (obj == null) return false;
if (getClass() != obj.getClass()) return false; if (getClass() != obj.getClass()) return false;
final DLong other = (DLong) obj; final DLong other = (DLong) obj;
return Arrays.equals(this.bytes, other.bytes); return Arrays.equals(this.value, other.value);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Arrays.hashCode(bytes); return Arrays.hashCode(value);
} }
@Override @Override
public String toString() { public String toString() {
if (this == ZERO) return "0"; if (this == ZERO) return "0";
return new String(Utility.binToHex(bytes)); return new String(Utility.binToHex(value));
} }
@Override @Override
public int intValue() { public int intValue() {
return ((bytes[12] & 0xff) << 24) | ((bytes[113] & 0xff) << 16) | ((bytes[14] & 0xff) << 8) | (bytes[15] & 0xff); return ((value[12] & 0xff) << 24) | ((value[113] & 0xff) << 16) | ((value[14] & 0xff) << 8) | (value[15] & 0xff);
} }
@Override @Override
public long longValue() { public long longValue() {
return ((((long) bytes[8] & 0xff) << 56) return ((((long) value[8] & 0xff) << 56)
| (((long) bytes[9] & 0xff) << 48) | (((long) value[9] & 0xff) << 48)
| (((long) bytes[10] & 0xff) << 40) | (((long) value[10] & 0xff) << 40)
| (((long) bytes[11] & 0xff) << 32) | (((long) value[11] & 0xff) << 32)
| (((long) bytes[12] & 0xff) << 24) | (((long) value[12] & 0xff) << 24)
| (((long) bytes[13] & 0xff) << 16) | (((long) value[13] & 0xff) << 16)
| (((long) bytes[14] & 0xff) << 8) | (((long) value[14] & 0xff) << 8)
| (((long) bytes[15] & 0xff))); | (((long) value[15] & 0xff)));
} }
@Override @Override
@@ -110,8 +110,8 @@ public final class DLong extends Number implements Comparable<DLong> {
@Override @Override
public int compareTo(DLong o) { public int compareTo(DLong o) {
if (o == null) return 1; if (o == null) return 1;
for (int i = 0; i < bytes.length; i++) { for (int i = 0; i < value.length; i++) {
if (this.bytes[i] != o.bytes[i]) return this.bytes[i] - o.bytes[i]; if (this.value[i] != o.value[i]) return this.value[i] - o.value[i];
} }
return 0; return 0;
} }