diff --git a/lib/redkale-plugins.jar b/lib/redkale-plugins.jar index 7cad5f2..214224d 100644 Binary files a/lib/redkale-plugins.jar and b/lib/redkale-plugins.jar differ diff --git a/lib/redkale.jar b/lib/redkale.jar index 05f86f9..a720365 100644 Binary files a/lib/redkale.jar and b/lib/redkale.jar differ diff --git a/libs/redbbs.jar b/libs/redbbs.jar index 8e86e30..7d89a57 100644 Binary files a/libs/redbbs.jar and b/libs/redbbs.jar differ diff --git a/pom.xml b/pom.xml index e64f3c2..0bfe40e 100644 --- a/pom.xml +++ b/pom.xml @@ -14,8 +14,8 @@ org.apache.maven.plugins maven-compiler-plugin - 1.8 - 1.8 + 10 + 10 @@ -25,7 +25,7 @@ org.redkale redkale - 1.9.5.2 + 1.9.6 org.redkalex diff --git a/src/com/lxyer/bbs/base/TaskQueue.java b/src/com/lxyer/bbs/base/TaskQueue.java index a51ead1..f16e2e9 100644 --- a/src/com/lxyer/bbs/base/TaskQueue.java +++ b/src/com/lxyer/bbs/base/TaskQueue.java @@ -1,26 +1,39 @@ package com.lxyer.bbs.base; -import com.lxyer.bbs.base.entity.Count; -import com.lxyer.bbs.base.entity.VisLog; +import com.lxyer.bbs.base.kit.LxyKit; import com.lxyer.bbs.base.user.UserInfo; +import com.lxyer.bbs.base.user.UserRecord; import com.lxyer.bbs.base.user.UserService; import com.lxyer.bbs.content.Content; import com.lxyer.bbs.content.ContentInfo; import com.lxyer.bbs.content.ContentService; +import com.mongodb.Block; +import com.mongodb.MongoClient; +import com.mongodb.client.AggregateIterable; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.Accumulators; +import com.mongodb.client.model.Aggregates; +import org.bson.Document; +import org.bson.conversions.Bson; import org.redkale.net.http.RestMapping; import org.redkale.net.http.RestService; import org.redkale.source.ColumnValue; import org.redkale.source.FilterExpress; import org.redkale.source.FilterNode; import org.redkale.source.Flipper; +import org.redkale.util.AnyValue; import org.redkale.util.Comment; import org.redkale.util.Sheet; -import org.redkale.util.Utility; import javax.annotation.Resource; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; -import java.util.function.Function; + +import static com.mongodb.client.model.Filters.*; +import static java.util.Arrays.asList; /** * Created by liangxianyou at 2018/6/20 22:54. @@ -33,12 +46,28 @@ public class TaskQueue extends BaseService implements Runnable @Resource private UserService userService; + @Resource(name = "property.mongo.host") + private String mongoHost; + @Resource(name = "property.mongo.database") + private String mongoDatabase; + protected static LinkedBlockingQueue queue = new LinkedBlockingQueue(); + private static MongoClient mongoClient; + private static MongoDatabase database; + private static MongoCollection visLog; + public TaskQueue() { new Thread(this).start(); } + @Override + public void init(AnyValue config) { + mongoClient = new MongoClient(mongoHost, 27017); + database = mongoClient.getDatabase(winos ? mongoDatabase + "_dev": mongoDatabase); + visLog = database.getCollection("vis_log"); + } + @RestMapping(ignore = true) public T take() throws InterruptedException { return (T) queue.take(); @@ -52,44 +81,44 @@ public class TaskQueue extends BaseService implements Runnable @Override @RestMapping(ignore = true, comment = "独立线程,用户访问行为记录到数据库") public void run() { - do { - try { - T task = take(); + try { + while (true){ + Map logData = (Map) take(); - //记录访问日志,如果是访问的文章详情:对文章访问数量更新 - if (task instanceof VisLog) { - //System.out.println(task); - ArangoService.save(task).thenAcceptAsync((_task) -> { - VisLog visLog = (VisLog) _task; - //[访问量] - String uri = visLog.getUri(); - if (uri != null && uri.startsWith("/jie/detail/")){ - updateViewNum(visLog); - } - }); + logData.put("ftime", String.format("%1$tY%1$tm%1$td%1$tH%1$tM%1$tS", logData.get("time"))); + visLog.insertOne(new Document(logData)); + + //在这里处理日志数据 + String uri = logData.get("uri")+""; + + //[访问量] + if (uri.startsWith("/jie/detail/")){ + updateViewNumAsync(logData); } - - } catch (InterruptedException e) { - e.printStackTrace(); } - } while (true); + } catch (InterruptedException e) { + e.printStackTrace(); + } } @Comment("帖子阅读数处理") - private void updateViewNum(VisLog visLog) { - - String aql = String.format("for d in vis_log_dev\n" + - " filter d.uri == '%s' and d.ip == '%s' and (d.userid == %s or d.userid==0)\n" + - " collect WITH COUNT INTO total\n" + - " return total", visLog.getUri(), visLog.getIp(), visLog.getUserid()); - - long total = ArangoService.findInt(aql); - - if (total <= 1) { - String uri = visLog.getUri(); - int contentid = Integer.parseInt(uri.replace("/jie/detail/", "")); - source.updateColumn(Content.class, contentid, ColumnValue.inc("viewnum", 1)); - } + private void updateViewNumAsync(Map logData) { + CompletableFuture.runAsync(()->{ + Bson filter = and( + eq("uri", logData.get("uri"))//帖子 + ,eq("ip", logData.get("ip"))//IP + ,or( + eq("userid", logData.get("userid"))//登录人 + ,eq("userid", 0)//未登录userid=0 + ) + ); + long count = visLog.count(filter); + if (count <= 1){ + String uri = logData.get("uri") + ""; + int contentid = Integer.parseInt(uri.replace("/jie/detail/", "")); + source.updateColumn(Content.class, contentid, ColumnValue.inc("viewnum", 1)); + } + }); } @RestMapping(ignore = true, comment = "访问热帖数据") @@ -97,33 +126,31 @@ public class TaskQueue extends BaseService implements Runnable int limit = 8; String cacheKey = "hotView"; Object ids = cacheSource.get(cacheKey); - if (isEmpty.test(ids)){ + if (ids == null){ Calendar cal = Calendar.getInstance(); cal.set(Calendar.DAY_OF_MONTH, -7); - Map para = new HashMap(); - para.put("time", cal.getTimeInMillis()); //查询一周某热帖记录 - List hotArticle = ArangoService.find( - "for d in " + (isDev ? "vis_log_dev" : "vis_log") + "\n" + - " filter d.uri =~ '^/jie/detail/[0-9]+$' and d.userid != 100001 and d.time > @time\n" + - " COLLECT uri=d.uri WITH COUNT INTO total\n" + - " sort total desc\n" + - " limit 10\n" + - " return {name: uri,total:total}", - Utility.ofMap("time", cal.getTimeInMillis()), - Count.class); + Bson filter = and(ne("userid", 100001) + ,regex("uri", "/jie/detail/*") + ,ne("ip", "") + ,gt("time", cal.getTimeInMillis()) + ); + List list = asList( + Aggregates.match(filter) + ,Aggregates.group("$uri", Accumulators.sum("count", 1)) + ,Aggregates.sort(new Document("count", -1)) + ,Aggregates.limit(8) + ); + AggregateIterable documents = visLog.aggregate(list, Document.class); - Function, List> deal = (counts) -> { - List _ids = new ArrayList<>(); - counts.forEach(x -> { - _ids.add(Integer.parseInt(x.getName().replace("/jie/detail/", ""))); - }); - return _ids; - }; + List _ids = new ArrayList<>(limit); + documents.forEach((Block) x->{ + String uri = x.getString("_id"); + _ids.add(Integer.parseInt(uri.replace("/jie/detail/", ""))); + }); - ids = deal.apply(hotArticle); - cacheSource.set(30 * 60, cacheKey, ids); + cacheSource.set(30 * 60, cacheKey, ids = _ids); } int[] contentids = new int[limit]; @@ -149,14 +176,14 @@ public class TaskQueue extends BaseService implements Runnable */ @RestMapping(ignore = true, comment = "帖子访客记录") public Sheet readRecordAsync(Flipper flipper ,int contentid){ - /*Bson filter = eq("uri", "/jie/detail/"+ contentid); + Bson filter = eq("uri", "/jie/detail/"+ contentid); FindIterable documents = visLog.find(filter).limit(flipper.getLimit()).skip(flipper.getOffset()); long total = visLog.countDocuments(filter); List rows = new ArrayList<>(); List uids = new ArrayList<>(); - documents.forEach((Consumer) x->{ + documents.forEach((Block) x->{ Integer userid = x.getInteger("userid"); if (userid > 0) uids.add(userid); @@ -178,7 +205,6 @@ public class TaskQueue extends BaseService implements Runnable sheet.setTotal(total); sheet.setRows(rows); - return sheet;*/ - return null; + return sheet; } }