guoziyun 7 bulan lalu
induk
melakukan
8ad4d811a7
2 mengubah file dengan 278 tambahan dan 46 penghapusan
  1. 43 46
      oms/services/cron-jobs/done-rate.ts
  2. 235 0
      oms/services/cron-jobs/done-rate_bak.ts

+ 43 - 46
oms/services/cron-jobs/done-rate.ts

@@ -1,10 +1,9 @@
-// oms/services/cron-jobs/done-rate.ts
-
 import dayjs from "dayjs";
 import doneRateService from "../../src/services/doneRateService"; // 导入 DoneRateService
 import { clickhouseService } from "../../src/services/clients";
 import mongoose, { Connection } from "mongoose"; // 导入 mongoose 和 Connection 用于处理远程连接
 import TotalDoneRate from "../../src/models/totalDoneRateModel"; // 导入 TotalDoneRate 模型 (已包含 totalTipCount)
+import DoneRateModel from "../../src/models/doneRateModel"; // 【新增】导入 DoneRateModel 用于聚合历史数据
 
 // ClickHouse 表名
 const CLICKHOUSE_EVENTS_TABLE = "events"; // 确保与 ClickHouseService 中的表名一致
@@ -35,8 +34,8 @@ interface ClickHouseTipCountResult {
 
 /**
  * 每日统计昨天的作品完成率。
- * 统计逻辑从 ClickHouse 中提取数据,得到每个作品的完成情况、道具使用情况,并更新到 doneRateModel
- * 随后,根据这些日统计数据,累加更新本地的 totalDoneRate 表 (包括 totalTipCount)。
+ * 统计逻辑从 ClickHouse 中提取数据,得到每个作品的完成情况、道具使用情况,并更新到 DoneRate 模型
+ * 随后,根据这些日统计数据,通过聚合 TotalDoneRate 表 (包括 totalTipCount)。
  * @returns Promise<string> - 返回统计结果的摘要信息。
  */
 async function run(): Promise<string> {
@@ -56,6 +55,7 @@ async function run(): Promise<string> {
 
   try {
     // --- 1. 从 ClickHouse 中提取数据 (Start, Done, Tip Counts) ---
+    // (此处是 ClickHouse 可能发生超时的地方)
 
     // 1.1 查询昨天每个作品的独立开始用户数
     const startCountsQuery = `
@@ -152,53 +152,49 @@ async function run(): Promise<string> {
 
     console.log(`[DoneRate Cron] DoneRate model (Daily) update completed. Total artworks processed: ${totalProcessedArtworks}. Created: ${createdRecordsCount}, Updated: ${updatedRecordsCount}.`);
 
-    // --- 3. 获取昨天的 DoneRate 记录,并更新本地的 TotalDoneRate 表 (累计记录) ---
-
+    // --- 3. 重新聚合并更新本地的 TotalDoneRate 表 (累计记录) ---
+    // 🚨 修正:原先的 Read-Modify-Write 逻辑被替换为更健壮的 MongoDB 聚合。
+    // 通过聚合 DoneRate 日记录来计算最新的累计总数,确保幂等性。
+
+    // 3.1 聚合所有历史 DoneRate 记录直到昨天
+    const aggregationPipeline = [
+      // 匹配所有小于或等于昨天的日期记录
+      { $match: { date: { $lte: yesterdayYYYYMMDD } } },
+      {
+        $group: {
+          _id: "$res", // 按作品ID分组
+          totalStartCount: { $sum: "$startCount" },
+          totalDoneCount: { $sum: "$doneCount" },
+          totalTipCount: { $sum: "$tipCount" },
+        },
+      },
+    ];
+
+    const aggregatedTotals = await DoneRateModel.aggregate(aggregationPipeline);
     let updatedTotalDoneRateCount = 0;
-    // 仅获取昨天更新或创建的记录,以保证数据源为最新
-    const yesterdayDoneRates = await doneRateService.getDoneRatesByDate(yesterdayYYYYMMDD);
-
-    const totalDoneRatesToUpdate = yesterdayDoneRates.length;
     let updateStartTime = new Date().getTime();
-    console.log(`[DoneRate Cron] 开始更新本地 TotalDoneRate 表。总计 ${totalDoneRatesToUpdate} 条记录。`);
 
-    for (let i = 0; i < totalDoneRatesToUpdate; i++) {
-      const doneRateDoc = yesterdayDoneRates[i];
-      try {
-        const artworkId = doneRateDoc.res; // 获取作品 ObjectId
-
-        // 查找现有的 TotalDoneRate 文档
-        const existingTotal = await TotalDoneRate.findById(artworkId).lean().exec();
-
-        // 1. 初始化昨天的计数
-        const dailyStartCount = doneRateDoc.startCount;
-        const dailyDoneCount = doneRateDoc.doneCount;
-        const dailyTipCount = doneRateDoc.tipCount; // 使用昨天的道具使用次数
-
-        // 2. 计算累计总数
-        let newTotalStartCount = dailyStartCount;
-        let newTotalDoneCount = dailyDoneCount;
-        let newTotalTipCount = dailyTipCount; // 累计总道具使用次数
-
-        if (existingTotal) {
-          // 如果已存在,则累加旧的总数
-          // 注意:假设此 cron job 每天只运行一次,否则需要更复杂的幂等逻辑
-          newTotalStartCount += existingTotal.totalStartCount || 0;
-          newTotalDoneCount += existingTotal.totalDoneCount || 0;
-          newTotalTipCount += existingTotal.totalTipCount || 0; // 累加 totalTipCount
-        }
+    console.log(`[DoneRate Cron] 开始通过聚合更新本地 TotalDoneRate 表。总计 ${aggregatedTotals.length} 个作品的累计数据。`);
+
+    // 3.2 遍历聚合结果,覆盖式更新 TotalDoneRate
+    for (let i = 0; i < aggregatedTotals.length; i++) {
+      const totalData = aggregatedTotals[i];
+      const artworkId = totalData._id;
 
-        // 3. 计算总完成率
-        const newCompletionRate = newTotalStartCount > 0 ? (newTotalDoneCount / newTotalStartCount) * 100 : 0;
+      const { totalStartCount, totalDoneCount, totalTipCount } = totalData;
 
-        // 4. 使用 findByIdAndUpdate 进行原子操作
+      // 1. 计算总完成率
+      const newCompletionRate = totalStartCount > 0 ? (totalDoneCount / totalStartCount) * 100 : 0;
+
+      try {
+        // 2. 使用 findByIdAndUpdate 进行原子操作 (覆盖式更新)
         const updatedDoc = await TotalDoneRate.findByIdAndUpdate(
           artworkId,
           {
             $set: {
-              totalStartCount: newTotalStartCount,
-              totalDoneCount: newTotalDoneCount,
-              totalTipCount: newTotalTipCount, // 写入新的累计道具使用次数
+              totalStartCount: totalStartCount,
+              totalDoneCount: totalDoneCount,
+              totalTipCount: totalTipCount, // 写入新的累计道具使用次数
               completionRate: newCompletionRate,
             },
           },
@@ -210,12 +206,12 @@ async function run(): Promise<string> {
         }
 
         // 进度日志
-        if ((i + 1) % 50 === 0 || i === totalDoneRatesToUpdate - 1) {
+        if ((i + 1) % 50 === 0 || i === aggregatedTotals.length - 1) {
           const elapsed = new Date().getTime() - updateStartTime;
-          console.log(`[DoneRate Cron] 进度: 已更新 ${i + 1}/${totalDoneRatesToUpdate} 条本地 TotalDoneRate 记录, 当前耗时: ${elapsed}ms`);
+          console.log(`[DoneRate Cron] 进度: 已更新 ${i + 1}/${aggregatedTotals.length} 条本地 TotalDoneRate 记录, 当前耗时: ${elapsed}ms`);
         }
       } catch (totalUpdateError) {
-        console.error(`[DoneRate Cron] Error updating TotalDoneRate document for artwork ID ${doneRateDoc.res}:`, totalUpdateError);
+        console.error(`[DoneRate Cron] Error updating TotalDoneRate document for artwork ID ${artworkId}:`, totalUpdateError);
       }
     }
 
@@ -227,7 +223,8 @@ async function run(): Promise<string> {
     return summary;
   } catch (error) {
     console.error(`[DoneRate Cron] Error during done-rate calculation for ${yesterdayYYYYMMDD}:`, error);
-    throw new Error("Failed to calculate daily done-rates."); // 抛出错误以通知 cron 调度器
+    // 重新抛出错误,以便 cron 调度器能够捕获并记录失败
+    throw new Error(`Failed to calculate daily done-rates: ${error instanceof Error ? error.message : String(error)}`);
   } finally {
   }
 }

+ 235 - 0
oms/services/cron-jobs/done-rate_bak.ts

@@ -0,0 +1,235 @@
+// oms/services/cron-jobs/done-rate.ts
+
+import dayjs from "dayjs";
+import doneRateService from "../../src/services/doneRateService"; // 导入 DoneRateService
+import { clickhouseService } from "../../src/services/clients";
+import mongoose, { Connection } from "mongoose"; // 导入 mongoose 和 Connection 用于处理远程连接
+import TotalDoneRate from "../../src/models/totalDoneRateModel"; // 导入 TotalDoneRate 模型 (已包含 totalTipCount)
+
+// ClickHouse 表名
+const CLICKHOUSE_EVENTS_TABLE = "events"; // 确保与 ClickHouseService 中的表名一致
+
+/**
+ * ClickHouse 查询结果接口:每日每个作品的独立开始用户数
+ */
+interface ClickHouseStartCountResult {
+  res: string; // 作品 ID
+  unique_starts: number; // 独立开始用户数
+}
+
+/**
+ * ClickHouse 查询结果接口:每日每个作品的独立完成用户数
+ */
+interface ClickHouseDoneCountResult {
+  res: string; // 作品 ID
+  unique_dones: number; // 独立完成用户数
+}
+
+/**
+ * ClickHouse 查询结果接口:每日每个作品的使用道具数 (对应 tipCount)
+ */
+interface ClickHouseTipCountResult {
+  res: string; // 作品 ID
+  tip_count: number; // 道具使用次数
+}
+
+/**
+ * 每日统计昨天的作品完成率。
+ * 统计逻辑从 ClickHouse 中提取数据,得到每个作品的完成情况、道具使用情况,并更新到 doneRateModel。
+ * 随后,根据这些日统计数据,累加更新本地的 totalDoneRate 表 (包括 totalTipCount)。
+ * @returns Promise<string> - 返回统计结果的摘要信息。
+ */
+async function run(): Promise<string> {
+  console.log("[DoneRate Cron] Starting daily done-rate calculation for yesterday...");
+
+  // 获取昨天和今天的日期
+  const yesterday = dayjs().subtract(1, "day");
+  const yesterdayYYYYMMDD = yesterday.format("YYYYMMDD");
+  const yesterdayStart = yesterday.startOf("day").toDate();
+  const yesterdayEnd = yesterday.endOf("day").toDate();
+
+  // 格式化日期字符串,使其符合 ClickHouse 的 toDateTime() 函数要求
+  const yesterdayStartString = dayjs(yesterdayStart).format("YYYY-MM-DD HH:mm:ss");
+  const yesterdayEndString = dayjs(yesterdayEnd).format("YYYY-MM-DD HH:mm:ss");
+
+  console.log(`[DoneRate Cron] Processing data for date: ${yesterdayYYYYMMDD}`);
+
+  try {
+    // --- 1. 从 ClickHouse 中提取数据 (Start, Done, Tip Counts) ---
+
+    // 1.1 查询昨天每个作品的独立开始用户数
+    const startCountsQuery = `
+      SELECT
+          res,
+          count(DISTINCT uid) AS unique_starts
+      FROM ${CLICKHOUSE_EVENTS_TABLE}
+      WHERE event = 'color_start'
+        AND time >= toDateTime('${yesterdayStartString}')
+        AND time < toDateTime('${yesterdayEndString}')
+      GROUP BY res
+      HAVING res IS NOT NULL
+    `;
+    const startResults = await clickhouseService.queryEvents<ClickHouseStartCountResult>(startCountsQuery);
+    const artworkStartCounts = new Map<string, number>();
+    startResults.forEach((row) => {
+      if (row.res && mongoose.Types.ObjectId.isValid(row.res)) {
+        artworkStartCounts.set(row.res, row.unique_starts);
+      } else {
+        console.warn(`[DoneRate Cron] Invalid artwork ID found in start_counts result: ${row.res}`);
+      }
+    });
+    console.log(`[DoneRate Cron] Retrieved ${startResults.length} unique start counts from ClickHouse.`);
+
+    // 1.2 查询昨天每个作品的独立完成用户数
+    const doneCountsQuery = `
+      SELECT
+          res,
+          count(DISTINCT uid) AS unique_dones
+      FROM ${CLICKHOUSE_EVENTS_TABLE}
+      WHERE event = 'color_done'
+        AND time >= toDateTime('${yesterdayStartString}')
+        AND time < toDateTime('${yesterdayEndString}')
+      GROUP BY res
+      HAVING res IS NOT NULL
+    `;
+    const doneResults = await clickhouseService.queryEvents<ClickHouseDoneCountResult>(doneCountsQuery);
+    const artworkDoneCounts = new Map<string, number>();
+    doneResults.forEach((row) => {
+      if (row.res && mongoose.Types.ObjectId.isValid(row.res)) {
+        artworkDoneCounts.set(row.res, row.unique_dones);
+      } else {
+        console.warn(`[DoneRate Cron] Invalid artwork ID found in done_counts result: ${row.res}`);
+      }
+    });
+    console.log(`[DoneRate Cron] Retrieved ${doneResults.length} unique done counts from ClickHouse.`);
+
+    // 1.3 查询昨天每个作品的使用道具数
+    const tipCountsQuery = `
+      SELECT
+          res,
+          count() AS tip_count
+      FROM ${CLICKHOUSE_EVENTS_TABLE}
+      WHERE event = 'color_tip'
+        AND time >= toDateTime('${yesterdayStartString}')
+        AND time < toDateTime('${yesterdayEndString}')
+      GROUP BY res
+      HAVING res IS NOT NULL
+    `;
+    const tipResults = await clickhouseService.queryEvents<ClickHouseTipCountResult>(tipCountsQuery);
+    const artworkTipCounts = new Map<string, number>();
+    tipResults.forEach((row) => {
+      if (row.res && mongoose.Types.ObjectId.isValid(row.res)) {
+        artworkTipCounts.set(row.res, row.tip_count);
+      } else {
+        console.warn(`[DoneRate Cron] Invalid artwork ID found in tip_counts result: ${row.res}`);
+      }
+    });
+    console.log(`[DoneRate Cron] Retrieved ${tipResults.length} unique tip counts from ClickHouse.`);
+
+    // --- 2. 合并数据并更新 DoneRate 模型 (每日记录) ---
+    let updatedRecordsCount = 0;
+    let createdRecordsCount = 0;
+
+    // 获取所有需要处理的作品 ID 集合 (Start + Done + Tip)
+    const allResIds = new Set([...artworkStartCounts.keys(), ...artworkDoneCounts.keys(), ...artworkTipCounts.keys()]);
+
+    for (const resIdStr of allResIds.values()) {
+      const startCount = artworkStartCounts.get(resIdStr) || 0;
+      const doneCount = artworkDoneCounts.get(resIdStr) || 0;
+      const tipCount = artworkTipCounts.get(resIdStr) || 0;
+      const resObjectId = new mongoose.Types.ObjectId(resIdStr);
+
+      // 使用 DoneRateService 来创建或更新记录,并传入 tipCount
+      const doneRateDoc = await doneRateService.createOrUpdateDoneRate(yesterdayYYYYMMDD, resObjectId, startCount, doneCount, tipCount);
+      if (doneRateDoc.createdAt.getTime() === doneRateDoc.updatedAt.getTime()) {
+        createdRecordsCount++;
+      } else {
+        updatedRecordsCount++;
+      }
+    }
+
+    const totalProcessedArtworks = createdRecordsCount + updatedRecordsCount;
+
+    console.log(`[DoneRate Cron] DoneRate model (Daily) update completed. Total artworks processed: ${totalProcessedArtworks}. Created: ${createdRecordsCount}, Updated: ${updatedRecordsCount}.`);
+
+    // --- 3. 获取昨天的 DoneRate 记录,并更新本地的 TotalDoneRate 表 (累计记录) ---
+
+    let updatedTotalDoneRateCount = 0;
+    // 仅获取昨天更新或创建的记录,以保证数据源为最新
+    const yesterdayDoneRates = await doneRateService.getDoneRatesByDate(yesterdayYYYYMMDD);
+
+    const totalDoneRatesToUpdate = yesterdayDoneRates.length;
+    let updateStartTime = new Date().getTime();
+    console.log(`[DoneRate Cron] 开始更新本地 TotalDoneRate 表。总计 ${totalDoneRatesToUpdate} 条记录。`);
+
+    for (let i = 0; i < totalDoneRatesToUpdate; i++) {
+      const doneRateDoc = yesterdayDoneRates[i];
+      try {
+        const artworkId = doneRateDoc.res; // 获取作品 ObjectId
+
+        // 查找现有的 TotalDoneRate 文档
+        const existingTotal = await TotalDoneRate.findById(artworkId).lean().exec();
+
+        // 1. 初始化昨天的计数
+        const dailyStartCount = doneRateDoc.startCount;
+        const dailyDoneCount = doneRateDoc.doneCount;
+        const dailyTipCount = doneRateDoc.tipCount; // 使用昨天的道具使用次数
+
+        // 2. 计算累计总数
+        let newTotalStartCount = dailyStartCount;
+        let newTotalDoneCount = dailyDoneCount;
+        let newTotalTipCount = dailyTipCount; // 累计总道具使用次数
+
+        if (existingTotal) {
+          // 如果已存在,则累加旧的总数
+          // 注意:假设此 cron job 每天只运行一次,否则需要更复杂的幂等逻辑
+          newTotalStartCount += existingTotal.totalStartCount || 0;
+          newTotalDoneCount += existingTotal.totalDoneCount || 0;
+          newTotalTipCount += existingTotal.totalTipCount || 0; // 累加 totalTipCount
+        }
+
+        // 3. 计算总完成率
+        const newCompletionRate = newTotalStartCount > 0 ? (newTotalDoneCount / newTotalStartCount) * 100 : 0;
+
+        // 4. 使用 findByIdAndUpdate 进行原子操作
+        const updatedDoc = await TotalDoneRate.findByIdAndUpdate(
+          artworkId,
+          {
+            $set: {
+              totalStartCount: newTotalStartCount,
+              totalDoneCount: newTotalDoneCount,
+              totalTipCount: newTotalTipCount, // 写入新的累计道具使用次数
+              completionRate: newCompletionRate,
+            },
+          },
+          { new: true, upsert: true } // upsert: true 表示如果文档不存在则创建
+        );
+
+        if (updatedDoc) {
+          updatedTotalDoneRateCount++;
+        }
+
+        // 进度日志
+        if ((i + 1) % 50 === 0 || i === totalDoneRatesToUpdate - 1) {
+          const elapsed = new Date().getTime() - updateStartTime;
+          console.log(`[DoneRate Cron] 进度: 已更新 ${i + 1}/${totalDoneRatesToUpdate} 条本地 TotalDoneRate 记录, 当前耗时: ${elapsed}ms`);
+        }
+      } catch (totalUpdateError) {
+        console.error(`[DoneRate Cron] Error updating TotalDoneRate document for artwork ID ${doneRateDoc.res}:`, totalUpdateError);
+      }
+    }
+
+    const updateTimeTaken = new Date().getTime() - updateStartTime;
+    console.log(`[DoneRate Cron] 本地 TotalDoneRate 表更新完成。总计更新 ${updatedTotalDoneRateCount} 条记录,总耗时 ${updateTimeTaken}ms。`);
+
+    const summary = `[DoneRate Cron] Daily done-rate calculation for ${yesterdayYYYYMMDD} completed. Total DoneRate processed: ${totalProcessedArtworks}. Created DoneRate: ${createdRecordsCount}, Updated DoneRate: ${updatedRecordsCount}. Updated TotalDoneRate records: ${updatedTotalDoneRateCount}.`;
+    console.log(summary);
+    return summary;
+  } catch (error) {
+    console.error(`[DoneRate Cron] Error during done-rate calculation for ${yesterdayYYYYMMDD}:`, error);
+    throw new Error("Failed to calculate daily done-rates."); // 抛出错误以通知 cron 调度器
+  } finally {
+  }
+}
+
+export = { run }; // 导出 run 函数以供 cron-jobs/index.ts 使用