diff --git a/conf/application.xml b/conf/application.xml index 3c2b462..5139982 100644 --- a/conf/application.xml +++ b/conf/application.xml @@ -3,7 +3,7 @@ - + diff --git a/conf/redis.properties b/conf/redis.properties deleted file mode 100644 index c1775de..0000000 --- a/conf/redis.properties +++ /dev/null @@ -1,4 +0,0 @@ -# redis -redis.host=47.111.150.118 -redis.password=*Zhong9307! -redis.port=6064 \ No newline at end of file diff --git a/src/com/zdemo/AbstractConsumer.java b/src/com/zdemo/AbstractConsumer.java index 6211d68..ac338ca 100644 --- a/src/com/zdemo/AbstractConsumer.java +++ b/src/com/zdemo/AbstractConsumer.java @@ -2,9 +2,9 @@ package com.zdemo; import org.redkale.convert.json.JsonConvert; -import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Set; /** * @author Liang @@ -33,7 +33,7 @@ public abstract class AbstractConsumer implements IConsumer { } @Override - public final Collection getSubscribes() { + public final Set getSubscribes() { return eventMap.keySet(); } diff --git a/src/com/zdemo/pulsar/PulsarConsumer.java b/src/com/zdemo/pulsar/PulsarConsumer.java new file mode 100644 index 0000000..abfcfa6 --- /dev/null +++ b/src/com/zdemo/pulsar/PulsarConsumer.java @@ -0,0 +1,93 @@ +package com.zdemo.pulsar; + +import com.zdemo.AbstractConsumer; +import com.zdemo.EventType; +import com.zdemo.IConsumer; +import org.apache.pulsar.client.api.*; +import org.redkale.service.Service; +import org.redkale.util.AnyValue; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; + +public abstract class PulsarConsumer extends AbstractConsumer implements IConsumer, Service { + + @Resource(name = "property.pulsar.serviceurl") + private String serviceurl = "pulsar://127.0.0.1:6650"; + private PulsarClient client; + private Consumer consumer; + + public abstract String getGroupid(); + + private final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); + + @Override + public void addEventType(EventType... eventTypes) { + super.addEventType(eventTypes); + try { + for (EventType eventType : eventTypes) { + queue.put(eventType); + } + } catch (InterruptedException e) { + logger.log(Level.WARNING, "", e); + } + } + + @Override + public void init(AnyValue config) { + new Thread(() -> { + try { + client = PulsarClient.builder() + .serviceUrl(serviceurl) + .build(); + + consumer = client.newConsumer() + .topics(new ArrayList<>(getSubscribes())) + .subscriptionName(getGroupid()) + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + + while (true) { + // 动态新增订阅 + while (!queue.isEmpty()) { + queue.clear(); + consumer.unsubscribe(); + consumer = client.newConsumer() + .topics(new ArrayList<>(getSubscribes())) + .subscriptionName(getGroupid()) + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + } + + // Wait for a message + Message msg = consumer.receive(10, TimeUnit.SECONDS); + if (msg == null) { + continue; + } + + String topic = msg.getTopicName().replace("persistent://public/default/", ""); + long offset = 0; + String value = new String(msg.getData()); + try { + accept(topic, value); + + consumer.acknowledge(msg); // Acknowledge the message so that it can be deleted by the message broker + } catch (Exception e) { + logger.log(Level.WARNING, String.format("topic[%s] event accept error, offset=%s,value:%s", topic, offset, value), e); + consumer.negativeAcknowledge(msg); // Message failed to process, redeliver later + } + } + } catch (PulsarClientException e) { + e.printStackTrace(); + } + }).start(); + } + + @Override + public void destroy(AnyValue config) { + + } +} diff --git a/src/com/zdemo/pulsar/PulsarProducer.java b/src/com/zdemo/pulsar/PulsarProducer.java new file mode 100644 index 0000000..219efcc --- /dev/null +++ b/src/com/zdemo/pulsar/PulsarProducer.java @@ -0,0 +1,82 @@ +package com.zdemo.pulsar; + +import com.zdemo.Event; +import com.zdemo.IProducer; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.redkale.convert.json.JsonConvert; +import org.redkale.service.Service; +import org.redkale.util.AnyValue; +import org.redkale.util.Comment; + +import javax.annotation.Resource; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; + +public class PulsarProducer implements IProducer, Service { + + @Resource(name = "property.pulsar.serviceurl") + private String serviceurl = "pulsar://127.0.0.1:6650"; + + @Comment("消息生产者") + private Map> producerMap = new HashMap(); + private PulsarClient client; + + @Override + public void init(AnyValue config) { + try { + client = PulsarClient.builder() + .serviceUrl(serviceurl) + .build(); + } catch (PulsarClientException e) { + e.printStackTrace(); + } + } + + public Producer getProducer(String topic) { + Producer producer = producerMap.get(topic); + if (producer != null) { + return producer; + } + + synchronized (this) { + if ((producer = producerMap.get(topic)) == null) { + try { + producer = client.newProducer() + .topic(topic) + .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) + .sendTimeout(10, TimeUnit.SECONDS) + .blockIfQueueFull(true) + .create(); + producerMap.put(topic, producer); + + return producer; + } catch (PulsarClientException e) { + e.printStackTrace(); + } + } + } + return producer; + } + + @Override + public void send(T t) { + try { + Producer producer = getProducer(t.topic); + + String v = JsonConvert.root().convertTo(t.value); + if (v.startsWith("\"") && v.endsWith("\"")) { + v = v.substring(1, v.length() - 1); + } + producer.newMessage() + .key("") + .value(v.getBytes()) + .send(); + } catch (Exception e) { + logger.log(Level.WARNING, "", e); + } + } +} diff --git a/test/com/zdemo/test/AppTest.java b/test/com/zdemo/test/AppTest.java index a2c52fd..8a081bc 100644 --- a/test/com/zdemo/test/AppTest.java +++ b/test/com/zdemo/test/AppTest.java @@ -2,7 +2,8 @@ package com.zdemo.test; import com.zdemo.Event; import com.zdemo.EventType; -import com.zdemo.kafak.KafakProducer; +import com.zdemo.IProducer; +import com.zdemo.pulsar.PulsarProducer; import org.junit.Test; import org.redkale.boot.Application; import org.redkale.convert.json.JsonConvert; @@ -28,17 +29,31 @@ public class AppTest { EventType.of("a1", new TypeToken() { }, r -> { System.out.println("我收到了消息 主题a1 事件:" + JsonConvert.root().convertTo(r)); - }), - - EventType.of("bx", str -> { - System.out.println("我收到了消息 主题bx 事件:" + str); }) , EventType.of("game-update", str -> { System.out.println("我收到了消息 主题game-update 事件:" + str); }) + + , EventType.of("http.req.hello", str -> { + System.out.println("我收到了消息 主题http.req.hello 事件:" + str); + }) + + , EventType.of("http.resp.node2004", str -> { + System.out.println("我收到了消息 主题http.resp.node2004 事件:" + str); + }) ); + // 10s 后加入 bx主题 + Thread.sleep(1_000 * 10); + System.out.println("加入新的主题订阅"); + consumer.addEventType( + EventType.of("bx", str -> { + System.out.println("我收到了消息 主题bx 事件:" + str); + }) + ); + + Thread.sleep(60_000 * 60); } catch (Exception e) { e.printStackTrace(); @@ -48,7 +63,7 @@ public class AppTest { @Test public void runProducer() { try { - KafakProducer producer = Application.singleton(KafakProducer.class); + IProducer producer = Application.singleton(PulsarProducer.class); // 发送不同的 事件 float v0 = 1f; @@ -60,12 +75,13 @@ public class AppTest { producer.send(Event.of("c1", v2));*/ producer.send(Event.of("game-update", 23256)); + producer.send(Event.of("bx", 23256)); - try { - Thread.sleep(10_000); + /*try { + Thread.sleep(1_000); } catch (InterruptedException e) { e.printStackTrace(); - } + }*/ } catch (Exception e) { e.printStackTrace(); } diff --git a/test/com/zdemo/test/MyConsumer.java b/test/com/zdemo/test/MyConsumer.java index 8a308e8..d9ddd84 100644 --- a/test/com/zdemo/test/MyConsumer.java +++ b/test/com/zdemo/test/MyConsumer.java @@ -1,8 +1,8 @@ package com.zdemo.test; -import com.zdemo.kafak.KafakConsumer; +import com.zdemo.pulsar.PulsarConsumer; -public class MyConsumer extends KafakConsumer { +public class MyConsumer extends PulsarConsumer { public String getGroupid() { return "group-test"; //消费组名称