新增:
1、redis 发布订阅 2、发布订阅模型修改
This commit is contained in:
parent
30291fb6e7
commit
60a5bd300f
@ -3,7 +3,7 @@
|
|||||||
<application port="2001">
|
<application port="2001">
|
||||||
|
|
||||||
<resources>
|
<resources>
|
||||||
|
<properties load="redis.properties"></properties>
|
||||||
</resources>
|
</resources>
|
||||||
|
|
||||||
<server protocol="HTTP" port="80">
|
<server protocol="HTTP" port="80">
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
|
# Producer
|
||||||
bootstrap.servers=122.112.180.156:6062
|
bootstrap.servers=47.111.150.118:6062
|
||||||
acks=all
|
acks=all
|
||||||
retries=0
|
retries=0
|
||||||
batch.size=16384
|
batch.size=16384
|
||||||
@ -7,3 +7,10 @@ linger.ms=1
|
|||||||
buffer.memory=33554432
|
buffer.memory=33554432
|
||||||
key.serializer=org.apache.kafka.common.serialization.StringSerializer
|
key.serializer=org.apache.kafka.common.serialization.StringSerializer
|
||||||
value.serializer=org.apache.kafka.common.serialization.StringSerializer
|
value.serializer=org.apache.kafka.common.serialization.StringSerializer
|
||||||
|
|
||||||
|
# Consumer
|
||||||
|
enable.auto.commit=true
|
||||||
|
auto.commit.interval.ms=1000
|
||||||
|
group.id=
|
||||||
|
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
|
||||||
|
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
|
4
conf/redis.properties
Normal file
4
conf/redis.properties
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
# redis
|
||||||
|
redis.host=47.111.150.118
|
||||||
|
redis.password=*Zhong9307!
|
||||||
|
redis.port=6064
|
@ -1,4 +1,4 @@
|
|||||||
package com.zdemo.test;
|
package com.zdemo;
|
||||||
|
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.Setter;
|
import lombok.Setter;
|
||||||
@ -6,6 +6,7 @@ import lombok.Setter;
|
|||||||
@Getter
|
@Getter
|
||||||
@Setter
|
@Setter
|
||||||
public class Event<V> {
|
public class Event<V> {
|
||||||
|
private String topic;
|
||||||
private String key;
|
private String key;
|
||||||
private V value;
|
private V value;
|
||||||
}
|
}
|
@ -4,7 +4,7 @@ import org.redkale.util.TypeToken;
|
|||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
public interface IConsumer<T> {
|
public interface IConsumer<T extends Event> {
|
||||||
|
|
||||||
Collection<String> getSubscribes();
|
Collection<String> getSubscribes();
|
||||||
|
|
||||||
|
@ -2,13 +2,13 @@ package com.zdemo;
|
|||||||
|
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
public interface IProducer<T> {
|
public interface IProducer<T extends Event> {
|
||||||
|
|
||||||
|
|
||||||
default CompletableFuture sendAsync(String topic,T... t) {
|
default CompletableFuture sendAsync(String topic, T... t) {
|
||||||
return CompletableFuture.runAsync(() -> send(topic, t));
|
return CompletableFuture.runAsync(() -> send(t));
|
||||||
}
|
}
|
||||||
|
|
||||||
void send(String topic,T... t);
|
void send(T... t);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,73 +0,0 @@
|
|||||||
package com.zdemo;
|
|
||||||
|
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
|
||||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
|
||||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
|
||||||
import org.apache.kafka.common.serialization.StringSerializer;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.redkale.service.AbstractService;
|
|
||||||
|
|
||||||
import java.time.Duration;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
public class KafakService extends AbstractService {
|
|
||||||
static Properties props = new Properties();
|
|
||||||
|
|
||||||
static String kafakServices = "122.112.180.156:6062";
|
|
||||||
|
|
||||||
static {
|
|
||||||
//生产
|
|
||||||
props.put("bootstrap.servers", kafakServices);
|
|
||||||
props.put("acks", "all");
|
|
||||||
props.put("retries", 0);
|
|
||||||
props.put("batch.size", 16384);
|
|
||||||
props.put("linger.ms", 1);
|
|
||||||
props.put("buffer.memory", 33554432);
|
|
||||||
props.put("key.serializer", StringSerializer.class.getName());
|
|
||||||
props.put("value.serializer", StringSerializer.class.getName());
|
|
||||||
|
|
||||||
//消费
|
|
||||||
props.put("bootstrap.servers", kafakServices);
|
|
||||||
props.put("group.id", "test");
|
|
||||||
props.put("enable.auto.commit", "true");
|
|
||||||
props.put("auto.commit.interval.ms", "1000");
|
|
||||||
props.put("key.deserializer", StringDeserializer.class.getName());
|
|
||||||
props.put("value.deserializer", StringDeserializer.class.getName());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void pull() {
|
|
||||||
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
|
|
||||||
consumer.subscribe(Arrays.asList("foo", "bar", "t1"));
|
|
||||||
while (true) {
|
|
||||||
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
|
|
||||||
for (ConsumerRecord<String, String> record : records) {
|
|
||||||
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void push() {
|
|
||||||
send("t1", "this is a test data too");
|
|
||||||
}
|
|
||||||
|
|
||||||
public void send(String topic, String data) {
|
|
||||||
KafkaProducer<String, String> producer = new KafkaProducer(props);
|
|
||||||
for (int i = 0; i < 2; i++) {
|
|
||||||
try {
|
|
||||||
Thread.sleep(100);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
producer.send(new ProducerRecord(topic, "" + i, data));
|
|
||||||
}
|
|
||||||
producer.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
@ -1,15 +1,20 @@
|
|||||||
package com.zdemo.kafak;
|
package com.zdemo.kafak;
|
||||||
|
|
||||||
|
import com.zdemo.Event;
|
||||||
import com.zdemo.IConsumer;
|
import com.zdemo.IConsumer;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
|
||||||
import org.redkale.convert.json.JsonConvert;
|
import org.redkale.convert.json.JsonConvert;
|
||||||
import org.redkale.net.http.RestService;
|
import org.redkale.net.http.RestService;
|
||||||
import org.redkale.service.Service;
|
import org.redkale.service.Service;
|
||||||
import org.redkale.util.AnyValue;
|
import org.redkale.util.AnyValue;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
@ -21,38 +26,43 @@ import java.util.logging.Logger;
|
|||||||
* @param <T>
|
* @param <T>
|
||||||
*/
|
*/
|
||||||
@RestService
|
@RestService
|
||||||
public abstract class KafakConsumer<T> implements IConsumer<T>, Service {
|
public abstract class KafakConsumer<T extends Event> implements IConsumer<T>, Service {
|
||||||
|
|
||||||
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
|
private final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
|
||||||
|
|
||||||
private String kafakServices = "122.112.180.156:6062";
|
@Resource(name = "APP_HOME")
|
||||||
private KafkaConsumer<String, String> consumer;
|
protected File APP_HOME;
|
||||||
|
|
||||||
|
public abstract String getGroupid();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(AnyValue config) {
|
public void init(AnyValue config) {
|
||||||
CompletableFuture.runAsync(() -> {
|
CompletableFuture.runAsync(() -> {
|
||||||
Properties props = new Properties();
|
try (FileInputStream fis = new FileInputStream(new File(APP_HOME, "conf/kafak.properties"));) {
|
||||||
props.put("bootstrap.servers", kafakServices);
|
Properties props = new Properties();
|
||||||
props.put("group.id", "test");
|
props.load(fis);
|
||||||
props.put("enable.auto.commit", "true");
|
props.put("group.id", getGroupid());
|
||||||
props.put("auto.commit.interval.ms", "1000");
|
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
|
||||||
props.put("key.deserializer", StringDeserializer.class.getName());
|
consumer.subscribe(getSubscribes());
|
||||||
props.put("value.deserializer", StringDeserializer.class.getName());
|
|
||||||
|
|
||||||
consumer = new KafkaConsumer<>(props);
|
while (true) {
|
||||||
consumer.subscribe(getSubscribes());
|
|
||||||
while (true) {
|
|
||||||
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
|
|
||||||
for (ConsumerRecord<String, String> record : records) {
|
|
||||||
try {
|
|
||||||
logger.finest(String.format("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()));
|
|
||||||
|
|
||||||
T t = JsonConvert.root().convertFrom(getTypeToken().getType(), record.value());
|
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
|
||||||
accept(t);
|
for (ConsumerRecord<String, String> record : records) {
|
||||||
} catch (Exception e) {
|
try {
|
||||||
e.printStackTrace();
|
logger.finest(String.format("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()));
|
||||||
|
T t = JsonConvert.root().convertFrom(getTypeToken().getType(), record.value());
|
||||||
|
accept(t);
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package com.zdemo.kafak;
|
package com.zdemo.kafak;
|
||||||
|
|
||||||
|
import com.zdemo.Event;
|
||||||
import com.zdemo.IProducer;
|
import com.zdemo.IProducer;
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
@ -8,7 +9,13 @@ import org.redkale.net.http.RestService;
|
|||||||
import org.redkale.service.Service;
|
import org.redkale.service.Service;
|
||||||
import org.redkale.util.AnyValue;
|
import org.redkale.util.AnyValue;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 生产
|
* 生产
|
||||||
@ -16,29 +23,31 @@ import java.util.Properties;
|
|||||||
* @param <T>
|
* @param <T>
|
||||||
*/
|
*/
|
||||||
@RestService
|
@RestService
|
||||||
public class KafakProducer<T> implements IProducer<T>, Service {
|
public class KafakProducer<T extends Event> implements IProducer<T>, Service {
|
||||||
|
private final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
|
||||||
private String kafakServers = "122.112.180.156:6062";
|
|
||||||
private KafkaProducer<String, String> producer;
|
private KafkaProducer<String, String> producer;
|
||||||
|
|
||||||
|
@Resource(name = "APP_HOME")
|
||||||
|
protected File APP_HOME;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(AnyValue config) {
|
public void init(AnyValue config) {
|
||||||
Properties props = new Properties();
|
try (FileInputStream fis = new FileInputStream(new File(APP_HOME, "conf/kafak.properties"));) {
|
||||||
props.put("bootstrap.servers", kafakServers);
|
Properties props = new Properties();
|
||||||
props.put("acks", "all");
|
props.load(fis);
|
||||||
props.put("retries", 0);
|
producer = new KafkaProducer(props);
|
||||||
props.put("batch.size", 16384);
|
} catch (FileNotFoundException e) {
|
||||||
props.put("linger.ms", 1);
|
e.printStackTrace();
|
||||||
props.put("buffer.memory", 33554432);
|
} catch (IOException e) {
|
||||||
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
e.printStackTrace();
|
||||||
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
}
|
||||||
producer = new KafkaProducer(props);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void send(String topic, T... t) {
|
public void send(T... t) {
|
||||||
for (T t1 : t) {
|
for (T x : t) {
|
||||||
producer.send(new ProducerRecord(topic, JsonConvert.root().convertTo(t1)));
|
logger.finest("send message: " + JsonConvert.root().convertTo(x));
|
||||||
|
producer.send(new ProducerRecord(x.getTopic(), JsonConvert.root().convertTo(x)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
71
src/com/zdemo/redis/RedisConsumer.java
Normal file
71
src/com/zdemo/redis/RedisConsumer.java
Normal file
@ -0,0 +1,71 @@
|
|||||||
|
package com.zdemo.redis;
|
||||||
|
|
||||||
|
import com.zdemo.Event;
|
||||||
|
import com.zdemo.IConsumer;
|
||||||
|
import org.redkale.convert.json.JsonConvert;
|
||||||
|
import org.redkale.service.Service;
|
||||||
|
import org.redkale.util.AnyValue;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.io.OutputStreamWriter;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.Socket;
|
||||||
|
|
||||||
|
public abstract class RedisConsumer<T extends Event> implements IConsumer<T>, Service {
|
||||||
|
|
||||||
|
@Resource(name = "property.redis.host")
|
||||||
|
private String host = "127.0.0.1";
|
||||||
|
@Resource(name = "property.redis.password")
|
||||||
|
private String password = "";
|
||||||
|
@Resource(name = "property.redis.port")
|
||||||
|
private int port = 6379;
|
||||||
|
|
||||||
|
public String getGroupid() {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(AnyValue config) {
|
||||||
|
try {
|
||||||
|
Socket client = new Socket();
|
||||||
|
client.connect(new InetSocketAddress(host, port));
|
||||||
|
client.setKeepAlive(true);
|
||||||
|
|
||||||
|
OutputStreamWriter oswSub = new OutputStreamWriter(client.getOutputStream());
|
||||||
|
oswSub.write("AUTH " + password + "\r\n");
|
||||||
|
oswSub.flush();
|
||||||
|
|
||||||
|
StringBuffer buf = new StringBuffer("SUBSCRIBE");
|
||||||
|
for (String topic : getSubscribes()) {
|
||||||
|
buf.append(" ").append(topic);
|
||||||
|
}
|
||||||
|
buf.append(" _ping\r\n");
|
||||||
|
oswSub.write(buf.toString());
|
||||||
|
oswSub.flush();
|
||||||
|
|
||||||
|
BufferedReader br = new BufferedReader(new InputStreamReader(client.getInputStream()));
|
||||||
|
String type = "";
|
||||||
|
String readLine;
|
||||||
|
while ((readLine = br.readLine()) != null) {
|
||||||
|
if ("*3".equals(readLine)) {
|
||||||
|
br.readLine(); // $7 len()
|
||||||
|
type = br.readLine(); // message
|
||||||
|
if (!"message".equals(type)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
br.readLine(); //$n len(key)
|
||||||
|
String topic = br.readLine(); // topic
|
||||||
|
|
||||||
|
br.readLine(); //$n len(value)
|
||||||
|
String value = br.readLine(); // value
|
||||||
|
T t = JsonConvert.root().convertFrom(getTypeToken().getType(), value);
|
||||||
|
accept(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
52
src/com/zdemo/redis/RedisProducer.java
Normal file
52
src/com/zdemo/redis/RedisProducer.java
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
package com.zdemo.redis;
|
||||||
|
|
||||||
|
import com.zdemo.Event;
|
||||||
|
import com.zdemo.IProducer;
|
||||||
|
import org.redkale.convert.json.JsonConvert;
|
||||||
|
import org.redkale.service.Service;
|
||||||
|
import org.redkale.util.AnyValue;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStreamWriter;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.Socket;
|
||||||
|
|
||||||
|
public class RedisProducer<T extends Event> implements IProducer<T>, Service {
|
||||||
|
|
||||||
|
@Resource(name = "property.redis.host")
|
||||||
|
private String host = "127.0.0.1";
|
||||||
|
@Resource(name = "property.redis.password")
|
||||||
|
private String password = "";
|
||||||
|
@Resource(name = "property.redis.port")
|
||||||
|
private int port = 6379;
|
||||||
|
|
||||||
|
private OutputStreamWriter oswPub;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(AnyValue config) {
|
||||||
|
try {
|
||||||
|
Socket client = new Socket();
|
||||||
|
client.connect(new InetSocketAddress(host, port));
|
||||||
|
client.setKeepAlive(true);
|
||||||
|
|
||||||
|
oswPub = new OutputStreamWriter(client.getOutputStream());
|
||||||
|
oswPub.write("AUTH " + password + "\r\n");
|
||||||
|
oswPub.flush();
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void send(T... t) {
|
||||||
|
for (T x : t) {
|
||||||
|
try {
|
||||||
|
oswPub.write("PUBLISH " + x.getTopic() + " '" + JsonConvert.root().convertTo(x) + "' \r\n");
|
||||||
|
oswPub.flush();
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
50
test/com/zdemo/test/AppTest.java
Normal file
50
test/com/zdemo/test/AppTest.java
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
package com.zdemo.test;
|
||||||
|
|
||||||
|
import com.zdemo.Event;
|
||||||
|
import com.zdemo.redis.RedisProducer;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.redkale.boot.Application;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 消息发布订阅测试
|
||||||
|
*/
|
||||||
|
public class AppTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void runConsumer() {
|
||||||
|
try {
|
||||||
|
// 启动并开启消费监听
|
||||||
|
Application.singleton(MyConsumer.class);
|
||||||
|
|
||||||
|
try {
|
||||||
|
Thread.sleep(15_000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void runProducer() {
|
||||||
|
try {
|
||||||
|
RedisProducer producer = Application.singleton(RedisProducer.class);
|
||||||
|
|
||||||
|
Event<Integer> event = new Event<>();
|
||||||
|
event.setTopic("c");
|
||||||
|
event.setKey("abx");
|
||||||
|
event.setValue(2314);
|
||||||
|
|
||||||
|
producer.send(event);
|
||||||
|
|
||||||
|
try {
|
||||||
|
Thread.sleep(1_000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,18 +1,22 @@
|
|||||||
package com.zdemo.test;
|
package com.zdemo.test;
|
||||||
|
|
||||||
import com.zdemo.kafak.KafakConsumer;
|
import com.zdemo.Event;
|
||||||
import com.zdemo.kafak.KafakProducer;
|
import com.zdemo.redis.RedisConsumer;
|
||||||
import org.junit.Test;
|
import org.redkale.convert.json.JsonConvert;
|
||||||
import org.redkale.util.TypeToken;
|
import org.redkale.util.TypeToken;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
public class MyConsumer extends KafakConsumer<Event<Integer>> {
|
public class MyConsumer extends RedisConsumer<Event<Integer>> {
|
||||||
|
|
||||||
|
public String getGroupid() {
|
||||||
|
return "group-test"; //quest、user、im、live
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Collection<String> getSubscribes() {
|
public Collection<String> getSubscribes() {
|
||||||
return List.of("a");
|
return List.of("a", "b", "c");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -23,32 +27,11 @@ public class MyConsumer extends KafakConsumer<Event<Integer>> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void accept(Event<Integer> event) {
|
public void accept(Event<Integer> event) {
|
||||||
System.out.println("我收到了消息 key:" + event.getKey() + " value:" + event.getValue());
|
switch (event.getTopic()) {
|
||||||
}
|
case "a" -> System.out.println("我收到了消息 主题A 事件:" + JsonConvert.root().convertTo(event));
|
||||||
|
case "b" -> System.out.println("我收到了消息 主题B 事件:" + JsonConvert.root().convertTo(event));
|
||||||
@Test
|
case "c" -> System.out.println("我收到了消息 主题C 事件:" + JsonConvert.root().convertTo(event));
|
||||||
public void run() {
|
|
||||||
MyConsumer consumer = new MyConsumer();
|
|
||||||
consumer.init(null);
|
|
||||||
|
|
||||||
try {
|
|
||||||
Thread.sleep(15_000);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void runProducer() {
|
|
||||||
KafakProducer<Event> producer = new KafakProducer();
|
|
||||||
producer.init(null);
|
|
||||||
|
|
||||||
Event<Integer> event = new Event<>();
|
|
||||||
event.setKey("XXX");
|
|
||||||
event.setValue(2314);
|
|
||||||
|
|
||||||
producer.send("a", event);
|
|
||||||
|
|
||||||
producer.destroy(null);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user