修改:kafka 订阅实现修改

This commit is contained in:
lxy
2020-09-07 09:51:01 +08:00
parent d45b39a309
commit f848c57e39
10 changed files with 201 additions and 127 deletions

View File

@@ -1,10 +1,11 @@
package com.zdemo.kafak;
import com.zdemo.Event;
import com.zdemo.AbstractConsumer;
import com.zdemo.EventType;
import com.zdemo.IConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.redkale.net.http.RestService;
import org.redkale.service.Service;
import org.redkale.util.AnyValue;
@@ -16,48 +17,86 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import static java.util.Arrays.asList;
/**
* 消费
*
* @param <T>
*/
@RestService
public abstract class KafakConsumer<T extends Event> implements IConsumer<T>, Service {
public abstract class KafakConsumer extends AbstractConsumer implements IConsumer, Service {
@Resource(name = "APP_HOME")
protected File APP_HOME;
protected Properties props;
// 0:none 1:restart -1:stop
//private int cmd = -1;
public abstract String getGroupid();
private final LinkedBlockingQueue<EventType> queue = new LinkedBlockingQueue<>();
@Override
public void addEventType(EventType... eventTypes) {
super.addEventType(eventTypes);
try {
for (EventType eventType : eventTypes) {
queue.put(eventType);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void init(AnyValue config) {
new Thread(() -> {
try (FileInputStream fis = new FileInputStream(new File(APP_HOME, "conf/kafak.properties"));) {
Properties props = new Properties();
props.load(fis);
props.put("group.id", getGroupid());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(getSubscribes());
try (FileInputStream fis = new FileInputStream(new File(APP_HOME, "conf/kafak.properties"));) {
props = new Properties();
props.load(fis);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String value = record.value();
try {
accept(value);
} catch (Exception e) {
logger.warning("event accept error :" + value);
e.printStackTrace();
if (logger.isLoggable(Level.INFO)) logger.info(getGroupid() + " consumer started!");
new Thread(() -> {
try {
props.put("group.id", getGroupid());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(asList("_"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10_000));
records.forEach(record -> {
String topic = record.topic();
long offset = record.offset();
String value = record.value();
try {
accept(topic, value);
} catch (Exception e) {
logger.warning(String.format("topic[%s] event accept error, offset=%s,value:%s", topic, offset, value));
e.printStackTrace();
}
});
// 动态新增订阅
while (!queue.isEmpty()) {
queue.clear();
consumer.unsubscribe();
consumer.subscribe(getSubscribes());
}
}
} catch (WakeupException ex) {
System.out.println("WakeupException !!!!");
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}, "thread-consumer-[" + getGroupid() + "]").start();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}

View File

@@ -50,7 +50,7 @@ public class KafakProducer<T extends Event> implements IProducer<T>, Service {
@Override
public void send(T... t) {
for (T x : t) {
producer.send(new ProducerRecord(x.getTopic(), JsonConvert.root().convertTo(x)));
producer.send(new ProducerRecord(x.topic, JsonConvert.root().convertTo(x.value)));
}
}