This commit is contained in:
2019-04-22 00:27:04 +08:00
parent db8c9b3182
commit ebbd490913
19 changed files with 329 additions and 107 deletions

View File

@@ -29,6 +29,11 @@ public class Kv<K,V> extends LinkedHashMap<K,V> {
return this;
}
public Kv<K,V> putAll(Kv<K,V> kv) {
kv.forEach((k,v) -> put(k, v));
return this;
}
// 将obj 属性映射到Kv 中
public static Kv toKv(Object m, String ... fields) {
Kv kv = Kv.of();

View File

@@ -44,4 +44,8 @@ public class DbKit implements DbSource{
public void dropTable(String tableName) {
dbSource.dropTable(tableName);
}
public void exetute(String sql) {
dbSource.exetute(sql);
}
}

View File

@@ -35,4 +35,6 @@ public interface DbSource {
void createTable(String sql);
void dropTable(String tableName);
void exetute(String sql);
}

View File

@@ -121,6 +121,20 @@ public class DbSourceMysql implements DbSource {
new RuntimeException("[DbSourceMysql.dropTable] NOT SUPPORT right now" ); // todo:
}
@Override
public void exetute(String sql) {
Connection connection = connection();
try (
PreparedStatement ps = connection.prepareStatement(sql);
){
ps.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
} finally {
release(connection);
}
}
private Connection connection() {
return connection(0);
}

View File

@@ -19,6 +19,7 @@ public class MetaService extends Doc<MetaService> {
private String comment; //业务中文名
//private List<String> links; //["link._key1","link._key2"]
private String sysPlatId; //平台id
private List<String> shows = new ArrayList<>();
private List<Map> edits = new ArrayList<>();
@@ -56,6 +57,13 @@ public class MetaService extends Doc<MetaService> {
this.comment = comment;
}
public String getSysPlatId() {
return sysPlatId;
}
public void setSysPlatId(String sysPlatId) {
this.sysPlatId = sysPlatId;
}
/*public List<String> getLinks() {
return links;
}

View File

@@ -7,16 +7,18 @@ import net.tccn.qtask.impl.QTaskMysql;
public class QRuner {
public static Object query(Task e) {
switch (e.cate.toLowerCase()) {
public static Object query(Task task) {
String cate = task.getDbAccount().getCate();
switch (cate.toLowerCase()) {
case "mysql":
return new QTaskMysql(e).execute();
return new QTaskMysql(task).execute();
case "method":
return new QTaskMethod(e).execute();
return new QTaskMethod(task).execute();
case "http":
return new QTaskHttp(e).execute();
return new QTaskHttp(task).execute();
case "es":
return new QTaskEs(e).execute();
return new QTaskEs(task).execute();
}
return null;
}

View File

@@ -1,7 +1,7 @@
package net.tccn.qtask;
public interface QTask {
Task getE();
Task getTask();
Object execute();
}

View File

@@ -1,68 +1,75 @@
package net.tccn.qtask;
import net.tccn.base.Kv;
import net.tccn.base.arango.Doc;
import javax.persistence.Table;
import net.tccn.dbq.jdbc.api.DbAccount;
/**
* cate:mysql
* queryId: select * from user where userid=#(userid)
* comment: 查询用户列表
* para: {userid:1}
*
* cate:method
* queryId:com.test.abc.take
* comment:调用Java函数take
* para:{name:xxx,age:12}
*
* cate:http
* query:http://127.0.0.1/meta/db_plat_list
* comment:查询数据平台列表
* para:{platToken:3421432}
*
*
*
* 任务对象
*/
@Table(name = "qtask", catalog = "db_dev")
public class Task extends Doc<Task> {
public static Task dao = dao(Task.class);
public class Task {
/*
|- dbp: 调用谁, 参数,
|- 谁:谁(干什么) => dbPlatId + content (在程序的世界每个个体,往往都有其明确的职责)
|-
public String cate;//MYSQL,ES,METHOD,HTTP
public String queryId;
public String dbPlatId;// MYSQL, ES,METHOD,http
public String comment;
public Kv para;
*/
public String restType;//List, Map
public Task() {
private String name; // 任务名,同一系统唯一
private String dbPlatId;// 数据源id
private String content;
private String comment;
private Kv para;
private DbAccount dbAccount;
//------------------
public String getName() {
return name;
}
public Task(String cate, String queryId, String comment, Kv para) {
this.cate = cate;
this.queryId = queryId;
public void setName(String name) {
this.name = name;
}
public String getDbPlatId() {
return dbPlatId;
}
public void setDbPlatId(String dbPlatId) {
this.dbPlatId = dbPlatId;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public String getComment() {
return comment;
}
public void setComment(String comment) {
this.comment = comment;
}
public Kv getPara() {
return para;
}
public void setPara(Kv para) {
this.para = para;
}
public void t() {
Task task = new Task();
public DbAccount getDbAccount() {
return dbAccount;
}
public static void add(String x) {
x = x + 1;
System.out.println("add: x= " + x);
public void setDbAccount(DbAccount dbAccount) {
this.dbAccount = dbAccount;
}
public static void main(String[] args) {
String x = "x";
add(x);
System.out.println("main: x=" + x);
}
}

View File

@@ -0,0 +1,74 @@
package net.tccn.qtask;
import net.tccn.base.Kv;
import net.tccn.base.arango.Doc;
import javax.persistence.Table;
/**
* Created by liangxianyou at 2019/4/20 20:04.
*/
@Table(name = "qtask", catalog = "db_dev")
public class TaskEntity extends Doc<TaskEntity> {
public static TaskEntity dao = dao(TaskEntity.class);
private String name; // 任务名称
//private String cate; // 任务类型
private String dbPlatId; // 数据平台id
private String comment; // 任务名称
private String content; //任务内容
private Kv<String, String> para; //任务参数
private String sysPlatId; // 平台id
// ---------------------
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getDbPlatId() {
return dbPlatId;
}
public void setDbPlatId(String dbPlatId) {
this.dbPlatId = dbPlatId;
}
public String getComment() {
return comment;
}
public void setComment(String comment) {
this.comment = comment;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public Kv<String, String> getPara() {
return para;
}
public void setPara(Kv<String, String> para) {
this.para = para;
}
public String getSysPlatId() {
return sysPlatId;
}
public void setSysPlatId(String sysPlatId) {
this.sysPlatId = sysPlatId;
}
//------------
}

View File

@@ -0,0 +1,47 @@
package net.tccn.qtask;
import net.tccn.base.Kv;
import net.tccn.base.MetaKit;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/**
* Created by liangxianyou at 2019/4/20 19:59.
*/
public class TaskKit {
private static List<TaskEntity> taskEntities;
static {
taskEntities = TaskEntity.dao.find();
}
public static Task buildTask(String name, String platToken, Kv para) {
TaskEntity taskEntity = getTaskEntity(name, platToken);
Task task = new Task();
task.setName(taskEntity.getName());
task.setComment(taskEntity.getComment());
task.setDbPlatId(taskEntity.getDbPlatId());
task.setPara(taskEntity.getPara().putAll(para));
task.setDbAccount(MetaKit.getDbPlat(taskEntity.getDbPlatId()));
return task;
}
public static TaskEntity getTaskEntity(String name, String platToken) {
Objects.requireNonNull(name);
Objects.requireNonNull(platToken);
Optional<TaskEntity> any = taskEntities.stream()
.filter(x -> name.equals(x.getName()) && MetaKit.getPlatId(platToken).equals(x.getSysPlatId()))
.findAny();
return any.get();
}
public static Object taskRun(String name, String platToken, Kv para) {
Task task = buildTask(name, platToken, para);
return QRuner.query(task);
}
}

View File

@@ -1,18 +1,27 @@
package net.tccn.qtask.impl;
import net.tccn.base.Kv;
import net.tccn.qtask.QTask;
import net.tccn.qtask.Task;
public abstract class QTaskAbs implements QTask {
private Task e;
protected Task task;
public QTaskAbs(Task e) {
this.e = e;
public QTaskAbs(Task task) {
this.task = task;
}
@Override
public Task getE() {
return e;
public Task getTask() {
return task;
}
public Kv getPara() {
return getTask().getPara();
}
public String getContent() {
return getTask().getContent();
}
}

View File

@@ -1,7 +1,5 @@
package net.tccn.qtask.impl;
import net.tccn.base.Kv;
import net.tccn.qtask.QRuner;
import net.tccn.qtask.Task;
public class QTaskEs extends QTaskAbs {
@@ -12,13 +10,7 @@ public class QTaskEs extends QTaskAbs {
@Override
public Object execute() {
StringBuilder url = new StringBuilder(getE().queryId);
if (!url.toString().contains("?")) url.append("?");
getE().para.forEach((k, v) -> {
url.append("&" + k + "=" + v);
});
String _url = url.toString().replaceAll(" ", "%20");
return QRuner.query(new Task("http", _url, "", Kv.of()));
return null;
}
}

View File

@@ -16,20 +16,20 @@ import java.util.Map;
*/
public class QTaskHttp extends QTaskAbs implements QTask {
public QTaskHttp(Task e) {
super(e);
public QTaskHttp(Task task) {
super(task);
}
private static Map<String, HttpURLConnection> connetions = new HashMap();
private URLConnection getConnection() {
try {
URI uri = URI.create(getE().queryId);
URI uri = URI.create(getTask().getDbAccount().getUrl());
URL url = uri.toURL();
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
StringBuilder content = new StringBuilder();
getE().para.forEach((k,v) -> {
getPara().forEach((k, v) -> {
content.append(k).append("=").append(v).append("&");
});
if (content.length() > 0) {
@@ -47,7 +47,7 @@ public class QTaskHttp extends QTaskAbs implements QTask {
//connection.setRequestProperty("Content-Length", String.valueOf(content.length()));
//connection.getOutputStream().write(encode.getBytes());
connetions.put(getE().queryId, connection);
connetions.put(getTask().getDbAccount().getUrl(), connection);
return connection;
} catch (MalformedURLException e) {
e.printStackTrace();

View File

@@ -14,19 +14,15 @@ public class QTaskMethod extends QTaskAbs implements QTask {
}
public String getClazz(){
String queryId = getE().queryId;
return queryId.substring(0, queryId.lastIndexOf("."));
String content = getContent(); //class + method
return content.substring(0, content.lastIndexOf("."));
}
public String getMethod(){
String queryId = getE().queryId;
return queryId.substring(queryId.lastIndexOf(".") + 1);
}
public Kv getPara() {
return getE().para;
String content = getTask().getContent();
return content.substring(content.lastIndexOf(".") + 1);
}
// execute JAVA method by CLASS AND METHOD
@Override
public Object execute() {
try {

View File

@@ -1,31 +1,64 @@
package net.tccn.qtask.impl;
import com.jfinal.plugin.activerecord.PageSqlKit;
import com.jfinal.plugin.activerecord.dialect.MysqlDialect;
import com.jfinal.template.Engine;
import com.jfinal.template.Template;
import net.tccn.base.MetaKit;
import net.tccn.base.PageBean;
import net.tccn.dbq.jdbc.api.DbKit;
import net.tccn.qtask.QTask;
import net.tccn.qtask.Task;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class QTaskMysql extends QTaskAbs implements QTask {
private Object mapper;
public QTaskMysql(Task e) {
super(e);
public static Engine engine = Engine.create("sqlTpl");
private static MysqlDialect dialect = new MysqlDialect();
static {
engine.setDevMode(true);
}
public String getSQL(){
String sql = getE().para.get("sql")+"";
return sql;
public QTaskMysql(Task task) {
super(task);
}
@Override
public Object execute() {
System.out.println("QTaskMysql.execute: " + getE().queryId);
DbKit dbKit = MetaKit.getDbKit(getTask().getDbPlatId());
/*if ("Map".equalsIgnoreCase(getE().queryType)){
// return mapper.findFirst(getSQL()); todo:
} else if ("list".equalsIgnoreCase(getE().queryType)){
// return mapper.findList(getSQL()); todo:
}*/
Template tpl = engine.getTemplateByString(task.getContent());
String sql = tpl.renderToString(getTask().getPara()).replaceAll("[\\s]+", " ");
if (sql.startsWith("select count")) {
return dbKit.queryInt(sql);
} else if (sql.startsWith("select ")) {
String[] sqls = PageSqlKit.parsePageSql(sql);
String findTotal = "select count(*) " + dialect.replaceOrderBy(sqls[1]);
String findList = sqls[0] + " " + sqls[1];
CompletableFuture<Integer> countFuture = CompletableFuture.supplyAsync(() -> dbKit.queryColumn(findTotal, int.class));
CompletableFuture<List<Map>> listFuture = CompletableFuture.supplyAsync(() -> dbKit.findList(findList, Map.class));
try {
return PageBean.by(listFuture.get(), countFuture.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
} else if (sql.startsWith("update ") || sql.startsWith("UPDATE ") ||
sql.startsWith("delete ") || sql.startsWith("DELETE ") ||
sql.startsWith("delete ") || sql.startsWith("DELETE ") ||
sql.startsWith("insert ") || sql.startsWith("INSERT ")){
dbKit.exetute(sql);
}
return null;
}
}

View File

@@ -105,7 +105,9 @@ public class MetadataService extends BaseService { //arango
public JBean serviceList(@RestParam(name = "platToken") String token) {
JBean jBean = new JBean();
List<MetaService> list = MetaKit.getMetaServices();
List<MetaService> list = MetaKit.getMetaServices().stream()
.filter(x -> platId(token).equals(x.getSysPlatId()))
.collect(Collectors.toList());
jBean.setBody(list);
return jBean;
@@ -155,6 +157,7 @@ public class MetadataService extends BaseService { //arango
if (service.getKey() != null) {
service.update();
} else {
service.setSysPlatId(platId(token));
service.save();
}
MetaKit.reload(MetaService.class);

View File

@@ -1,8 +1,9 @@
package net.tccn.service;
import net.tccn.base.JBean;
import net.tccn.base.MetaKit;
import net.tccn.base.PageBean;
import net.tccn.qtask.Task;
import net.tccn.qtask.TaskEntity;
import org.redkale.net.http.RestParam;
import org.redkale.net.http.RestService;
import org.redkale.source.Flipper;
@@ -12,17 +13,36 @@ import org.redkale.util.Comment;
public class _QtaskService extends BaseService{
@Comment("qtask列表")
public JBean list(Task task, Flipper flipper, @RestParam(name = "platToken") String token) {
PageBean<Task> page = Task.dao.findPage(task, flipper);
public JBean list(TaskEntity task, Flipper flipper, @RestParam(name = "platToken") String token) {
PageBean<TaskEntity> page = TaskEntity.dao.findPage(task, flipper);
return JBean.by(0, "", page);
}
@Comment("qtask保存")
public JBean save() {
public JBean save(TaskEntity task, @RestParam(name = "platToken") String token) {
JBean jBean = new JBean();
return JBean.by(0, "");
// 同平台name 唯一校验
TaskEntity bean = new TaskEntity();
bean.setSysPlatId(platId(token));
bean.setName(task.getName());
long count = bean.count();
if (count > 0) {
jBean.set(-1, "任务名称占用");
} else {
if (task.getKey() != null) {
task.update();
} else {
task.setSysPlatId(platId(token));
task.save();
}
MetaKit.reload(task);
}
return jBean;
}
}

View File

@@ -26,14 +26,15 @@ import static java.util.Arrays.asList;
*/
public class RunTest<T> {
public static Task A = new Task("mysql", "select * from user where userid=#(userid)", "查询用户列表", Kv.of("userid", 1));
/*public static Task A = new Task("mysql", "select * from user where userid=#(userid)", "查询用户列表", Kv.of("userid", 1));
public static Task B = new Task("method", "User.say", "user调用", Kv.of("name", "张三").set("age", 13));
public static Task C = new Task("http", "http://127.0.0.1/meta/db_plat_list?platToken=3421432", "查询数据平台列表", Kv.of("abx", "abx111"));
public static Task d = new Task("es", "http://192.168.91.5:9200/_sql?", "查询数据平台列表", Kv.of("sql", "select * from basic_iotdevice_all limit 10"));
public static Task e = new Task("http", "http://192.168.91.5:9200/_sql?sql=select%20*%20from%20basic_iotdevice_all%20limit%2010", "查询数据平台列表", Kv.of());
*/
//@Test
public void qtaskTest() {
/*public void qtaskTest() {
long start = System.currentTimeMillis();
Object query = QRuner.query(d);
System.out.printf("耗时:%s MS" ,System.currentTimeMillis() - start);
@@ -43,7 +44,7 @@ public class RunTest<T> {
//System.out.println(query.getClass());
}
}*/
ParseMysql parser = new ParseMysql();
//@Test
public void parseFBeanTest() {