guoziyun 9 meses atrás
pai
commit
a895e97e05

+ 164 - 0
oms/dist/services/cron-jobs/done-rate2.js

@@ -0,0 +1,164 @@
+"use strict";
+// oms/services/cron-jobs/done-rate.ts
+var __importDefault = (this && this.__importDefault) || function (mod) {
+    return (mod && mod.__esModule) ? mod : { "default": mod };
+};
+const dayjs_1 = __importDefault(require("dayjs"));
+const doneRateService_1 = __importDefault(require("../../src/services/doneRateService")); // 导入 DoneRateService
+const app_1 = require("../../src/app"); // 导入 ClickhouseService 实例
+const mongoose_1 = __importDefault(require("mongoose")); // 导入 mongoose 和 Connection 用于处理远程连接
+const totalDoneRateModel_1 = __importDefault(require("../../src/models/totalDoneRateModel")); // 导入新的 TotalDoneRate 模型
+// ClickHouse 表名
+const CLICKHOUSE_EVENTS_TABLE = "events"; // 确保与 ClickHouseService 中的表名一致
+/**
+ * 每日统计昨天的作品完成率。
+ * 统计逻辑从 ClickHouse 中提取数据,得到每个作品的完成情况,并更新到 doneRateModel。
+ * 随后,根据这些日统计数据,累加更新本地的 totalDoneRate 表。
+ * @returns Promise<string> - 返回统计结果的摘要信息。
+ */
+async function run() {
+    console.log("[DoneRate Cron] Starting daily done-rate calculation for yesterday..."); // 获取昨天和今天的日期
+    // 获取昨天和今天的日期
+    const yesterday = (0, dayjs_1.default)().subtract(1, "day");
+    const yesterdayYYYYMMDD = yesterday.format("YYYYMMDD");
+    const yesterdayStart = yesterday.startOf("day").toDate();
+    const yesterdayEnd = yesterday.endOf("day").toDate();
+    // 格式化日期字符串,使其符合 ClickHouse 的 toDateTime() 函数要求
+    const yesterdayStartString = (0, dayjs_1.default)(yesterdayStart).format("YYYY-MM-DD HH:mm:ss");
+    const yesterdayEndString = (0, dayjs_1.default)(yesterdayEnd).format("YYYY-MM-DD HH:mm:ss");
+    console.log(`[DoneRate Cron] Processing data for date: ${yesterdayYYYYMMDD}`);
+    try {
+        // --- 1. 从 ClickHouse 中提取数据 ---
+        // 查询昨天每个作品的独立开始用户数
+        const startCountsQuery = `
+      SELECT
+          res,
+          count(DISTINCT uid) AS unique_starts
+      FROM ${CLICKHOUSE_EVENTS_TABLE}
+      WHERE event = 'color_start'
+        AND time >= toDateTime('${yesterdayStartString}')
+        AND time < toDateTime('${yesterdayEndString}')
+      GROUP BY res
+      HAVING res IS NOT NULL
+    `;
+        const startResults = await app_1.clickhouseService.queryEvents(startCountsQuery);
+        const artworkStartCounts = new Map();
+        startResults.forEach((row) => {
+            if (row.res && mongoose_1.default.Types.ObjectId.isValid(row.res)) {
+                artworkStartCounts.set(row.res, row.unique_starts);
+            }
+            else {
+                console.warn(`[DoneRate Cron] Invalid artwork ID found in start_counts result: ${row.res}`);
+            }
+        });
+        console.log(`[DoneRate Cron] Retrieved ${startResults.length} unique start counts from ClickHouse.`); // 查询昨天每个作品的独立完成用户数
+        // 查询昨天每个作品的独立完成用户数
+        const doneCountsQuery = `
+      SELECT
+          res,
+          count(DISTINCT uid) AS unique_dones
+      FROM ${CLICKHOUSE_EVENTS_TABLE}
+      WHERE event = 'color_done'
+        AND time >= toDateTime('${yesterdayStartString}')
+        AND time < toDateTime('${yesterdayEndString}')
+      GROUP BY res
+      HAVING res IS NOT NULL
+    `;
+        const doneResults = await app_1.clickhouseService.queryEvents(doneCountsQuery);
+        const artworkDoneCounts = new Map();
+        doneResults.forEach((row) => {
+            if (row.res && mongoose_1.default.Types.ObjectId.isValid(row.res)) {
+                artworkDoneCounts.set(row.res, row.unique_dones);
+            }
+            else {
+                console.warn(`[DoneRate Cron] Invalid artwork ID found in done_counts result: ${row.res}`);
+            }
+        });
+        console.log(`[DoneRate Cron] Retrieved ${doneResults.length} unique done counts from ClickHouse.`);
+        // --- 2. 合并数据并更新 DoneRate 模型 ---
+        let updatedRecordsCount = 0; // for DoneRate
+        let createdRecordsCount = 0; // for DoneRate
+        // 遍历所有有开始事件的作品ID
+        for (const [resIdStr, startCount] of artworkStartCounts.entries()) {
+            const doneCount = artworkDoneCounts.get(resIdStr) || 0;
+            const resObjectId = new mongoose_1.default.Types.ObjectId(resIdStr);
+            // 使用 DoneRateService 来创建或更新记录
+            const doneRateDoc = await doneRateService_1.default.createOrUpdateDoneRate(yesterdayYYYYMMDD, resObjectId, startCount, doneCount);
+            if (doneRateDoc.createdAt.getTime() === doneRateDoc.updatedAt.getTime()) {
+                createdRecordsCount++;
+            }
+            else {
+                updatedRecordsCount++;
+            }
+            artworkDoneCounts.delete(resIdStr); // 已经处理过的作品ID从 doneCounts 中移除
+        }
+        // 处理只有完成事件但没有开始事件的作品 (通常不应发生,但以防万一)
+        for (const [resIdStr, doneCount] of artworkDoneCounts.entries()) {
+            const startCount = 0; // 没有开始事件,所以开始次数为0
+            const resObjectId = new mongoose_1.default.Types.ObjectId(resIdStr);
+            const doneRateDoc = await doneRateService_1.default.createOrUpdateDoneRate(yesterdayYYYYMMDD, resObjectId, startCount, doneCount);
+            if (doneRateDoc.createdAt.getTime() === doneRateDoc.updatedAt.getTime()) {
+                createdRecordsCount++;
+            }
+            else {
+                updatedRecordsCount++;
+            }
+        }
+        const totalProcessedArtworks = createdRecordsCount + updatedRecordsCount;
+        console.log(`[DoneRate Cron] DoneRate model update completed. Total artworks processed: ${totalProcessedArtworks}. Created: ${createdRecordsCount}, Updated: ${updatedRecordsCount}.`); // --- 3. 获取昨天的所有 DoneRate 记录,并更新本地的 totalDoneRate 表 ---
+        let updatedTotalDoneRateCount = 0;
+        const yesterdayDoneRates = await doneRateService_1.default.getDoneRatesByDate(yesterdayYYYYMMDD);
+        const totalDoneRatesToUpdate = yesterdayDoneRates.length;
+        let updateStartTime = new Date().getTime();
+        console.log(`[DoneRate Cron] 开始更新本地 TotalDoneRate 表。总计 ${totalDoneRatesToUpdate} 条记录。`);
+        for (let i = 0; i < totalDoneRatesToUpdate; i++) {
+            const doneRateDoc = yesterdayDoneRates[i];
+            try {
+                const artworkId = doneRateDoc.res; // 获取作品 ObjectId
+                // 查找现有的 TotalDoneRate 文档
+                const existingTotal = await totalDoneRateModel_1.default.findById(artworkId);
+                let newTotalStartCount = doneRateDoc.startCount;
+                let newTotalDoneCount = doneRateDoc.doneCount;
+                if (existingTotal) {
+                    // 如果已存在,则累加
+                    newTotalStartCount += existingTotal.totalStartCount || 0;
+                    newTotalDoneCount += existingTotal.totalDoneCount || 0;
+                }
+                const newCompletionRate = newTotalStartCount > 0 ? (newTotalDoneCount / newTotalStartCount) * 100 : 0;
+                // 使用 findByIdAndUpdate 进行原子操作,确保数据一致性
+                // 这里的 _id 是根据 artworkId 来查找或创建的
+                const updatedDoc = await totalDoneRateModel_1.default.findByIdAndUpdate(artworkId, {
+                    $set: {
+                        totalStartCount: newTotalStartCount,
+                        totalDoneCount: newTotalDoneCount,
+                        completionRate: newCompletionRate,
+                    },
+                }, { new: true, upsert: true } // upsert: true 表示如果文档不存在则创建
+                );
+                if (updatedDoc) {
+                    updatedTotalDoneRateCount++;
+                }
+                // 进度日志
+                if ((i + 1) % 50 === 0 || i === totalDoneRatesToUpdate - 1) {
+                    const elapsed = new Date().getTime() - updateStartTime;
+                    console.log(`[DoneRate Cron] 进度: 已更新 ${i + 1}/${totalDoneRatesToUpdate} 条本地 TotalDoneRate 记录, 当前耗时: ${elapsed}ms`);
+                }
+            }
+            catch (totalUpdateError) {
+                console.error(`[DoneRate Cron] Error updating TotalDoneRate document for artwork ID ${doneRateDoc.res}:`, totalUpdateError);
+            }
+        }
+        const updateTimeTaken = new Date().getTime() - updateStartTime;
+        console.log(`[DoneRate Cron] 本地 TotalDoneRate 表更新完成。总计更新 ${updatedTotalDoneRateCount} 条记录,总耗时 ${updateTimeTaken}ms。`);
+        const summary = `[DoneRate Cron] Daily done-rate calculation for ${yesterdayYYYYMMDD} completed. Total DoneRate processed: ${totalProcessedArtworks}. Created DoneRate: ${createdRecordsCount}, Updated DoneRate: ${updatedRecordsCount}. Updated TotalDoneRate records: ${updatedTotalDoneRateCount}.`;
+        console.log(summary);
+        return summary;
+    }
+    catch (error) {
+        console.error(`[DoneRate Cron] Error during done-rate calculation for ${yesterdayYYYYMMDD}:`, error);
+        throw new Error("Failed to calculate daily done-rates."); // 抛出错误以通知 cron 调度器
+    }
+    finally {
+    }
+}
+module.exports = { run };

