guoziyun 7 月之前
父節點
當前提交
9a4b32fd06
共有 2 個文件被更改,包括 226 次插入38 次删除
  1. 38 38
      oms/dist/services/cron-jobs/done-rate.js
  2. 188 0
      oms/dist/services/cron-jobs/done-rate_bak.js

+ 38 - 38
oms/dist/services/cron-jobs/done-rate.js

@@ -1,5 +1,4 @@
 "use strict";
-// oms/services/cron-jobs/done-rate.ts
 var __importDefault = (this && this.__importDefault) || function (mod) {
     return (mod && mod.__esModule) ? mod : { "default": mod };
 };
@@ -8,12 +7,13 @@ const doneRateService_1 = __importDefault(require("../../src/services/doneRateSe
 const clients_1 = require("../../src/services/clients");
 const mongoose_1 = __importDefault(require("mongoose")); // 导入 mongoose 和 Connection 用于处理远程连接
 const totalDoneRateModel_1 = __importDefault(require("../../src/models/totalDoneRateModel")); // 导入 TotalDoneRate 模型 (已包含 totalTipCount)
+const doneRateModel_1 = __importDefault(require("../../src/models/doneRateModel")); // 【新增】导入 DoneRateModel 用于聚合历史数据
 // ClickHouse 表名
 const CLICKHOUSE_EVENTS_TABLE = "events"; // 确保与 ClickHouseService 中的表名一致
 /**
  * 每日统计昨天的作品完成率。
- * 统计逻辑从 ClickHouse 中提取数据,得到每个作品的完成情况、道具使用情况,并更新到 doneRateModel
- * 随后,根据这些日统计数据,累加更新本地的 totalDoneRate 表 (包括 totalTipCount)。
+ * 统计逻辑从 ClickHouse 中提取数据,得到每个作品的完成情况、道具使用情况,并更新到 DoneRate 模型
+ * 随后,根据这些日统计数据,通过聚合 TotalDoneRate 表 (包括 totalTipCount)。
  * @returns Promise<string> - 返回统计结果的摘要信息。
  */
 async function run() {
@@ -29,6 +29,7 @@ async function run() {
     console.log(`[DoneRate Cron] Processing data for date: ${yesterdayYYYYMMDD}`);
     try {
         // --- 1. 从 ClickHouse 中提取数据 (Start, Done, Tip Counts) ---
+        // (此处是 ClickHouse 可能发生超时的地方)
         // 1.1 查询昨天每个作品的独立开始用户数
         const startCountsQuery = `
       SELECT
@@ -119,42 +120,40 @@ async function run() {
         }
         const totalProcessedArtworks = createdRecordsCount + updatedRecordsCount;
         console.log(`[DoneRate Cron] DoneRate model (Daily) update completed. Total artworks processed: ${totalProcessedArtworks}. Created: ${createdRecordsCount}, Updated: ${updatedRecordsCount}.`);
-        // --- 3. 获取昨天的 DoneRate 记录,并更新本地的 TotalDoneRate 表 (累计记录) ---
+        // --- 3. 重新聚合并更新本地的 TotalDoneRate 表 (累计记录) ---
+        // 🚨 修正:原先的 Read-Modify-Write 逻辑被替换为更健壮的 MongoDB 聚合。
+        // 通过聚合 DoneRate 日记录来计算最新的累计总数,确保幂等性。
+        // 3.1 聚合所有历史 DoneRate 记录直到昨天
+        const aggregationPipeline = [
+            // 匹配所有小于或等于昨天的日期记录
+            { $match: { date: { $lte: yesterdayYYYYMMDD } } },
+            {
+                $group: {
+                    _id: "$res", // 按作品ID分组
+                    totalStartCount: { $sum: "$startCount" },
+                    totalDoneCount: { $sum: "$doneCount" },
+                    totalTipCount: { $sum: "$tipCount" },
+                },
+            },
+        ];
+        const aggregatedTotals = await doneRateModel_1.default.aggregate(aggregationPipeline);
         let updatedTotalDoneRateCount = 0;
-        // 仅获取昨天更新或创建的记录,以保证数据源为最新
-        const yesterdayDoneRates = await doneRateService_1.default.getDoneRatesByDate(yesterdayYYYYMMDD);
-        const totalDoneRatesToUpdate = yesterdayDoneRates.length;
         let updateStartTime = new Date().getTime();
-        console.log(`[DoneRate Cron] 开始更新本地 TotalDoneRate 表。总计 ${totalDoneRatesToUpdate} 条记录。`);
-        for (let i = 0; i < totalDoneRatesToUpdate; i++) {
-            const doneRateDoc = yesterdayDoneRates[i];
+        console.log(`[DoneRate Cron] 开始通过聚合更新本地 TotalDoneRate 表。总计 ${aggregatedTotals.length} 个作品的累计数据。`);
+        // 3.2 遍历聚合结果,覆盖式更新 TotalDoneRate
+        for (let i = 0; i < aggregatedTotals.length; i++) {
+            const totalData = aggregatedTotals[i];
+            const artworkId = totalData._id;
+            const { totalStartCount, totalDoneCount, totalTipCount } = totalData;
+            // 1. 计算总完成率
+            const newCompletionRate = totalStartCount > 0 ? (totalDoneCount / totalStartCount) * 100 : 0;
             try {
-                const artworkId = doneRateDoc.res; // 获取作品 ObjectId
-                // 查找现有的 TotalDoneRate 文档
-                const existingTotal = await totalDoneRateModel_1.default.findById(artworkId).lean().exec();
-                // 1. 初始化昨天的计数
-                const dailyStartCount = doneRateDoc.startCount;
-                const dailyDoneCount = doneRateDoc.doneCount;
-                const dailyTipCount = doneRateDoc.tipCount; // 使用昨天的道具使用次数
-                // 2. 计算累计总数
-                let newTotalStartCount = dailyStartCount;
-                let newTotalDoneCount = dailyDoneCount;
-                let newTotalTipCount = dailyTipCount; // 累计总道具使用次数
-                if (existingTotal) {
-                    // 如果已存在,则累加旧的总数
-                    // 注意:假设此 cron job 每天只运行一次,否则需要更复杂的幂等逻辑
-                    newTotalStartCount += existingTotal.totalStartCount || 0;
-                    newTotalDoneCount += existingTotal.totalDoneCount || 0;
-                    newTotalTipCount += existingTotal.totalTipCount || 0; // 累加 totalTipCount
-                }
-                // 3. 计算总完成率
-                const newCompletionRate = newTotalStartCount > 0 ? (newTotalDoneCount / newTotalStartCount) * 100 : 0;
-                // 4. 使用 findByIdAndUpdate 进行原子操作
+                // 2. 使用 findByIdAndUpdate 进行原子操作 (覆盖式更新)
                 const updatedDoc = await totalDoneRateModel_1.default.findByIdAndUpdate(artworkId, {
                     $set: {
-                        totalStartCount: newTotalStartCount,
-                        totalDoneCount: newTotalDoneCount,
-                        totalTipCount: newTotalTipCount, // 写入新的累计道具使用次数
+                        totalStartCount: totalStartCount,
+                        totalDoneCount: totalDoneCount,
+                        totalTipCount: totalTipCount, // 写入新的累计道具使用次数
                         completionRate: newCompletionRate,
                     },
                 }, { new: true, upsert: true } // upsert: true 表示如果文档不存在则创建
@@ -163,13 +162,13 @@ async function run() {
                     updatedTotalDoneRateCount++;
                 }
                 // 进度日志
-                if ((i + 1) % 50 === 0 || i === totalDoneRatesToUpdate - 1) {
+                if ((i + 1) % 50 === 0 || i === aggregatedTotals.length - 1) {
                     const elapsed = new Date().getTime() - updateStartTime;
-                    console.log(`[DoneRate Cron] 进度: 已更新 ${i + 1}/${totalDoneRatesToUpdate} 条本地 TotalDoneRate 记录, 当前耗时: ${elapsed}ms`);
+                    console.log(`[DoneRate Cron] 进度: 已更新 ${i + 1}/${aggregatedTotals.length} 条本地 TotalDoneRate 记录, 当前耗时: ${elapsed}ms`);
                 }
             }
             catch (totalUpdateError) {
-                console.error(`[DoneRate Cron] Error updating TotalDoneRate document for artwork ID ${doneRateDoc.res}:`, totalUpdateError);
+                console.error(`[DoneRate Cron] Error updating TotalDoneRate document for artwork ID ${artworkId}:`, totalUpdateError);
             }
         }
         const updateTimeTaken = new Date().getTime() - updateStartTime;
@@ -180,7 +179,8 @@ async function run() {
     }
     catch (error) {
         console.error(`[DoneRate Cron] Error during done-rate calculation for ${yesterdayYYYYMMDD}:`, error);
-        throw new Error("Failed to calculate daily done-rates."); // 抛出错误以通知 cron 调度器
+        // 重新抛出错误,以便 cron 调度器能够捕获并记录失败
+        throw new Error(`Failed to calculate daily done-rates: ${error instanceof Error ? error.message : String(error)}`);
     }
     finally {
     }

+ 188 - 0
oms/dist/services/cron-jobs/done-rate_bak.js

@@ -0,0 +1,188 @@
+"use strict";
+// oms/services/cron-jobs/done-rate.ts
+var __importDefault = (this && this.__importDefault) || function (mod) {
+    return (mod && mod.__esModule) ? mod : { "default": mod };
+};
+const dayjs_1 = __importDefault(require("dayjs"));
+const doneRateService_1 = __importDefault(require("../../src/services/doneRateService")); // 导入 DoneRateService
+const clients_1 = require("../../src/services/clients");
+const mongoose_1 = __importDefault(require("mongoose")); // 导入 mongoose 和 Connection 用于处理远程连接
+const totalDoneRateModel_1 = __importDefault(require("../../src/models/totalDoneRateModel")); // 导入 TotalDoneRate 模型 (已包含 totalTipCount)
+// ClickHouse 表名
+const CLICKHOUSE_EVENTS_TABLE = "events"; // 确保与 ClickHouseService 中的表名一致
+/**
+ * 每日统计昨天的作品完成率。
+ * 统计逻辑从 ClickHouse 中提取数据,得到每个作品的完成情况、道具使用情况,并更新到 doneRateModel。
+ * 随后,根据这些日统计数据,累加更新本地的 totalDoneRate 表 (包括 totalTipCount)。
+ * @returns Promise<string> - 返回统计结果的摘要信息。
+ */
+async function run() {
+    console.log("[DoneRate Cron] Starting daily done-rate calculation for yesterday...");
+    // 获取昨天和今天的日期
+    const yesterday = (0, dayjs_1.default)().subtract(1, "day");
+    const yesterdayYYYYMMDD = yesterday.format("YYYYMMDD");
+    const yesterdayStart = yesterday.startOf("day").toDate();
+    const yesterdayEnd = yesterday.endOf("day").toDate();
+    // 格式化日期字符串,使其符合 ClickHouse 的 toDateTime() 函数要求
+    const yesterdayStartString = (0, dayjs_1.default)(yesterdayStart).format("YYYY-MM-DD HH:mm:ss");
+    const yesterdayEndString = (0, dayjs_1.default)(yesterdayEnd).format("YYYY-MM-DD HH:mm:ss");
+    console.log(`[DoneRate Cron] Processing data for date: ${yesterdayYYYYMMDD}`);
+    try {
+        // --- 1. 从 ClickHouse 中提取数据 (Start, Done, Tip Counts) ---
+        // 1.1 查询昨天每个作品的独立开始用户数
+        const startCountsQuery = `
+      SELECT
+          res,
+          count(DISTINCT uid) AS unique_starts
+      FROM ${CLICKHOUSE_EVENTS_TABLE}
+      WHERE event = 'color_start'
+        AND time >= toDateTime('${yesterdayStartString}')
+        AND time < toDateTime('${yesterdayEndString}')
+      GROUP BY res
+      HAVING res IS NOT NULL
+    `;
+        const startResults = await clients_1.clickhouseService.queryEvents(startCountsQuery);
+        const artworkStartCounts = new Map();
+        startResults.forEach((row) => {
+            if (row.res && mongoose_1.default.Types.ObjectId.isValid(row.res)) {
+                artworkStartCounts.set(row.res, row.unique_starts);
+            }
+            else {
+                console.warn(`[DoneRate Cron] Invalid artwork ID found in start_counts result: ${row.res}`);
+            }
+        });
+        console.log(`[DoneRate Cron] Retrieved ${startResults.length} unique start counts from ClickHouse.`);
+        // 1.2 查询昨天每个作品的独立完成用户数
+        const doneCountsQuery = `
+      SELECT
+          res,
+          count(DISTINCT uid) AS unique_dones
+      FROM ${CLICKHOUSE_EVENTS_TABLE}
+      WHERE event = 'color_done'
+        AND time >= toDateTime('${yesterdayStartString}')
+        AND time < toDateTime('${yesterdayEndString}')
+      GROUP BY res
+      HAVING res IS NOT NULL
+    `;
+        const doneResults = await clients_1.clickhouseService.queryEvents(doneCountsQuery);
+        const artworkDoneCounts = new Map();
+        doneResults.forEach((row) => {
+            if (row.res && mongoose_1.default.Types.ObjectId.isValid(row.res)) {
+                artworkDoneCounts.set(row.res, row.unique_dones);
+            }
+            else {
+                console.warn(`[DoneRate Cron] Invalid artwork ID found in done_counts result: ${row.res}`);
+            }
+        });
+        console.log(`[DoneRate Cron] Retrieved ${doneResults.length} unique done counts from ClickHouse.`);
+        // 1.3 查询昨天每个作品的使用道具数
+        const tipCountsQuery = `
+      SELECT
+          res,
+          count() AS tip_count
+      FROM ${CLICKHOUSE_EVENTS_TABLE}
+      WHERE event = 'color_tip'
+        AND time >= toDateTime('${yesterdayStartString}')
+        AND time < toDateTime('${yesterdayEndString}')
+      GROUP BY res
+      HAVING res IS NOT NULL
+    `;
+        const tipResults = await clients_1.clickhouseService.queryEvents(tipCountsQuery);
+        const artworkTipCounts = new Map();
+        tipResults.forEach((row) => {
+            if (row.res && mongoose_1.default.Types.ObjectId.isValid(row.res)) {
+                artworkTipCounts.set(row.res, row.tip_count);
+            }
+            else {
+                console.warn(`[DoneRate Cron] Invalid artwork ID found in tip_counts result: ${row.res}`);
+            }
+        });
+        console.log(`[DoneRate Cron] Retrieved ${tipResults.length} unique tip counts from ClickHouse.`);
+        // --- 2. 合并数据并更新 DoneRate 模型 (每日记录) ---
+        let updatedRecordsCount = 0;
+        let createdRecordsCount = 0;
+        // 获取所有需要处理的作品 ID 集合 (Start + Done + Tip)
+        const allResIds = new Set([...artworkStartCounts.keys(), ...artworkDoneCounts.keys(), ...artworkTipCounts.keys()]);
+        for (const resIdStr of allResIds.values()) {
+            const startCount = artworkStartCounts.get(resIdStr) || 0;
+            const doneCount = artworkDoneCounts.get(resIdStr) || 0;
+            const tipCount = artworkTipCounts.get(resIdStr) || 0;
+            const resObjectId = new mongoose_1.default.Types.ObjectId(resIdStr);
+            // 使用 DoneRateService 来创建或更新记录,并传入 tipCount
+            const doneRateDoc = await doneRateService_1.default.createOrUpdateDoneRate(yesterdayYYYYMMDD, resObjectId, startCount, doneCount, tipCount);
+            if (doneRateDoc.createdAt.getTime() === doneRateDoc.updatedAt.getTime()) {
+                createdRecordsCount++;
+            }
+            else {
+                updatedRecordsCount++;
+            }
+        }
+        const totalProcessedArtworks = createdRecordsCount + updatedRecordsCount;
+        console.log(`[DoneRate Cron] DoneRate model (Daily) update completed. Total artworks processed: ${totalProcessedArtworks}. Created: ${createdRecordsCount}, Updated: ${updatedRecordsCount}.`);
+        // --- 3. 获取昨天的 DoneRate 记录,并更新本地的 TotalDoneRate 表 (累计记录) ---
+        let updatedTotalDoneRateCount = 0;
+        // 仅获取昨天更新或创建的记录,以保证数据源为最新
+        const yesterdayDoneRates = await doneRateService_1.default.getDoneRatesByDate(yesterdayYYYYMMDD);
+        const totalDoneRatesToUpdate = yesterdayDoneRates.length;
+        let updateStartTime = new Date().getTime();
+        console.log(`[DoneRate Cron] 开始更新本地 TotalDoneRate 表。总计 ${totalDoneRatesToUpdate} 条记录。`);
+        for (let i = 0; i < totalDoneRatesToUpdate; i++) {
+            const doneRateDoc = yesterdayDoneRates[i];
+            try {
+                const artworkId = doneRateDoc.res; // 获取作品 ObjectId
+                // 查找现有的 TotalDoneRate 文档
+                const existingTotal = await totalDoneRateModel_1.default.findById(artworkId).lean().exec();
+                // 1. 初始化昨天的计数
+                const dailyStartCount = doneRateDoc.startCount;
+                const dailyDoneCount = doneRateDoc.doneCount;
+                const dailyTipCount = doneRateDoc.tipCount; // 使用昨天的道具使用次数
+                // 2. 计算累计总数
+                let newTotalStartCount = dailyStartCount;
+                let newTotalDoneCount = dailyDoneCount;
+                let newTotalTipCount = dailyTipCount; // 累计总道具使用次数
+                if (existingTotal) {
+                    // 如果已存在,则累加旧的总数
+                    // 注意:假设此 cron job 每天只运行一次,否则需要更复杂的幂等逻辑
+                    newTotalStartCount += existingTotal.totalStartCount || 0;
+                    newTotalDoneCount += existingTotal.totalDoneCount || 0;
+                    newTotalTipCount += existingTotal.totalTipCount || 0; // 累加 totalTipCount
+                }
+                // 3. 计算总完成率
+                const newCompletionRate = newTotalStartCount > 0 ? (newTotalDoneCount / newTotalStartCount) * 100 : 0;
+                // 4. 使用 findByIdAndUpdate 进行原子操作
+                const updatedDoc = await totalDoneRateModel_1.default.findByIdAndUpdate(artworkId, {
+                    $set: {
+                        totalStartCount: newTotalStartCount,
+                        totalDoneCount: newTotalDoneCount,
+                        totalTipCount: newTotalTipCount, // 写入新的累计道具使用次数
+                        completionRate: newCompletionRate,
+                    },
+                }, { new: true, upsert: true } // upsert: true 表示如果文档不存在则创建
+                );
+                if (updatedDoc) {
+                    updatedTotalDoneRateCount++;
+                }
+                // 进度日志
+                if ((i + 1) % 50 === 0 || i === totalDoneRatesToUpdate - 1) {
+                    const elapsed = new Date().getTime() - updateStartTime;
+                    console.log(`[DoneRate Cron] 进度: 已更新 ${i + 1}/${totalDoneRatesToUpdate} 条本地 TotalDoneRate 记录, 当前耗时: ${elapsed}ms`);
+                }
+            }
+            catch (totalUpdateError) {
+                console.error(`[DoneRate Cron] Error updating TotalDoneRate document for artwork ID ${doneRateDoc.res}:`, totalUpdateError);
+            }
+        }
+        const updateTimeTaken = new Date().getTime() - updateStartTime;
+        console.log(`[DoneRate Cron] 本地 TotalDoneRate 表更新完成。总计更新 ${updatedTotalDoneRateCount} 条记录,总耗时 ${updateTimeTaken}ms。`);
+        const summary = `[DoneRate Cron] Daily done-rate calculation for ${yesterdayYYYYMMDD} completed. Total DoneRate processed: ${totalProcessedArtworks}. Created DoneRate: ${createdRecordsCount}, Updated DoneRate: ${updatedRecordsCount}. Updated TotalDoneRate records: ${updatedTotalDoneRateCount}.`;
+        console.log(summary);
+        return summary;
+    }
+    catch (error) {
+        console.error(`[DoneRate Cron] Error during done-rate calculation for ${yesterdayYYYYMMDD}:`, error);
+        throw new Error("Failed to calculate daily done-rates."); // 抛出错误以通知 cron 调度器
+    }
+    finally {
+    }
+}
+module.exports = { run };