This commit is contained in:
Redkale
2016-09-26 14:01:20 +08:00
parent c43e516d14
commit 92eefc872a
4 changed files with 16 additions and 239 deletions

View File

@@ -272,7 +272,6 @@ public abstract class NodeServer {
if (Modifier.isAbstract(type.getModifiers())) continue; //修饰abstract的类跳过
if (DataSource.class.isAssignableFrom(type)) continue;
if (CacheSource.class.isAssignableFrom(type)) continue;
if (DataSQLListener.class.isAssignableFrom(type)) continue;
if (DataCacheListener.class.isAssignableFrom(type)) continue;
if (WebSocketNode.class.isAssignableFrom(type)) continue;
}

View File

@@ -1,128 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.service;
import org.redkale.source.DataSQLListener;
import org.redkale.source.DataSource;
import org.redkale.source.DataDefaultSource;
import org.redkale.util.AnyValue;
import org.redkale.util.AutoLoad;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.logging.*;
import javax.annotation.Resource;
import org.redkale.util.*;
/**
* 暂时不实现
*
* <p>
* 详情见: http://redkale.org
*
* @author zhangjx
*/
@Deprecated
@AutoLoad(false)
@ResourceType({DataSQLListenerService.class, DataSQLListener.class})
public class DataSQLListenerService implements DataSQLListener, Service {
private static final String format = "%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%tL";
protected final Logger logger = Logger.getLogger(this.getClass().getSimpleName());
private final boolean finest = logger.isLoggable(Level.FINEST);
@Resource(name = "APP_HOME")
private File home;
@Resource(name = "$")
private DataSource source;
private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(1024 * 1024);
private PrintStream syncfile;
@Override
public void init(AnyValue config) {
new Thread() {
{
setName(DataSQLListener.class.getSimpleName() + "-Thread");
setDaemon(true);
}
@Override
public void run() {
while (true) {
try {
String sql = queue.take();
send(sql);
} catch (Exception e) {
logger.log(Level.SEVERE, this.getName() + " occur error");
}
}
}
}.start();
}
public String name() {
return this.getClass().getAnnotation(Resource.class).name();
}
@Override
public void destroy(AnyValue config) {
if (syncfile != null) syncfile.close();
}
private void write(String... sqls) {
try {
if (syncfile == null) {
File root = new File(home, "dbsync");
root.mkdirs();
syncfile = new PrintStream(new FileOutputStream(new File(root, "sql-" + name() + ".sql"), true), false, "UTF-8");
}
for (String sql : sqls) {
syncfile.print(sql + ";\r\n");
}
syncfile.flush();
} catch (Exception e) {
logger.log(Level.WARNING, "write sql file error. (" + name() + ", " + Arrays.toString(sqls) + ")", e);
}
}
@Override
public void insert(String... sqls) {
put(sqls);
}
@Override
public void update(String... sqls) {
put(sqls);
}
@Override
public void delete(String... sqls) {
put(sqls);
}
private void put(String... sqls) {
String date = String.format(format, System.currentTimeMillis());
for (String sql : sqls) {
try {
queue.put("/* " + date + " */ " + sql);
} catch (Exception e) {
write(sql);
}
}
}
@MultiRun(selfrun = false, async = true)
public void send(String... sqls) {
((DataDefaultSource) source).directExecute(sqls);
}
}

View File

