|
|
@@ -3,13 +3,14 @@ var __importDefault = (this && this.__importDefault) || function (mod) {
|
|
|
return (mod && mod.__esModule) ? mod : { "default": mod };
|
|
|
};
|
|
|
const dayjs_1 = __importDefault(require("dayjs"));
|
|
|
-const doneRateService_1 = __importDefault(require("../../src/services/doneRateService")); // 导入 DoneRateService
|
|
|
const clients_1 = require("../../src/services/clients");
|
|
|
-const mongoose_1 = __importDefault(require("mongoose")); // 导入 mongoose 和 Connection 用于处理远程连接
|
|
|
+const mongoose_1 = __importDefault(require("mongoose"));
|
|
|
const totalDoneRateModel_1 = __importDefault(require("../../src/models/totalDoneRateModel")); // 导入 TotalDoneRate 模型 (已包含 totalTipCount)
|
|
|
const doneRateModel_1 = __importDefault(require("../../src/models/doneRateModel")); // 导入 DoneRateModel 用于聚合历史数据
|
|
|
// ClickHouse 表名
|
|
|
const CLICKHOUSE_EVENTS_TABLE = "events"; // 确保与 ClickHouseService 中的表名一致
|
|
|
+const TOTAL_DONE_RATE_BULK_SIZE = 1000;
|
|
|
+const DAILY_DONE_RATE_BULK_SIZE = 1000;
|
|
|
/**
|
|
|
* 每日统计作品完成率。
|
|
|
* 如果提供了 dateStr (YYYY-MM-DD 或 YYYYMMDD),则统计该日的数据;否则,默认统计昨天的数据。
|
|
|
@@ -85,27 +86,49 @@ async function run(dateStr) {
|
|
|
totalTipEvents += Number(row.tip_count) || 0;
|
|
|
});
|
|
|
console.log(`[DoneRate Cron] ClickHouse aggregate query completed in ${clickhouseQueryElapsedMs}ms. Rows: ${aggregateResults.length}, valid artworks: ${artworkStartCounts.size}, invalid rows: ${invalidArtworkRowCount}, unique starts: ${totalStartUsers}, unique dones: ${totalDoneUsers}, tip events: ${totalTipEvents}.`);
|
|
|
- // --- 2. 合并数据并更新 DoneRate 模型 (每日记录) ---
|
|
|
+ // --- 2. 合并数据并批量更新 DoneRate 模型 (每日记录) ---
|
|
|
let updatedRecordsCount = 0;
|
|
|
let createdRecordsCount = 0;
|
|
|
// 获取所有需要处理的作品 ID 集合 (Start + Done + Tip)
|
|
|
- const allResIds = new Set([...artworkStartCounts.keys(), ...artworkDoneCounts.keys(), ...artworkTipCounts.keys()]);
|
|
|
- for (const resIdStr of allResIds.values()) {
|
|
|
- const startCount = artworkStartCounts.get(resIdStr) || 0;
|
|
|
- const doneCount = artworkDoneCounts.get(resIdStr) || 0;
|
|
|
- const tipCount = artworkTipCounts.get(resIdStr) || 0;
|
|
|
- const resObjectId = new mongoose_1.default.Types.ObjectId(resIdStr);
|
|
|
- // 使用 DoneRateService 来创建或更新记录,并传入 tipCount
|
|
|
- const doneRateDoc = await doneRateService_1.default.createOrUpdateDoneRate(yesterdayYYYYMMDD, resObjectId, startCount, doneCount, tipCount);
|
|
|
- if (doneRateDoc.createdAt.getTime() === doneRateDoc.updatedAt.getTime()) {
|
|
|
- createdRecordsCount++;
|
|
|
- }
|
|
|
- else {
|
|
|
- updatedRecordsCount++;
|
|
|
- }
|
|
|
+ const allResIds = [...new Set([...artworkStartCounts.keys(), ...artworkDoneCounts.keys(), ...artworkTipCounts.keys()])];
|
|
|
+ const doneRateBulkStartedAt = Date.now();
|
|
|
+ console.log(`[DoneRate Cron] 开始通过 bulkWrite 更新每日 DoneRate。总计 ${allResIds.length} 个作品,batchSize=${DAILY_DONE_RATE_BULK_SIZE}。`);
|
|
|
+ for (let i = 0; i < allResIds.length; i += DAILY_DONE_RATE_BULK_SIZE) {
|
|
|
+ const chunk = allResIds.slice(i, i + DAILY_DONE_RATE_BULK_SIZE);
|
|
|
+ const ops = chunk.map((resIdStr) => {
|
|
|
+ const startCount = artworkStartCounts.get(resIdStr) || 0;
|
|
|
+ const doneCount = artworkDoneCounts.get(resIdStr) || 0;
|
|
|
+ const tipCount = artworkTipCounts.get(resIdStr) || 0;
|
|
|
+ const completionRate = startCount > 0 ? (doneCount / startCount) * 100 : 0;
|
|
|
+ const resObjectId = new mongoose_1.default.Types.ObjectId(resIdStr);
|
|
|
+ return {
|
|
|
+ updateOne: {
|
|
|
+ filter: { date: yesterdayYYYYMMDD, res: resObjectId },
|
|
|
+ update: {
|
|
|
+ $set: {
|
|
|
+ startCount,
|
|
|
+ doneCount,
|
|
|
+ tipCount,
|
|
|
+ completionRate,
|
|
|
+ },
|
|
|
+ $setOnInsert: {
|
|
|
+ date: yesterdayYYYYMMDD,
|
|
|
+ res: resObjectId,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ upsert: true,
|
|
|
+ },
|
|
|
+ };
|
|
|
+ });
|
|
|
+ const result = await doneRateModel_1.default.bulkWrite(ops, { ordered: false });
|
|
|
+ createdRecordsCount += result.upsertedCount || 0;
|
|
|
+ updatedRecordsCount += chunk.length - (result.upsertedCount || 0);
|
|
|
+ const processed = Math.min(i + DAILY_DONE_RATE_BULK_SIZE, allResIds.length);
|
|
|
+ const elapsed = Date.now() - doneRateBulkStartedAt;
|
|
|
+ console.log(`[DoneRate Cron] 每日DoneRate进度: 已处理 ${processed}/${allResIds.length} 条,耗时 ${elapsed}ms`);
|
|
|
}
|
|
|
const totalProcessedArtworks = createdRecordsCount + updatedRecordsCount;
|
|
|
- console.log(`[DoneRate Cron] DoneRate model (Daily) update completed. Total artworks processed: ${totalProcessedArtworks}. Created: ${createdRecordsCount}, Updated: ${updatedRecordsCount}.`);
|
|
|
+ console.log(`[DoneRate Cron] DoneRate model (Daily) update completed. Total artworks processed: ${totalProcessedArtworks}. Created: ${createdRecordsCount}, Updated: ${updatedRecordsCount}. Elapsed: ${Date.now() - doneRateBulkStartedAt}ms.`);
|
|
|
// --- 3. 重新聚合并更新本地的 TotalDoneRate 表 (累计记录) ---
|
|
|
// 通过聚合 DoneRate 日记录来计算最新的累计总数,确保幂等性。
|
|
|
// 3.1 聚合所有历史 DoneRate 记录直到昨天 (即 targetDay)
|
|
|
@@ -121,43 +144,44 @@ async function run(dateStr) {
|
|
|
},
|
|
|
},
|
|
|
];
|
|
|
+ const totalAggregateStartedAt = Date.now();
|
|
|
const aggregatedTotals = await doneRateModel_1.default.aggregate(aggregationPipeline);
|
|
|
+ const totalAggregateElapsedMs = Date.now() - totalAggregateStartedAt;
|
|
|
+ console.log(`[DoneRate Cron] TotalDoneRate 聚合完成。作品数 ${aggregatedTotals.length},聚合耗时 ${totalAggregateElapsedMs}ms。`);
|
|
|
+ const totalUpdateStartedAt = Date.now();
|
|
|
let updatedTotalDoneRateCount = 0;
|
|
|
- let updateStartTime = new Date().getTime();
|
|
|
- console.log(`[DoneRate Cron] 开始通过聚合更新本地 TotalDoneRate 表。总计 ${aggregatedTotals.length} 个作品的累计数据。`);
|
|
|
- // 3.2 遍历聚合结果,覆盖式更新 TotalDoneRate
|
|
|
- for (let i = 0; i < aggregatedTotals.length; i++) {
|
|
|
- const totalData = aggregatedTotals[i];
|
|
|
- const artworkId = totalData._id;
|
|
|
- const { totalStartCount, totalDoneCount, totalTipCount } = totalData;
|
|
|
- // 1. 计算总完成率
|
|
|
- const newCompletionRate = totalStartCount > 0 ? (totalDoneCount / totalStartCount) * 100 : 0;
|
|
|
- try {
|
|
|
- // 2. 使用 findByIdAndUpdate 进行原子操作 (覆盖式更新)
|
|
|
- const updatedDoc = await totalDoneRateModel_1.default.findByIdAndUpdate(artworkId, {
|
|
|
- $set: {
|
|
|
- totalStartCount: totalStartCount,
|
|
|
- totalDoneCount: totalDoneCount,
|
|
|
- totalTipCount: totalTipCount, // 写入新的累计道具使用次数
|
|
|
- completionRate: newCompletionRate,
|
|
|
+ console.log(`[DoneRate Cron] 开始通过 bulkWrite 更新本地 TotalDoneRate 表。总计 ${aggregatedTotals.length} 个作品,batchSize=${TOTAL_DONE_RATE_BULK_SIZE}。`);
|
|
|
+ for (let i = 0; i < aggregatedTotals.length; i += TOTAL_DONE_RATE_BULK_SIZE) {
|
|
|
+ const chunk = aggregatedTotals.slice(i, i + TOTAL_DONE_RATE_BULK_SIZE);
|
|
|
+ const ops = chunk.map((totalData) => {
|
|
|
+ const artworkId = totalData._id;
|
|
|
+ const totalStartCount = Number(totalData.totalStartCount) || 0;
|
|
|
+ const totalDoneCount = Number(totalData.totalDoneCount) || 0;
|
|
|
+ const totalTipCount = Number(totalData.totalTipCount) || 0;
|
|
|
+ const completionRate = totalStartCount > 0 ? (totalDoneCount / totalStartCount) * 100 : 0;
|
|
|
+ return {
|
|
|
+ updateOne: {
|
|
|
+ filter: { _id: artworkId },
|
|
|
+ update: {
|
|
|
+ $set: {
|
|
|
+ totalStartCount,
|
|
|
+ totalDoneCount,
|
|
|
+ totalTipCount,
|
|
|
+ completionRate,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ upsert: true,
|
|
|
},
|
|
|
- }, { new: true, upsert: true } // upsert: true 表示如果文档不存在则创建
|
|
|
- );
|
|
|
- if (updatedDoc) {
|
|
|
- updatedTotalDoneRateCount++;
|
|
|
- }
|
|
|
- // 进度日志
|
|
|
- if ((i + 1) % 50 === 0 || i === aggregatedTotals.length - 1) {
|
|
|
- const elapsed = new Date().getTime() - updateStartTime;
|
|
|
- console.log(`[DoneRate Cron] 进度: 已更新 ${i + 1}/${aggregatedTotals.length} 条本地 TotalDoneRate 记录, 当前耗时: ${elapsed}ms`);
|
|
|
- }
|
|
|
- }
|
|
|
- catch (totalUpdateError) {
|
|
|
- console.error(`[DoneRate Cron] Error updating TotalDoneRate document for artwork ID ${artworkId}:`, totalUpdateError);
|
|
|
- }
|
|
|
+ };
|
|
|
+ });
|
|
|
+ await totalDoneRateModel_1.default.bulkWrite(ops, { ordered: false });
|
|
|
+ updatedTotalDoneRateCount += chunk.length;
|
|
|
+ const processed = Math.min(i + TOTAL_DONE_RATE_BULK_SIZE, aggregatedTotals.length);
|
|
|
+ const elapsed = Date.now() - totalUpdateStartedAt;
|
|
|
+ console.log(`[DoneRate Cron] 进度: 已处理 ${processed}/${aggregatedTotals.length} 条 TotalDoneRate,累计耗时 ${elapsed}ms`);
|
|
|
}
|
|
|
- const updateTimeTaken = new Date().getTime() - updateStartTime;
|
|
|
- console.log(`[DoneRate Cron] 本地 TotalDoneRate 表更新完成。总计更新 ${updatedTotalDoneRateCount} 条记录,总耗时 ${updateTimeTaken}ms。`);
|
|
|
+ const totalUpdateElapsedMs = Date.now() - totalUpdateStartedAt;
|
|
|
+ console.log(`[DoneRate Cron] 本地 TotalDoneRate 表更新完成。处理记录数 ${aggregatedTotals.length},写入阶段耗时 ${totalUpdateElapsedMs}ms。`);
|
|
|
const summary = `[DoneRate Cron] Daily done-rate calculation for ${yesterdayYYYYMMDD} completed. Total DoneRate processed: ${totalProcessedArtworks}. Created DoneRate: ${createdRecordsCount}, Updated DoneRate: ${updatedRecordsCount}. Updated TotalDoneRate records: ${updatedTotalDoneRateCount}.`;
|
|
|
console.log(summary);
|
|
|
return summary;
|