新增:zdb 客户端程序实现,其他修改

This commit is contained in:
lxy 2021-01-08 15:37:18 +08:00
parent ddebb8c7d6
commit d201cd3917
7 changed files with 267 additions and 13 deletions

View File

@ -5,3 +5,7 @@ redis.port=6064
# pulsar
pulsar.serviceurl=pulsar://47.113.228.247:6650
# zdb
zdb.host = 127.0.0.1
zdb.port = 1216

View File

@ -5,6 +5,7 @@ import org.redkale.convert.json.JsonConvert;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;
/**
* @author Liang
@ -12,6 +13,8 @@ import java.util.Set;
*/
public abstract class AbstractConsumer implements IConsumer {
public Logger logger = Logger.getLogger(this.getClass().getSimpleName());
public final Map<String, EventType> eventMap = new HashMap<>();
public abstract String getGroupid();

View File

@ -3,16 +3,13 @@ package com.zdemo;
import org.redkale.util.TypeToken;
import java.util.Collection;
import java.util.logging.Logger;
public interface IConsumer<T extends Event> {
public interface IConsumer {
TypeToken<String> TYPE_TOKEN_STRING = new TypeToken<String>() {
};
TypeToken<Integer> TYPE_TOKEN_INT = new TypeToken<Integer>() {
};
Logger logger = Logger.getLogger(IConsumer.class.getSimpleName());
Collection<String> getTopics();
void addEventType(EventType... eventType);
@ -21,6 +18,7 @@ public interface IConsumer<T extends Event> {
/**
* 取消订阅
*
* @param topic
*/
void unsubscribe(String topic);

View File

@ -42,7 +42,7 @@ public abstract class KafakConsumer extends AbstractConsumer implements IConsume
super.addEventType(eventTypes);
// 增加变更标记
queue.add(() -> logger.info("KafakConsumer starting..."));
queue.add(() -> logger.info("KafakConsumer add new topic!"));
}
@Override

View File

@ -15,7 +15,7 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.logging.Level;
public abstract class RedisConsumer extends AbstractConsumer implements IConsumer, Service {
public class RedisConsumer extends AbstractConsumer implements IConsumer, Service {
@Resource(name = "property.redis.host")
private String host = "127.0.0.1";
@ -81,6 +81,11 @@ public abstract class RedisConsumer extends AbstractConsumer implements IConsume
}).start();
}
@Override
public String getGroupid() {
return null;
}
@Override
public void addEventType(EventType... eventType) {
for (EventType type : eventType) {

View File

@ -4,12 +4,16 @@ import com.zdemo.Event;
import com.zdemo.EventType;
import com.zdemo.IConsumer;
import com.zdemo.IProducer;
import com.zdemo.kafak.KafakProducer;
import com.zdemo.zdb.ZdbProducer;
import org.junit.Test;
import org.redkale.boot.Application;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static java.util.Arrays.asList;
@ -44,7 +48,7 @@ public class AppTest {
@Test
public void runProducer() {
try {
IProducer producer = Application.singleton(KafakProducer.class);
IProducer producer = Application.singleton(ZdbProducer.class);
// 发送不同的 事件
float v0 = 1f;
@ -57,8 +61,8 @@ public class AppTest {
/*producer.send(Event.of("game-update", 23256));
producer.send(Event.of("bx", 23256));*/
for (int i = 0; i < 5; i++) {
producer.send(Event.of("a", i + ""));
for (int i = 0; i < 10_0000; i++) {
producer.send(Event.of("a-1", i + ""));
}
try {
@ -70,4 +74,244 @@ public class AppTest {
e.printStackTrace();
}
}
private static LinkedBlockingQueue<String> queue = new LinkedBlockingQueue();
@Test
public void t() {
List<String> list = new ArrayList<>();
list.toArray(String[]::new);
new Thread(() -> {
while (true) {
System.out.println("accept:");
String peek = null;
try {
System.out.println(!queue.isEmpty());
peek = queue.poll(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(peek);
}
}).start();
for (int i = 0; i < 10; i++) {
try {
queue.put(i + "");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
System.out.println("---");
Thread.sleep(1000 * 5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void xx() {
Function<String, String> fun = x -> {
return x.toUpperCase();
};
System.out.println(fun.toString());
}
@Test
public void yy() {
IProducer producer = null;
try {
producer = Application.singleton(ZdbProducer.class);
for (int i = 0; i < 100; i++) {
producer.send(Event.of("x", "x"));
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
// (27+5*23)/(63-59)
// [27+5*23] [/] [63-59]
// [27] + [5*23] [/] [63-59]
/**
* 1. 按照优先级逐一拆解运算
* 括号, 乘除加减
* 2. 逐一进行计算
*/
class C {
C A;
C B;
int c1; // 如果 A 只剩数字将c1 转为整数存放到 c1
int c2;
String x; // + - * /
}
@Test
public void x() {
// (27+5*23)/(63-59)
String str = "(27+5*23)/(63-59)";
str = "27+5*23";
str = "258/((35+17)/(5*3+18-29)+3)(138-134)*41-6+10+24";
str = "5*3+18-29";
//System.out.println("258/((35+17)/(5*3+18-29)+3)(138-134)*41-6+10+24".replaceAll("\\)\\(", ")*("));
List<String> parse = parse(str.replaceAll("\\)\\(", ")\\*("));
System.out.println(c(str));
}
// 因式分解括号 -> 乘除 -> 加减
public List<String> parse(String str) {
// 找到一对
/*
258/()()
[258, /, (35+17)/(5*3+18-29)+3, 138-134, *, 41, -, 6, +, 10, +, 24]
*/
// 258/((35+17)/(5*3+18-29)+3)(138-134)*41-6+10+24
//str = "258 / (35+17)/(5*3+18-29)+3 138-134, * 41-6+10+24";
String[] strArr = str.split("");
// 一级括号除分解
List<String> arr = new ArrayList<>();
String tmp = "";
int n1 = 0; // 括号层级开始
int m1 = 0; // 括号层级结尾
for (String s : strArr) {
if (n1 > 0) { // 一级括号分解
if (")".equals(s) && (++m1) == n1) { // 一级括号结束
arr.add(tmp);
tmp = "";
n1 = 0;
m1 = 0;
} else {
if ("(".equals(s)) {
n1++;
}
tmp += s;
}
} else { // 无括号
if ("+".equals(s) || "-".equals(s) || "*".equals(s) || "/".equals(s)) {
if (!"".equals(tmp)) {
arr.add(tmp);
}
arr.add(s);
tmp = "";
} else if ("(".equals(s)) {
n1 = 1;
} else {
tmp += s;
}
}
}
if (!"".equals(tmp)) {
arr.add(tmp);
}
return arr;
}
public int c(String str) {
List<String> arr = parse(str); // 预期 length >= 3 基数的基数[27+5*23, /, 63-59], [60, /, 2, *, 164-23*7]
System.out.println(arr);
if (arr == null || arr.size() < 3) {
return -1; // 错误码-1错误的计算式
}
List<String> _arr = new ArrayList<>();
// 按照优先级做合并计算 乘除优先
// 乘除
for (int i = 1; i < arr.size() - 1; i += 2) {
/*if ("*".equals(arr.get(i)) || "/".equals(arr.get(i))) {
if (_arr.size() > 0) {
_arr.remove(_arr.size() - 1);
}
int c = c(arr.get(i - 1), arr.get(i + 1), arr.get(i));
if (c < 0) {
return -1;
}
_arr.add(c + "");
} else {
_arr.add(arr.get(i - 1));
_arr.add(arr.get(i));
_arr.add(arr.get(i + 1));
}*/
if ("*".equals(arr.get(i)) || "/".equals(arr.get(i))) {
int c = c(arr.get(i - 1), arr.get(i + 1), arr.get(i));
if (c < 0) {
return c;
}
_arr.add(c + "");
} else {
}
}
if (_arr.size() == 1) { // 通过第一轮的 乘除计算完成结果合并
return Integer.parseInt(_arr.get(0));
}
int c = 0;
for (int i = 1; i < _arr.size(); i += 2) {
int _c = c(_arr.get(i - 1), _arr.get(i + 1), _arr.get(i));
if (_c < 0) {
return _c;
}
c += _c;
}
return c;
}
public int c(String a, String b, String x) {
int _a = 0;
if (a.contains("(") || a.contains("+") || a.contains("-") || a.contains("*") || a.contains("/")) {
_a = c(a);
} else { // 预期 ( + - * / 的结果为标准数字
_a = Integer.parseInt(a);
}
int _b = 0;
if (b.contains("(") || b.contains("+") || b.contains("-") || b.contains("*") || b.contains("/")) {
_b = c(b);
} else { // 预期 ( + - * / 的结果为标准数字
_b = Integer.parseInt(b);
}
// 如果出现负数错误码直接返回对应的负数
if (_a < 0) {
return _a;
}
if (_b < 0) {
return _b;
}
// 定义错误标识: -1错误的计算式-2除不尽-3除数为0-4大于200,
if ("+".equals(x)) {
return _a + _b;
} else if ("-".equals(x)) {
return _a - _b;
} else if ("*".equals(x)) {
return _a * _b;
} else if ("/".equals(x)) {
return _a % _b > 0 ? -2 : _a / _b; // 除不尽
}
return 0;
}
}

View File

@ -1,8 +1,8 @@
package com.zdemo.test;
import com.zdemo.kafak.KafakConsumer;
import com.zdemo.zdb.ZdbConsumer;
public class MyConsumer extends KafakConsumer {
public class MyConsumer extends ZdbConsumer {
public String getGroupid() {
return "group-test"; //消费组名称