移除DataCacheListener功能

This commit is contained in:
Redkale
2020-05-15 21:53:21 +08:00
parent e545010034
commit f1a97c0219
4 changed files with 1 additions and 142 deletions

View File

@@ -284,20 +284,6 @@ public abstract class NodeServer {
application.dataSources.add(source);
appResFactory.register(resourceName, DataSource.class, source);
SncpClient client = Sncp.getSncpClient((Service) src);
final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress();
if ((src instanceof DataSource) && sncpAddr != null && resourceFactory.find(resourceName, DataCacheListener.class) == null) { //只有DataSourceService 才能赋值 DataCacheListener
final NodeSncpServer sncpServer = application.findNodeSncpServer(sncpAddr);
final Set<String> groups = new HashSet<>();
if (client != null && client.getSameGroup() != null) groups.add(client.getSameGroup());
if (client != null && client.getDiffGroups() != null) groups.addAll(client.getDiffGroups());
Service cacheListenerService = Sncp.createLocalService(serverClassLoader, resourceName, DataCacheListenerService.class, appResFactory, appSncpTranFactory, sncpAddr, groups, Sncp.getConf((Service) src));
appResFactory.register(resourceName, DataCacheListener.class, cacheListenerService);
localServices.add(cacheListenerService);
sncpServer.consumerAccept(cacheListenerService);
rf.inject(cacheListenerService, self);
logger.info("[" + Thread.currentThread().getName() + "] Load Service " + cacheListenerService);
}
field.set(src, source);
rf.inject(source, self); // 给其可能包含@Resource的字段赋值;
//NodeServer.this.watchFactory.inject(src);
@@ -418,7 +404,6 @@ public abstract class NodeServer {
if (Modifier.isAbstract(serviceImplClass.getModifiers())) continue; //修饰abstract的类跳过
if (DataSource.class.isAssignableFrom(serviceImplClass)) continue;
if (CacheSource.class.isAssignableFrom(serviceImplClass)) continue;
if (DataCacheListener.class.isAssignableFrom(serviceImplClass)) continue;
}
if (entry.getName().contains("$")) throw new RuntimeException("<name> value cannot contains '$' in " + entry.getProperty());
Service oldother = resourceFactory.find(entry.getName(), serviceImplClass);

View File

@@ -1,49 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.service;
import java.io.*;
import javax.annotation.*;
import org.redkale.source.*;
import org.redkale.util.*;
/**
* 实现进程间DataSource的缓存数据同步
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
@AutoLoad(false)
@ResourceType(DataCacheListener.class)
public class DataCacheListenerService implements DataCacheListener, Service {
@Resource(name = "$")
private DataSource source;
@Override
@RpcMultiRun(selfrun = false, async = true)
public <T> int insertCache(Class<T> clazz, T... entitys) {
if (!(source instanceof DataCacheListener)) return -2;
return ((DataCacheListener) source).insertCache(clazz, entitys);
}
@Override
@RpcMultiRun(selfrun = false, async = true)
public <T> int updateCache(Class<T> clazz, T... entitys) {
if (!(source instanceof DataCacheListener)) return -2;
return ((DataCacheListener) source).updateCache(clazz, entitys);
}
@Override
@RpcMultiRun(selfrun = false, async = true)
public <T> int deleteCache(Class<T> clazz, Serializable... ids) {
if (!(source instanceof DataCacheListener)) return -2;
return ((DataCacheListener) source).deleteCache(clazz, ids);
}
}

View File

@@ -1,24 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.redkale.source;
import java.io.Serializable;
/**
*
* <p>
* 详情见: https://redkale.org
*
* @author zhangjx
*/
public interface DataCacheListener {
public <T> int insertCache(Class<T> clazz, T... entitys);
public <T> int updateCache(Class<T> clazz, T... entitys);
public <T> int deleteCache(Class<T> clazz, Serializable... pks);
}

View File

@@ -15,7 +15,6 @@ import java.util.concurrent.atomic.*;
import java.util.function.*;
import java.util.logging.*;
import java.util.stream.Stream;
import javax.annotation.Resource;
import org.redkale.service.*;
import static org.redkale.source.DataSources.*;
import org.redkale.util.*;
@@ -34,7 +33,7 @@ import org.redkale.util.*;
@AutoLoad(false)
@SuppressWarnings("unchecked")
@ResourceType(DataSource.class)
public abstract class DataSqlSource<DBChannel> extends AbstractService implements DataSource, DataCacheListener, Function<Class, EntityInfo>, AutoCloseable, Resourcable {
public abstract class DataSqlSource<DBChannel> extends AbstractService implements DataSource, Function<Class, EntityInfo>, AutoCloseable, Resourcable {
protected static final Flipper FLIPPER_ONE = new Flipper(1);
@@ -56,9 +55,6 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
protected PoolSource<DBChannel> writePool;
@Resource(name = "$")
protected DataCacheListener cacheListener;
protected final BiFunction<EntityInfo, Object, CharSequence> sqlFormatter;
protected final BiConsumer futureCompleteConsumer = (r, t) -> {
@@ -387,20 +383,6 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
for (final T value : entitys) {
c += cache.insert(value);
}
if (cacheListener != null) cacheListener.insertCache(info.getType(), entitys);
return c;
}
@Override
public <T> int insertCache(Class<T> clazz, T... entitys) {
if (entitys.length == 0) return 0;
final EntityInfo<T> info = loadEntityInfo(clazz);
final EntityCache<T> cache = info.getCache();
if (cache == null) return -1;
int c = 0;
for (T value : entitys) {
c += cache.insert(value);
}
return c;
}
@@ -683,7 +665,6 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
final EntityCache<T> cache = info.getCache();
if (cache == null) return -1;
Serializable[] ids = cache.delete(flipper, node);
if (cacheListener != null) cacheListener.deleteCache(info.getType(), ids);
return count >= 0 ? count : (ids == null ? 0 : ids.length);
}
@@ -694,23 +675,9 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
for (Serializable key : pks) {
c += cache.delete(key);
}
if (cacheListener != null) cacheListener.deleteCache(info.getType(), pks);
return count >= 0 ? count : c;
}
@Override
public <T> int deleteCache(Class<T> clazz, Serializable... pks) {
if (pks.length == 0) return 0;
final EntityInfo<T> info = loadEntityInfo(clazz);
final EntityCache<T> cache = info.getCache();
if (cache == null) return -1;
int c = 0;
for (Serializable id : pks) {
c += cache.delete(id);
}
return c;
}
protected static StringBuilder multisplit(char ch1, char ch2, String split, StringBuilder sb, String str, int from) {
if (str == null) return sb;
int pos1 = str.indexOf(ch1, from);
@@ -1220,11 +1187,9 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
}
if (neednode) {
T[] rs = cache.update(entity, attrs, node);
if (cacheListener != null) cacheListener.updateCache(info.getType(), rs);
return count >= 0 ? count : (rs == null ? 0 : rs.length);
} else {
T rs = cache.update(entity, attrs);
if (cacheListener != null) cacheListener.updateCache(info.getType(), rs);
return count >= 0 ? count : (rs == null ? 0 : 1);
}
}
@@ -1242,7 +1207,6 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
cols.add(col);
}
T[] rs = cache.updateColumn(node, flipper, attrs, cols);
if (cacheListener != null) cacheListener.updateCache(info.getType(), rs);
return count >= 0 ? count : (rs == null ? 0 : 1);
}
@@ -1259,7 +1223,6 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
cols.add(col);
}
T rs = cache.updateColumn(pk, attrs, cols);
if (cacheListener != null) cacheListener.updateCache(info.getType(), rs);
return count >= 0 ? count : (rs == null ? 0 : 1);
}
@@ -1267,7 +1230,6 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
final EntityCache<T> cache = info.getCache();
if (cache == null) return count;
T[] rs = cache.update(info.getAttribute(column), colval, node);
if (cacheListener != null) cacheListener.updateCache(info.getType(), rs);
return count >= 0 ? count : (rs == null ? 0 : 1);
}
@@ -1275,7 +1237,6 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
final EntityCache<T> cache = info.getCache();
if (cache == null) return count;
T rs = cache.update(pk, info.getAttribute(column), colval);
if (cacheListener != null) cacheListener.updateCache(info.getType(), rs);
return count >= 0 ? count : (rs == null ? 0 : 1);
}
@@ -1286,23 +1247,9 @@ public abstract class DataSqlSource<DBChannel> extends AbstractService implement
for (final T value : entitys) {
c2 += cache.update(value);
}
if (cacheListener != null) cacheListener.updateCache(info.getType(), entitys);
return count >= 0 ? count : c2;
}
@Override
public <T> int updateCache(Class<T> clazz, T... entitys) {
if (entitys.length == 0) return 0;
final EntityInfo<T> info = loadEntityInfo(clazz);
final EntityCache<T> cache = info.getCache();
if (cache == null) return -1;
int c = 0;
for (T value : entitys) {
c += cache.update(value);
}
return c;
}
public <T> int reloadCache(Class<T> clazz, Serializable... pks) {
final EntityInfo<T> info = loadEntityInfo(clazz);
final EntityCache<T> cache = info.getCache();