@@ -5,11 +5,14 @@
*/
package org.redkale.mq ;
import java.lang.reflect.ParameterizedType ;
import java.lang.reflect.Type ;
import java.nio.charset.StandardCharsets ;
import java.util.* ;
import java.util.concurrent.* ;
import java.util.concurrent.atomic.AtomicLong ;
import java.util.concurrent.locks.ReentrantLock ;
import java.util.function.IntFunction ;
import java.util.logging.* ;
import java.util.stream.Collectors ;
import org.redkale.annotation.* ;
@@ -22,6 +25,7 @@ import org.redkale.convert.Convert;
import org.redkale.convert.ConvertFactory ;
import org.redkale.convert.ConvertType ;
import org.redkale.net.Servlet ;
import org.redkale.net.WorkThread ;
import org.redkale.net.http.* ;
import org.redkale.net.sncp.* ;
import org.redkale.service.* ;
@@ -54,44 +58,52 @@ public abstract class MessageAgent implements Resourcable {
protected AnyValue config ;
private ExecutorService workExecutor ;
protected final ReentrantLock messageProducerLock = new ReentrantLock ( ) ;
protected MessageProducer baseM essageProducer;
protected MessageProducer m essageBase Producer;
protected Map < ConvertType , ConvertMessageProducer > messageProducers = new ConcurrentHashMap < > ( ) ;
protected Map < ConvertType , ConvertMessageProducer > messageProducerMap = new ConcurrentHashMap < > ( ) ;
protected final CopyOnWriteArrayList < MessageConsumer > messageConsumerList = new CopyOnWriteArrayList < > ( ) ;
//key: group, sub-key: topic
protected final ConcurrentHash Map< String , ConcurrentHash Map< String , MessageConsumer > > c onsumerMap = new Concurrent HashMap< > ( ) ;
protected final Map < String , Map < String , MessageConsumerWrapper > > messageC onsumerMap = new HashMap < > ( ) ;
protected final CopyOnWriteArrayList < MessageConsumer > consumerList = new CopyOnWriteArrayList < > ( ) ;
protected MessageClientProducer httpClientProducer ;
protected MessageClientProducer http Producer;
protected MessageClientProducer sncpProducer ;
protected MessageClientProducer sncpClient Producer;
protected HttpMessageClient httpMessageClient ;
protected SncpMessageClient sncpMessageClient ;
protected final ReentrantLock consumerLock = new ReentrantLock ( ) ;
protected final ReentrantLock clientC onsumerLock = new ReentrantLock ( ) ;
protected final ReentrantLock p roducerLock = new ReentrantLock ( ) ;
protected final ReentrantLock clientP roducerLock = new ReentrantLock ( ) ;
protected final ReentrantLock serviceLock = new ReentrantLock ( ) ;
protected final List < MessageConsumer > consumerListen ers = new CopyOnWriteArrayList < > ( ) ;
protected MessageCoder < MessageRecord > clientMessageCod er = MessageRecordCoder . getInstance ( ) ;
//本地Service消息接收处理器, key:consumerid
protected HashMap < String , MessageClientConsumerNode > clientConsumerNodes = new LinkedHashMap < > ( ) ;
protected final AtomicLong msgSeqno = new AtomicLong ( System . nanoTime ( ) ) ;
protected ScheduledThreadPoolExecutor timeoutExecutor ;
protected MessageCoder < MessageRecord > messageCoder = MessageRecordCoder . getInstance ( ) ;
//本地Service消息接收处理器, key:consumerid
protected HashMap < String , MessageClientConsumerNode > clientConsumerNodes = new LinkedHashMap < > ( ) ;
public void init ( AnyValue config ) {
this . name = checkName ( config . getValue ( " name " , " " ) ) ;
int threads = config . getIntValue ( " threads " , - 1 ) ;
if ( threads = = 0 ) {
this . workExecutor = application . getWorkExecutor ( ) ;
}
if ( this . workExecutor = = null ) {
this . workExecutor = threads > 0 ? WorkThread . createExecutor ( threads , " Redkale-MessageConsumerThread-[ " + name + " ]-%s " )
: WorkThread . createWorkExecutor ( Utility . cpus ( ) , " Redkale-MessageConsumerThread-[ " + name + " ]-%s " ) ;
}
this . httpMessageClient = new HttpMessageClient ( this ) ;
this . sncpMessageClient = new SncpMessageClient ( this ) ;
String coderType = config . getValue ( " coder " , " " ) ;
@@ -106,7 +118,7 @@ public abstract class MessageAgent implements Resourcable {
if ( coder instanceof Service ) {
( ( Service ) coder ) . init ( config ) ;
}
this . m essageCoder = coder ;
this . clientM essageCoder = coder ;
} catch ( RuntimeException ex ) {
throw ex ;
} catch ( Exception e ) {
@@ -122,13 +134,17 @@ public abstract class MessageAgent implements Resourcable {
this . timeoutExecutor . setRemoveOnCancelPolicy ( true ) ;
}
public Future submit ( Runnable event ) {
return workExecutor . submit ( event ) ;
}
public Map < String , Long > start ( List < MessageConsumer > consumers ) {
this . consumerListeners . addAll ( consumers ) ;
initMessageConsumer ( consumers ) ;
startMessageConsumer ( ) ;
final LinkedHashMap < String , Long > map = new LinkedHashMap < > ( ) ;
final List < CompletableFuture > futures = new ArrayList < > ( ) ;
this . clientConsumerNodes . values ( ) . forEach ( node - > {
long s = System . currentTimeMillis ( ) ;
node . consumer . startup ( ) ;
node . consumer . start ( ) ;
long e = System . currentTimeMillis ( ) - s ;
map . put ( node . consumer . consumerid , e ) ;
} ) ;
@@ -137,30 +153,37 @@ public abstract class MessageAgent implements Resourcable {
//Application.shutdown 在执行server.shutdown之前执行
public void stop ( ) {
this . stopMessageConsumer ( ) ;
this . stopMessageProducer ( ) ;
this . clientConsumerNodes . values ( ) . forEach ( node - > {
node . consumer . shutdown ( ) ;
node . consumer . stop ( ) ;
} ) ;
}
//Application.shutdown 在所有server.shutdown执行后执行
public void destroy ( AnyValue config ) {
this . httpMessageClient . close ( ) ;
this . sncpMessageClient . close ( ) ;
for ( MessageConsumer consumer : consumerListeners ) {
for ( MessageConsumer consumer : messageConsumerList ) {
consumer . destroy ( config ) ;
}
this . c onsumerListeners . clear ( ) ;
this . messageC onsumerList. clear ( ) ;
this . messageConsumerMap . clear ( ) ;
this . httpMessageClient . close ( ) ;
this . sncpMessageClient . close ( ) ;
if ( this . sncpClientProducer ! = null ) {
this . sncpClientProducer . shutdown ( ) ;
}
if ( this . httpClientProducer ! = null ) {
this . httpClientProducer . shutdown ( ) ;
}
if ( this . clientMessageCoder instanceof Service ) {
( ( Service ) this . clientMessageCoder ) . destroy ( config ) ;
}
if ( this . timeoutExecutor ! = null ) {
this . timeoutExecutor . shutdown ( ) ;
this . timeoutExecutor . shutdownNow ( ) ;
}
if ( this . sncpProduce r ! = null ) {
this . sncpProduce r. shutdown( ) . join ( ) ;
}
if ( this . httpProducer ! = null ) {
this . httpProducer . shutdown ( ) . join ( ) ;
}
if ( this . messageCoder instanceof Service ) {
( ( Service ) this . messageCoder ) . destroy ( config ) ;
if ( this . workExecuto r ! = null & & this . workExecutor ! = application . getWorkExecutor ( ) ) {
this . workExecuto r. shutdownNow ( ) ;
}
}
@@ -169,25 +192,55 @@ public abstract class MessageAgent implements Resourcable {
}
public MessageProducer loadMessageProducer ( ResourceProducer ann ) {
MessageProducer baseProducer = this . baseM essageProducer;
if ( this . baseM essageProducer = = null ) {
MessageProducer baseProducer = this . m essageBase Producer;
if ( this . m essageBase Producer = = null ) {
messageProducerLock . lock ( ) ;
try {
if ( this . baseM essageProducer = = null ) {
this . baseMessageProducer = create MessageProducer( ) ;
if ( this . m essageBase Producer = = null ) {
start MessageProducer( ) ;
}
} finally {
messageProducerLock . unlock ( ) ;
}
baseProducer = this . baseM essageProducer;
baseProducer = this . m essageBase Producer;
}
MessageProducer producer = baseProducer ;
Objects . requireNonNull ( producer ) ;
return messageProducers . computeIfAbsent ( ann . convertType ( ) , t - > new ConvertMessageProducer ( producer , ConvertFactory . findConvert ( t ) ) ) ;
return messageProducerMap . computeIfAbsent ( ann . convertType ( ) , t - > new ConvertMessageProducer ( producer , ConvertFactory . findConvert ( t ) ) ) ;
}
public void load MessageConsumer( MessageConsumer consumer ) {
protected void init MessageConsumer( List < MessageConsumer> consumers ) {
clientConsumerLock . lock ( ) ;
try {
Map < String , Map < String , MessageConsumerWrapper > > maps = new HashMap < > ( ) ;
for ( MessageConsumer consumer : consumers ) {
ResourceConsumer res = consumer . getClass ( ) . getAnnotation ( ResourceConsumer . class ) ;
String group = application . getPropertyValue ( res . group ( ) ) ;
group = consumer . getClass ( ) . getName ( ) + ( group . isEmpty ( ) ? " " : ( " - " + group ) ) ;
Map < String , MessageConsumerWrapper > map = maps . computeIfAbsent ( group , g - > new HashMap < > ( ) ) ;
for ( String t : res . topics ( ) ) {
String topic = application . getPropertyValue ( t ) ;
if ( ! topic . trim ( ) . isEmpty ( ) ) {
if ( map . containsKey ( topic . trim ( ) ) ) {
throw new RedkaleException ( MessageConsumer . class . getSimpleName ( )
+ " consume topic ( " + topic + " ) repeat with " + map . get ( topic ) . getClass ( ) . getName ( ) + " and " + consumer . getClass ( ) . getName ( ) ) ;
}
for ( MessageConsumerWrapper wrapper : map . values ( ) ) {
if ( ! Objects . equals ( res . convertType ( ) , wrapper . convertType ) ) {
throw new RedkaleException ( MessageConsumer . class . getSimpleName ( )
+ " " + consumer . getClass ( ) . getName ( ) + " convertType( " + res . convertType ( ) + " ) differ in "
+ wrapper . consumer . getClass ( ) . getName ( ) + " convertType( " + wrapper . convertType + " ) " ) ;
}
}
map . put ( topic . trim ( ) , new MessageConsumerWrapper ( this , consumer , res . convertType ( ) ) ) ;
}
}
}
messageConsumerList . addAll ( consumers ) ;
messageConsumerMap . putAll ( maps ) ;
} finally {
clientConsumerLock . unlock ( ) ;
}
}
@Override
@@ -211,6 +264,10 @@ public abstract class MessageAgent implements Resourcable {
this . config = config ;
}
public ExecutorService getWorkExecutor ( ) {
return workExecutor ;
}
public HttpMessageClient getHttpMessageClient ( ) {
return httpMessageClient ;
}
@@ -234,7 +291,6 @@ public abstract class MessageAgent implements Resourcable {
return name ;
}
@Deprecated
protected List < MessageClientConsumer > getMessageClientConsumers ( ) {
List < MessageClientConsumer > consumers = new ArrayList < > ( ) ;
MessageClientConsumer one = this . httpMessageClient = = null ? null : this . httpMessageClient . respConsumer ;
@@ -249,14 +305,13 @@ public abstract class MessageAgent implements Resourcable {
return consumers ;
}
@Deprecated
protected List < MessageClientProducer > getMessageClientProducers ( ) {
List < MessageClientProducer > producers = new ArrayList < > ( ) ;
if ( this . httpProducer ! = null ) {
producers . add ( this . httpProducer ) ;
if ( this . httpClient Producer ! = null ) {
producers . add ( this . httpClient Producer ) ;
}
if ( this . sncpProducer ! = null ) {
producers . add ( this . sncpProducer ) ;
if ( this . sncpClient Producer ! = null ) {
producers . add ( this . sncpClient Producer ) ;
}
MessageClientProducer one = this . httpMessageClient = = null ? null : this . httpMessageClient . getProducer ( ) ;
if ( one ! = null ) {
@@ -269,56 +324,57 @@ public abstract class MessageAgent implements Resourcable {
return producers ;
}
@Deprecated
public MessageCoder < MessageRecord > ge tMessageCoder ( ) {
return this . messageCoder ;
public MessageCoder < MessageRecord > getClientMessageCoder ( ) {
return this . clien tMessageCoder ;
}
@Deprecated
//获取指定topic的生产处理器
public MessageClientProducer getSncpMessageClientProducer ( ) {
if ( this . sncpProducer = = null ) {
p roducerLock. lock ( ) ;
if ( this . sncpClient Producer = = null ) {
clientP roducerLock. lock ( ) ;
try {
if ( this . sncpProducer = = null ) {
if ( this . sncpClient Producer = = null ) {
long s = System . currentTimeMillis ( ) ;
this . sncpProducer = createMessageClientProducer ( " SncpProducer " ) ;
this . sncpClient Producer = createMessageClientProducer ( " SncpProducer " ) ;
long e = System . currentTimeMillis ( ) - s ;
if ( logger . isLoggable ( Level . FINEST ) ) {
logger . log ( Level . FINEST , " MessageAgent.SncpProducer startup all in " + e + " ms " ) ;
}
}
} finally {
p roducerLock. unlock ( ) ;
clientP roducerLock. unlock ( ) ;
}
}
return this . sncpProducer ;
return this . sncpClient Producer ;
}
@Deprecated
public MessageClientProducer getHttpMessageClientProducer ( ) {
if ( this . httpProducer = = null ) {
p roducerLock. lock ( ) ;
if ( this . httpClient Producer = = null ) {
clientP roducerLock. lock ( ) ;
try {
if ( this . httpProducer = = null ) {
if ( this . httpClient Producer = = null ) {
long s = System . currentTimeMillis ( ) ;
this . httpProducer = createMessageClientProducer ( " HttpProducer " ) ;
this . httpClient Producer = createMessageClientProducer ( " HttpProducer " ) ;
long e = System . currentTimeMillis ( ) - s ;
if ( logger . isLoggable ( Level . FINEST ) ) {
logger . log ( Level . FINEST , " MessageAgent.HttpProducer startup all in " + e + " ms " ) ;
}
}
} finally {
p roducerLock. unlock ( ) ;
clientP roducerLock. unlock ( ) ;
}
}
return this . httpProducer ;
return this . httpClient Producer ;
}
//
protected abstract MessageProducer create MessageProduc er( ) ;
protected abstract void start MessageConsum er( ) ;
protected abstract void close MessageProduc er( MessageProducer messageProducer ) throws Exception ;
protected abstract void stop MessageConsum er( ) ;
protected abstract void startMessageProducer ( ) ;
protected abstract void stopMessageProducer ( ) ;
@ResourceListener
public abstract void onResourceChange ( ResourceEvent [ ] events ) ;
@@ -335,33 +391,12 @@ public abstract class MessageAgent implements Resourcable {
//ServiceLoader时判断配置是否符合当前实现类
public abstract boolean acceptsConf ( AnyValue config ) ;
@Deprecated
//创建指定topic的生产处理器
protected abstract MessageClientProducer createMessageClientProducer ( String producerName ) ;
//创建指定topic的消费处理器
public abstract MessageClientConsumer createMessageClientConsumer ( String [ ] topics , String group , MessageClientProcessor processor ) ;
public void addMessageConsumer ( ResourceConsumer res , MessageConsumer consumer ) {
consumerLock . lock ( ) ;
try {
ConcurrentHashMap < String , MessageConsumer > map = consumerMap . computeIfAbsent ( res . group ( ) , g - > new ConcurrentHashMap < > ( ) ) ;
for ( String topic : res . topics ( ) ) {
if ( ! topic . trim ( ) . isEmpty ( ) ) {
if ( map . containsKey ( topic . trim ( ) ) ) {
throw new RedkaleException ( MessageConsumer . class . getSimpleName ( )
+ " consume topic ( " + topic + " ) repeat with "
+ map . get ( topic ) . getClass ( ) . getName ( ) + " and " + consumer . getClass ( ) . getName ( ) ) ;
}
map . put ( topic . trim ( ) , consumer ) ;
}
}
consumerList . add ( consumer ) ;
} finally {
consumerLock . unlock ( ) ;
}
}
public final void putService ( NodeHttpServer ns , Service service , HttpServlet servlet ) {
AutoLoad al = service . getClass ( ) . getAnnotation ( AutoLoad . class ) ;
if ( al ! = null & & ! al . value ( ) & & service . getClass ( ) . getAnnotation ( Local . class ) ! = null ) {
@@ -471,21 +506,68 @@ public abstract class MessageAgent implements Resourcable {
}
protected static class MessageConsumerWrapper {
public static class MessageConsumerWrapper < T > {
private final MessageAgent messageAgent ;
private final MessageConsumer consumer ;
public MessageConsumerWrapper ( MessageConsumer consumer ) {
private final ConvertType convertType ;
private final Convert convert ;
private final Type messageType ;
private final IntFunction < T [ ] > arrayCreator ;
public MessageConsumerWrapper ( MessageAgent messageAgent , MessageConsumer < T > consumer , ConvertType convertType ) {
Objects . requireNonNull ( messageAgent ) ;
Objects . requireNonNull ( consumer ) ;
Objects . requireNonNull ( convertType ) ;
this . messageAgent = messageAgent ;
this . convertType = convertType ;
this . consumer = consumer ;
this . convert = ConvertFactory . findConvert ( convertType ) ;
this . messageType = parseMessageType ( consumer . getClass ( ) ) ;
this . arrayCreator = Creator . funcArray ( TypeToken . typeToClass ( messageType ) ) ;
}
private static Type parseMessageType ( Class < ? extends MessageConsumer > clazz ) {
if ( Objects . equals ( Object . class , clazz ) ) {
throw new RedkaleException ( clazz . getName ( ) + " not implements " + MessageConsumer . class . getName ( ) ) ;
}
Type messageType = null ;
Class [ ] clzs = clazz . getInterfaces ( ) ;
for ( int i = 0 ; i < clzs . length ; i + + ) {
if ( MessageConsumer . class . isAssignableFrom ( clzs [ i ] ) ) {
messageType = clazz . getGenericInterfaces ( ) [ i ] ;
break ;
}
}
if ( messageType = = null ) {
return parseMessageType ( ( Class ) clazz . getSuperclass ( ) ) ;
}
return TypeToken . getGenericType ( ( ( ParameterizedType ) messageType ) . getActualTypeArguments ( ) [ 0 ] , clazz ) ;
}
public void init ( AnyValue config ) {
consumer . init ( config ) ;
}
public void onMessage ( MessageConext context , Object [ ] messages ) {
consumer . onMessage ( context , messages ) ;
public Future onMessage ( MessageConext context , List < byte [ ] > messages ) {
return messageAgent . submit ( ( ) - > {
try {
T [ ] msgs = this . arrayCreator . apply ( messages . size ( ) ) ;
int index = - 1 ;
for ( byte [ ] bs : messages ) {
msgs [ + + index ] = ( T ) convert . convertFrom ( messageType , bs ) ;
}
consumer . onMessage ( context , msgs ) ;
} catch ( Throwable t ) {
messageAgent . getLogger ( ) . log ( Level . SEVERE , MessageConsumer . class . getSimpleName ( ) + " execute error, topic: " + context . getTopic ( )
+ " , messages: " + messages . stream ( ) . map ( v - > new String ( v , StandardCharsets . UTF_8 ) ) . collect ( Collectors . toList ( ) ) ) ;
}
} ) ;
}
public void destroy ( AnyValue config ) {