From f1a97c0219e963634030db758428c7bf9d05caf8 Mon Sep 17 00:00:00 2001 From: Redkale <8730487+redkale@users.noreply.github.com> Date: Fri, 15 May 2020 21:53:21 +0800 Subject: [PATCH] =?UTF-8?q?=E7=A7=BB=E9=99=A4DataCacheListener=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/org/redkale/boot/NodeServer.java | 15 ----- .../service/DataCacheListenerService.java | 49 ----------------- src/org/redkale/source/DataCacheListener.java | 24 -------- src/org/redkale/source/DataSqlSource.java | 55 +------------------ 4 files changed, 1 insertion(+), 142 deletions(-) delete mode 100644 src/org/redkale/service/DataCacheListenerService.java delete mode 100644 src/org/redkale/source/DataCacheListener.java diff --git a/src/org/redkale/boot/NodeServer.java b/src/org/redkale/boot/NodeServer.java index 555c4b9be..32f729285 100644 --- a/src/org/redkale/boot/NodeServer.java +++ b/src/org/redkale/boot/NodeServer.java @@ -284,20 +284,6 @@ public abstract class NodeServer { application.dataSources.add(source); appResFactory.register(resourceName, DataSource.class, source); - SncpClient client = Sncp.getSncpClient((Service) src); - final InetSocketAddress sncpAddr = client == null ? null : client.getClientAddress(); - if ((src instanceof DataSource) && sncpAddr != null && resourceFactory.find(resourceName, DataCacheListener.class) == null) { //只有DataSourceService 才能赋值 DataCacheListener - final NodeSncpServer sncpServer = application.findNodeSncpServer(sncpAddr); - final Set groups = new HashSet<>(); - if (client != null && client.getSameGroup() != null) groups.add(client.getSameGroup()); - if (client != null && client.getDiffGroups() != null) groups.addAll(client.getDiffGroups()); - Service cacheListenerService = Sncp.createLocalService(serverClassLoader, resourceName, DataCacheListenerService.class, appResFactory, appSncpTranFactory, sncpAddr, groups, Sncp.getConf((Service) src)); - appResFactory.register(resourceName, DataCacheListener.class, cacheListenerService); - localServices.add(cacheListenerService); - sncpServer.consumerAccept(cacheListenerService); - rf.inject(cacheListenerService, self); - logger.info("[" + Thread.currentThread().getName() + "] Load Service " + cacheListenerService); - } field.set(src, source); rf.inject(source, self); // 给其可能包含@Resource的字段赋值; //NodeServer.this.watchFactory.inject(src); @@ -418,7 +404,6 @@ public abstract class NodeServer { if (Modifier.isAbstract(serviceImplClass.getModifiers())) continue; //修饰abstract的类跳过 if (DataSource.class.isAssignableFrom(serviceImplClass)) continue; if (CacheSource.class.isAssignableFrom(serviceImplClass)) continue; - if (DataCacheListener.class.isAssignableFrom(serviceImplClass)) continue; } if (entry.getName().contains("$")) throw new RuntimeException(" value cannot contains '$' in " + entry.getProperty()); Service oldother = resourceFactory.find(entry.getName(), serviceImplClass); diff --git a/src/org/redkale/service/DataCacheListenerService.java b/src/org/redkale/service/DataCacheListenerService.java deleted file mode 100644 index 7d1539296..000000000 --- a/src/org/redkale/service/DataCacheListenerService.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.service; - -import java.io.*; -import javax.annotation.*; -import org.redkale.source.*; -import org.redkale.util.*; - -/** - * 实现进程间DataSource的缓存数据同步 - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - */ -@AutoLoad(false) -@ResourceType(DataCacheListener.class) -public class DataCacheListenerService implements DataCacheListener, Service { - - @Resource(name = "$") - private DataSource source; - - @Override - @RpcMultiRun(selfrun = false, async = true) - public int insertCache(Class clazz, T... entitys) { - if (!(source instanceof DataCacheListener)) return -2; - return ((DataCacheListener) source).insertCache(clazz, entitys); - } - - @Override - @RpcMultiRun(selfrun = false, async = true) - public int updateCache(Class clazz, T... entitys) { - if (!(source instanceof DataCacheListener)) return -2; - return ((DataCacheListener) source).updateCache(clazz, entitys); - } - - @Override - @RpcMultiRun(selfrun = false, async = true) - public int deleteCache(Class clazz, Serializable... ids) { - if (!(source instanceof DataCacheListener)) return -2; - return ((DataCacheListener) source).deleteCache(clazz, ids); - } - -} diff --git a/src/org/redkale/source/DataCacheListener.java b/src/org/redkale/source/DataCacheListener.java deleted file mode 100644 index 5d840188c..000000000 --- a/src/org/redkale/source/DataCacheListener.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package org.redkale.source; - -import java.io.Serializable; - -/** - * - *

- * 详情见: https://redkale.org - * - * @author zhangjx - */ -public interface DataCacheListener { - - public int insertCache(Class clazz, T... entitys); - - public int updateCache(Class clazz, T... entitys); - - public int deleteCache(Class clazz, Serializable... pks); -} diff --git a/src/org/redkale/source/DataSqlSource.java b/src/org/redkale/source/DataSqlSource.java index 6e400e946..455980e38 100644 --- a/src/org/redkale/source/DataSqlSource.java +++ b/src/org/redkale/source/DataSqlSource.java @@ -15,7 +15,6 @@ import java.util.concurrent.atomic.*; import java.util.function.*; import java.util.logging.*; import java.util.stream.Stream; -import javax.annotation.Resource; import org.redkale.service.*; import static org.redkale.source.DataSources.*; import org.redkale.util.*; @@ -34,7 +33,7 @@ import org.redkale.util.*; @AutoLoad(false) @SuppressWarnings("unchecked") @ResourceType(DataSource.class) -public abstract class DataSqlSource extends AbstractService implements DataSource, DataCacheListener, Function, AutoCloseable, Resourcable { +public abstract class DataSqlSource extends AbstractService implements DataSource, Function, AutoCloseable, Resourcable { protected static final Flipper FLIPPER_ONE = new Flipper(1); @@ -56,9 +55,6 @@ public abstract class DataSqlSource extends AbstractService implement protected PoolSource writePool; - @Resource(name = "$") - protected DataCacheListener cacheListener; - protected final BiFunction sqlFormatter; protected final BiConsumer futureCompleteConsumer = (r, t) -> { @@ -387,20 +383,6 @@ public abstract class DataSqlSource extends AbstractService implement for (final T value : entitys) { c += cache.insert(value); } - if (cacheListener != null) cacheListener.insertCache(info.getType(), entitys); - return c; - } - - @Override - public int insertCache(Class clazz, T... entitys) { - if (entitys.length == 0) return 0; - final EntityInfo info = loadEntityInfo(clazz); - final EntityCache cache = info.getCache(); - if (cache == null) return -1; - int c = 0; - for (T value : entitys) { - c += cache.insert(value); - } return c; } @@ -683,7 +665,6 @@ public abstract class DataSqlSource extends AbstractService implement final EntityCache cache = info.getCache(); if (cache == null) return -1; Serializable[] ids = cache.delete(flipper, node); - if (cacheListener != null) cacheListener.deleteCache(info.getType(), ids); return count >= 0 ? count : (ids == null ? 0 : ids.length); } @@ -694,23 +675,9 @@ public abstract class DataSqlSource extends AbstractService implement for (Serializable key : pks) { c += cache.delete(key); } - if (cacheListener != null) cacheListener.deleteCache(info.getType(), pks); return count >= 0 ? count : c; } - @Override - public int deleteCache(Class clazz, Serializable... pks) { - if (pks.length == 0) return 0; - final EntityInfo info = loadEntityInfo(clazz); - final EntityCache cache = info.getCache(); - if (cache == null) return -1; - int c = 0; - for (Serializable id : pks) { - c += cache.delete(id); - } - return c; - } - protected static StringBuilder multisplit(char ch1, char ch2, String split, StringBuilder sb, String str, int from) { if (str == null) return sb; int pos1 = str.indexOf(ch1, from); @@ -1220,11 +1187,9 @@ public abstract class DataSqlSource extends AbstractService implement } if (neednode) { T[] rs = cache.update(entity, attrs, node); - if (cacheListener != null) cacheListener.updateCache(info.getType(), rs); return count >= 0 ? count : (rs == null ? 0 : rs.length); } else { T rs = cache.update(entity, attrs); - if (cacheListener != null) cacheListener.updateCache(info.getType(), rs); return count >= 0 ? count : (rs == null ? 0 : 1); } } @@ -1242,7 +1207,6 @@ public abstract class DataSqlSource extends AbstractService implement cols.add(col); } T[] rs = cache.updateColumn(node, flipper, attrs, cols); - if (cacheListener != null) cacheListener.updateCache(info.getType(), rs); return count >= 0 ? count : (rs == null ? 0 : 1); } @@ -1259,7 +1223,6 @@ public abstract class DataSqlSource extends AbstractService implement cols.add(col); } T rs = cache.updateColumn(pk, attrs, cols); - if (cacheListener != null) cacheListener.updateCache(info.getType(), rs); return count >= 0 ? count : (rs == null ? 0 : 1); } @@ -1267,7 +1230,6 @@ public abstract class DataSqlSource extends AbstractService implement final EntityCache cache = info.getCache(); if (cache == null) return count; T[] rs = cache.update(info.getAttribute(column), colval, node); - if (cacheListener != null) cacheListener.updateCache(info.getType(), rs); return count >= 0 ? count : (rs == null ? 0 : 1); } @@ -1275,7 +1237,6 @@ public abstract class DataSqlSource extends AbstractService implement final EntityCache cache = info.getCache(); if (cache == null) return count; T rs = cache.update(pk, info.getAttribute(column), colval); - if (cacheListener != null) cacheListener.updateCache(info.getType(), rs); return count >= 0 ? count : (rs == null ? 0 : 1); } @@ -1286,23 +1247,9 @@ public abstract class DataSqlSource extends AbstractService implement for (final T value : entitys) { c2 += cache.update(value); } - if (cacheListener != null) cacheListener.updateCache(info.getType(), entitys); return count >= 0 ? count : c2; } - @Override - public int updateCache(Class clazz, T... entitys) { - if (entitys.length == 0) return 0; - final EntityInfo info = loadEntityInfo(clazz); - final EntityCache cache = info.getCache(); - if (cache == null) return -1; - int c = 0; - for (T value : entitys) { - c += cache.update(value); - } - return c; - } - public int reloadCache(Class clazz, Serializable... pks) { final EntityInfo info = loadEntityInfo(clazz); final EntityCache cache = info.getCache();