@@ -45,9 +45,9 @@ public abstract class NodeServer {
private final Server server ;
private InetSocketAddress node Address; //HttpServer中的node Address 为所属group对应的SncpServer, 为null表示没有分布式结构
private InetSocketAddress sncp Address; //HttpServer中的sncp Address 为所属group对应的SncpServer, 为null表示只是单节点, 没有分布式结构
private String node Group = " " ; //当前Server的SNCP协议的组
private String sncp Group = " " ; //当前Server的SNCP协议的组
private AnyValue nodeConf ;
@@ -77,31 +77,34 @@ public abstract class NodeServer {
this . nodeConf = config = = null ? AnyValue . create ( ) : config ;
if ( isSNCP ( ) ) { // SNCP协议
String host = this . nodeConf . getValue ( " host " , " 0.0.0.0 " ) . replace ( " 0.0.0.0 " , " " ) ;
this . node Address = new InetSocketAddress ( host . isEmpty ( ) ? application . localAddress . getHostAddress ( ) : host , this . nodeConf . getIntValue ( " port " ) ) ;
this . node Group = application . globalNodes . getOrDefault ( this . node Address, " " ) ;
this . sncp Address = new InetSocketAddress ( host . isEmpty ( ) ? application . localAddress . getHostAddress ( ) : host , this . nodeConf . getIntValue ( " port " ) ) ;
this . sncp Group = application . globalNodes . getOrDefault ( this . sncp Address, " " ) ;
if ( server ! = null ) this . nodeProtocol = server . getProtocol ( ) ;
} else { // HTTP协议
this . sncpAddress = null ;
this . sncpGroup = " " ;
this . nodeProtocol = Sncp . DEFAULT_PROTOCOL ;
String mbgroup = this . nodeConf . getValue ( " group " , " " ) ;
NodeServer sncpServer = null ; //有匹配的就取匹配的, 没有且SNCP只有一个, 则取此SNCP。
for ( NodeServer ns : application . servers ) {
if ( ! ns . isSNCP ( ) ) continue ;
if ( sncpServer = = null ) sncpServer = ns ;
if ( ns . getNode Group ( ) . equals ( mbgroup ) ) {
if ( ns . getSncp Group ( ) . equals ( mbgroup ) ) {
sncpServer = ns ;
break ;
}
}
if ( sncpServer ! = null ) {
this . node Address = sncpServer . getNode Address ( ) ;
this . node Group = sncpServer . getNode Group ( ) ;
this . sncp Address = sncpServer . getSncp Address ( ) ;
this . sncp Group = sncpServer . getSncp Group ( ) ;
this . nodeProtocol = sncpServer . getNodeProtocol ( ) ;
}
}
if ( this . node Address ! = null ) { // 无分布式结构下 HTTP协议的node Address 为 null
this . factory . register ( RESNAME_SNCP_NODE , SocketAddress . class , this . node Address) ;
this . factory . register ( RESNAME_SNCP_NODE , InetSocketAddress . class , this . node Address) ;
this . factory . register ( RESNAME_SNCP_NODE , String . class , this . node Address. getAddress ( ) . getHostAddress ( ) ) ;
this . factory . register ( RESNAME_SNCP_GROUP , this . node Group) ;
if ( this . sncp Address ! = null ) { // 无分布式结构下 HTTP协议的sncp Address 为 null
this . factory . register ( RESNAME_SNCP_NODE , SocketAddress . class , this . sncp Address) ;
this . factory . register ( RESNAME_SNCP_NODE , InetSocketAddress . class , this . sncp Address) ;
this . factory . register ( RESNAME_SNCP_NODE , String . class , this . sncp Address. getAddress ( ) . getHostAddress ( ) ) ;
this . factory . register ( RESNAME_SNCP_GROUP , this . sncp Group) ;
}
{
//设置root文件夹
@@ -135,9 +138,9 @@ public abstract class NodeServer {
regFactory . register ( rs . name ( ) , DataSource . class , source ) ;
Class < ? extends Service > sc = ( Class < ? extends Service > ) application . dataCacheListenerClass ;
if ( sc ! = null ) {
Service cacheListenerService = Sncp . createLocalService ( rs . name ( ) , sc , this . node Address, nodeSameGroupTransports , nodeDiffGroupTransports ) ;
Service cacheListenerService = Sncp . createLocalService ( rs . name ( ) , sc , this . sncp Address, nodeSameGroupTransports , nodeDiffGroupTransports ) ;
regFactory . register ( rs . name ( ) , DataCacheListener . class , cacheListenerService ) ;
ServiceWrapper wrapper = new ServiceWrapper ( sc , cacheListenerService , node Group, rs . name ( ) , null ) ;
ServiceWrapper wrapper = new ServiceWrapper ( sc , cacheListenerService , sncp Group, rs . name ( ) , null ) ;
localServices . add ( wrapper ) ;
if ( consumer ! = null ) consumer . accept ( wrapper ) ;
rf . inject ( cacheListenerService ) ;
@@ -151,18 +154,18 @@ public abstract class NodeServer {
}
protected List < Transport > [ ] parseTransport ( final String [ ] groups ) {
final Set < InetSocketAddress > sameGroupAddrs = application . findGlobalGroup ( this . node Group) ;
final Set < InetSocketAddress > sameGroupAddrs = application . findGlobalGroup ( this . sncp Group) ;
final Map < String , Set < InetSocketAddress > > diffGroupAddrs = new HashMap < > ( ) ;
for ( String groupitem : groups ) {
final Set < InetSocketAddress > addrs = application . findGlobalGroup ( groupitem ) ;
if ( addrs = = null | | groupitem . equals ( this . node Group) ) continue ;
if ( addrs = = null | | groupitem . equals ( this . sncp Group) ) continue ;
diffGroupAddrs . put ( groupitem , addrs ) ;
}
final List < Transport > sameGroupTransports0 = new ArrayList < > ( ) ;
if ( sameGroupAddrs ! = null ) {
sameGroupAddrs . remove ( this . node Address) ;
sameGroupAddrs . remove ( this . sncp Address) ;
for ( InetSocketAddress iaddr : sameGroupAddrs ) {
sameGroupTransports0 . add ( loadTransport ( this . node Group, getNodeProtocol ( ) , iaddr ) ) ;
sameGroupTransports0 . add ( loadTransport ( this . sncp Group, getNodeProtocol ( ) , iaddr ) ) ;
}
}
final List < Transport > diffGroupTransports0 = new ArrayList < > ( ) ;
@@ -174,12 +177,12 @@ public abstract class NodeServer {
public abstract boolean isSNCP ( ) ;
public InetSocketAddress getNode Address ( ) {
return node Address;
public InetSocketAddress getSncp Address ( ) {
return sncp Address;
}
public String getNode Group ( ) {
return node Group;
public String getSncp Group ( ) {
return sncp Group;
}
public String getNodeProtocol ( ) {
@@ -235,7 +238,7 @@ public abstract class NodeServer {
if ( serviceFilter = = null ) return ;
final String threadName = " [ " + Thread . currentThread ( ) . getName ( ) + " ] " ;
final Set < FilterEntry < Service > > entrys = serviceFilter . getFilterEntrys ( ) ;
final String defgroup = nodeConf = = null ? " " : nodeConf . getValue ( " group " , " " ) ; //Server节点获取group信息
final String defgroups = nodeConf = = null ? " " : nodeConf . getValue ( " group " , " " ) ; //Server节点获取group信息
ResourceFactory regFactory = isSNCP ( ) ? application . factory : factory ;
for ( FilterEntry < Service > entry : entrys ) { //service实现类
final Class < ? extends Service > type = entry . getType ( ) ;
@@ -245,50 +248,44 @@ public abstract class NodeServer {
if ( Modifier . isAbstract ( type . getModifiers ( ) ) ) continue ;
if ( type . getAnnotation ( Ignore . class ) ! = null ) continue ;
if ( ! isSNCP ( ) & & factory . find ( entry . getName ( ) , type ) ! = null ) continue ;
String group = entry . getGroup ( ) ;
if ( group = = null | | group . isEmpty ( ) ) group = defgroup ;
String groups = entry . getGroup ( ) ;
if ( groups = = null | | groups . isEmpty ( ) ) groups = defgroups ;
final Set < InetSocketAddress > sameGroupAddrs = new LinkedHashSet < > ( ) ;
final Map < String , Set < InetSocketAddress > > diffGroupAddrs = new HashMap < > ( ) ;
for ( String str : group . split ( " ; " ) ) {
application . globalNodes . forEach ( ( k , v ) - > {
if ( v . equals ( str ) ) {
if ( v . equals ( this . nodeGroup ) ) {
sameGroupAddrs . add ( k ) ;
} else {
Set < InetSocketAddress > set = diffGroupAddrs . get ( v ) ;
if ( set = = null ) {
set = new LinkedHashSet < > ( ) ;
diffGroupAddrs . put ( v , set ) ;
}
set . add ( k ) ;
}
}
} ) ;
for ( String g : groups . split ( " ; " ) ) {
if ( g . isEmpty ( ) ) continue ;
Set < InetSocketAddress > set = application . findGlobalGroup ( g ) ;
if ( set = = null ) throw new RuntimeException ( type . getName ( ) + " has illegal group ( " + groups + " ) " ) ;
if ( g . equals ( this . sncpGroup ) ) {
sameGroupAddrs . addAll ( set ) ;
} el se {
diffGroupAddrs . put ( g , set ) ;
}
}
final boolean localable = sameGroupAddrs . contains ( this . node Address) ;
final boolean localable = this . sncpAddress = = null | | sameGroupAddrs . contains ( this . sncp Address) ;
Service service ;
List < Transport > diffGroupTransports = new ArrayList < > ( ) ;
diffGroupAddrs . forEach ( ( k , v ) - > diffGroupTransports . add ( loadTransport ( k , server . getProtocol ( ) , v ) ) ) ;
if ( localable | | ( sameGroupAddrs . isEmpty ( ) & & diffGroupTransports . isEmpty ( ) ) ) {
sameGroupAddrs . remove ( this . node Address) ;
sameGroupAddrs . remove ( this . sncp Address) ;
List < Transport > sameGroupTransports = new ArrayList < > ( ) ;
for ( InetSocketAddress iaddr : sameGroupAddrs ) {
Set < InetSocketAddress > tset = new HashSet < > ( ) ;
tset . add ( iaddr ) ;
sameGroupTransports . add ( loadTransport ( this . node Group, server . getProtocol ( ) , tset ) ) ;
sameGroupTransports . add ( loadTransport ( this . sncp Group, server . getProtocol ( ) , tset ) ) ;
}
service = Sncp . createLocalService ( entry . getName ( ) , type , this . node Address, sameGroupTransports , diffGroupTransports ) ;
service = Sncp . createLocalService ( entry . getName ( ) , type , this . sncp Address, sameGroupTransports , diffGroupTransports ) ;
} else {
StringBuilder g = new StringBuilder ( this . node Group) ;
StringBuilder g = new StringBuilder ( this . sncp Group) ;
diffGroupAddrs . forEach ( ( k , v ) - > {
if ( g . length ( ) > 0 ) g . append ( ';' ) ;
g . append ( k ) ;
sameGroupAddrs . addAll ( v ) ;
} ) ;
if ( sameGroupAddrs . isEmpty ( ) ) throw new RuntimeException ( type + " : " + group ) ;
service = Sncp . createRemoteService ( entry . getName ( ) , type , this . node Address, loadTransport ( g . toString ( ) , server . getProtocol ( ) , sameGroupAddrs ) ) ;
if ( sameGroupAddrs . isEmpty ( ) ) throw new RuntimeException ( type . getName ( ) + " has no remote address on group ( " + groups + " ) " ) ;
service = Sncp . createRemoteService ( entry . getName ( ) , type , this . sncp Address, loadTransport ( g . toString ( ) , server . getProtocol ( ) , sameGroupAddrs ) ) ;
}
ServiceWrapper wrapper = new ServiceWrapper ( type , service , entry ) ;
if ( factory . find ( wrapper . getName ( ) , wrapper . getType ( ) ) = = null ) {
@@ -300,7 +297,7 @@ public abstract class NodeServer {
if ( consumer ! = null ) consumer . accept ( wrapper ) ;
}
} else if ( isSNCP ( ) ) {
throw new RuntimeException ( ServiceWrapper . class . getSimpleName ( ) + " (class: " + type . getName ( ) + " , name: " + entry . getName ( ) + " , group: " + group + " ) is repeat. " ) ;
throw new RuntimeException ( ServiceWrapper . class . getSimpleName ( ) + " (class: " + type . getName ( ) + " , name: " + entry . getName ( ) + " , group: " + groups + " ) is repeat. " ) ;
}
}
servicecdl . countDown ( ) ;
@@ -327,7 +324,7 @@ public abstract class NodeServer {
}
protected ClassFilter < Service > createServiceClassFilter ( final AnyValue config ) {
return createClassFilter ( this . node Group, config , null , Service . class , Annotation . class , " services " , " service " ) ;
return createClassFilter ( this . sncp Group, config , null , Service . class , Annotation . class , " services " , " service " ) ;
}
protected static ClassFilter createClassFilter ( final String localGroup , final AnyValue config , Class < ? extends Annotation > ref ,