diff --git a/conf/application.xml b/conf/application.xml index ba023bc..3c2b462 100644 --- a/conf/application.xml +++ b/conf/application.xml @@ -3,7 +3,7 @@ - + diff --git a/conf/kafak.properties b/conf/kafak.properties index 0afae2f..dcb1935 100644 --- a/conf/kafak.properties +++ b/conf/kafak.properties @@ -1,9 +1,16 @@ - -bootstrap.servers=122.112.180.156:6062 +# Producer +bootstrap.servers=47.111.150.118:6062 acks=all retries=0 batch.size=16384 linger.ms=1 buffer.memory=33554432 key.serializer=org.apache.kafka.common.serialization.StringSerializer -value.serializer=org.apache.kafka.common.serialization.StringSerializer \ No newline at end of file +value.serializer=org.apache.kafka.common.serialization.StringSerializer + +# Consumer +enable.auto.commit=true +auto.commit.interval.ms=1000 +group.id= +key.deserializer=org.apache.kafka.common.serialization.StringDeserializer +value.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ No newline at end of file diff --git a/conf/redis.properties b/conf/redis.properties new file mode 100644 index 0000000..c1775de --- /dev/null +++ b/conf/redis.properties @@ -0,0 +1,4 @@ +# redis +redis.host=47.111.150.118 +redis.password=*Zhong9307! +redis.port=6064 \ No newline at end of file diff --git a/test/com/zdemo/test/Event.java b/src/com/zdemo/Event.java similarity index 74% rename from test/com/zdemo/test/Event.java rename to src/com/zdemo/Event.java index e597ee9..7484b75 100644 --- a/test/com/zdemo/test/Event.java +++ b/src/com/zdemo/Event.java @@ -1,4 +1,4 @@ -package com.zdemo.test; +package com.zdemo; import lombok.Getter; import lombok.Setter; @@ -6,6 +6,7 @@ import lombok.Setter; @Getter @Setter public class Event { + private String topic; private String key; private V value; } diff --git a/src/com/zdemo/IConsumer.java b/src/com/zdemo/IConsumer.java index 4cb2790..20225f1 100644 --- a/src/com/zdemo/IConsumer.java +++ b/src/com/zdemo/IConsumer.java @@ -4,7 +4,7 @@ import org.redkale.util.TypeToken; import java.util.Collection; -public interface IConsumer { +public interface IConsumer { Collection getSubscribes(); diff --git a/src/com/zdemo/IProducer.java b/src/com/zdemo/IProducer.java index 4aaa215..e088548 100644 --- a/src/com/zdemo/IProducer.java +++ b/src/com/zdemo/IProducer.java @@ -2,13 +2,13 @@ package com.zdemo; import java.util.concurrent.CompletableFuture; -public interface IProducer { +public interface IProducer { - default CompletableFuture sendAsync(String topic,T... t) { - return CompletableFuture.runAsync(() -> send(topic, t)); + default CompletableFuture sendAsync(String topic, T... t) { + return CompletableFuture.runAsync(() -> send(t)); } - void send(String topic,T... t); + void send(T... t); } diff --git a/src/com/zdemo/KafakService.java b/src/com/zdemo/KafakService.java deleted file mode 100644 index bc91837..0000000 --- a/src/com/zdemo/KafakService.java +++ /dev/null @@ -1,73 +0,0 @@ -package com.zdemo; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.junit.Test; -import org.redkale.service.AbstractService; - -import java.time.Duration; -import java.util.Arrays; -import java.util.Properties; - -public class KafakService extends AbstractService { - static Properties props = new Properties(); - - static String kafakServices = "122.112.180.156:6062"; - - static { - //生产 - props.put("bootstrap.servers", kafakServices); - props.put("acks", "all"); - props.put("retries", 0); - props.put("batch.size", 16384); - props.put("linger.ms", 1); - props.put("buffer.memory", 33554432); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); - - //消费 - props.put("bootstrap.servers", kafakServices); - props.put("group.id", "test"); - props.put("enable.auto.commit", "true"); - props.put("auto.commit.interval.ms", "1000"); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - } - - @Test - public void pull() { - KafkaConsumer consumer = new KafkaConsumer<>(props); - consumer.subscribe(Arrays.asList("foo", "bar", "t1")); - while (true) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); - } - } - } - - @Test - public void push() { - send("t1", "this is a test data too"); - } - - public void send(String topic, String data) { - KafkaProducer producer = new KafkaProducer(props); - for (int i = 0; i < 2; i++) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - e.printStackTrace(); - } - producer.send(new ProducerRecord(topic, "" + i, data)); - } - producer.close(); - } - - -} diff --git a/src/com/zdemo/kafak/KafakConsumer.java b/src/com/zdemo/kafak/KafakConsumer.java index 8ad8409..52a5a17 100644 --- a/src/com/zdemo/kafak/KafakConsumer.java +++ b/src/com/zdemo/kafak/KafakConsumer.java @@ -1,15 +1,20 @@ package com.zdemo.kafak; +import com.zdemo.Event; import com.zdemo.IConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.StringDeserializer; import org.redkale.convert.json.JsonConvert; import org.redkale.net.http.RestService; import org.redkale.service.Service; import org.redkale.util.AnyValue; +import javax.annotation.Resource; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; import java.time.Duration; import java.util.Properties; import java.util.concurrent.CompletableFuture; @@ -21,38 +26,43 @@ import java.util.logging.Logger; * @param */ @RestService -public abstract class KafakConsumer implements IConsumer, Service { +public abstract class KafakConsumer implements IConsumer, Service { - protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); + private final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); - private String kafakServices = "122.112.180.156:6062"; - private KafkaConsumer consumer; + @Resource(name = "APP_HOME") + protected File APP_HOME; + + public abstract String getGroupid(); @Override public void init(AnyValue config) { CompletableFuture.runAsync(() -> { - Properties props = new Properties(); - props.put("bootstrap.servers", kafakServices); - props.put("group.id", "test"); - props.put("enable.auto.commit", "true"); - props.put("auto.commit.interval.ms", "1000"); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); + try (FileInputStream fis = new FileInputStream(new File(APP_HOME, "conf/kafak.properties"));) { + Properties props = new Properties(); + props.load(fis); + props.put("group.id", getGroupid()); + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(getSubscribes()); - consumer = new KafkaConsumer<>(props); - consumer.subscribe(getSubscribes()); - while (true) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - try { - logger.finest(String.format("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value())); + while (true) { - T t = JsonConvert.root().convertFrom(getTypeToken().getType(), record.value()); - accept(t); - } catch (Exception e) { - e.printStackTrace(); + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + try { + logger.finest(String.format("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value())); + T t = JsonConvert.root().convertFrom(getTypeToken().getType(), record.value()); + accept(t); + } catch (Exception e) { + e.printStackTrace(); + } } } + + } catch (FileNotFoundException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); } }); } diff --git a/src/com/zdemo/kafak/KafakProducer.java b/src/com/zdemo/kafak/KafakProducer.java index b77e847..7cddefb 100644 --- a/src/com/zdemo/kafak/KafakProducer.java +++ b/src/com/zdemo/kafak/KafakProducer.java @@ -1,5 +1,6 @@ package com.zdemo.kafak; +import com.zdemo.Event; import com.zdemo.IProducer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -8,7 +9,13 @@ import org.redkale.net.http.RestService; import org.redkale.service.Service; import org.redkale.util.AnyValue; +import javax.annotation.Resource; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; import java.util.Properties; +import java.util.logging.Logger; /** * 生产 @@ -16,29 +23,31 @@ import java.util.Properties; * @param */ @RestService -public class KafakProducer implements IProducer, Service { - - private String kafakServers = "122.112.180.156:6062"; +public class KafakProducer implements IProducer, Service { + private final Logger logger = Logger.getLogger(this.getClass().getSimpleName()); private KafkaProducer producer; + @Resource(name = "APP_HOME") + protected File APP_HOME; + @Override public void init(AnyValue config) { - Properties props = new Properties(); - props.put("bootstrap.servers", kafakServers); - props.put("acks", "all"); - props.put("retries", 0); - props.put("batch.size", 16384); - props.put("linger.ms", 1); - props.put("buffer.memory", 33554432); - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - producer = new KafkaProducer(props); + try (FileInputStream fis = new FileInputStream(new File(APP_HOME, "conf/kafak.properties"));) { + Properties props = new Properties(); + props.load(fis); + producer = new KafkaProducer(props); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } } @Override - public void send(String topic, T... t) { - for (T t1 : t) { - producer.send(new ProducerRecord(topic, JsonConvert.root().convertTo(t1))); + public void send(T... t) { + for (T x : t) { + logger.finest("send message: " + JsonConvert.root().convertTo(x)); + producer.send(new ProducerRecord(x.getTopic(), JsonConvert.root().convertTo(x))); } } diff --git a/src/com/zdemo/redis/RedisConsumer.java b/src/com/zdemo/redis/RedisConsumer.java new file mode 100644 index 0000000..65d1792 --- /dev/null +++ b/src/com/zdemo/redis/RedisConsumer.java @@ -0,0 +1,71 @@ +package com.zdemo.redis; + +import com.zdemo.Event; +import com.zdemo.IConsumer; +import org.redkale.convert.json.JsonConvert; +import org.redkale.service.Service; +import org.redkale.util.AnyValue; + +import javax.annotation.Resource; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.net.InetSocketAddress; +import java.net.Socket; + +public abstract class RedisConsumer implements IConsumer, Service { + + @Resource(name = "property.redis.host") + private String host = "127.0.0.1"; + @Resource(name = "property.redis.password") + private String password = ""; + @Resource(name = "property.redis.port") + private int port = 6379; + + public String getGroupid() { + return ""; + } + + @Override + public void init(AnyValue config) { + try { + Socket client = new Socket(); + client.connect(new InetSocketAddress(host, port)); + client.setKeepAlive(true); + + OutputStreamWriter oswSub = new OutputStreamWriter(client.getOutputStream()); + oswSub.write("AUTH " + password + "\r\n"); + oswSub.flush(); + + StringBuffer buf = new StringBuffer("SUBSCRIBE"); + for (String topic : getSubscribes()) { + buf.append(" ").append(topic); + } + buf.append(" _ping\r\n"); + oswSub.write(buf.toString()); + oswSub.flush(); + + BufferedReader br = new BufferedReader(new InputStreamReader(client.getInputStream())); + String type = ""; + String readLine; + while ((readLine = br.readLine()) != null) { + if ("*3".equals(readLine)) { + br.readLine(); // $7 len() + type = br.readLine(); // message + if (!"message".equals(type)) { + continue; + } + br.readLine(); //$n len(key) + String topic = br.readLine(); // topic + + br.readLine(); //$n len(value) + String value = br.readLine(); // value + T t = JsonConvert.root().convertFrom(getTypeToken().getType(), value); + accept(t); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/src/com/zdemo/redis/RedisProducer.java b/src/com/zdemo/redis/RedisProducer.java new file mode 100644 index 0000000..805cc05 --- /dev/null +++ b/src/com/zdemo/redis/RedisProducer.java @@ -0,0 +1,52 @@ +package com.zdemo.redis; + +import com.zdemo.Event; +import com.zdemo.IProducer; +import org.redkale.convert.json.JsonConvert; +import org.redkale.service.Service; +import org.redkale.util.AnyValue; + +import javax.annotation.Resource; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.net.InetSocketAddress; +import java.net.Socket; + +public class RedisProducer implements IProducer, Service { + + @Resource(name = "property.redis.host") + private String host = "127.0.0.1"; + @Resource(name = "property.redis.password") + private String password = ""; + @Resource(name = "property.redis.port") + private int port = 6379; + + private OutputStreamWriter oswPub; + + @Override + public void init(AnyValue config) { + try { + Socket client = new Socket(); + client.connect(new InetSocketAddress(host, port)); + client.setKeepAlive(true); + + oswPub = new OutputStreamWriter(client.getOutputStream()); + oswPub.write("AUTH " + password + "\r\n"); + oswPub.flush(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void send(T... t) { + for (T x : t) { + try { + oswPub.write("PUBLISH " + x.getTopic() + " '" + JsonConvert.root().convertTo(x) + "' \r\n"); + oswPub.flush(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } +} diff --git a/test/com/zdemo/test/AppTest.java b/test/com/zdemo/test/AppTest.java new file mode 100644 index 0000000..a120bdc --- /dev/null +++ b/test/com/zdemo/test/AppTest.java @@ -0,0 +1,50 @@ +package com.zdemo.test; + +import com.zdemo.Event; +import com.zdemo.redis.RedisProducer; +import org.junit.Test; +import org.redkale.boot.Application; + +/** + * 消息发布订阅测试 + */ +public class AppTest { + + @Test + public void runConsumer() { + try { + // 启动并开启消费监听 + Application.singleton(MyConsumer.class); + + try { + Thread.sleep(15_000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void runProducer() { + try { + RedisProducer producer = Application.singleton(RedisProducer.class); + + Event event = new Event<>(); + event.setTopic("c"); + event.setKey("abx"); + event.setValue(2314); + + producer.send(event); + + try { + Thread.sleep(1_000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/test/com/zdemo/test/MyConsumer.java b/test/com/zdemo/test/MyConsumer.java index 44e20d4..769535a 100644 --- a/test/com/zdemo/test/MyConsumer.java +++ b/test/com/zdemo/test/MyConsumer.java @@ -1,18 +1,22 @@ package com.zdemo.test; -import com.zdemo.kafak.KafakConsumer; -import com.zdemo.kafak.KafakProducer; -import org.junit.Test; +import com.zdemo.Event; +import com.zdemo.redis.RedisConsumer; +import org.redkale.convert.json.JsonConvert; import org.redkale.util.TypeToken; import java.util.Collection; import java.util.List; -public class MyConsumer extends KafakConsumer> { +public class MyConsumer extends RedisConsumer> { + + public String getGroupid() { + return "group-test"; //quest、user、im、live + } @Override public Collection getSubscribes() { - return List.of("a"); + return List.of("a", "b", "c"); } @Override @@ -23,32 +27,11 @@ public class MyConsumer extends KafakConsumer> { @Override public void accept(Event event) { - System.out.println("我收到了消息 key:" + event.getKey() + " value:" + event.getValue()); - } - - @Test - public void run() { - MyConsumer consumer = new MyConsumer(); - consumer.init(null); - - try { - Thread.sleep(15_000); - } catch (InterruptedException e) { - e.printStackTrace(); + switch (event.getTopic()) { + case "a" -> System.out.println("我收到了消息 主题A 事件:" + JsonConvert.root().convertTo(event)); + case "b" -> System.out.println("我收到了消息 主题B 事件:" + JsonConvert.root().convertTo(event)); + case "c" -> System.out.println("我收到了消息 主题C 事件:" + JsonConvert.root().convertTo(event)); } - } - @Test - public void runProducer() { - KafakProducer producer = new KafakProducer(); - producer.init(null); - - Event event = new Event<>(); - event.setKey("XXX"); - event.setValue(2314); - - producer.send("a", event); - - producer.destroy(null); } }