done-rate.ts 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. // oms/services/cron-jobs/done-rate.ts
  2. import dayjs from "dayjs";
  3. import doneRateService from "../../src/services/doneRateService"; // 导入 DoneRateService
  4. import artService from "../../src/services/artService"; // 👈 导入 ArtService
  5. import { clickhouseService } from "../../src/app"; // 导入 ClickhouseService 实例
  6. import mongoose from "mongoose"; // 导入 mongoose 用于处理 ObjectId
  7. import Art, { IArt } from "../../src/models/artModel"; // 👈 导入 Art 模型和 IArt 接口
  8. // ClickHouse 表名
  9. const CLICKHOUSE_EVENTS_TABLE = "events_raw"; // 确保与 ClickHouseService 中的表名一致
  10. /**
  11. * ClickHouse 查询结果接口:每日每个作品的独立开始用户数
  12. */
  13. interface ClickHouseStartCountResult {
  14. res: string; // 作品 ID
  15. unique_starts: number; // 独立开始用户数
  16. }
  17. /**
  18. * ClickHouse 查询结果接口:每日每个作品的独立完成用户数
  19. */
  20. interface ClickHouseDoneCountResult {
  21. res: string; // 作品 ID
  22. unique_dones: number; // 独立完成用户数
  23. }
  24. /**
  25. * 每日统计昨天的作品完成率。
  26. * 统计逻辑从 ClickHouse 中提取数据,得到每个作品的完成情况,并更新到 doneRateModel。
  27. * 随后,根据这些日统计数据,累加更新 Art 表的总统计字段。
  28. * @returns Promise<string> - 返回统计结果的摘要信息。
  29. */
  30. async function run(): Promise<string> {
  31. console.log("[DoneRate Cron] Starting daily done-rate calculation for yesterday...");
  32. // 获取昨天和今天的日期
  33. const yesterday = dayjs().subtract(1, "day");
  34. const yesterdayYYYYMMDD = yesterday.format("YYYYMMDD");
  35. const yesterdayStart = yesterday.startOf("day").toDate();
  36. const yesterdayEnd = yesterday.endOf("day").toDate();
  37. console.log(`[DoneRate Cron] Processing data for date: ${yesterdayYYYYMMDD}`);
  38. try {
  39. // --- 1. 从 ClickHouse 中提取数据 ---
  40. // 查询昨天每个作品的独立开始用户数
  41. const startCountsQuery = `
  42. SELECT
  43. res,
  44. count(DISTINCT uid) AS unique_starts
  45. FROM ${CLICKHOUSE_EVENTS_TABLE}
  46. WHERE event = 'color_start'
  47. AND time >= toDateTime('${dayjs(yesterdayStart).toISOString()}')
  48. AND time < toDateTime('${dayjs(yesterdayEnd).toISOString()}')
  49. GROUP BY res
  50. HAVING res IS NOT NULL
  51. FORMAT JSONEachRow
  52. `;
  53. const startResults = await clickhouseService.queryEvents<ClickHouseStartCountResult>(startCountsQuery);
  54. const artworkStartCounts = new Map<string, number>();
  55. startResults.forEach((row) => {
  56. if (row.res && mongoose.Types.ObjectId.isValid(row.res)) {
  57. artworkStartCounts.set(row.res, row.unique_starts);
  58. } else {
  59. console.warn(`[DoneRate Cron] Invalid artwork ID found in start_counts result: ${row.res}`);
  60. }
  61. });
  62. console.log(`[DoneRate Cron] Retrieved ${startResults.length} unique start counts from ClickHouse.`);
  63. // 查询昨天每个作品的独立完成用户数
  64. const doneCountsQuery = `
  65. SELECT
  66. res,
  67. count(DISTINCT uid) AS unique_dones
  68. FROM ${CLICKHOUSE_EVENTS_TABLE}
  69. WHERE event = 'color_done'
  70. AND time >= toDateTime('${dayjs(yesterdayStart).toISOString()}')
  71. AND time < toDateTime('${dayjs(yesterdayEnd).toISOString()}')
  72. GROUP BY res
  73. HAVING res IS NOT NULL
  74. FORMAT JSONEachRow
  75. `;
  76. const doneResults = await clickhouseService.queryEvents<ClickHouseDoneCountResult>(doneCountsQuery);
  77. const artworkDoneCounts = new Map<string, number>();
  78. doneResults.forEach((row) => {
  79. if (row.res && mongoose.Types.ObjectId.isValid(row.res)) {
  80. artworkDoneCounts.set(row.res, row.unique_dones);
  81. } else {
  82. console.warn(`[DoneRate Cron] Invalid artwork ID found in done_counts result: ${row.res}`);
  83. }
  84. });
  85. console.log(`[DoneRate Cron] Retrieved ${doneResults.length} unique done counts from ClickHouse.`);
  86. // --- 2. 合并数据并更新 DoneRate 模型 ---
  87. let updatedRecordsCount = 0; // for DoneRate
  88. let createdRecordsCount = 0; // for DoneRate
  89. // 遍历所有有开始事件的作品ID
  90. for (const [resIdStr, startCount] of artworkStartCounts.entries()) {
  91. const doneCount = artworkDoneCounts.get(resIdStr) || 0;
  92. const resObjectId = new mongoose.Types.ObjectId(resIdStr);
  93. // 使用 DoneRateService 来创建或更新记录
  94. const doneRateDoc = await doneRateService.createOrUpdateDoneRate(yesterdayYYYYMMDD, resObjectId, startCount, doneCount);
  95. if (doneRateDoc.createdAt.getTime() === doneRateDoc.updatedAt.getTime()) {
  96. createdRecordsCount++;
  97. } else {
  98. updatedRecordsCount++;
  99. }
  100. artworkDoneCounts.delete(resIdStr); // 已经处理过的作品ID从 doneCounts 中移除
  101. }
  102. // 处理只有完成事件但没有开始事件的作品 (通常不应发生,但以防万一)
  103. for (const [resIdStr, doneCount] of artworkDoneCounts.entries()) {
  104. const startCount = 0; // 没有开始事件,所以开始次数为0
  105. const resObjectId = new mongoose.Types.ObjectId(resIdStr);
  106. const doneRateDoc = await doneRateService.createOrUpdateDoneRate(yesterdayYYYYMMDD, resObjectId, startCount, doneCount);
  107. if (doneRateDoc.createdAt.getTime() === doneRateDoc.updatedAt.getTime()) {
  108. createdRecordsCount++;
  109. } else {
  110. updatedRecordsCount++;
  111. }
  112. }
  113. const totalProcessedArtworks = createdRecordsCount + updatedRecordsCount;
  114. console.log(`[DoneRate Cron] DoneRate model update completed. Total artworks processed: ${totalProcessedArtworks}. Created: ${createdRecordsCount}, Updated: ${updatedRecordsCount}.`);
  115. // --- 3. 获取昨天的所有 DoneRate 记录,并更新 Art 表的统计字段 ---
  116. let updatedArtworksCount = 0; // for Art model
  117. const yesterdayDoneRates = await doneRateService.getDoneRatesByDate(yesterdayYYYYMMDD);
  118. console.log(`[DoneRate Cron] Found ${yesterdayDoneRates.length} DoneRate records for yesterday to update Art table.`);
  119. for (const doneRateDoc of yesterdayDoneRates) {
  120. try {
  121. const artworkId = doneRateDoc.res; // 获取作品 ObjectId
  122. const currentArt = await artService.getArtById(artworkId.toString());
  123. if (currentArt) {
  124. // 累加总开始数和总完成数
  125. const newTotalStartCount = (currentArt.totalStartCount || 0) + doneRateDoc.startCount;
  126. const newTotalDoneCount = (currentArt.totalDoneCount || 0) + doneRateDoc.doneCount;
  127. // 重新计算总完成率
  128. const newCompletionRate = newTotalStartCount > 0 ? (newTotalDoneCount / newTotalStartCount) * 100 : 0;
  129. // 更新 Art 文档
  130. await artService.updateArt(artworkId.toString(), {
  131. totalStartCount: newTotalStartCount,
  132. totalDoneCount: newTotalDoneCount,
  133. completionRate: newCompletionRate,
  134. });
  135. updatedArtworksCount++;
  136. } else {
  137. console.warn(`[DoneRate Cron] Art document with ID ${artworkId} not found for DoneRate record (date: ${doneRateDoc.date}). Skipping Art update.`);
  138. }
  139. } catch (artUpdateError) {
  140. console.error(`[DoneRate Cron] Error updating Art document for artwork ID ${doneRateDoc.res}:`, artUpdateError);
  141. }
  142. }
  143. const summary = `[DoneRate Cron] Daily done-rate calculation for ${yesterdayYYYYMMDD} completed. Total DoneRate processed: ${totalProcessedArtworks}. Created DoneRate: ${createdRecordsCount}, Updated DoneRate: ${updatedRecordsCount}. Updated Art records: ${updatedArtworksCount}.`;
  144. console.log(summary);
  145. return summary;
  146. } catch (error) {
  147. console.error(`[DoneRate Cron] Error during done-rate calculation for ${yesterdayYYYYMMDD}:`, error);
  148. throw new Error("Failed to calculate daily done-rates."); // 抛出错误以通知 cron 调度器
  149. }
  150. }
  151. export = { run }; // 导出 run 函数以供 cron-jobs/index.ts 使用