This commit is contained in:
lxy 2020-08-15 01:06:35 +08:00
parent 60a5bd300f
commit 8f26b66e35
5 changed files with 12 additions and 9 deletions

View File

@ -3,8 +3,10 @@ package com.zdemo;
import org.redkale.util.TypeToken; import org.redkale.util.TypeToken;
import java.util.Collection; import java.util.Collection;
import java.util.logging.Logger;
public interface IConsumer<T extends Event> { public interface IConsumer<T extends Event> {
Logger logger = Logger.getLogger(IConsumer.class.getSimpleName());
Collection<String> getSubscribes(); Collection<String> getSubscribes();

View File

@ -1,11 +1,12 @@
package com.zdemo; package com.zdemo;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.logging.Logger;
public interface IProducer<T extends Event> { public interface IProducer<T extends Event> {
Logger logger = Logger.getLogger(IProducer.class.getSimpleName());
default CompletableFuture sendAsync(T... t) {
default CompletableFuture sendAsync(String topic, T... t) {
return CompletableFuture.runAsync(() -> send(t)); return CompletableFuture.runAsync(() -> send(t));
} }

View File

@ -18,7 +18,6 @@ import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.logging.Logger;
/** /**
* 消费 * 消费
@ -28,8 +27,6 @@ import java.util.logging.Logger;
@RestService @RestService
public abstract class KafakConsumer<T extends Event> implements IConsumer<T>, Service { public abstract class KafakConsumer<T extends Event> implements IConsumer<T>, Service {
private final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
@Resource(name = "APP_HOME") @Resource(name = "APP_HOME")
protected File APP_HOME; protected File APP_HOME;

View File

@ -15,7 +15,6 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.Properties; import java.util.Properties;
import java.util.logging.Logger;
/** /**
* 生产 * 生产
@ -24,7 +23,6 @@ import java.util.logging.Logger;
*/ */
@RestService @RestService
public class KafakProducer<T extends Event> implements IProducer<T>, Service { public class KafakProducer<T extends Event> implements IProducer<T>, Service {
private final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
private KafkaProducer<String, String> producer; private KafkaProducer<String, String> producer;
@Resource(name = "APP_HOME") @Resource(name = "APP_HOME")

View File

@ -60,8 +60,13 @@ public abstract class RedisConsumer<T extends Event> implements IConsumer<T>, Se
br.readLine(); //$n len(value) br.readLine(); //$n len(value)
String value = br.readLine(); // value String value = br.readLine(); // value
try {
T t = JsonConvert.root().convertFrom(getTypeToken().getType(), value); T t = JsonConvert.root().convertFrom(getTypeToken().getType(), value);
accept(t); accept(t);
} catch (Exception e) {
logger.warning("event fmt error :" + value);
e.printStackTrace();
}
} }
} }
} catch (Exception e) { } catch (Exception e) {