+ 1 - 1
oms/dist/services/cron-jobs/index.js

@@ -11,7 +11,7 @@ const database_1 = require("../../src/database");
 // Each element: [name: string, schedule: string, jobModule: CronJobModule]
 const settings = [
     // 假设这些文件将存在于 oms/services/cron-jobs/ 目录下
-    ["done-rate", "10 1 * * *", require("./done-rate")], // 每天凌晨0点10分, 统计作品完成率
+    ["done-rate", "20 13 * * *", require("./done-rate2")], // 每天凌晨0点10分, 统计作品完成率
     ["daily-activity-detector", "50 0 * * *", require("./daily-activity-detector")], // 每天凌晨0点50分, 检查是否需要生成新的推送消息
     ["message-sender", "*/5 * * * *", require("./message-sender")], // 每5分钟运行一次
 ];

+ 60 - 0
oms/dist/src/models/totalDoneRateModel.js

@@ -0,0 +1,60 @@
+"use strict";
+// oms/src/models/totalDoneRateModel.ts
+var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {
+    if (k2 === undefined) k2 = k;
+    var desc = Object.getOwnPropertyDescriptor(m, k);
+    if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) {
+      desc = { enumerable: true, get: function() { return m[k]; } };
+    }
+    Object.defineProperty(o, k2, desc);
+}) : (function(o, m, k, k2) {
+    if (k2 === undefined) k2 = k;
+    o[k2] = m[k];
+}));
+var __setModuleDefault = (this && this.__setModuleDefault) || (Object.create ? (function(o, v) {
+    Object.defineProperty(o, "default", { enumerable: true, value: v });
+}) : function(o, v) {
+    o["default"] = v;
+});
+var __importStar = (this && this.__importStar) || (function () {
+    var ownKeys = function(o) {
+        ownKeys = Object.getOwnPropertyNames || function (o) {
+            var ar = [];
+            for (var k in o) if (Object.prototype.hasOwnProperty.call(o, k)) ar[ar.length] = k;
+            return ar;
+        };
+        return ownKeys(o);
+    };
+    return function (mod) {
+        if (mod && mod.__esModule) return mod;
+        var result = {};
+        if (mod != null) for (var k = ownKeys(mod), i = 0; i < k.length; i++) if (k[i] !== "default") __createBinding(result, mod, k[i]);
+        __setModuleDefault(result, mod);
+        return result;
+    };
+})();
+Object.defineProperty(exports, "__esModule", { value: true });
+const mongoose_1 = __importStar(require("mongoose"));
+const totalDoneRateSchema = new mongoose_1.Schema({
+    // _id 字段直接用作作品的 ObjectId,因此无需再定义一个 res 字段。
+    // MongoDB 会自动为其创建唯一索引。
+    totalStartCount: {
+        type: Number,
+        required: true,
+        default: 0,
+    },
+    totalDoneCount: {
+        type: Number,
+        required: true,
+        default: 0,
+    },
+    completionRate: {
+        type: Number,
+        required: true,
+        default: 0,
+    },
+}, {
+    timestamps: true, // 自动添加 createdAt 和 updatedAt 字段
+});
+const TotalDoneRate = mongoose_1.default.model("TotalDoneRate", totalDoneRateSchema);
+exports.default = TotalDoneRate;

