From 3c084723eddf7cc2849b62c757f39e1d39419406 Mon Sep 17 00:00:00 2001 From: lxy <237809796@qq.com> Date: Thu, 29 Oct 2020 12:29:08 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9Apulsar=20=E6=B6=88?= =?UTF-8?q?=E8=B4=B9=E8=AE=A2=E9=98=85=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- conf/application.xml | 2 +- conf/config.properties | 7 ++ conf/redis.properties | 4 - src/com/zdemo/AbstractConsumer.java | 4 +- src/com/zdemo/pulsar/PulsarConsumer.java | 93 ++++++++++++++++++++++++ src/com/zdemo/pulsar/PulsarProducer.java | 82 +++++++++++++++++++++ test/com/zdemo/test/AppTest.java | 34 ++++++--- test/com/zdemo/test/MyConsumer.java | 4 +- 8 files changed, 212 insertions(+), 18 deletions(-) create mode 100644 conf/config.properties delete mode 100644 conf/redis.properties create mode 100644 src/com/zdemo/pulsar/PulsarConsumer.java create mode 100644 src/com/zdemo/pulsar/PulsarProducer.java diff --git a/conf/application.xml b/conf/application.xml index 3c2b462..5139982 100644 --- a/conf/application.xml +++ b/conf/application.xml @@ -3,7 +3,7 @@ - + diff --git a/conf/config.properties b/conf/config.properties new file mode 100644 index 0000000..a064824 --- /dev/null +++ b/conf/config.properties @@ -0,0 +1,7 @@ +# redis +redis.host=47.111.150.118 +redis.password=*Zhong9307! +redis.port=6064 + +# pulsar +pulsar.serviceurl=pulsar://47.113.228.247:6650 \ No newline at end of file diff --git a/conf/redis.properties b/conf/redis.properties deleted file mode 100644 index c1775de..0000000 --- a/conf/redis.properties +++ /dev/null @@ -1,4 +0,0 @@ -# redis -redis.host=47.111.150.118 -redis.password=*Zhong9307! -redis.port=6064 \ No newline at end of file diff --git a/src/com/zdemo/AbstractConsumer.java b/src/com/zdemo/AbstractConsumer.java index 6211d68..ac338ca 100644 --- a/src/com/zdemo/AbstractConsumer.java +++ b/src/com/zdemo/AbstractConsumer.java @@ -2,9 +2,9 @@ package com.zdemo; import org.redkale.convert.json.JsonConvert; -import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Set; /** * @author Liang @@ -33,7 +33,7 @@ public abstract class AbstractConsumer implements IConsumer { } @Override - public final Collection getSubscribes() { + public final Set getSubscribes() { return eventMap.keySet(); } diff --git a/src/com/zdemo/pulsar/PulsarConsumer.java b/src/com/zdemo/pulsar/PulsarConsumer.java new file mode 100644 index 0000000..abfcfa6 --- /dev/null +++ b/src/com/zdemo/pulsar/PulsarConsumer.java @@ -0,0 +1,93 @@ +package com.zdemo.pulsar; + +import com.zdemo.AbstractConsumer; +import com.zdemo.EventType; +import com.zdemo.IConsumer; +import org.apache.pulsar.client.api.*; +import org.redkale.service.Service; +import org.redkale.util.AnyValue; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; + +public abstract class PulsarConsumer extends AbstractConsumer implements IConsumer, Service { + + @Resource(name = "property.pulsar.serviceurl") + private String serviceurl = "pulsar://127.0.0.1:6650"; + private PulsarClient client; + private Consumer consumer; + + public abstract String getGroupid(); + + private final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); + + @Override + public void addEventType(EventType... eventTypes) { + super.addEventType(eventTypes); + try { + for (EventType eventType : eventTypes) { + queue.put(eventType); + } + } catch (InterruptedException e) { + logger.log(Level.WARNING, "", e); + } + } + + @Override + public void init(AnyValue config) { + new Thread(() -> { + try { + client = PulsarClient.builder() + .serviceUrl(serviceurl) + .build(); + + consumer = client.newConsumer() + .topics(new ArrayList<>(getSubscribes())) + .subscriptionName(getGroupid()) + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + + while (true) { + // 动态新增订阅 + while (!queue.isEmpty()) { + queue.clear(); + consumer.unsubscribe(); + consumer = client.newConsumer() + .topics(new ArrayList<>(getSubscribes())) + .subscriptionName(getGroupid()) + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + } + + // Wait for a message + Message msg = consumer.receive(10, TimeUnit.SECONDS); + if (msg == null) { + continue; + } + + String topic = msg.getTopicName().replace("persistent://public/default/", ""); + long offset = 0; + String value = new String(msg.getData()); + try { + accept(topic, value); + + consumer.acknowledge(msg); // Acknowledge the message so that it can be deleted by the message broker + } catch (Exception e) { + logger.log(Level.WARNING, String.format("topic[%s] event accept error, offset=%s,value:%s", topic, offset, value), e); + consumer.negativeAcknowledge(msg); // Message failed to process, redeliver later + } + } + } catch (PulsarClientException e) { + e.printStackTrace(); + } + }).start(); + } + + @Override + public void destroy(AnyValue config) { + + } +} diff --git a/src/com/zdemo/pulsar/PulsarProducer.java b/src/com/zdemo/pulsar/PulsarProducer.java new file mode 100644 index 0000000..219efcc --- /dev/null +++ b/src/com/zdemo/pulsar/PulsarProducer.java @@ -0,0 +1,82 @@ +package com.zdemo.pulsar; + +import com.zdemo.Event; +import com.zdemo.IProducer; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.redkale.convert.json.JsonConvert; +import org.redkale.service.Service; +import org.redkale.util.AnyValue; +import org.redkale.util.Comment; + +import javax.annotation.Resource; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; + +public class PulsarProducer implements IProducer, Service { + + @Resource(name = "property.pulsar.serviceurl") + private String serviceurl = "pulsar://127.0.0.1:6650"; + + @Comment("消息生产者") + private Map> producerMap = new HashMap(); + private PulsarClient client; + + @Override + public void init(AnyValue config) { + try { + client = PulsarClient.builder() + .serviceUrl(serviceurl) + .build(); + } catch (PulsarClientException e) { + e.printStackTrace(); + } + } + + public Producer getProducer(String topic) { + Producer producer = producerMap.get(topic); + if (producer != null) { + return producer; + } + + synchronized (this) { + if ((producer = producerMap.get(topic)) == null) { + try { + producer = client.newProducer() + .topic(topic) + .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) + .sendTimeout(10, TimeUnit.SECONDS) + .blockIfQueueFull(true) + .create(); + producerMap.put(topic, producer); + + return producer; + } catch (PulsarClientException e) { + e.printStackTrace(); + } + } + } + return producer; + } + + @Override + public void send(T t) { + try { + Producer producer = getProducer(t.topic); + + String v = JsonConvert.root().convertTo(t.value); + if (v.startsWith("\"") && v.endsWith("\"")) { + v = v.substring(1, v.length() - 1); + } + producer.newMessage() + .key("") + .value(v.getBytes()) + .send(); + } catch (Exception e) { + logger.log(Level.WARNING, "", e); + } + } +} diff --git a/test/com/zdemo/test/AppTest.java b/test/com/zdemo/test/AppTest.java index a2c52fd..8a081bc 100644 --- a/test/com/zdemo/test/AppTest.java +++ b/test/com/zdemo/test/AppTest.java @@ -2,7 +2,8 @@ package com.zdemo.test; import com.zdemo.Event; import com.zdemo.EventType; -import com.zdemo.kafak.KafakProducer; +import com.zdemo.IProducer; +import com.zdemo.pulsar.PulsarProducer; import org.junit.Test; import org.redkale.boot.Application; import org.redkale.convert.json.JsonConvert; @@ -28,17 +29,31 @@ public class AppTest { EventType.of("a1", new TypeToken() { }, r -> { System.out.println("我收到了消息 主题a1 事件:" + JsonConvert.root().convertTo(r)); - }), - - EventType.of("bx", str -> { - System.out.println("我收到了消息 主题bx 事件:" + str); }) , EventType.of("game-update", str -> { System.out.println("我收到了消息 主题game-update 事件:" + str); }) + + , EventType.of("http.req.hello", str -> { + System.out.println("我收到了消息 主题http.req.hello 事件:" + str); + }) + + , EventType.of("http.resp.node2004", str -> { + System.out.println("我收到了消息 主题http.resp.node2004 事件:" + str); + }) ); + // 10s 后加入 bx主题 + Thread.sleep(1_000 * 10); + System.out.println("加入新的主题订阅"); + consumer.addEventType( + EventType.of("bx", str -> { + System.out.println("我收到了消息 主题bx 事件:" + str); + }) + ); + + Thread.sleep(60_000 * 60); } catch (Exception e) { e.printStackTrace(); @@ -48,7 +63,7 @@ public class AppTest { @Test public void runProducer() { try { - KafakProducer producer = Application.singleton(KafakProducer.class); + IProducer producer = Application.singleton(PulsarProducer.class); // 发送不同的 事件 float v0 = 1f; @@ -60,12 +75,13 @@ public class AppTest { producer.send(Event.of("c1", v2));*/ producer.send(Event.of("game-update", 23256)); + producer.send(Event.of("bx", 23256)); - try { - Thread.sleep(10_000); + /*try { + Thread.sleep(1_000); } catch (InterruptedException e) { e.printStackTrace(); - } + }*/ } catch (Exception e) { e.printStackTrace(); } diff --git a/test/com/zdemo/test/MyConsumer.java b/test/com/zdemo/test/MyConsumer.java index 8a308e8..d9ddd84 100644 --- a/test/com/zdemo/test/MyConsumer.java +++ b/test/com/zdemo/test/MyConsumer.java @@ -1,8 +1,8 @@ package com.zdemo.test; -import com.zdemo.kafak.KafakConsumer; +import com.zdemo.pulsar.PulsarConsumer; -public class MyConsumer extends KafakConsumer { +public class MyConsumer extends PulsarConsumer { public String getGroupid() { return "group-test"; //消费组名称