@@ -69,9 +69,6 @@ public final class DataDefaultSource implements DataSource, Function<Class, Enti
@Resource(name = "property.datasource.nodeid")
private int nodeid;
@Resource(name = "$")
private DataSQLListener writeListener;
@Resource(name = "$")
private DataCacheListener cacheListener;
@@ -338,7 +335,6 @@ public final class DataDefaultSource implements DataSource, Function<Class, Enti
final Attribute primary = info.getPrimary();
final boolean distributed = info.distributed;
Attribute<T, Serializable>[] attrs = info.insertAttributes;
String[] sqls = new String[values.length];
if (distributed && !info.initedPrimaryValue && primaryType.isPrimitive()) { //由DataSource生成主键
synchronized (info) {
if (!info.initedPrimaryValue) { //初始化最大主键值
@@ -363,7 +359,7 @@ public final class DataDefaultSource implements DataSource, Function<Class, Enti
}
}
}
PreparedStatement prestmt = createInsertPreparedStatement(conn, sql, info, sqls, values);
PreparedStatement prestmt = createInsertPreparedStatement(conn, sql, info, values);
try {
prestmt.executeBatch();
} catch (SQLException se) {
@@ -397,10 +393,9 @@ public final class DataDefaultSource implements DataSource, Function<Class, Enti
}
}
prestmt.close();
prestmt = createInsertPreparedStatement(conn, sql, info, sqls, values);
prestmt = createInsertPreparedStatement(conn, sql, info, values);
prestmt.executeBatch();
}
if (writeListener != null) writeListener.insert(sqls);
if (info.autoGenerated) { //由数据库自动生成主键值
ResultSet set = prestmt.getGeneratedKeys();
int i = -1;
@@ -452,44 +447,18 @@ public final class DataDefaultSource implements DataSource, Function<Class, Enti
}
private <T> PreparedStatement createInsertPreparedStatement(final Connection conn, final String sql,
final EntityInfo<T> info, final String[] sqls, T... values) throws SQLException {
final EntityInfo<T> info, T... values) throws SQLException {
Attribute<T, Serializable>[] attrs = info.insertAttributes;
final PreparedStatement prestmt = info.autoGenerated
? conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS) : conn.prepareStatement(sql);
if (writeListener == null) {
for (final T value : values) {
int i = 0;
if (info.distributed || info.autouuid) info.createPrimaryValue(value);
for (Attribute<T, Serializable> attr : attrs) {
prestmt.setObject(++i, attr.get(value));
}
prestmt.addBatch();
}
} else { //调用writeListener回调接口
char[] sqlchars = sql.toCharArray();
CharSequence[] ps = new CharSequence[attrs.length];
int index = 0;
for (final T value : values) {
int i = 0;
if (info.distributed || info.autouuid) info.createPrimaryValue(value);
for (Attribute<T, Serializable> attr : attrs) {
Object a = attr.get(value);
ps[i] = FilterNode.formatToString(a);
prestmt.setObject(++i, a);
}
prestmt.addBatch();
//-----------------------------
StringBuilder sb = new StringBuilder(128);
i = 0;
for (char ch : sqlchars) {
if (ch == '?') {
sb.append(ps[i++]);
} else {
sb.append(ch);
}
}
sqls[index++] = sb.toString();
for (final T value : values) {
int i = 0;
if (info.distributed || info.autouuid) info.createPrimaryValue(value);
for (Attribute<T, Serializable> attr : attrs) {
prestmt.setObject(++i, attr.get(value));
}
prestmt.addBatch();
}
return prestmt;
}
@@ -558,17 +527,13 @@ public final class DataDefaultSource implements DataSource, Function<Class, Enti
try {
if (!info.isVirtualEntity()) {
final Statement stmt = conn.createStatement();
String[] sqls = new String[keys.length];
int index = -1;
for (Serializable key : keys) {
String sql = "DELETE FROM " + info.getTable(keys) + " WHERE " + info.getPrimarySQLColumn() + " = " + FilterNode.formatToString(key);
sqls[++index] = sql;
if (debug.get()) logger.finest(info.getType().getSimpleName() + " delete sql=" + sql);
stmt.addBatch(sql);
}
stmt.executeBatch();
stmt.close();
if (writeListener != null) writeListener.delete(sqls);
}
//------------------------------------
final EntityCache<T> cache = info.getCache();
@@ -608,7 +573,6 @@ public final class DataDefaultSource implements DataSource, Function<Class, Enti
final Statement stmt = conn.createStatement();
stmt.execute(sql);
stmt.close();
if (writeListener != null) writeListener.delete(sql);
}
//------------------------------------
final EntityCache<T> cache = info.getCache();
@@ -662,46 +626,16 @@ public final class DataDefaultSource implements DataSource, Function<Class, Enti
final Attribute<T, Serializable> primary = info.getPrimary();
final PreparedStatement prestmt = conn.prepareStatement(updateSQL);
Attribute<T, Serializable>[] attrs = info.updateAttributes;
String[] sqls = null;
if (writeListener == null) {
for (final T value : values) {
int i = 0;
for (Attribute<T, Serializable> attr : attrs) {
prestmt.setObject(++i, attr.get(value));
}
prestmt.setObject(++i, primary.get(value));
prestmt.addBatch();
}
} else {
char[] sqlchars = updateSQL.toCharArray();
sqls = new String[values.length];
CharSequence[] ps = new CharSequence[attrs.length];
int index = 0;
for (final T value : values) {
int i = 0;
for (Attribute<T, Serializable> attr : attrs) {
Object a = attr.get(value);
ps[i] = FilterNode.formatToString(a);
prestmt.setObject(++i, a);
}
prestmt.setObject(++i, primary.get(value));
prestmt.addBatch();
//-----------------------------
StringBuilder sb = new StringBuilder(128);
i = 0;
for (char ch : sqlchars) {
if (ch == '?') {
sb.append(ps[i++]);
} else {
sb.append(ch);
}
}
sqls[index++] = sb.toString();
for (final T value : values) {
int i = 0;
for (Attribute<T, Serializable> attr : attrs) {
prestmt.setObject(++i, attr.get(value));
}
prestmt.setObject(++i, primary.get(value));
prestmt.addBatch();
}
prestmt.executeBatch();
prestmt.close();
if (writeListener != null) writeListener.update(sqls);
}
//---------------------------------------------------
final EntityCache<T> cache = info.getCache();
@@ -748,7 +682,6 @@ public final class DataDefaultSource implements DataSource, Function<Class, Enti
final Statement stmt = conn.createStatement();
stmt.execute(sql);
stmt.close();
if (writeListener != null) writeListener.update(sql);
}
//---------------------------------------------------
final EntityCache<T> cache = info.getCache();
@@ -799,7 +732,6 @@ public final class DataDefaultSource implements DataSource, Function<Class, Enti
final Statement stmt = conn.createStatement();
stmt.execute(sql);
stmt.close();
if (writeListener != null) writeListener.update(sql);
}
//---------------------------------------------------
final EntityCache<T> cache = info.getCache();
@@ -848,7 +780,6 @@ public final class DataDefaultSource implements DataSource, Function<Class, Enti
final Statement stmt = conn.createStatement();
stmt.execute(sql);
stmt.close();
if (writeListener != null) writeListener.update(sql);
}
//---------------------------------------------------
final EntityCache<T> cache = info.getCache();
@@ -898,7 +829,6 @@ public final class DataDefaultSource implements DataSource, Function<Class, Enti
final Statement stmt = conn.createStatement();
stmt.execute(sql);
stmt.close();
if (writeListener != null) writeListener.update(sql);
}
//---------------------------------------------------
final EntityCache<T> cache = info.getCache();
@@ -948,7 +878,6 @@ public final class DataDefaultSource implements DataSource, Function<Class, Enti
final Statement stmt = conn.createStatement();
stmt.execute(sql);
stmt.close();
if (writeListener != null) writeListener.update(sql);
}
//---------------------------------------------------
final EntityCache<T> cache = info.getCache();
@@ -1008,7 +937,6 @@ public final class DataDefaultSource implements DataSource, Function<Class, Enti
final Statement stmt = conn.createStatement();
stmt.execute(sql);
stmt.close();
if (writeListener != null) writeListener.update(sql);
}
//---------------------------------------------------
final EntityCache<T> cache = info.getCache();
@@ -1071,7 +999,6 @@ public final class DataDefaultSource implements DataSource, Function<Class, Enti
final Statement stmt = conn.createStatement();
stmt.execute(sql);
stmt.close();
if (writeListener != null) writeListener.update(sql);
}
//---------------------------------------------------
final EntityCache<T> cache = info.getCache();

View File

@@ -1,21 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.source;
/**
* &#64;Resource(name = "property.datasource.nodeid")
*
* <p> 详情见: http://redkale.org
* @author zhangjx
*/
public interface DataSQLListener {
public void insert(String... sqls);
public void update(String... sqls);
public void delete(String... sqls);
}