+ 165 - 0
oms/dist/src/scripts/migrate-total-done-rates.js

@@ -0,0 +1,165 @@
+"use strict";
+// oms/scripts/migrate-total-done-rates.ts
+var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {
+    if (k2 === undefined) k2 = k;
+    var desc = Object.getOwnPropertyDescriptor(m, k);
+    if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) {
+      desc = { enumerable: true, get: function() { return m[k]; } };
+    }
+    Object.defineProperty(o, k2, desc);
+}) : (function(o, m, k, k2) {
+    if (k2 === undefined) k2 = k;
+    o[k2] = m[k];
+}));
+var __setModuleDefault = (this && this.__setModuleDefault) || (Object.create ? (function(o, v) {
+    Object.defineProperty(o, "default", { enumerable: true, value: v });
+}) : function(o, v) {
+    o["default"] = v;
+});
+var __importStar = (this && this.__importStar) || (function () {
+    var ownKeys = function(o) {
+        ownKeys = Object.getOwnPropertyNames || function (o) {
+            var ar = [];
+            for (var k in o) if (Object.prototype.hasOwnProperty.call(o, k)) ar[ar.length] = k;
+            return ar;
+        };
+        return ownKeys(o);
+    };
+    return function (mod) {
+        if (mod && mod.__esModule) return mod;
+        var result = {};
+        if (mod != null) for (var k = ownKeys(mod), i = 0; i < k.length; i++) if (k[i] !== "default") __createBinding(result, mod, k[i]);
+        __setModuleDefault(result, mod);
+        return result;
+    };
+})();
+var __importDefault = (this && this.__importDefault) || function (mod) {
+    return (mod && mod.__esModule) ? mod : { "default": mod };
+};
+Object.defineProperty(exports, "__esModule", { value: true });
+const mongoose_1 = __importStar(require("mongoose"));
+const totalDoneRateModel_1 = __importDefault(require("../models/totalDoneRateModel")); // 导入新的 TotalDoneRate 模型
+// --- 数据库配置 ---
+// 本地 OMS 数据库的连接字符串
+const LOCAL_MONGO_URI = "mongodb://oms:oms123.@localhost:27717/omsdb?authSource=admin";
+// 远程旧 CLOGS 数据库的连接字符串
+const REMOTE_CLOGS_MONGO_URI = "mongodb://clogs:clogs123.%23%23@localhost:27017/clogs";
+const oldTotalDoneRateSchema = new mongoose_1.Schema({
+    _id: { type: mongoose_1.Schema.Types.ObjectId, required: true },
+    totalStartCount: { type: Number, required: true },
+    totalDoneCount: { type: Number, required: true },
+    completionRate: { type: Number, required: true },
+}, {
+    collection: "total_done_rate", // 指定旧的表名
+    versionKey: false,
+});
+let localConn = null;
+let remoteConn = null;
+// 定义每次处理的数据量
+const BATCH_SIZE = 5000;
+/**
+ * 迁移函数,执行 total_done_rate 数据割接
+ */
+async function migrateTotalDoneRates() {
+    console.log("[Migration] Starting data migration from clogs.total_done_rate to oms.totaldonerates...");
+    try {
+        // 1. 建立本地和远程数据库连接
+        localConn = await mongoose_1.default.createConnection(LOCAL_MONGO_URI);
+        console.log("[Migration] Connected to local OMS database.");
+        remoteConn = await mongoose_1.default.createConnection(REMOTE_CLOGS_MONGO_URI);
+        const OldTotalDoneRate = remoteConn.model("OldTotalDoneRate", oldTotalDoneRateSchema);
+        console.log("[Migration] Connected to remote CLOGS database.");
+        // 2. 使用游标分批处理数据
+        console.log(`[Migration] Processing records in batches of ${BATCH_SIZE}...`);
+        let processedCount = 0;
+        let successfulInserts = 0;
+        let batch = [];
+        // 使用游标查询,不会一次性加载所有数据到内存
+        const cursor = OldTotalDoneRate.find({}).lean().cursor();
+        // 逐条处理游标中的数据
+        for await (const doc of cursor) {
+            // 数据结构同构,可以直接推入批次
+            batch.push(doc);
+            // 如果达到批次大小,则执行批量插入
+            if (batch.length >= BATCH_SIZE) {
+                try {
+                    const result = await localConn.model("TotalDoneRate", totalDoneRateModel_1.default.schema).insertMany(batch, { ordered: false });
+                    successfulInserts += result.length;
+                }
+                catch (error) {
+                    if (error.code === 11000) {
+                        console.warn("[Migration] Duplicate key error in batch. Attempting individual inserts...");
+                        // 如果存在重复项,则尝试逐条插入以找出成功的记录
+                        for (const item of batch) {
+                            try {
+                                await localConn.model("TotalDoneRate", totalDoneRateModel_1.default.schema).create(item);
+                                successfulInserts++;
+                            }
+                            catch (e) {
+                                if (e.code !== 11000) {
+                                    console.error(`[Migration] Failed to insert record with _id ${item._id}:`, e.message);
+                                }
+                            }
+                        }
+                    }
+                    else {
+                        console.error("[Migration] An error occurred during a batch insertion:", error.message);
+                    }
+                }
+                processedCount += batch.length;
+                console.log(`[Migration] Processed ${processedCount} records. Total successfully inserted: ${successfulInserts}`);
+                // 清空批次,准备下一轮
+                batch = [];
+            }
+        }
+        // 处理最后一个未满的批次
+        if (batch.length > 0) {
+            try {
+                const result = await localConn.model("TotalDoneRate", totalDoneRateModel_1.default.schema).insertMany(batch, { ordered: false });
+                successfulInserts += result.length;
+            }
+            catch (error) {
+                if (error.code === 11000) {
+                    console.warn("[Migration] Duplicate key error in final batch. Attempting individual inserts...");
+                    for (const item of batch) {
+                        try {
+                            await localConn.model("TotalDoneRate", totalDoneRateModel_1.default.schema).create(item);
+                            successfulInserts++;
+                        }
+                        catch (e) {
+                            if (e.code !== 11000) {
+                                console.error(`[Migration] Failed to insert record with _id ${item._id}:`, e.message);
+                            }
+                        }
+                    }
+                }
+                else {
+                    console.error("[Migration] An error occurred during the final batch insertion:", error.message);
+                }
+            }
+            processedCount += batch.length;
+        }
+        console.log(`[Migration] All records processed. Total processed: ${processedCount}. Total successfully inserted: ${successfulInserts}`);
+        console.log("[Migration] Data migration completed successfully!");
+    }
+    catch (error) {
+        console.error("[Migration] A critical error occurred. Script will exit:", error);
+        throw error;
+    }
+    finally {
+        // 确保关闭所有连接
+        if (localConn) {
+            await localConn.close();
+            console.log("[Migration] Local connection closed.");
+        }
+        if (remoteConn) {
+            await remoteConn.close();
+            console.log("[Migration] Remote connection closed.");
+        }
+    }
+}
+// 运行迁移脚本
+migrateTotalDoneRates().catch((err) => {
+    console.error("[Migration] Script failed to run:", err);
+    process.exit(1); // 退出并返回错误码
+});

