新增:总线消息插件模式注入
This commit is contained in:
parent
c4639fcabf
commit
57c216c791
@ -4,8 +4,14 @@
|
|||||||
|
|
||||||
<resources>
|
<resources>
|
||||||
<properties load="config.properties"></properties>
|
<properties load="config.properties"></properties>
|
||||||
|
<listener value="com.zdemo.ZhubListener"/>
|
||||||
</resources>
|
</resources>
|
||||||
|
|
||||||
|
<zhubs>
|
||||||
|
<zhub name="zhub" addr="47.111.150.118" port="6066" groupid="platf-zhub"/>
|
||||||
|
<zhub name="zhub2" addr="47.111.150.118" port="6066" groupid="platf-chat"/>
|
||||||
|
</zhubs>
|
||||||
|
|
||||||
<server protocol="HTTP" port="80">
|
<server protocol="HTTP" port="80">
|
||||||
<request>
|
<request>
|
||||||
<remoteaddr value="request.headers.X-Real-IP"/>
|
<remoteaddr value="request.headers.X-Real-IP"/>
|
||||||
|
51
src/com/zdemo/ZhubListener.java
Normal file
51
src/com/zdemo/ZhubListener.java
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
package com.zdemo;
|
||||||
|
|
||||||
|
import org.redkale.boot.Application;
|
||||||
|
import org.redkale.boot.ApplicationListener;
|
||||||
|
import org.redkale.service.Service;
|
||||||
|
import org.redkale.util.AnyValue;
|
||||||
|
import org.redkale.util.RedkaleClassLoader;
|
||||||
|
import org.redkale.util.ResourceFactory;
|
||||||
|
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 服务监听
|
||||||
|
*
|
||||||
|
* @author: liangxy.
|
||||||
|
*/
|
||||||
|
public class ZhubListener implements ApplicationListener {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preStart(Application application) {
|
||||||
|
|
||||||
|
CompletableFuture.runAsync(() -> {
|
||||||
|
ResourceFactory resourceFactory = application.getResourceFactory();
|
||||||
|
RedkaleClassLoader classLoader = application.getClassLoader();
|
||||||
|
|
||||||
|
AnyValue appConfig = application.getAppConfig();
|
||||||
|
AnyValue zhubs = appConfig.getAnyValue("zhubs");
|
||||||
|
AnyValue[] values = zhubs.getAnyValues("zhub");
|
||||||
|
for (AnyValue zhub : values) {
|
||||||
|
String clazz = zhub.getValue("value", "com.zdemo.zhub.ZHubClient");
|
||||||
|
try {
|
||||||
|
Class<?> aClass = classLoader.loadClass(clazz);
|
||||||
|
Service obj = (Service) aClass.newInstance();
|
||||||
|
obj.init(zhub);
|
||||||
|
resourceFactory.register(zhub.get("name"), aClass, obj);
|
||||||
|
} catch (ClassNotFoundException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (IllegalAccessException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (InstantiationException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preShutdown(Application application) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
@ -7,6 +7,7 @@ import com.zdemo.IProducer;
|
|||||||
import org.redkale.convert.json.JsonConvert;
|
import org.redkale.convert.json.JsonConvert;
|
||||||
import org.redkale.service.Service;
|
import org.redkale.service.Service;
|
||||||
import org.redkale.util.AnyValue;
|
import org.redkale.util.AnyValue;
|
||||||
|
import org.redkale.util.AutoLoad;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
@ -23,7 +24,8 @@ import java.util.function.BiConsumer;
|
|||||||
import java.util.logging.Level;
|
import java.util.logging.Level;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
public abstract class ZHubClient extends AbstractConsumer implements IConsumer, IProducer, Service {
|
@AutoLoad(value = false)
|
||||||
|
public class ZHubClient extends AbstractConsumer implements IConsumer, IProducer, Service {
|
||||||
|
|
||||||
public Logger logger = Logger.getLogger(ZHubClient.class.getSimpleName());
|
public Logger logger = Logger.getLogger(ZHubClient.class.getSimpleName());
|
||||||
|
|
||||||
@ -33,6 +35,8 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer,
|
|||||||
private String password = "";
|
private String password = "";
|
||||||
@Resource(name = "property.zhub.port")
|
@Resource(name = "property.zhub.port")
|
||||||
private int port = 1216;
|
private int port = 1216;
|
||||||
|
@Resource(name = "property.zhub.groupid")
|
||||||
|
private String groupid = "";
|
||||||
|
|
||||||
private ReentrantLock lock = new ReentrantLock();
|
private ReentrantLock lock = new ReentrantLock();
|
||||||
private Socket client;
|
private Socket client;
|
||||||
@ -53,6 +57,14 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer,
|
|||||||
if (!preInit()) {
|
if (!preInit()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 自动注入
|
||||||
|
if (config != null) {
|
||||||
|
host = config.getValue("addr", host);
|
||||||
|
port = config.getIntValue("port", port);
|
||||||
|
groupid = config.getValue("groupid", groupid);
|
||||||
|
}
|
||||||
|
|
||||||
if (!initSocket()) {
|
if (!initSocket()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -194,7 +206,11 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer,
|
|||||||
writer = client.getOutputStream();
|
writer = client.getOutputStream();
|
||||||
reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
|
reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
|
||||||
|
|
||||||
send("groupid " + getGroupid());
|
String groupid = getGroupid();
|
||||||
|
if (groupid == null || groupid.isEmpty()) {
|
||||||
|
throw new RuntimeException("ZHubClient groupid can not is empty");
|
||||||
|
}
|
||||||
|
send("groupid " + groupid);
|
||||||
|
|
||||||
StringBuffer buf = new StringBuffer("subscribe");
|
StringBuffer buf = new StringBuffer("subscribe");
|
||||||
for (String topic : getTopics()) {
|
for (String topic : getTopics()) {
|
||||||
@ -209,6 +225,9 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer,
|
|||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.log(Level.WARNING, "Zdb Consumer 初始化失败!", e);
|
logger.log(Level.WARNING, "Zdb Consumer 初始化失败!", e);
|
||||||
return false;
|
return false;
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.log(Level.WARNING, "Zdb Consumer 初始化失败!", e);
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
@ -266,6 +285,11 @@ public abstract class ZHubClient extends AbstractConsumer implements IConsumer,
|
|||||||
delay(topic, v, delay);
|
delay(topic, v, delay);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getGroupid() {
|
||||||
|
return groupid;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void subscribe(String topic) {
|
protected void subscribe(String topic) {
|
||||||
send("subscribe " + topic); //新增订阅
|
send("subscribe " + topic); //新增订阅
|
||||||
|
@ -25,7 +25,6 @@ public class AppTest {
|
|||||||
MyConsumer consumer = Application.singleton(MyConsumer.class);
|
MyConsumer consumer = Application.singleton(MyConsumer.class);
|
||||||
|
|
||||||
consumer.subscribe("a", str -> {
|
consumer.subscribe("a", str -> {
|
||||||
System.out.println(System.currentTimeMillis());
|
|
||||||
logger.info("我收到了消息 a 事件:" + str);
|
logger.info("我收到了消息 a 事件:" + str);
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -42,7 +41,6 @@ public class AppTest {
|
|||||||
System.out.println(Utility.now() + " ----------------- timer b 执行了");
|
System.out.println(Utility.now() + " ----------------- timer b 执行了");
|
||||||
});
|
});
|
||||||
//consumer.delay("a", "1", 200);
|
//consumer.delay("a", "1", 200);
|
||||||
System.out.println(System.currentTimeMillis());
|
|
||||||
consumer.delay("a", "1", "2000");
|
consumer.delay("a", "1", "2000");
|
||||||
|
|
||||||
Thread.sleep(60_000 * 60);
|
Thread.sleep(60_000 * 60);
|
||||||
|
Loading…
Reference in New Issue
Block a user