This commit is contained in:
2018-08-29 15:38:56 +08:00
parent b7a7dae495
commit f87954899f
5 changed files with 90 additions and 64 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -14,8 +14,8 @@
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<configuration> <configuration>
<source>1.8</source> <source>10</source>
<target>1.8</target> <target>10</target>
</configuration> </configuration>
</plugin> </plugin>
</plugins> </plugins>
@@ -25,7 +25,7 @@
<dependency> <dependency>
<groupId>org.redkale</groupId> <groupId>org.redkale</groupId>
<artifactId>redkale</artifactId> <artifactId>redkale</artifactId>
<version>1.9.5.2</version> <version>1.9.6</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.redkalex</groupId> <groupId>org.redkalex</groupId>

View File

@@ -1,26 +1,39 @@
package com.lxyer.bbs.base; package com.lxyer.bbs.base;
import com.lxyer.bbs.base.entity.Count; import com.lxyer.bbs.base.kit.LxyKit;
import com.lxyer.bbs.base.entity.VisLog;
import com.lxyer.bbs.base.user.UserInfo; import com.lxyer.bbs.base.user.UserInfo;
import com.lxyer.bbs.base.user.UserRecord;
import com.lxyer.bbs.base.user.UserService; import com.lxyer.bbs.base.user.UserService;
import com.lxyer.bbs.content.Content; import com.lxyer.bbs.content.Content;
import com.lxyer.bbs.content.ContentInfo; import com.lxyer.bbs.content.ContentInfo;
import com.lxyer.bbs.content.ContentService; import com.lxyer.bbs.content.ContentService;
import com.mongodb.Block;
import com.mongodb.MongoClient;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Accumulators;
import com.mongodb.client.model.Aggregates;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.redkale.net.http.RestMapping; import org.redkale.net.http.RestMapping;
import org.redkale.net.http.RestService; import org.redkale.net.http.RestService;
import org.redkale.source.ColumnValue; import org.redkale.source.ColumnValue;
import org.redkale.source.FilterExpress; import org.redkale.source.FilterExpress;
import org.redkale.source.FilterNode; import org.redkale.source.FilterNode;
import org.redkale.source.Flipper; import org.redkale.source.Flipper;
import org.redkale.util.AnyValue;
import org.redkale.util.Comment; import org.redkale.util.Comment;
import org.redkale.util.Sheet; import org.redkale.util.Sheet;
import org.redkale.util.Utility;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Function;
import static com.mongodb.client.model.Filters.*;
import static java.util.Arrays.asList;
/** /**
* Created by liangxianyou at 2018/6/20 22:54. * Created by liangxianyou at 2018/6/20 22:54.
@@ -33,12 +46,28 @@ public class TaskQueue<T extends Object> extends BaseService implements Runnable
@Resource @Resource
private UserService userService; private UserService userService;
@Resource(name = "property.mongo.host")
private String mongoHost;
@Resource(name = "property.mongo.database")
private String mongoDatabase;
protected static LinkedBlockingQueue queue = new LinkedBlockingQueue(); protected static LinkedBlockingQueue queue = new LinkedBlockingQueue();
private static MongoClient mongoClient;
private static MongoDatabase database;
private static MongoCollection<Document> visLog;
public TaskQueue() { public TaskQueue() {
new Thread(this).start(); new Thread(this).start();
} }
@Override
public void init(AnyValue config) {
mongoClient = new MongoClient(mongoHost, 27017);
database = mongoClient.getDatabase(winos ? mongoDatabase + "_dev": mongoDatabase);
visLog = database.getCollection("vis_log");
}
@RestMapping(ignore = true) @RestMapping(ignore = true)
public T take() throws InterruptedException { public T take() throws InterruptedException {
return (T) queue.take(); return (T) queue.take();
@@ -52,44 +81,44 @@ public class TaskQueue<T extends Object> extends BaseService implements Runnable
@Override @Override
@RestMapping(ignore = true, comment = "独立线程,用户访问行为记录到数据库") @RestMapping(ignore = true, comment = "独立线程,用户访问行为记录到数据库")
public void run() { public void run() {
do { try {
try { while (true){
T task = take(); Map logData = (Map) take();
//记录访问日志,如果是访问的文章详情:对文章访问数量更新 logData.put("ftime", String.format("%1$tY%1$tm%1$td%1$tH%1$tM%1$tS", logData.get("time")));
if (task instanceof VisLog) { visLog.insertOne(new Document(logData));
//System.out.println(task);
ArangoService.save(task).thenAcceptAsync((_task) -> { //在这里处理日志数据
VisLog visLog = (VisLog) _task; String uri = logData.get("uri")+"";
//[访问量]
String uri = visLog.getUri(); //[访问量]
if (uri != null && uri.startsWith("/jie/detail/")){ if (uri.startsWith("/jie/detail/")){
updateViewNum(visLog); updateViewNumAsync(logData);
}
});
} }
} catch (InterruptedException e) {
e.printStackTrace();
} }
} while (true); } catch (InterruptedException e) {
e.printStackTrace();
}
} }
@Comment("帖子阅读数处理") @Comment("帖子阅读数处理")
private void updateViewNum(VisLog visLog) { private void updateViewNumAsync(Map logData) {
CompletableFuture.runAsync(()->{
String aql = String.format("for d in vis_log_dev\n" + Bson filter = and(
" filter d.uri == '%s' and d.ip == '%s' and (d.userid == %s or d.userid==0)\n" + eq("uri", logData.get("uri"))//帖子
" collect WITH COUNT INTO total\n" + ,eq("ip", logData.get("ip"))//IP
" return total", visLog.getUri(), visLog.getIp(), visLog.getUserid()); ,or(
eq("userid", logData.get("userid"))//登录人
long total = ArangoService.findInt(aql); ,eq("userid", 0)//未登录userid=0
)
if (total <= 1) { );
String uri = visLog.getUri(); long count = visLog.count(filter);
int contentid = Integer.parseInt(uri.replace("/jie/detail/", "")); if (count <= 1){
source.updateColumn(Content.class, contentid, ColumnValue.inc("viewnum", 1)); String uri = logData.get("uri") + "";
} int contentid = Integer.parseInt(uri.replace("/jie/detail/", ""));
source.updateColumn(Content.class, contentid, ColumnValue.inc("viewnum", 1));
}
});
} }
@RestMapping(ignore = true, comment = "访问热帖数据") @RestMapping(ignore = true, comment = "访问热帖数据")
@@ -97,33 +126,31 @@ public class TaskQueue<T extends Object> extends BaseService implements Runnable
int limit = 8; int limit = 8;
String cacheKey = "hotView"; String cacheKey = "hotView";
Object ids = cacheSource.get(cacheKey); Object ids = cacheSource.get(cacheKey);
if (isEmpty.test(ids)){ if (ids == null){
Calendar cal = Calendar.getInstance(); Calendar cal = Calendar.getInstance();
cal.set(Calendar.DAY_OF_MONTH, -7); cal.set(Calendar.DAY_OF_MONTH, -7);
Map para = new HashMap();
para.put("time", cal.getTimeInMillis());
//查询一周某热帖记录 //查询一周某热帖记录
List<Count> hotArticle = ArangoService.find( Bson filter = and(ne("userid", 100001)
"for d in " + (isDev ? "vis_log_dev" : "vis_log") + "\n" + ,regex("uri", "/jie/detail/*")
" filter d.uri =~ '^/jie/detail/[0-9]+$' and d.userid != 100001 and d.time > @time\n" + ,ne("ip", "")
" COLLECT uri=d.uri WITH COUNT INTO total\n" + ,gt("time", cal.getTimeInMillis())
" sort total desc\n" + );
" limit 10\n" + List<Bson> list = asList(
" return {name: uri,total:total}", Aggregates.match(filter)
Utility.ofMap("time", cal.getTimeInMillis()), ,Aggregates.group("$uri", Accumulators.sum("count", 1))
Count.class); ,Aggregates.sort(new Document("count", -1))
,Aggregates.limit(8)
);
AggregateIterable<Document> documents = visLog.aggregate(list, Document.class);
Function<List<Count>, List<Integer>> deal = (counts) -> { List<Integer> _ids = new ArrayList<>(limit);
List<Integer> _ids = new ArrayList<>(); documents.forEach((Block<? super Document>) x->{
counts.forEach(x -> { String uri = x.getString("_id");
_ids.add(Integer.parseInt(x.getName().replace("/jie/detail/", ""))); _ids.add(Integer.parseInt(uri.replace("/jie/detail/", "")));
}); });
return _ids;
};
ids = deal.apply(hotArticle); cacheSource.set(30 * 60, cacheKey, ids = _ids);
cacheSource.set(30 * 60, cacheKey, ids);
} }
int[] contentids = new int[limit]; int[] contentids = new int[limit];
@@ -149,14 +176,14 @@ public class TaskQueue<T extends Object> extends BaseService implements Runnable
*/ */
@RestMapping(ignore = true, comment = "帖子访客记录") @RestMapping(ignore = true, comment = "帖子访客记录")
public Sheet<Map> readRecordAsync(Flipper flipper ,int contentid){ public Sheet<Map> readRecordAsync(Flipper flipper ,int contentid){
/*Bson filter = eq("uri", "/jie/detail/"+ contentid); Bson filter = eq("uri", "/jie/detail/"+ contentid);
FindIterable<Document> documents = visLog.find(filter).limit(flipper.getLimit()).skip(flipper.getOffset()); FindIterable<Document> documents = visLog.find(filter).limit(flipper.getLimit()).skip(flipper.getOffset());
long total = visLog.countDocuments(filter); long total = visLog.countDocuments(filter);
List<Map> rows = new ArrayList<>(); List<Map> rows = new ArrayList<>();
List<Integer> uids = new ArrayList<>(); List<Integer> uids = new ArrayList<>();
documents.forEach((Consumer<? super Document>) x->{ documents.forEach((Block<? super Document>) x->{
Integer userid = x.getInteger("userid"); Integer userid = x.getInteger("userid");
if (userid > 0) uids.add(userid); if (userid > 0) uids.add(userid);
@@ -178,7 +205,6 @@ public class TaskQueue<T extends Object> extends BaseService implements Runnable
sheet.setTotal(total); sheet.setTotal(total);
sheet.setRows(rows); sheet.setRows(rows);
return sheet;*/ return sheet;
return null;
} }
} }