+ 201 - 0
oms/services/cron-jobs/done-rate2.ts

@@ -0,0 +1,201 @@
+// oms/services/cron-jobs/done-rate.ts
+
+import dayjs from "dayjs";
+import doneRateService from "../../src/services/doneRateService"; // 导入 DoneRateService
+import { clickhouseService } from "../../src/app"; // 导入 ClickhouseService 实例
+import mongoose, { Connection } from "mongoose"; // 导入 mongoose 和 Connection 用于处理远程连接
+import TotalDoneRate from "../../src/models/totalDoneRateModel"; // 导入新的 TotalDoneRate 模型
+
+// ClickHouse 表名
+const CLICKHOUSE_EVENTS_TABLE = "events"; // 确保与 ClickHouseService 中的表名一致
+
+/**
+ * ClickHouse 查询结果接口:每日每个作品的独立开始用户数
+ */
+interface ClickHouseStartCountResult {
+  res: string; // 作品 ID
+  unique_starts: number; // 独立开始用户数
+}
+
+/**
+ * ClickHouse 查询结果接口:每日每个作品的独立完成用户数
+ */
+interface ClickHouseDoneCountResult {
+  res: string; // 作品 ID
+  unique_dones: number; // 独立完成用户数
+}
+
+/**
+ * 每日统计昨天的作品完成率。
+ * 统计逻辑从 ClickHouse 中提取数据,得到每个作品的完成情况,并更新到 doneRateModel。
+ * 随后,根据这些日统计数据,累加更新本地的 totalDoneRate 表。
+ * @returns Promise<string> - 返回统计结果的摘要信息。
+ */
+async function run(): Promise<string> {
+  console.log("[DoneRate Cron] Starting daily done-rate calculation for yesterday..."); // 获取昨天和今天的日期
+
+  // 获取昨天和今天的日期
+  const yesterday = dayjs().subtract(1, "day");
+  const yesterdayYYYYMMDD = yesterday.format("YYYYMMDD");
+  const yesterdayStart = yesterday.startOf("day").toDate();
+  const yesterdayEnd = yesterday.endOf("day").toDate();
+
+  // 格式化日期字符串,使其符合 ClickHouse 的 toDateTime() 函数要求
+  const yesterdayStartString = dayjs(yesterdayStart).format("YYYY-MM-DD HH:mm:ss");
+  const yesterdayEndString = dayjs(yesterdayEnd).format("YYYY-MM-DD HH:mm:ss");
+
+  console.log(`[DoneRate Cron] Processing data for date: ${yesterdayYYYYMMDD}`);
+
+  try {
+    // --- 1. 从 ClickHouse 中提取数据 ---
+
+    // 查询昨天每个作品的独立开始用户数
+    const startCountsQuery = `
+      SELECT
+          res,
+          count(DISTINCT uid) AS unique_starts
+      FROM ${CLICKHOUSE_EVENTS_TABLE}
+      WHERE event = 'color_start'
+        AND time >= toDateTime('${yesterdayStartString}')
+        AND time < toDateTime('${yesterdayEndString}')
+      GROUP BY res
+      HAVING res IS NOT NULL
+    `;
+    const startResults = await clickhouseService.queryEvents<ClickHouseStartCountResult>(startCountsQuery);
+    const artworkStartCounts = new Map<string, number>();
+    startResults.forEach((row) => {
+      if (row.res && mongoose.Types.ObjectId.isValid(row.res)) {
+        artworkStartCounts.set(row.res, row.unique_starts);
+      } else {
+        console.warn(`[DoneRate Cron] Invalid artwork ID found in start_counts result: ${row.res}`);
+      }
+    });
+    console.log(`[DoneRate Cron] Retrieved ${startResults.length} unique start counts from ClickHouse.`); // 查询昨天每个作品的独立完成用户数
+
+    // 查询昨天每个作品的独立完成用户数
+    const doneCountsQuery = `
+      SELECT
+          res,
+          count(DISTINCT uid) AS unique_dones
+      FROM ${CLICKHOUSE_EVENTS_TABLE}
+      WHERE event = 'color_done'
+        AND time >= toDateTime('${yesterdayStartString}')
+        AND time < toDateTime('${yesterdayEndString}')
+      GROUP BY res
+      HAVING res IS NOT NULL
+    `;
+    const doneResults = await clickhouseService.queryEvents<ClickHouseDoneCountResult>(doneCountsQuery);
+    const artworkDoneCounts = new Map<string, number>();
+    doneResults.forEach((row) => {
+      if (row.res && mongoose.Types.ObjectId.isValid(row.res)) {
+        artworkDoneCounts.set(row.res, row.unique_dones);
+      } else {
+        console.warn(`[DoneRate Cron] Invalid artwork ID found in done_counts result: ${row.res}`);
+      }
+    });
+    console.log(`[DoneRate Cron] Retrieved ${doneResults.length} unique done counts from ClickHouse.`);
+
+    // --- 2. 合并数据并更新 DoneRate 模型 ---
+    let updatedRecordsCount = 0; // for DoneRate
+    let createdRecordsCount = 0; // for DoneRate
+
+    // 遍历所有有开始事件的作品ID
+    for (const [resIdStr, startCount] of artworkStartCounts.entries()) {
+      const doneCount = artworkDoneCounts.get(resIdStr) || 0;
+      const resObjectId = new mongoose.Types.ObjectId(resIdStr);
+
+      // 使用 DoneRateService 来创建或更新记录
+      const doneRateDoc = await doneRateService.createOrUpdateDoneRate(yesterdayYYYYMMDD, resObjectId, startCount, doneCount);
+      if (doneRateDoc.createdAt.getTime() === doneRateDoc.updatedAt.getTime()) {
+        createdRecordsCount++;
+      } else {
+        updatedRecordsCount++;
+      }
+      artworkDoneCounts.delete(resIdStr); // 已经处理过的作品ID从 doneCounts 中移除
+    }
+
+    // 处理只有完成事件但没有开始事件的作品 (通常不应发生,但以防万一)
+    for (const [resIdStr, doneCount] of artworkDoneCounts.entries()) {
+      const startCount = 0; // 没有开始事件,所以开始次数为0
+      const resObjectId = new mongoose.Types.ObjectId(resIdStr);
+
+      const doneRateDoc = await doneRateService.createOrUpdateDoneRate(yesterdayYYYYMMDD, resObjectId, startCount, doneCount);
+      if (doneRateDoc.createdAt.getTime() === doneRateDoc.updatedAt.getTime()) {
+        createdRecordsCount++;
+      } else {
+        updatedRecordsCount++;
+      }
+    }
+
+    const totalProcessedArtworks = createdRecordsCount + updatedRecordsCount;
+
+    console.log(`[DoneRate Cron] DoneRate model update completed. Total artworks processed: ${totalProcessedArtworks}. Created: ${createdRecordsCount}, Updated: ${updatedRecordsCount}.`); // --- 3. 获取昨天的所有 DoneRate 记录,并更新本地的 totalDoneRate 表 ---
+
+    let updatedTotalDoneRateCount = 0;
+    const yesterdayDoneRates = await doneRateService.getDoneRatesByDate(yesterdayYYYYMMDD);
+
+    const totalDoneRatesToUpdate = yesterdayDoneRates.length;
+    let updateStartTime = new Date().getTime();
+    console.log(`[DoneRate Cron] 开始更新本地 TotalDoneRate 表。总计 ${totalDoneRatesToUpdate} 条记录。`);
+
+    for (let i = 0; i < totalDoneRatesToUpdate; i++) {
+      const doneRateDoc = yesterdayDoneRates[i];
+      try {
+        const artworkId = doneRateDoc.res; // 获取作品 ObjectId
+
+        // 查找现有的 TotalDoneRate 文档
+        const existingTotal = await TotalDoneRate.findById(artworkId);
+
+        let newTotalStartCount = doneRateDoc.startCount;
+        let newTotalDoneCount = doneRateDoc.doneCount;
+
+        if (existingTotal) {
+          // 如果已存在,则累加
+          newTotalStartCount += existingTotal.totalStartCount || 0;
+          newTotalDoneCount += existingTotal.totalDoneCount || 0;
+        }
+
+        const newCompletionRate = newTotalStartCount > 0 ? (newTotalDoneCount / newTotalStartCount) * 100 : 0;
+
+        // 使用 findByIdAndUpdate 进行原子操作,确保数据一致性
+        // 这里的 _id 是根据 artworkId 来查找或创建的
+        const updatedDoc = await TotalDoneRate.findByIdAndUpdate(
+          artworkId,
+          {
+            $set: {
+              totalStartCount: newTotalStartCount,
+              totalDoneCount: newTotalDoneCount,
+              completionRate: newCompletionRate,
+            },
+          },
+          { new: true, upsert: true } // upsert: true 表示如果文档不存在则创建
+        );
+
+        if (updatedDoc) {
+          updatedTotalDoneRateCount++;
+        }
+
+        // 进度日志
+        if ((i + 1) % 50 === 0 || i === totalDoneRatesToUpdate - 1) {
+          const elapsed = new Date().getTime() - updateStartTime;
+          console.log(`[DoneRate Cron] 进度: 已更新 ${i + 1}/${totalDoneRatesToUpdate} 条本地 TotalDoneRate 记录, 当前耗时: ${elapsed}ms`);
+        }
+      } catch (totalUpdateError) {
+        console.error(`[DoneRate Cron] Error updating TotalDoneRate document for artwork ID ${doneRateDoc.res}:`, totalUpdateError);
+      }
+    }
+
+    const updateTimeTaken = new Date().getTime() - updateStartTime;
+    console.log(`[DoneRate Cron] 本地 TotalDoneRate 表更新完成。总计更新 ${updatedTotalDoneRateCount} 条记录,总耗时 ${updateTimeTaken}ms。`);
+
+    const summary = `[DoneRate Cron] Daily done-rate calculation for ${yesterdayYYYYMMDD} completed. Total DoneRate processed: ${totalProcessedArtworks}. Created DoneRate: ${createdRecordsCount}, Updated DoneRate: ${updatedRecordsCount}. Updated TotalDoneRate records: ${updatedTotalDoneRateCount}.`;
+    console.log(summary);
+    return summary;
+  } catch (error) {
+    console.error(`[DoneRate Cron] Error during done-rate calculation for ${yesterdayYYYYMMDD}:`, error);
+    throw new Error("Failed to calculate daily done-rates."); // 抛出错误以通知 cron 调度器
+  } finally {
+  }
+}
+
+export = { run }; // 导出 run 函数以供 cron-jobs/index.ts 使用

