| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- const MongoClient = require("mongoose").mongo.MongoClient;
- const SyncSeq = require("./sync-seq");
- const { format } = require("date-fns");
- class MongoSync {
- constructor() {
- this.delay = 30 * 1000; // 同步频率
- this.batchSize = 200; // 每次同步的事件数量
- this.maxRetryAttempts = 10; // 最大重连次数
- this.retryDelay = 5000; // 重连等待时间(毫秒)
- this.remotedb = "coloring_ol";
- this.localdb = "omsdb";
- this.localClient = null;
- this.remoteClient = null;
- }
- /**
- * 建立 MongoDB 连接,带重试机制
- */
- async connectWithRetry(uri, clientName) {
- let client = null;
- let attempts = 0;
- while (attempts < this.maxRetryAttempts && !client) {
- try {
- console.log(`[${clientName}] Attempting to connect... (Attempt ${attempts + 1}/${this.maxRetryAttempts})`);
- client = await new MongoClient(uri).connect();
- console.log(`[${clientName}] Connection successful!`);
- return client;
- } catch (err) {
- console.error(`[${clientName}] Connection failed: ${err.message}`);
- if (err.name === "MongoNetworkTimeoutError" || err.name === "MongoParseError") {
- attempts++;
- if (attempts < this.maxRetryAttempts) {
- console.log(`[${clientName}] Retrying in ${this.retryDelay / 1000} seconds...`);
- await new Promise((resolve) => setTimeout(resolve, this.retryDelay));
- }
- } else {
- // 对于非网络相关的错误,直接抛出
- throw err;
- }
- }
- }
- // 如果重试次数用尽仍然失败,抛出错误
- if (!client) {
- throw new Error(`Failed to connect to ${clientName} after ${this.maxRetryAttempts} attempts.`);
- }
- }
- /**
- * 启动同步任务
- */
- async run() {
- try {
- // 获取环境变量,并提供默认值
- const MONGO_URI = process.env.MONGO_URI || "mongodb://oms:oms123.@localhost:27717/omsdb?authSource=admin";
- const REMOTE_SYNC_MONGO_URI = process.env.REMOTE_SYNC_MONGO_URI || "mongodb://coloring:coloring123.@gogs.jccytech.cn:7881?authSource=admin";
- const INIT_SEQ = parseInt(process.env.INIT_SEQ) || 7368116;
- // 建立数据库连接,如果失败会重试
- this.localClient = await this.connectWithRetry(MONGO_URI, "local");
- this.remoteClient = await this.connectWithRetry(REMOTE_SYNC_MONGO_URI, "remote");
- this.remoteDB = this.remoteClient.db(this.remotedb);
- this.localDB = this.localClient.db(this.localdb);
- // 获取当前本地同步序列号
- let seqDoc = await SyncSeq.findOne();
- if (!seqDoc) {
- seqDoc = new SyncSeq({ seq: INIT_SEQ });
- await seqDoc.save();
- }
- console.log(`Sync started from sequence: ${seqDoc.seq}`);
- this.cycleRun(seqDoc);
- } catch (err) {
- console.error("Critical error in sync process:", err);
- // 致命错误,让进程退出,PM2会处理重启
- process.exit(1);
- }
- }
- /**
- * 循环同步逻辑
- */
- async cycleRun(seqDoc) {
- try {
- const seq = seqDoc.seq;
- const synceventTB = this.remoteDB.collection("syncevents");
- let eventDocs =
- (await synceventTB
- .find({ _id: { $gt: seq } })
- .limit(this.batchSize)
- .toArray()) || [];
- let processedCount = 0;
- for (const eventDoc of eventDocs) {
- // 只同步特定的 arts 表
- if (eventDoc.tb === "arts") {
- await this.processEvent(eventDoc);
- }
- seqDoc.seq = eventDoc._id; // 更新序列号
- processedCount++;
- }
- await seqDoc.save();
- console.log(`${format(new Date(), "yyyy-MM-dd HH:mm")} Synced ${eventDocs.length} events, processed ${processedCount} 'arts' events.`);
- // 根据处理结果决定下一次运行的时间
- if (eventDocs.length > 0) {
- // 还有更多数据,立即运行下一轮
- setTimeout(() => this.cycleRun(seqDoc), 0);
- } else {
- // 没有新数据,等待一段时间后再检查
- setTimeout(() => this.cycleRun(seqDoc), this.delay);
- }
- } catch (err) {
- console.error("Error during sync cycle:", err);
- // 对于循环内的错误,不立即退出,而是等待下一轮检查,让 connectWithRetry 处理重连
- setTimeout(() => this.run(), this.retryDelay);
- }
- }
- /**
- * 处理单个事件的逻辑
- */
- async processEvent(eventDoc) {
- const remotetb = this.remoteDB.collection(eventDoc.tb);
- const localtb = this.localDB.collection(eventDoc.tb);
- if (eventDoc.op === "remove") {
- await localtb.deleteOne({ _id: eventDoc.rid });
- console.log(`Sync remove: ${eventDoc.tb} ${eventDoc.rid}`);
- } else if (eventDoc.op === "save") {
- const remotedoc = await remotetb.findOne({ _id: eventDoc.rid });
- if (!remotedoc) {
- console.log(`Remote doc not found, may be deleted, skipping: ${eventDoc.tb} ${eventDoc.rid}`);
- return;
- }
- const localdoc = await localtb.findOne({ _id: eventDoc.rid });
- if (!localdoc) {
- console.log(`Sync add: ${eventDoc.tb} ${eventDoc.rid}`);
- await localtb.insertOne(remotedoc);
- } else {
- console.log(`Sync update: ${eventDoc.tb} ${eventDoc.rid}`);
- await localtb.replaceOne({ _id: eventDoc.rid }, remotedoc);
- }
- }
- }
- }
- // 导出类以便在其他地方使用
- module.exports = MongoSync;
- // 如果作为主模块运行,则创建实例并启动
- if (require.main === module) {
- new MongoSync().run().catch(console.error);
- }
|