|
|
@@ -1,4 +1,5 @@
|
|
|
import { MessageRecord, IMessageRecord } from "../models/messageRecordModel";
|
|
|
+import { MessageStatsDailyUid } from "../models/messageStatsDailyUidModel";
|
|
|
import { redisClient } from "./clients";
|
|
|
|
|
|
export class MessageRecordService {
|
|
|
@@ -24,6 +25,7 @@ export class MessageRecordService {
|
|
|
public static readonly DEFAULT_STATS_LIMIT = 50;
|
|
|
public static readonly MAX_STATS_LIMIT = 200;
|
|
|
private static readonly STATS_CACHE_TTL_SECONDS = 300;
|
|
|
+ private static readonly PREAGG_ENABLED = process.env.MESSAGE_STATS_PREAGG_ENABLED === "1";
|
|
|
|
|
|
private logPerf(endpoint: string, stage: string, durationMs: number, extra: Record<string, unknown> = {}) {
|
|
|
console.log(
|
|
|
@@ -47,6 +49,15 @@ export class MessageRecordService {
|
|
|
return 1;
|
|
|
}
|
|
|
|
|
|
+ private formatDateKeyInTimezone(date: Date): string {
|
|
|
+ return new Intl.DateTimeFormat("en-CA", {
|
|
|
+ timeZone: MessageRecordService.TIMEZONE,
|
|
|
+ year: "numeric",
|
|
|
+ month: "2-digit",
|
|
|
+ day: "2-digit",
|
|
|
+ }).format(date);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 创建一条新的消息推送记录
|
|
|
* @param recordData 消息记录数据
|
|
|
@@ -147,8 +158,24 @@ export class MessageRecordService {
|
|
|
return cached;
|
|
|
}
|
|
|
|
|
|
- const matchConditions = this.buildMatchConditions(startDate, endDate, strategyName);
|
|
|
- const result = await this.getStatisticsByGroup(matchConditions, []);
|
|
|
+ let result: any = null;
|
|
|
+ if (MessageRecordService.PREAGG_ENABLED && startDate && endDate) {
|
|
|
+ const preAggStartedAt = Date.now();
|
|
|
+ result = await this.getOverallFromPreAgg(startDate, endDate, strategyName);
|
|
|
+ if (result) {
|
|
|
+ this.logPerf("overall", "preagg", Date.now() - preAggStartedAt, {
|
|
|
+ startDate: startDate.toISOString(),
|
|
|
+ endDate: endDate.toISOString(),
|
|
|
+ hasStrategy: Boolean(strategyName),
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!result) {
|
|
|
+ const matchConditions = this.buildMatchConditions(startDate, endDate, strategyName);
|
|
|
+ result = await this.getStatisticsByGroup(matchConditions, []);
|
|
|
+ }
|
|
|
+
|
|
|
await this.setCache(cacheKey, result);
|
|
|
console.log(`[MessageStatsCache] miss key=${cacheKey}`);
|
|
|
return result;
|
|
|
@@ -187,9 +214,28 @@ export class MessageRecordService {
|
|
|
return cached;
|
|
|
}
|
|
|
|
|
|
- const matchConditions = this.buildMatchConditions(startDate, endDate, strategyName);
|
|
|
- const groupFields = ["strategyId", "strategyName"];
|
|
|
- const result = await this.getStatisticsByGroup(matchConditions, groupFields, { clickThroughRate: -1 }, page, limit);
|
|
|
+ let result: any[] = [];
|
|
|
+ if (MessageRecordService.PREAGG_ENABLED && startDate && endDate) {
|
|
|
+ const preAggStartedAt = Date.now();
|
|
|
+ result = await this.getByStrategyFromPreAgg(startDate, endDate, strategyName, page, limit);
|
|
|
+ if (Array.isArray(result) && result.length > 0) {
|
|
|
+ this.logPerf("by-strategy", "preagg", Date.now() - preAggStartedAt, {
|
|
|
+ startDate: startDate.toISOString(),
|
|
|
+ endDate: endDate.toISOString(),
|
|
|
+ hasStrategy: Boolean(strategyName),
|
|
|
+ page,
|
|
|
+ limit,
|
|
|
+ resultRows: result.length,
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!Array.isArray(result) || result.length === 0) {
|
|
|
+ const matchConditions = this.buildMatchConditions(startDate, endDate, strategyName);
|
|
|
+ const groupFields = ["strategyId", "strategyName"];
|
|
|
+ result = await this.getStatisticsByGroup(matchConditions, groupFields, { clickThroughRate: -1 }, page, limit);
|
|
|
+ }
|
|
|
+
|
|
|
await this.setCache(cacheKey, result);
|
|
|
console.log(`[MessageStatsCache] miss key=${cacheKey}`);
|
|
|
return result;
|
|
|
@@ -300,7 +346,24 @@ export class MessageRecordService {
|
|
|
return cached;
|
|
|
}
|
|
|
|
|
|
- const result = await this.getDailyTrendsByDimensions(startDate, endDate, strategyName, []);
|
|
|
+ let result: any[] = [];
|
|
|
+ if (MessageRecordService.PREAGG_ENABLED && startDate && endDate) {
|
|
|
+ const preAggStartedAt = Date.now();
|
|
|
+ result = await this.getDailyTrendsFromPreAgg(startDate, endDate, strategyName);
|
|
|
+ if (Array.isArray(result) && result.length > 0) {
|
|
|
+ this.logPerf("daily-trends", "preagg", Date.now() - preAggStartedAt, {
|
|
|
+ startDate: startDate.toISOString(),
|
|
|
+ endDate: endDate.toISOString(),
|
|
|
+ hasStrategy: Boolean(strategyName),
|
|
|
+ resultRows: result.length,
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!Array.isArray(result) || result.length === 0) {
|
|
|
+ result = await this.getDailyTrendsByDimensions(startDate, endDate, strategyName, []);
|
|
|
+ }
|
|
|
+
|
|
|
await this.setCache(cacheKey, result);
|
|
|
console.log(`[MessageStatsCache] miss key=${cacheKey}`);
|
|
|
return result;
|
|
|
@@ -367,13 +430,36 @@ export class MessageRecordService {
|
|
|
};
|
|
|
}
|
|
|
|
|
|
- // 任意 cache miss → 单次 $facet 查询替代 3 次独立聚合
|
|
|
- const matchConditions = this.buildMatchConditions(startDate, endDate, strategyName);
|
|
|
- const { overall, strategyStats, dailyTrends } = await this.runSummaryFacetQuery(
|
|
|
- matchConditions,
|
|
|
- safePage,
|
|
|
- safeLimit
|
|
|
- );
|
|
|
+ let overall: any = null;
|
|
|
+ let strategyStats: any[] = [];
|
|
|
+ let dailyTrends: any[] = [];
|
|
|
+
|
|
|
+ // 任意 cache miss:优先读取预聚合,预聚合不可用时回退实时 facet
|
|
|
+ if (MessageRecordService.PREAGG_ENABLED && startDate && endDate) {
|
|
|
+ const preAggResult = await this.runSummaryFromPreAgg(startDate, endDate, strategyName, safePage, safeLimit);
|
|
|
+ overall = preAggResult.overall;
|
|
|
+ strategyStats = preAggResult.strategyStats;
|
|
|
+ dailyTrends = preAggResult.dailyTrends;
|
|
|
+ this.logPerf("summary", "preagg-query", Date.now() - summaryStartedAt, {
|
|
|
+ overallRows: this.resolveResultRows(overall),
|
|
|
+ strategyRows: this.resolveResultRows(strategyStats),
|
|
|
+ dailyTrendRows: this.resolveResultRows(dailyTrends),
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!overall || strategyStats.length === 0 || dailyTrends.length === 0) {
|
|
|
+ const matchConditions = this.buildMatchConditions(startDate, endDate, strategyName);
|
|
|
+ const facetResult = await this.runSummaryFacetQuery(matchConditions, safePage, safeLimit);
|
|
|
+ if (!overall) {
|
|
|
+ overall = facetResult.overall;
|
|
|
+ }
|
|
|
+ if (strategyStats.length === 0) {
|
|
|
+ strategyStats = facetResult.strategyStats;
|
|
|
+ }
|
|
|
+ if (dailyTrends.length === 0) {
|
|
|
+ dailyTrends = facetResult.dailyTrends;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
// 回填各子查询缓存(未命中的才写入)
|
|
|
await Promise.all([
|
|
|
@@ -667,6 +753,189 @@ export class MessageRecordService {
|
|
|
};
|
|
|
}
|
|
|
|
|
|
+ private buildPreAggMatch(startDate: Date, endDate: Date, strategyName?: string): any {
|
|
|
+ const startKey = this.formatDateKeyInTimezone(startDate);
|
|
|
+ const endKey = this.formatDateKeyInTimezone(endDate);
|
|
|
+ const match: any = {
|
|
|
+ dateKey: {
|
|
|
+ $gte: startKey,
|
|
|
+ $lte: endKey,
|
|
|
+ },
|
|
|
+ };
|
|
|
+ if (strategyName) {
|
|
|
+ match.strategyName = strategyName;
|
|
|
+ }
|
|
|
+ return match;
|
|
|
+ }
|
|
|
+
|
|
|
+ private async getOverallFromPreAgg(startDate: Date, endDate: Date, strategyName?: string) {
|
|
|
+ const pipeline: any[] = [
|
|
|
+ { $match: this.buildPreAggMatch(startDate, endDate, strategyName) },
|
|
|
+ {
|
|
|
+ $group: {
|
|
|
+ _id: { status: "$status", inforeground: "$inforeground", uid: "$uid" },
|
|
|
+ count: { $sum: "$msgCount" },
|
|
|
+ },
|
|
|
+ },
|
|
|
+ {
|
|
|
+ $group: {
|
|
|
+ _id: { status: "$_id.status", inforeground: "$_id.inforeground" },
|
|
|
+ count: { $sum: "$count" },
|
|
|
+ uniqueUsers: { $sum: 1 },
|
|
|
+ },
|
|
|
+ },
|
|
|
+ {
|
|
|
+ $group: {
|
|
|
+ _id: null,
|
|
|
+ totalRecords: { $sum: "$count" },
|
|
|
+ ...this.getStatusAggregationFields(),
|
|
|
+ },
|
|
|
+ },
|
|
|
+ { $project: this.getStatisticsProjectFields([]) },
|
|
|
+ ];
|
|
|
+
|
|
|
+ const [result] = await MessageStatsDailyUid.aggregate(pipeline).allowDiskUse(true);
|
|
|
+ return result || null;
|
|
|
+ }
|
|
|
+
|
|
|
+ private async getByStrategyFromPreAgg(startDate: Date, endDate: Date, strategyName: string | undefined, page: number, limit: number) {
|
|
|
+ const safePage = Math.max(1, Math.floor(page || MessageRecordService.DEFAULT_STATS_PAGE));
|
|
|
+ const safeLimit = Math.min(
|
|
|
+ MessageRecordService.MAX_STATS_LIMIT,
|
|
|
+ Math.max(1, Math.floor(limit || MessageRecordService.DEFAULT_STATS_LIMIT))
|
|
|
+ );
|
|
|
+
|
|
|
+ const pipeline: any[] = [
|
|
|
+ { $match: this.buildPreAggMatch(startDate, endDate, strategyName) },
|
|
|
+ {
|
|
|
+ $group: {
|
|
|
+ _id: {
|
|
|
+ strategyId: "$strategyId",
|
|
|
+ strategyName: "$strategyName",
|
|
|
+ status: "$status",
|
|
|
+ inforeground: "$inforeground",
|
|
|
+ uid: "$uid",
|
|
|
+ },
|
|
|
+ count: { $sum: "$msgCount" },
|
|
|
+ },
|
|
|
+ },
|
|
|
+ {
|
|
|
+ $group: {
|
|
|
+ _id: {
|
|
|
+ strategyId: "$_id.strategyId",
|
|
|
+ strategyName: "$_id.strategyName",
|
|
|
+ status: "$_id.status",
|
|
|
+ inforeground: "$_id.inforeground",
|
|
|
+ },
|
|
|
+ count: { $sum: "$count" },
|
|
|
+ uniqueUsers: { $sum: 1 },
|
|
|
+ },
|
|
|
+ },
|
|
|
+ {
|
|
|
+ $group: {
|
|
|
+ _id: { strategyId: "$_id.strategyId", strategyName: "$_id.strategyName" },
|
|
|
+ strategyId: { $first: "$_id.strategyId" },
|
|
|
+ strategyName: { $first: "$_id.strategyName" },
|
|
|
+ totalRecords: { $sum: "$count" },
|
|
|
+ ...this.getStatusAggregationFields(),
|
|
|
+ },
|
|
|
+ },
|
|
|
+ { $project: this.getStatisticsProjectFields(["strategyId", "strategyName"]) },
|
|
|
+ { $sort: { clickThroughRate: -1 } },
|
|
|
+ { $skip: (safePage - 1) * safeLimit },
|
|
|
+ { $limit: safeLimit },
|
|
|
+ ];
|
|
|
+
|
|
|
+ return MessageStatsDailyUid.aggregate(pipeline).allowDiskUse(true);
|
|
|
+ }
|
|
|
+
|
|
|
+ private async getDailyTrendsFromPreAgg(startDate: Date, endDate: Date, strategyName?: string) {
|
|
|
+ const pipeline: any[] = [
|
|
|
+ { $match: this.buildPreAggMatch(startDate, endDate, strategyName) },
|
|
|
+ {
|
|
|
+ $group: {
|
|
|
+ _id: {
|
|
|
+ date: "$date",
|
|
|
+ status: "$status",
|
|
|
+ inforeground: "$inforeground",
|
|
|
+ uid: "$uid",
|
|
|
+ },
|
|
|
+ count: { $sum: "$msgCount" },
|
|
|
+ },
|
|
|
+ },
|
|
|
+ {
|
|
|
+ $group: {
|
|
|
+ _id: { date: "$_id.date", status: "$_id.status", inforeground: "$_id.inforeground" },
|
|
|
+ count: { $sum: "$count" },
|
|
|
+ uniqueUsers: { $sum: 1 },
|
|
|
+ },
|
|
|
+ },
|
|
|
+ {
|
|
|
+ $group: {
|
|
|
+ _id: "$_id.date",
|
|
|
+ date: { $first: "$_id.date" },
|
|
|
+ totalRecords: { $sum: "$count" },
|
|
|
+ ...this.getStatusAggregationFields(),
|
|
|
+ },
|
|
|
+ },
|
|
|
+ {
|
|
|
+ $project: {
|
|
|
+ _id: 0,
|
|
|
+ date: "$date",
|
|
|
+ totalRecords: "$totalRecords",
|
|
|
+ sent: "$sent",
|
|
|
+ delivered: "$delivered",
|
|
|
+ opened: "$opened",
|
|
|
+ failed: "$failed",
|
|
|
+ displayCount: "$displayCount",
|
|
|
+ displayedUsers: "$displayedUsers",
|
|
|
+ openedUsers: "$openedUsers",
|
|
|
+ sentSuccessRate: {
|
|
|
+ $cond: [{ $eq: ["$totalRecords", 0] }, 0, { $divide: ["$sent", "$totalRecords"] }],
|
|
|
+ },
|
|
|
+ deliveredRate: {
|
|
|
+ $cond: [{ $eq: ["$sent", 0] }, 0, { $divide: ["$delivered", "$sent"] }],
|
|
|
+ },
|
|
|
+ displayRate: {
|
|
|
+ $cond: [{ $eq: ["$delivered", 0] }, 0, { $divide: ["$displayCount", "$delivered"] }],
|
|
|
+ },
|
|
|
+ clickThroughRate: {
|
|
|
+ $cond: [{ $eq: ["$displayCount", 0] }, 0, { $divide: ["$opened", "$displayCount"] }],
|
|
|
+ },
|
|
|
+ tokenInvalidationRate: {
|
|
|
+ $cond: [{ $eq: ["$totalRecords", 0] }, 0, { $divide: ["$failed", "$totalRecords"] }],
|
|
|
+ },
|
|
|
+ actualClickThroughRate: {
|
|
|
+ $cond: [{ $eq: ["$displayedUsers", 0] }, 0, { $divide: ["$openedUsers", "$displayedUsers"] }],
|
|
|
+ },
|
|
|
+ },
|
|
|
+ },
|
|
|
+ { $sort: { date: -1 } },
|
|
|
+ ];
|
|
|
+
|
|
|
+ return MessageStatsDailyUid.aggregate(pipeline).allowDiskUse(true);
|
|
|
+ }
|
|
|
+
|
|
|
+ private async runSummaryFromPreAgg(
|
|
|
+ startDate: Date,
|
|
|
+ endDate: Date,
|
|
|
+ strategyName: string | undefined,
|
|
|
+ page: number,
|
|
|
+ limit: number
|
|
|
+ ): Promise<{ overall: any; strategyStats: any[]; dailyTrends: any[] }> {
|
|
|
+ const [overall, strategyStats, dailyTrends] = await Promise.all([
|
|
|
+ this.getOverallFromPreAgg(startDate, endDate, strategyName),
|
|
|
+ this.getByStrategyFromPreAgg(startDate, endDate, strategyName, page, limit),
|
|
|
+ this.getDailyTrendsFromPreAgg(startDate, endDate, strategyName),
|
|
|
+ ]);
|
|
|
+
|
|
|
+ return {
|
|
|
+ overall,
|
|
|
+ strategyStats: strategyStats || [],
|
|
|
+ dailyTrends: dailyTrends || [],
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* summary 首屏接口专用:单次 $facet 查询同时计算 overall / byStrategy / dailyTrends,
|
|
|
* 替代 3 次独立聚合,减少对同一时间窗口数据的重复扫描。
|