guoziyun 9 ヶ月 前
コミット
d2181c01f5

+ 1 - 1
.vscode/launch.json

@@ -83,7 +83,7 @@
       "request": "launch",
       "name": "active-user-daily-notify (TS-Node)",
       "skipFiles": ["<node_internals>/**"],
-      "program": "${workspaceFolder}/oms/src/scripts/active-user-daily-notify.ts.ts",
+      "program": "${workspaceFolder}/oms/src/scripts/active-user-daily-notify.ts",
       "runtimeArgs": [
         "--require",
         "ts-node/register" // This tells Node.js to use ts-node to register a TypeScript transpiler

+ 133 - 82
oms/dist/services/sync/sync-service.js

@@ -2,97 +2,148 @@
 const MongoClient = require("mongoose").mongo.MongoClient;
 const SyncSeq = require("./sync-seq");
 const { format } = require("date-fns");
-const remotedb = "coloring_ol"; // 远端数据库
-const localdb = "omsdb"; // 本地的数据库
-/**
- * sync from remote
- */
-async function run() {
-    const delay = 30 * 1000; // sync freq
-    const INIT_SEQ = process.env.INIT_SEQ || 7368116;
-    const MONGO_URI = process.env.MONGO_URI || "mongodb://oms:oms123.@localhost:27717/omsdb?authSource=admin";
-    const REMOTE_SYNC_MONGO_URI = process.env.REMOTE_SYNC_MONGO_URI || "mongodb://coloring:coloring123.@hk.jccytech.cn:7881?authSource=admin";
-    const localClient = await new MongoClient(MONGO_URI).connect();
-    const remoteClient = await new MongoClient(REMOTE_SYNC_MONGO_URI).connect();
-    let remoteDB = remoteClient.db(remotedb);
-    let localDB = localClient.db(localdb);
-    console.log("Connect to remote mongodb " + REMOTE_SYNC_MONGO_URI + " success!");
-    // 读取远程syncevent表
-    const synceventTB = remoteDB.collection("syncevents");
-    // get current local slave sync seq (use mongoose)
-    let seq = INIT_SEQ;
-    let seqDoc = await SyncSeq.findOne();
-    if (seqDoc)
-        seq = seqDoc.seq;
-    else
-        seqDoc = new SyncSeq({ seq });
-    let count, localtb, remotetb, localdoc, remotedoc;
-    setTimeout(async function cycleRun() {
+class MongoSync {
+    constructor() {
+        this.delay = 30 * 1000; // 同步频率
+        this.batchSize = 200; // 每次同步的事件数量
+        this.maxRetryAttempts = 10; // 最大重连次数
+        this.retryDelay = 5000; // 重连等待时间(毫秒)
+        this.remotedb = "coloring_ol";
+        this.localdb = "omsdb";
+        this.localClient = null;
+        this.remoteClient = null;
+    }
+    /**
+     * 建立 MongoDB 连接,带重试机制
+     */
+    async connectWithRetry(uri, clientName) {
+        let client = null;
+        let attempts = 0;
+        while (attempts < this.maxRetryAttempts && !client) {
+            try {
+                console.log(`[${clientName}] Attempting to connect... (Attempt ${attempts + 1}/${this.maxRetryAttempts})`);
+                client = await new MongoClient(uri).connect();
+                console.log(`[${clientName}] Connection successful!`);
+                return client;
+            }
+            catch (err) {
+                console.error(`[${clientName}] Connection failed: ${err.message}`);
+                if (err.name === "MongoNetworkTimeoutError" || err.name === "MongoParseError") {
+                    attempts++;
+                    if (attempts < this.maxRetryAttempts) {
+                        console.log(`[${clientName}] Retrying in ${this.retryDelay / 1000} seconds...`);
+                        await new Promise((resolve) => setTimeout(resolve, this.retryDelay));
+                    }
+                }
+                else {
+                    // 对于非网络相关的错误,直接抛出
+                    throw err;
+                }
+            }
+        }
+        // 如果重试次数用尽仍然失败,抛出错误
+        if (!client) {
+            throw new Error(`Failed to connect to ${clientName} after ${this.maxRetryAttempts} attempts.`);
+        }
+    }
+    /**
+     * 启动同步任务
+     */
+    async run() {
+        try {
+            // 获取环境变量,并提供默认值
+            const MONGO_URI = process.env.MONGO_URI || "mongodb://oms:oms123.@localhost:27717/omsdb?authSource=admin";
+            const REMOTE_SYNC_MONGO_URI = process.env.REMOTE_SYNC_MONGO_URI || "mongodb://coloring:coloring123.@hk.jccytech.cn:7881?authSource=admin";
+            const INIT_SEQ = parseInt(process.env.INIT_SEQ) || 7368116;
+            // 建立数据库连接,如果失败会重试
+            this.localClient = await this.connectWithRetry(MONGO_URI, "local");
+            this.remoteClient = await this.connectWithRetry(REMOTE_SYNC_MONGO_URI, "remote");
+            this.remoteDB = this.remoteClient.db(this.remotedb);
+            this.localDB = this.localClient.db(this.localdb);
+            // 获取当前本地同步序列号
+            let seqDoc = await SyncSeq.findOne();
+            if (!seqDoc) {
+                seqDoc = new SyncSeq({ seq: INIT_SEQ });
+                await seqDoc.save();
+            }
+            console.log(`Sync started from sequence: ${seqDoc.seq}`);
+            this.cycleRun(seqDoc);
+        }
+        catch (err) {
+            console.error("Critical error in sync process:", err);
+            // 致命错误,让进程退出,PM2会处理重启
+            process.exit(1);
+        }
+    }
+    /**
+     * 循环同步逻辑
+     */
+    async cycleRun(seqDoc) {
         try {
+            const seq = seqDoc.seq;
+            const synceventTB = this.remoteDB.collection("syncevents");
             let eventDocs = (await synceventTB
                 .find({ _id: { $gt: seq } })
-                .limit(200)
+                .limit(this.batchSize)
                 .toArray()) || [];
-            count = 0;
-            for (let i = 0; i < eventDocs.length; i++) {
-                let eventDoc = eventDocs[i];
-                // 只同步特定的arts表
-                if (eventDoc.tb == "arts") {
-                    console.log(eventDoc);
-                    if (!eventDoc.db)
-                        eventDoc.db = "coloring_ol";
-                    // 对应的表
-                    remotetb = remoteDB.collection(eventDoc.tb);
-                    localtb = localDB.collection(eventDoc.tb);
-                    if (remotetb && localtb) {
-                        if (eventDoc.op == "remove") {
-                            await localtb.deleteOne({ _id: eventDoc.rid });
-                            console.log("sync remove : " + eventDoc.tb + " " + eventDoc.rid);
-                        }
-                        else if (eventDoc.op == "save") {
-                            remotedoc = await remotetb.findOne({ _id: eventDoc.rid });
-                            localdoc = await localtb.findOne({ _id: eventDoc.rid });
-                            if (!remotedoc) {
-                                console.log("remote doc not found, may be deleted, skip: " + eventDoc.tb + " " + eventDoc.rid);
-                            }
-                            else if (!localdoc) {
-                                console.log("sync add :" + eventDoc.tb + " " + eventDoc.rid);
-                                try {
-                                    await localtb.insertOne(remotedoc);
-                                }
-                                catch (e) {
-                                    console.error(e);
-                                }
-                            }
-                            else {
-                                console.log("sync update :" + eventDoc.tb + " " + eventDoc.rid);
-                                remotedoc.totalStartCount = localdoc.totalStartCount;
-                                remotedoc.totalDoneCount = localdoc.totalDoneCount;
-                                remotedoc.completionRate = localdoc.completionRate;
-                                await localtb.replaceOne({ _id: eventDoc.rid }, remotedoc);
-                            }
-                        }
-                    }
+            let processedCount = 0;
+            for (const eventDoc of eventDocs) {
+                // 只同步特定的 arts 表
+                if (eventDoc.tb === "arts") {
+                    await this.processEvent(eventDoc);
                 }
-                seq = eventDoc._id;
-                seqDoc.seq = seq;
-                count++;
+                seqDoc.seq = eventDoc._id; // 更新序列号
+                processedCount++;
             }
             await seqDoc.save();
-            console.log(`${format(new Date(), "yyyy-MM-dd HH:mm")} sync event length: ${eventDocs.length}, precessed: ${count}`);
+            console.log(`${format(new Date(), "yyyy-MM-dd HH:mm")} Synced ${eventDocs.length} events, processed ${processedCount} 'arts' events.`);
+            // 根据处理结果决定下一次运行的时间
+            if (eventDocs.length > 0) {
+                // 还有更多数据,立即运行下一轮
+                setTimeout(() => this.cycleRun(seqDoc), 0);
+            }
+            else {
+                // 没有新数据,等待一段时间后再检查
+                setTimeout(() => this.cycleRun(seqDoc), this.delay);
+            }
         }
         catch (err) {
-            console.error(err);
+            console.error("Error during sync cycle:", err);
+            // 对于循环内的错误,不立即退出,而是等待下一轮检查,让 connectWithRetry 处理重连
+            setTimeout(() => this.run(), this.retryDelay);
+        }
+    }
+    /**
+     * 处理单个事件的逻辑
+     */
+    async processEvent(eventDoc) {
+        const remotetb = this.remoteDB.collection(eventDoc.tb);
+        const localtb = this.localDB.collection(eventDoc.tb);
+        if (eventDoc.op === "remove") {
+            await localtb.deleteOne({ _id: eventDoc.rid });
+            console.log(`Sync remove: ${eventDoc.tb} ${eventDoc.rid}`);
+        }
+        else if (eventDoc.op === "save") {
+            const remotedoc = await remotetb.findOne({ _id: eventDoc.rid });
+            if (!remotedoc) {
+                console.log(`Remote doc not found, may be deleted, skipping: ${eventDoc.tb} ${eventDoc.rid}`);
+                return;
+            }
+            const localdoc = await localtb.findOne({ _id: eventDoc.rid });
+            if (!localdoc) {
+                console.log(`Sync add: ${eventDoc.tb} ${eventDoc.rid}`);
+                await localtb.insertOne(remotedoc);
+            }
+            else {
+                console.log(`Sync update: ${eventDoc.tb} ${eventDoc.rid}`);
+                await localtb.replaceOne({ _id: eventDoc.rid }, remotedoc);
+            }
         }
-        if (count > 0)
-            setTimeout(cycleRun, 0);
-        else
-            setTimeout(run, delay);
-    }, 0);
+    }
 }
-module.exports = {
-    run,
-};
-if (require.main == module) {
-    run().catch(console.error);
+// 导出类以便在其他地方使用
+module.exports = MongoSync;
+// 如果作为主模块运行,则创建实例并启动
+if (require.main === module) {
+    new MongoSync().run().catch(console.error);
 }

+ 20 - 13
oms/dist/src/scripts/active-user-daily-notify.js

@@ -222,12 +222,19 @@ async function run() {
         const activeUsers = await userModel_1.User.find({
             lastActiveAt: { $gte: sevenDaysAgo },
             fmToken: { $nin: [null, ""] },
-            versionName: { $in: ["5.8.2-debug"] },
+            // versionName: { $in: ["5.8.2-debug"] },
             // versionName: { $in: ["5.8.2", "5.8.2-debug"] },
-            // versionCode: { $gte: 347 },
+            versionCode: { $gte: 347 },
         })
             .select("_id uid fmToken lang cc")
             .lean();
+        const isChengenDevice = activeUsers.find((e) => e.uid == "de14mVK6TBqoSUijSkJTUm");
+        if (!isChengenDevice) {
+            console.log(`未包含陈根设备`);
+        }
+        else {
+            console.log("包含陈根设备");
+        }
         if (activeUsers.length === 0) {
             console.log("未找到符合条件的用户,脚本结束。");
             await (0, database_1.disconnectFromDatabase)();
@@ -266,17 +273,17 @@ async function run() {
         });
         // --- 立即发送第一批消息 ---
         console.log("\n开始发送第一批消息...");
-        for (const messageData of messagesToSend) {
-            const user = messageData.user;
-            const userLang = getUserLanguage(user);
-            const fcmToken = user.fmToken;
-            const data1 = getMessageDataFromTemplate(messageData.template1, userLang);
-            data1.image = `https://d1e6q48ob2nxw1.cloudfront.net/thumbs/v2/fcm/640/${artwork1Id}.png`;
-            data1.bigger = "true";
-            data1.action = "go/art";
-            data1.param = artwork1Id;
-            await sendAndRecordMessage(user.uid, fcmToken, messageData.template1, data1, strategy._id, strategy.name);
-        }
+        // for (const messageData of messagesToSend) {
+        //   const user = messageData.user;
+        //   const userLang = getUserLanguage(user);
+        //   const fcmToken = user.fmToken as string;
+        //   const data1 = getMessageDataFromTemplate(messageData.template1, userLang);
+        //   data1.image = `https://d1e6q48ob2nxw1.cloudfront.net/thumbs/v2/fcm/640/${artwork1Id}.png`;
+        //   data1.bigger = "true";
+        //   data1.action = "go/art";
+        //   data1.param = artwork1Id;
+        //   await sendAndRecordMessage(user.uid, fcmToken, messageData.template1, data1, strategy._id, strategy.name);
+        // }
         console.log("第一批消息发送完成。");
         // 返回一个 Promise,该 Promise 将在 30 分钟后执行并完成所有后续操作
         await (0, database_1.disconnectFromDatabase)(); // 先注释掉先不发第二轮消息了

+ 22 - 15
oms/src/scripts/active-user-daily-notify.ts

@@ -211,13 +211,20 @@ export async function run(): Promise<void> {
     const activeUsers = await User.find({
       lastActiveAt: { $gte: sevenDaysAgo },
       fmToken: { $nin: [null, ""] },
-      versionName: { $in: ["5.8.2-debug"] },
+      // versionName: { $in: ["5.8.2-debug"] },
       // versionName: { $in: ["5.8.2", "5.8.2-debug"] },
-      // versionCode: { $gte: 347 },
+      versionCode: { $gte: 347 },
     })
       .select("_id uid fmToken lang cc")
       .lean<IUser[]>();
 
+    const isChengenDevice = activeUsers.find((e) => e.uid == "de14mVK6TBqoSUijSkJTUm");
+    if (!isChengenDevice) {
+      console.log(`未包含陈根设备`);
+    } else {
+      console.log("包含陈根设备");
+    }
+
     if (activeUsers.length === 0) {
       console.log("未找到符合条件的用户,脚本结束。");
       await disconnectFromDatabase();
@@ -260,19 +267,19 @@ export async function run(): Promise<void> {
 
     // --- 立即发送第一批消息 ---
     console.log("\n开始发送第一批消息...");
-    for (const messageData of messagesToSend) {
-      const user = messageData.user;
-      const userLang = getUserLanguage(user);
-      const fcmToken = user.fmToken as string;
-
-      const data1 = getMessageDataFromTemplate(messageData.template1, userLang);
-      data1.image = `https://d1e6q48ob2nxw1.cloudfront.net/thumbs/v2/fcm/640/${artwork1Id}.png`;
-      data1.bigger = "true";
-      data1.action = "go/art";
-      data1.param = artwork1Id;
-
-      await sendAndRecordMessage(user.uid, fcmToken, messageData.template1, data1, strategy._id, strategy.name);
-    }
+    // for (const messageData of messagesToSend) {
+    //   const user = messageData.user;
+    //   const userLang = getUserLanguage(user);
+    //   const fcmToken = user.fmToken as string;
+
+    //   const data1 = getMessageDataFromTemplate(messageData.template1, userLang);
+    //   data1.image = `https://d1e6q48ob2nxw1.cloudfront.net/thumbs/v2/fcm/640/${artwork1Id}.png`;
+    //   data1.bigger = "true";
+    //   data1.action = "go/art";
+    //   data1.param = artwork1Id;
+
+    //   await sendAndRecordMessage(user.uid, fcmToken, messageData.template1, data1, strategy._id, strategy.name);
+    // }
     console.log("第一批消息发送完成。");
 
     // 返回一个 Promise,该 Promise 将在 30 分钟后执行并完成所有后续操作