+ 1 - 1
oms/services/cron-jobs/index.ts

@@ -13,7 +13,7 @@ interface CronJobModule {
 // Each element: [name: string, schedule: string, jobModule: CronJobModule]
 const settings: [string, string, CronJobModule][] = [
   // 假设这些文件将存在于 oms/services/cron-jobs/ 目录下
-  ["done-rate", "10 1 * * *", require("./done-rate") as CronJobModule], // 每天凌晨0点10分, 统计作品完成率
+  ["done-rate", "20 13 * * *", require("./done-rate2") as CronJobModule], // 每天凌晨0点10分, 统计作品完成率
   ["daily-activity-detector", "50 0 * * *", require("./daily-activity-detector") as CronJobModule], // 每天凌晨0点50分, 检查是否需要生成新的推送消息
   ["message-sender", "*/5 * * * *", require("./message-sender") as CronJobModule], // 每5分钟运行一次
 ];

+ 43 - 0
oms/src/models/totalDoneRateModel.ts

@@ -0,0 +1,43 @@
+// oms/src/models/totalDoneRateModel.ts
+
+import mongoose, { Schema, Document } from "mongoose";
+
+/**
+ * 累计作品完成率统计数据的接口。
+ * 这个模型用于存储每个作品的总开始数、总完成数和总完成率。
+ */
+export interface ITotalDoneRate extends Document {
+  _id: mongoose.Schema.Types.ObjectId; // 直接使用作品的 ObjectId 作为 _id
+  totalStartCount: number; // 作品总的开始次数
+  totalDoneCount: number; // 作品总的完成次数
+  completionRate: number; // 作品的总完成率 (百分比, 0-100)
+}
+
+const totalDoneRateSchema: Schema = new Schema(
+  {
+    // _id 字段直接用作作品的 ObjectId,因此无需再定义一个 res 字段。
+    // MongoDB 会自动为其创建唯一索引。
+    totalStartCount: {
+      type: Number,
+      required: true,
+      default: 0,
+    },
+    totalDoneCount: {
+      type: Number,
+      required: true,
+      default: 0,
+    },
+    completionRate: {
+      type: Number,
+      required: true,
+      default: 0,
+    },
+  },
+  {
+    timestamps: true, // 自动添加 createdAt 和 updatedAt 字段
+  }
+);
+
+const TotalDoneRate = mongoose.model<ITotalDoneRate>("TotalDoneRate", totalDoneRateSchema);
+
+export default TotalDoneRate;

+ 149 - 0
oms/src/scripts/migrate-total-done-rates.ts

@@ -0,0 +1,149 @@
+// oms/scripts/migrate-total-done-rates.ts
+
+import mongoose, { Schema, Document, Connection } from "mongoose";
+import TotalDoneRate, { ITotalDoneRate } from "../models/totalDoneRateModel"; // 导入新的 TotalDoneRate 模型
+
+// --- 数据库配置 ---
+// 本地 OMS 数据库的连接字符串
+const LOCAL_MONGO_URI = "mongodb://oms:oms123.@localhost:27717/omsdb?authSource=admin";
+// 远程旧 CLOGS 数据库的连接字符串
+const REMOTE_CLOGS_MONGO_URI = "mongodb://clogs:clogs123.%23%23@localhost:27017/clogs";
+
+// --- 旧数据库 total_done_rate 表的 Schema 和 Model ---
+// 接口与新模型的 ITotalDoneRate 相同,因为数据结构是同构的
+interface IOldTotalDoneRate extends Document {
+  _id: mongoose.Types.ObjectId;
+  totalStartCount: number;
+  totalDoneCount: number;
+  completionRate: number;
+}
+
+const oldTotalDoneRateSchema: Schema<IOldTotalDoneRate> = new Schema(
+  {
+    _id: { type: Schema.Types.ObjectId, required: true },
+    totalStartCount: { type: Number, required: true },
+    totalDoneCount: { type: Number, required: true },
+    completionRate: { type: Number, required: true },
+  },
+  {
+    collection: "total_done_rate", // 指定旧的表名
+    versionKey: false,
+  }
+);
+
+let localConn: Connection | null = null;
+let remoteConn: Connection | null = null;
+
+// 定义每次处理的数据量
+const BATCH_SIZE = 5000;
+
+/**
+ * 迁移函数,执行 total_done_rate 数据割接
+ */
+async function migrateTotalDoneRates() {
+  console.log("[Migration] Starting data migration from clogs.total_done_rate to oms.totaldonerates...");
+
+  try {
+    // 1. 建立本地和远程数据库连接
+    localConn = await mongoose.createConnection(LOCAL_MONGO_URI);
+    console.log("[Migration] Connected to local OMS database.");
+
+    remoteConn = await mongoose.createConnection(REMOTE_CLOGS_MONGO_URI);
+    const OldTotalDoneRate = remoteConn.model<IOldTotalDoneRate>("OldTotalDoneRate", oldTotalDoneRateSchema);
+    console.log("[Migration] Connected to remote CLOGS database.");
+
+    // 2. 使用游标分批处理数据
+    console.log(`[Migration] Processing records in batches of ${BATCH_SIZE}...`);
+
+    let processedCount = 0;
+    let successfulInserts = 0;
+    let batch: any[] = [];
+
+    // 使用游标查询,不会一次性加载所有数据到内存
+    const cursor = OldTotalDoneRate.find({}).lean().cursor();
+
+    // 逐条处理游标中的数据
+    for await (const doc of cursor) {
+      // 数据结构同构,可以直接推入批次
+      batch.push(doc);
+
+      // 如果达到批次大小,则执行批量插入
+      if (batch.length >= BATCH_SIZE) {
+        try {
+          const result = await localConn.model("TotalDoneRate", TotalDoneRate.schema).insertMany(batch, { ordered: false });
+          successfulInserts += result.length;
+        } catch (error: any) {
+          if (error.code === 11000) {
+            console.warn("[Migration] Duplicate key error in batch. Attempting individual inserts...");
+            // 如果存在重复项,则尝试逐条插入以找出成功的记录
+            for (const item of batch) {
+              try {
+                await localConn.model("TotalDoneRate", TotalDoneRate.schema).create(item);
+                successfulInserts++;
+              } catch (e: any) {
+                if (e.code !== 11000) {
+                  console.error(`[Migration] Failed to insert record with _id ${item._id}:`, e.message);
+                }
+              }
+            }
+          } else {
+            console.error("[Migration] An error occurred during a batch insertion:", error.message);
+          }
+        }
+
+        processedCount += batch.length;
+        console.log(`[Migration] Processed ${processedCount} records. Total successfully inserted: ${successfulInserts}`);
+
+        // 清空批次,准备下一轮
+        batch = [];
+      }
+    }
+
+    // 处理最后一个未满的批次
+    if (batch.length > 0) {
+      try {
+        const result = await localConn.model("TotalDoneRate", TotalDoneRate.schema).insertMany(batch, { ordered: false });
+        successfulInserts += result.length;
+      } catch (error: any) {
+        if (error.code === 11000) {
+          console.warn("[Migration] Duplicate key error in final batch. Attempting individual inserts...");
+          for (const item of batch) {
+            try {
+              await localConn.model("TotalDoneRate", TotalDoneRate.schema).create(item);
+              successfulInserts++;
+            } catch (e: any) {
+              if (e.code !== 11000) {
+                console.error(`[Migration] Failed to insert record with _id ${item._id}:`, e.message);
+              }
+            }
+          }
+        } else {
+          console.error("[Migration] An error occurred during the final batch insertion:", error.message);
+        }
+      }
+      processedCount += batch.length;
+    }
+
+    console.log(`[Migration] All records processed. Total processed: ${processedCount}. Total successfully inserted: ${successfulInserts}`);
+    console.log("[Migration] Data migration completed successfully!");
+  } catch (error: any) {
+    console.error("[Migration] A critical error occurred. Script will exit:", error);
+    throw error;
+  } finally {
+    // 确保关闭所有连接
+    if (localConn) {
+      await localConn.close();
+      console.log("[Migration] Local connection closed.");
+    }
+    if (remoteConn) {
+      await remoteConn.close();
+      console.log("[Migration] Remote connection closed.");
+    }
+  }
+}
+
+// 运行迁移脚本
+migrateTotalDoneRates().catch((err) => {
+  console.error("[Migration] Script failed to run:", err);
+  process.exit(1); // 退出并返回错误码
+});