.
This commit is contained in:
14
src/com/zdemo/IConsumer.java
Normal file
14
src/com/zdemo/IConsumer.java
Normal file
@@ -0,0 +1,14 @@
|
||||
package com.zdemo;
|
||||
|
||||
import org.redkale.util.TypeToken;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
public interface IConsumer<T> {
|
||||
|
||||
Collection<String> getSubscribes();
|
||||
|
||||
TypeToken<T> getTypeToken();
|
||||
|
||||
void accept(T t);
|
||||
}
|
14
src/com/zdemo/IProducer.java
Normal file
14
src/com/zdemo/IProducer.java
Normal file
@@ -0,0 +1,14 @@
|
||||
package com.zdemo;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
public interface IProducer<T> {
|
||||
|
||||
|
||||
default CompletableFuture sendAsync(String topic,T... t) {
|
||||
return CompletableFuture.runAsync(() -> send(topic, t));
|
||||
}
|
||||
|
||||
void send(String topic,T... t);
|
||||
|
||||
}
|
73
src/com/zdemo/KafakService.java
Normal file
73
src/com/zdemo/KafakService.java
Normal file
@@ -0,0 +1,73 @@
|
||||
package com.zdemo;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.junit.Test;
|
||||
import org.redkale.service.AbstractService;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.Properties;
|
||||
|
||||
public class KafakService extends AbstractService {
|
||||
static Properties props = new Properties();
|
||||
|
||||
static String kafakServices = "122.112.180.156:6062";
|
||||
|
||||
static {
|
||||
//生产
|
||||
props.put("bootstrap.servers", kafakServices);
|
||||
props.put("acks", "all");
|
||||
props.put("retries", 0);
|
||||
props.put("batch.size", 16384);
|
||||
props.put("linger.ms", 1);
|
||||
props.put("buffer.memory", 33554432);
|
||||
props.put("key.serializer", StringSerializer.class.getName());
|
||||
props.put("value.serializer", StringSerializer.class.getName());
|
||||
|
||||
//消费
|
||||
props.put("bootstrap.servers", kafakServices);
|
||||
props.put("group.id", "test");
|
||||
props.put("enable.auto.commit", "true");
|
||||
props.put("auto.commit.interval.ms", "1000");
|
||||
props.put("key.deserializer", StringDeserializer.class.getName());
|
||||
props.put("value.deserializer", StringDeserializer.class.getName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void pull() {
|
||||
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
|
||||
consumer.subscribe(Arrays.asList("foo", "bar", "t1"));
|
||||
while (true) {
|
||||
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
|
||||
for (ConsumerRecord<String, String> record : records) {
|
||||
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void push() {
|
||||
send("t1", "this is a test data too");
|
||||
}
|
||||
|
||||
public void send(String topic, String data) {
|
||||
KafkaProducer<String, String> producer = new KafkaProducer(props);
|
||||
for (int i = 0; i < 2; i++) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
producer.send(new ProducerRecord(topic, "" + i, data));
|
||||
}
|
||||
producer.close();
|
||||
}
|
||||
|
||||
|
||||
}
|
59
src/com/zdemo/kafak/KafakConsumer.java
Normal file
59
src/com/zdemo/kafak/KafakConsumer.java
Normal file
@@ -0,0 +1,59 @@
|
||||
package com.zdemo.kafak;
|
||||
|
||||
import com.zdemo.IConsumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.redkale.convert.json.JsonConvert;
|
||||
import org.redkale.net.http.RestService;
|
||||
import org.redkale.service.Service;
|
||||
import org.redkale.util.AnyValue;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
/**
|
||||
* 消费
|
||||
*
|
||||
* @param <T>
|
||||
*/
|
||||
@RestService
|
||||
public abstract class KafakConsumer<T> implements IConsumer<T>, Service {
|
||||
|
||||
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
|
||||
|
||||
private String kafakServices = "122.112.180.156:6062";
|
||||
private KafkaConsumer<String, String> consumer;
|
||||
|
||||
@Override
|
||||
public void init(AnyValue config) {
|
||||
CompletableFuture.runAsync(() -> {
|
||||
Properties props = new Properties();
|
||||
props.put("bootstrap.servers", kafakServices);
|
||||
props.put("group.id", "test");
|
||||
props.put("enable.auto.commit", "true");
|
||||
props.put("auto.commit.interval.ms", "1000");
|
||||
props.put("key.deserializer", StringDeserializer.class.getName());
|
||||
props.put("value.deserializer", StringDeserializer.class.getName());
|
||||
|
||||
consumer = new KafkaConsumer<>(props);
|
||||
consumer.subscribe(getSubscribes());
|
||||
while (true) {
|
||||
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
|
||||
for (ConsumerRecord<String, String> record : records) {
|
||||
try {
|
||||
logger.finest(String.format("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()));
|
||||
|
||||
T t = JsonConvert.root().convertFrom(getTypeToken().getType(), record.value());
|
||||
accept(t);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
49
src/com/zdemo/kafak/KafakProducer.java
Normal file
49
src/com/zdemo/kafak/KafakProducer.java
Normal file
@@ -0,0 +1,49 @@
|
||||
package com.zdemo.kafak;
|
||||
|
||||
import com.zdemo.IProducer;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.redkale.convert.json.JsonConvert;
|
||||
import org.redkale.net.http.RestService;
|
||||
import org.redkale.service.Service;
|
||||
import org.redkale.util.AnyValue;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* 生产
|
||||
*
|
||||
* @param <T>
|
||||
*/
|
||||
@RestService
|
||||
public class KafakProducer<T> implements IProducer<T>, Service {
|
||||
|
||||
private String kafakServers = "122.112.180.156:6062";
|
||||
private KafkaProducer<String, String> producer;
|
||||
|
||||
@Override
|
||||
public void init(AnyValue config) {
|
||||
Properties props = new Properties();
|
||||
props.put("bootstrap.servers", kafakServers);
|
||||
props.put("acks", "all");
|
||||
props.put("retries", 0);
|
||||
props.put("batch.size", 16384);
|
||||
props.put("linger.ms", 1);
|
||||
props.put("buffer.memory", 33554432);
|
||||
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||
producer = new KafkaProducer(props);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(String topic, T... t) {
|
||||
for (T t1 : t) {
|
||||
producer.send(new ProducerRecord(topic, JsonConvert.root().convertTo(t1)));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy(AnyValue config) {
|
||||
producer.close();
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user