guoziyun hai 9 meses
pai
achega
fb07feaedf
Modificáronse 1 ficheiros con 142 adicións e 79 borrados
  1. 142 79
      oms/services/sync/sync-service.js

+ 142 - 79
oms/services/sync/sync-service.js

@@ -2,99 +2,162 @@ const MongoClient = require("mongoose").mongo.MongoClient;
 const SyncSeq = require("./sync-seq");
 const { format } = require("date-fns");
 
-const remotedb = "coloring_ol"; // 远端数据库
-const localdb = "omsdb"; // 本地的数据库
-/**
- * sync from remote
- */
-async function run() {
-  const delay = 30 * 1000; // sync freq
-  const INIT_SEQ = process.env.INIT_SEQ || 7368116;
-  const MONGO_URI = process.env.MONGO_URI || "mongodb://oms:oms123.@localhost:27717/omsdb?authSource=admin";
-  const REMOTE_SYNC_MONGO_URI = process.env.REMOTE_SYNC_MONGO_URI || "mongodb://coloring:coloring123.@hk.jccytech.cn:7881?authSource=admin";
-  const localClient = await new MongoClient(MONGO_URI).connect();
-  const remoteClient = await new MongoClient(REMOTE_SYNC_MONGO_URI).connect();
-
-  let remoteDB = remoteClient.db(remotedb);
-  let localDB = localClient.db(localdb);
-
-  console.log("Connect to remote mongodb " + REMOTE_SYNC_MONGO_URI + " success!");
-
-  // 读取远程syncevent表
-  const synceventTB = remoteDB.collection("syncevents");
-  // get current local slave sync seq (use mongoose)
-  let seq = INIT_SEQ;
-  let seqDoc = await SyncSeq.findOne();
-  if (seqDoc) seq = seqDoc.seq;
-  else seqDoc = new SyncSeq({ seq });
-
-  let count, localtb, remotetb, localdoc, remotedoc;
-  setTimeout(async function cycleRun() {
+class MongoSync {
+  constructor() {
+    this.delay = 30 * 1000; // 同步频率
+    this.batchSize = 200; // 每次同步的事件数量
+    this.maxRetryAttempts = 10; // 最大重连次数
+    this.retryDelay = 5000; // 重连等待时间(毫秒)
+
+    this.remotedb = "coloring_ol";
+    this.localdb = "omsdb";
+
+    this.localClient = null;
+    this.remoteClient = null;
+  }
+
+  /**
+   * 建立 MongoDB 连接,带重试机制
+   */
+  async connectWithRetry(uri, clientName) {
+    let client = null;
+    let attempts = 0;
+    while (attempts < this.maxRetryAttempts && !client) {
+      try {
+        console.log(`[${clientName}] Attempting to connect... (Attempt ${attempts + 1}/${this.maxRetryAttempts})`);
+        client = await new MongoClient(uri).connect();
+        console.log(`[${clientName}] Connection successful!`);
+        return client;
+      } catch (err) {
+        console.error(`[${clientName}] Connection failed: ${err.message}`);
+        if (err.name === "MongoNetworkTimeoutError" || err.name === "MongoParseError") {
+          attempts++;
+          if (attempts < this.maxRetryAttempts) {
+            console.log(`[${clientName}] Retrying in ${this.retryDelay / 1000} seconds...`);
+            await new Promise((resolve) => setTimeout(resolve, this.retryDelay));
+          }
+        } else {
+          // 对于非网络相关的错误,直接抛出
+          throw err;
+        }
+      }
+    }
+    // 如果重试次数用尽仍然失败,抛出错误
+    if (!client) {
+      throw new Error(`Failed to connect to ${clientName} after ${this.maxRetryAttempts} attempts.`);
+    }
+  }
+
+  /**
+   * 启动同步任务
+   */
+  async run() {
+    try {
+      // 获取环境变量,并提供默认值
+      const MONGO_URI = process.env.MONGO_URI || "mongodb://oms:oms123.@localhost:27717/omsdb?authSource=admin";
+      const REMOTE_SYNC_MONGO_URI = process.env.REMOTE_SYNC_MONGO_URI || "mongodb://coloring:coloring123.@hk.jccytech.cn:7881?authSource=admin";
+      const INIT_SEQ = parseInt(process.env.INIT_SEQ) || 7368116;
+
+      // 建立数据库连接,如果失败会重试
+      this.localClient = await this.connectWithRetry(MONGO_URI, "local");
+      this.remoteClient = await this.connectWithRetry(REMOTE_SYNC_MONGO_URI, "remote");
+
+      this.remoteDB = this.remoteClient.db(this.remotedb);
+      this.localDB = this.localClient.db(this.localdb);
+
+      // 获取当前本地同步序列号
+      let seqDoc = await SyncSeq.findOne();
+      if (!seqDoc) {
+        seqDoc = new SyncSeq({ seq: INIT_SEQ });
+        await seqDoc.save();
+      }
+
+      console.log(`Sync started from sequence: ${seqDoc.seq}`);
+      this.cycleRun(seqDoc);
+    } catch (err) {
+      console.error("Critical error in sync process:", err);
+      // 致命错误,让进程退出,PM2会处理重启
+      process.exit(1);
+    }
+  }
+
+  /**
+   * 循环同步逻辑
+   */
+  async cycleRun(seqDoc) {
     try {
+      const seq = seqDoc.seq;
+      const synceventTB = this.remoteDB.collection("syncevents");
+
       let eventDocs =
         (await synceventTB
           .find({ _id: { $gt: seq } })
-          .limit(200)
+          .limit(this.batchSize)
           .toArray()) || [];
-      count = 0;
-      for (let i = 0; i < eventDocs.length; i++) {
-        let eventDoc = eventDocs[i];
-        // 只同步特定的arts表
-        if (eventDoc.tb == "arts") {
-          console.log(eventDoc);
-
-          if (!eventDoc.db) eventDoc.db = "coloring_ol";
-          // 对应的表
-          remotetb = remoteDB.collection(eventDoc.tb);
-          localtb = localDB.collection(eventDoc.tb);
-          if (remotetb && localtb) {
-            if (eventDoc.op == "remove") {
-              await localtb.deleteOne({ _id: eventDoc.rid });
-              console.log("sync remove : " + eventDoc.tb + " " + eventDoc.rid);
-            } else if (eventDoc.op == "save") {
-              remotedoc = await remotetb.findOne({ _id: eventDoc.rid });
-              localdoc = await localtb.findOne({ _id: eventDoc.rid });
-              if (!remotedoc) {
-                console.log("remote doc not found, may be deleted, skip: " + eventDoc.tb + " " + eventDoc.rid);
-              } else if (!localdoc) {
-                console.log("sync add :" + eventDoc.tb + " " + eventDoc.rid);
-                try {
-                  await localtb.insertOne(remotedoc);
-                } catch (e) {
-                  console.error(e);
-                }
-              } else {
-                console.log("sync update :" + eventDoc.tb + " " + eventDoc.rid);
-                remotedoc.totalStartCount = localdoc.totalStartCount;
-                remotedoc.totalDoneCount = localdoc.totalDoneCount;
-                remotedoc.completionRate = localdoc.completionRate;
-                await localtb.replaceOne({ _id: eventDoc.rid }, remotedoc);
-              }
-            }
-          }
-        }
 
-        seq = eventDoc._id;
-        seqDoc.seq = seq;
-        count++;
+      let processedCount = 0;
+      for (const eventDoc of eventDocs) {
+        // 只同步特定的 arts 表
+        if (eventDoc.tb === "arts") {
+          await this.processEvent(eventDoc);
+        }
+        seqDoc.seq = eventDoc._id; // 更新序列号
+        processedCount++;
       }
 
       await seqDoc.save();
 
-      console.log(`${format(new Date(), "yyyy-MM-dd HH:mm")} sync event length: ${eventDocs.length}, precessed: ${count}`);
+      console.log(`${format(new Date(), "yyyy-MM-dd HH:mm")} Synced ${eventDocs.length} events, processed ${processedCount} 'arts' events.`);
+
+      // 根据处理结果决定下一次运行的时间
+      if (eventDocs.length > 0) {
+        // 还有更多数据,立即运行下一轮
+        setTimeout(() => this.cycleRun(seqDoc), 0);
+      } else {
+        // 没有新数据,等待一段时间后再检查
+        setTimeout(() => this.cycleRun(seqDoc), this.delay);
+      }
     } catch (err) {
-      console.error(err);
+      console.error("Error during sync cycle:", err);
+      // 对于循环内的错误,不立即退出,而是等待下一轮检查,让 connectWithRetry 处理重连
+      setTimeout(() => this.run(), this.retryDelay);
     }
+  }
+
+  /**
+   * 处理单个事件的逻辑
+   */
+  async processEvent(eventDoc) {
+    const remotetb = this.remoteDB.collection(eventDoc.tb);
+    const localtb = this.localDB.collection(eventDoc.tb);
+
+    if (eventDoc.op === "remove") {
+      await localtb.deleteOne({ _id: eventDoc.rid });
+      console.log(`Sync remove: ${eventDoc.tb} ${eventDoc.rid}`);
+    } else if (eventDoc.op === "save") {
+      const remotedoc = await remotetb.findOne({ _id: eventDoc.rid });
+      if (!remotedoc) {
+        console.log(`Remote doc not found, may be deleted, skipping: ${eventDoc.tb} ${eventDoc.rid}`);
+        return;
+      }
+      const localdoc = await localtb.findOne({ _id: eventDoc.rid });
 
-    if (count > 0) setTimeout(cycleRun, 0);
-    else setTimeout(run, delay);
-  }, 0);
+      if (!localdoc) {
+        console.log(`Sync add: ${eventDoc.tb} ${eventDoc.rid}`);
+        await localtb.insertOne(remotedoc);
+      } else {
+        console.log(`Sync update: ${eventDoc.tb} ${eventDoc.rid}`);
+
+        await localtb.replaceOne({ _id: eventDoc.rid }, remotedoc);
+      }
+    }
+  }
 }
 
-module.exports = {
-  run,
-};
+// 导出类以便在其他地方使用
+module.exports = MongoSync;
 
-if (require.main == module) {
-  run().catch(console.error);
+// 如果作为主模块运行,则创建实例并启动
+if (require.main === module) {
+  new MongoSync().run().catch(console.error);
 }