| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- // oms/services/cron-jobs/done-rate.ts
- import dayjs from "dayjs";
- import doneRateService from "../../src/services/doneRateService"; // 导入 DoneRateService
- import artService from "../../src/services/artService"; // 👈 导入 ArtService
- import { clickhouseService } from "../../src/app"; // 导入 ClickhouseService 实例
- import mongoose from "mongoose"; // 导入 mongoose 用于处理 ObjectId
- import Art, { IArt } from "../../src/models/artModel"; // 👈 导入 Art 模型和 IArt 接口
- // ClickHouse 表名
- const CLICKHOUSE_EVENTS_TABLE = "events_raw"; // 确保与 ClickHouseService 中的表名一致
- /**
- * ClickHouse 查询结果接口:每日每个作品的独立开始用户数
- */
- interface ClickHouseStartCountResult {
- res: string; // 作品 ID
- unique_starts: number; // 独立开始用户数
- }
- /**
- * ClickHouse 查询结果接口:每日每个作品的独立完成用户数
- */
- interface ClickHouseDoneCountResult {
- res: string; // 作品 ID
- unique_dones: number; // 独立完成用户数
- }
- /**
- * 每日统计昨天的作品完成率。
- * 统计逻辑从 ClickHouse 中提取数据,得到每个作品的完成情况,并更新到 doneRateModel。
- * 随后,根据这些日统计数据,累加更新 Art 表的总统计字段。
- * @returns Promise<string> - 返回统计结果的摘要信息。
- */
- async function run(): Promise<string> {
- console.log("[DoneRate Cron] Starting daily done-rate calculation for yesterday...");
- // 获取昨天和今天的日期
- const yesterday = dayjs().subtract(1, "day");
- const yesterdayYYYYMMDD = yesterday.format("YYYYMMDD");
- const yesterdayStart = yesterday.startOf("day").toDate();
- const yesterdayEnd = yesterday.endOf("day").toDate();
- console.log(`[DoneRate Cron] Processing data for date: ${yesterdayYYYYMMDD}`);
- try {
- // --- 1. 从 ClickHouse 中提取数据 ---
- // 查询昨天每个作品的独立开始用户数
- const startCountsQuery = `
- SELECT
- res,
- count(DISTINCT uid) AS unique_starts
- FROM ${CLICKHOUSE_EVENTS_TABLE}
- WHERE event = 'color_start'
- AND time >= toDateTime('${dayjs(yesterdayStart).toISOString()}')
- AND time < toDateTime('${dayjs(yesterdayEnd).toISOString()}')
- GROUP BY res
- HAVING res IS NOT NULL
- FORMAT JSONEachRow
- `;
- const startResults = await clickhouseService.queryEvents<ClickHouseStartCountResult>(startCountsQuery);
- const artworkStartCounts = new Map<string, number>();
- startResults.forEach((row) => {
- if (row.res && mongoose.Types.ObjectId.isValid(row.res)) {
- artworkStartCounts.set(row.res, row.unique_starts);
- } else {
- console.warn(`[DoneRate Cron] Invalid artwork ID found in start_counts result: ${row.res}`);
- }
- });
- console.log(`[DoneRate Cron] Retrieved ${startResults.length} unique start counts from ClickHouse.`);
- // 查询昨天每个作品的独立完成用户数
- const doneCountsQuery = `
- SELECT
- res,
- count(DISTINCT uid) AS unique_dones
- FROM ${CLICKHOUSE_EVENTS_TABLE}
- WHERE event = 'color_done'
- AND time >= toDateTime('${dayjs(yesterdayStart).toISOString()}')
- AND time < toDateTime('${dayjs(yesterdayEnd).toISOString()}')
- GROUP BY res
- HAVING res IS NOT NULL
- FORMAT JSONEachRow
- `;
- const doneResults = await clickhouseService.queryEvents<ClickHouseDoneCountResult>(doneCountsQuery);
- const artworkDoneCounts = new Map<string, number>();
- doneResults.forEach((row) => {
- if (row.res && mongoose.Types.ObjectId.isValid(row.res)) {
- artworkDoneCounts.set(row.res, row.unique_dones);
- } else {
- console.warn(`[DoneRate Cron] Invalid artwork ID found in done_counts result: ${row.res}`);
- }
- });
- console.log(`[DoneRate Cron] Retrieved ${doneResults.length} unique done counts from ClickHouse.`);
- // --- 2. 合并数据并更新 DoneRate 模型 ---
- let updatedRecordsCount = 0; // for DoneRate
- let createdRecordsCount = 0; // for DoneRate
- // 遍历所有有开始事件的作品ID
- for (const [resIdStr, startCount] of artworkStartCounts.entries()) {
- const doneCount = artworkDoneCounts.get(resIdStr) || 0;
- const resObjectId = new mongoose.Types.ObjectId(resIdStr);
- // 使用 DoneRateService 来创建或更新记录
- const doneRateDoc = await doneRateService.createOrUpdateDoneRate(yesterdayYYYYMMDD, resObjectId, startCount, doneCount);
- if (doneRateDoc.createdAt.getTime() === doneRateDoc.updatedAt.getTime()) {
- createdRecordsCount++;
- } else {
- updatedRecordsCount++;
- }
- artworkDoneCounts.delete(resIdStr); // 已经处理过的作品ID从 doneCounts 中移除
- }
- // 处理只有完成事件但没有开始事件的作品 (通常不应发生,但以防万一)
- for (const [resIdStr, doneCount] of artworkDoneCounts.entries()) {
- const startCount = 0; // 没有开始事件,所以开始次数为0
- const resObjectId = new mongoose.Types.ObjectId(resIdStr);
- const doneRateDoc = await doneRateService.createOrUpdateDoneRate(yesterdayYYYYMMDD, resObjectId, startCount, doneCount);
- if (doneRateDoc.createdAt.getTime() === doneRateDoc.updatedAt.getTime()) {
- createdRecordsCount++;
- } else {
- updatedRecordsCount++;
- }
- }
- const totalProcessedArtworks = createdRecordsCount + updatedRecordsCount;
- console.log(`[DoneRate Cron] DoneRate model update completed. Total artworks processed: ${totalProcessedArtworks}. Created: ${createdRecordsCount}, Updated: ${updatedRecordsCount}.`);
- // --- 3. 获取昨天的所有 DoneRate 记录,并更新 Art 表的统计字段 ---
- let updatedArtworksCount = 0; // for Art model
- const yesterdayDoneRates = await doneRateService.getDoneRatesByDate(yesterdayYYYYMMDD);
- console.log(`[DoneRate Cron] Found ${yesterdayDoneRates.length} DoneRate records for yesterday to update Art table.`);
- for (const doneRateDoc of yesterdayDoneRates) {
- try {
- const artworkId = doneRateDoc.res; // 获取作品 ObjectId
- const currentArt = await artService.getArtById(artworkId.toString());
- if (currentArt) {
- // 累加总开始数和总完成数
- const newTotalStartCount = (currentArt.totalStartCount || 0) + doneRateDoc.startCount;
- const newTotalDoneCount = (currentArt.totalDoneCount || 0) + doneRateDoc.doneCount;
- // 重新计算总完成率
- const newCompletionRate = newTotalStartCount > 0 ? (newTotalDoneCount / newTotalStartCount) * 100 : 0;
- // 更新 Art 文档
- await artService.updateArt(artworkId.toString(), {
- totalStartCount: newTotalStartCount,
- totalDoneCount: newTotalDoneCount,
- completionRate: newCompletionRate,
- });
- updatedArtworksCount++;
- } else {
- console.warn(`[DoneRate Cron] Art document with ID ${artworkId} not found for DoneRate record (date: ${doneRateDoc.date}). Skipping Art update.`);
- }
- } catch (artUpdateError) {
- console.error(`[DoneRate Cron] Error updating Art document for artwork ID ${doneRateDoc.res}:`, artUpdateError);
- }
- }
- const summary = `[DoneRate Cron] Daily done-rate calculation for ${yesterdayYYYYMMDD} completed. Total DoneRate processed: ${totalProcessedArtworks}. Created DoneRate: ${createdRecordsCount}, Updated DoneRate: ${updatedRecordsCount}. Updated Art records: ${updatedArtworksCount}.`;
- console.log(summary);
- return summary;
- } catch (error) {
- console.error(`[DoneRate Cron] Error during done-rate calculation for ${yesterdayYYYYMMDD}:`, error);
- throw new Error("Failed to calculate daily done-rates."); // 抛出错误以通知 cron 调度器
- }
- }
- export = { run }; // 导出 run 函数以供 cron-jobs/index.ts 使用
|