guoziyun 9 månader sedan
förälder
incheckning
f707094729
2 ändrade filer med 92 tillägg och 37 borttagningar
  1. 7 6
      oms/services/cron-jobs/done-rate.ts
  2. 85 31
      oms/src/scripts/migrate-done-rates.ts

+ 7 - 6
oms/services/cron-jobs/done-rate.ts

@@ -161,12 +161,13 @@ async function run(): Promise<string> {
           const newCompletionRate = newTotalStartCount > 0 ? (newTotalDoneCount / newTotalStartCount) * 100 : 0;
 
           // **【新增】** 更新本地 Art 文档
-          await artService.updateArt(artworkId.toString(), {
-            totalStartCount: newTotalStartCount,
-            totalDoneCount: newTotalDoneCount,
-            completionRate: newCompletionRate,
-          });
-          updatedLocalArtworksCount++;
+          // 不更新本地art了
+          // await artService.updateArt(artworkId.toString(), {
+          //   totalStartCount: newTotalStartCount,
+          //   totalDoneCount: newTotalDoneCount,
+          //   completionRate: newCompletionRate,
+          // });
+          // updatedLocalArtworksCount++;
 
           // **【新增】** 同步更新远程 Art 文档
           const remoteArtDoc = await RemoteArt.findById(artworkId);

+ 85 - 31
oms/src/scripts/migrate-done-rates.ts

@@ -1,14 +1,14 @@
 // oms/scripts/migrate-done-rates.ts
 
 import mongoose, { Schema, Document, Connection } from "mongoose";
+import DoneRate from "../models/doneRateModel"; // 导入本地 DoneRate 模型
 import dayjs from "dayjs";
 import customParseFormat from "dayjs/plugin/customParseFormat";
-import DoneRate from "../models/doneRateModel"; // 导入本地 DoneRate 模型
 
 dayjs.extend(customParseFormat);
 
 // --- 数据库配置 ---
-// 本地 OMS 数据库的连接字符串,请根据你的实际配置修改
+// 本地 OMS 数据库的连接字符串
 const LOCAL_MONGO_URI = "mongodb://oms:oms123.@localhost:27717/omsdb?authSource=admin";
 // 远程旧 CLOGS 数据库的连接字符串
 const REMOTE_CLOGS_MONGO_URI = "mongodb://clogs:clogs123.%23%23@localhost:27017/clogs";
@@ -42,6 +42,9 @@ const oldDoneRateSchema: Schema<IOldDoneRate> = new Schema(
 let localConn: Connection | null = null;
 let remoteConn: Connection | null = null;
 
+// 定义每次处理的数据量
+const BATCH_SIZE = 5000;
+
 /**
  * 迁移函数,执行数据割接
  */
@@ -57,41 +60,92 @@ async function migrateDoneRates() {
     const OldDoneRate = remoteConn.model<IOldDoneRate>("OldDoneRate", oldDoneRateSchema);
     console.log("[Migration] Connected to remote CLOGS database.");
 
-    // 2. 从旧表中查询所有数据
-    console.log("[Migration] Fetching all records from remote 'done_rate' table...");
-    const oldRecords = await OldDoneRate.find({}).lean(); // 使用 lean() 获得原生 JS 对象,性能更高
-    console.log(`[Migration] Found ${oldRecords.length} records to migrate.`);
+    // 2. 使用游标分批处理数据
+    console.log(`[Migration] Processing records in batches of ${BATCH_SIZE}...`);
+
+    let processedCount = 0;
+    let successfulInserts = 0;
+    let batch: any[] = [];
+
+    // 使用游标查询,不会一次性加载所有数据到内存
+    const cursor = OldDoneRate.find({}).lean().cursor();
+
+    // 逐条处理游标中的数据
+    for await (const doc of cursor) {
+      // 格式化数据以匹配本地模型
+      const newRecord = {
+        date: doc.collectionName,
+        res: doc.res,
+        startCount: doc.startCount,
+        doneCount: doc.doneCount,
+        completionRate: doc.completionRate,
+      };
+      batch.push(newRecord);
 
-    if (oldRecords.length === 0) {
-      console.log("[Migration] No data to migrate. Exiting.");
-      return;
+      // 如果达到批次大小,则执行批量插入
+      if (batch.length >= BATCH_SIZE) {
+        try {
+          const result = await localConn.model("DoneRate", DoneRate.schema).insertMany(batch, { ordered: false });
+          successfulInserts += result.length;
+        } catch (error: any) {
+          if (error.code === 11000) {
+            console.warn("[Migration] Duplicate key error in batch. Continuing...");
+            // 如果存在重复项,则尝试逐条插入以找出成功的记录
+            for (const item of batch) {
+              try {
+                await localConn.model("DoneRate", DoneRate.schema).create(item);
+                successfulInserts++;
+              } catch (e: any) {
+                if (e.code !== 11000) {
+                  console.error(`[Migration] Failed to insert record ${JSON.stringify(item)}:`, e.message);
+                }
+              }
+            }
+          } else {
+            console.error("[Migration] An error occurred during a batch insertion:", error.message);
+          }
+        }
+
+        processedCount += batch.length;
+        console.log(`[Migration] Processed ${processedCount} records. Total successfully inserted: ${successfulInserts}`);
+
+        // 清空批次,准备下一轮
+        batch = [];
+      }
     }
 
-    // 3. 格式化数据以匹配本地模型
-    const newRecords = oldRecords.map((record) => ({
-      date: record.collectionName,
-      res: record.res,
-      startCount: record.startCount,
-      doneCount: record.doneCount,
-      completionRate: record.completionRate,
-    }));
-
-    // 4. 将格式化后的数据批量插入到本地表中
-    console.log("[Migration] Inserting records into local 'doneRates' collection...");
-    const localDoneRateModel = localConn.model("DoneRate", DoneRate.schema);
-    const result = await localDoneRateModel.insertMany(newRecords, { ordered: false });
-
-    console.log(`[Migration] Successfully migrated ${result.length} records.`);
+    // 处理最后一个未满的批次
+    if (batch.length > 0) {
+      try {
+        const result = await localConn.model("DoneRate", DoneRate.schema).insertMany(batch, { ordered: false });
+        successfulInserts += result.length;
+      } catch (error: any) {
+        if (error.code === 11000) {
+          console.warn("[Migration] Duplicate key error in final batch. Continuing...");
+          for (const item of batch) {
+            try {
+              await localConn.model("DoneRate", DoneRate.schema).create(item);
+              successfulInserts++;
+            } catch (e: any) {
+              if (e.code !== 11000) {
+                console.error(`[Migration] Failed to insert record ${JSON.stringify(item)}:`, e.message);
+              }
+            }
+          }
+        } else {
+          console.error("[Migration] An error occurred during the final batch insertion:", error.message);
+        }
+      }
+      processedCount += batch.length;
+    }
+
+    console.log(`[Migration] All records processed. Total processed: ${processedCount}. Total successfully inserted: ${successfulInserts}`);
     console.log("[Migration] Data migration completed successfully!");
   } catch (error: any) {
-    console.error("[Migration] An error occurred during migration:", error);
-    // 检查是否是重复键错误,这是正常的
-    if (error.code === 11000) {
-      console.warn("[Migration] Some records may already exist (duplicate key error). This is expected if the script is run multiple times.");
-    }
-    throw error; // 抛出错误以在控制台中显示
+    console.error("[Migration] A critical error occurred. Script will exit:", error);
+    throw error;
   } finally {
-    // 5. 确保关闭所有连接
+    // 确保关闭所有连接
     if (localConn) {
       await localConn.close();
       console.log("[Migration] Local connection closed.");