sync-service.js 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. const MongoClient = require("mongoose").mongo.MongoClient;
  2. const SyncSeq = require("./sync-seq");
  3. const { format } = require("date-fns");
  4. class MongoSync {
  5. constructor() {
  6. this.delay = 30 * 1000; // 同步频率
  7. this.batchSize = 200; // 每次同步的事件数量
  8. this.maxRetryAttempts = 10; // 最大重连次数
  9. this.retryDelay = 5000; // 重连等待时间(毫秒)
  10. this.remotedb = "coloring_ol";
  11. this.localdb = "omsdb";
  12. this.localClient = null;
  13. this.remoteClient = null;
  14. }
  15. /**
  16. * 建立 MongoDB 连接,带重试机制
  17. */
  18. async connectWithRetry(uri, clientName) {
  19. let client = null;
  20. let attempts = 0;
  21. while (attempts < this.maxRetryAttempts && !client) {
  22. try {
  23. console.log(`[${clientName}] Attempting to connect... (Attempt ${attempts + 1}/${this.maxRetryAttempts})`);
  24. client = await new MongoClient(uri).connect();
  25. console.log(`[${clientName}] Connection successful!`);
  26. return client;
  27. } catch (err) {
  28. console.error(`[${clientName}] Connection failed: ${err.message}`);
  29. if (err.name === "MongoNetworkTimeoutError" || err.name === "MongoParseError") {
  30. attempts++;
  31. if (attempts < this.maxRetryAttempts) {
  32. console.log(`[${clientName}] Retrying in ${this.retryDelay / 1000} seconds...`);
  33. await new Promise((resolve) => setTimeout(resolve, this.retryDelay));
  34. }
  35. } else {
  36. // 对于非网络相关的错误,直接抛出
  37. throw err;
  38. }
  39. }
  40. }
  41. // 如果重试次数用尽仍然失败,抛出错误
  42. if (!client) {
  43. throw new Error(`Failed to connect to ${clientName} after ${this.maxRetryAttempts} attempts.`);
  44. }
  45. }
  46. /**
  47. * 启动同步任务
  48. */
  49. async run() {
  50. try {
  51. // 获取环境变量,并提供默认值
  52. const MONGO_URI = process.env.MONGO_URI || "mongodb://oms:oms123.@localhost:27717/omsdb?authSource=admin";
  53. const REMOTE_SYNC_MONGO_URI = process.env.REMOTE_SYNC_MONGO_URI || "mongodb://coloring:coloring123.@gogs.jccytech.cn:7881?authSource=admin";
  54. const INIT_SEQ = parseInt(process.env.INIT_SEQ) || 7368116;
  55. // 建立数据库连接,如果失败会重试
  56. this.localClient = await this.connectWithRetry(MONGO_URI, "local");
  57. this.remoteClient = await this.connectWithRetry(REMOTE_SYNC_MONGO_URI, "remote");
  58. this.remoteDB = this.remoteClient.db(this.remotedb);
  59. this.localDB = this.localClient.db(this.localdb);
  60. // 获取当前本地同步序列号
  61. let seqDoc = await SyncSeq.findOne();
  62. if (!seqDoc) {
  63. seqDoc = new SyncSeq({ seq: INIT_SEQ });
  64. await seqDoc.save();
  65. }
  66. console.log(`Sync started from sequence: ${seqDoc.seq}`);
  67. this.cycleRun(seqDoc);
  68. } catch (err) {
  69. console.error("Critical error in sync process:", err);
  70. // 致命错误,让进程退出,PM2会处理重启
  71. process.exit(1);
  72. }
  73. }
  74. /**
  75. * 循环同步逻辑
  76. */
  77. async cycleRun(seqDoc) {
  78. try {
  79. const seq = seqDoc.seq;
  80. const synceventTB = this.remoteDB.collection("syncevents");
  81. let eventDocs =
  82. (await synceventTB
  83. .find({ _id: { $gt: seq } })
  84. .limit(this.batchSize)
  85. .toArray()) || [];
  86. let processedCount = 0;
  87. for (const eventDoc of eventDocs) {
  88. // 只同步特定的 arts 表
  89. if (eventDoc.tb === "arts") {
  90. await this.processEvent(eventDoc);
  91. }
  92. seqDoc.seq = eventDoc._id; // 更新序列号
  93. processedCount++;
  94. }
  95. await seqDoc.save();
  96. console.log(`${format(new Date(), "yyyy-MM-dd HH:mm")} Synced ${eventDocs.length} events, processed ${processedCount} 'arts' events.`);
  97. // 根据处理结果决定下一次运行的时间
  98. if (eventDocs.length > 0) {
  99. // 还有更多数据,立即运行下一轮
  100. setTimeout(() => this.cycleRun(seqDoc), 0);
  101. } else {
  102. // 没有新数据,等待一段时间后再检查
  103. setTimeout(() => this.cycleRun(seqDoc), this.delay);
  104. }
  105. } catch (err) {
  106. console.error("Error during sync cycle:", err);
  107. // 对于循环内的错误,不立即退出,而是等待下一轮检查,让 connectWithRetry 处理重连
  108. setTimeout(() => this.run(), this.retryDelay);
  109. }
  110. }
  111. /**
  112. * 处理单个事件的逻辑
  113. */
  114. async processEvent(eventDoc) {
  115. const remotetb = this.remoteDB.collection(eventDoc.tb);
  116. const localtb = this.localDB.collection(eventDoc.tb);
  117. if (eventDoc.op === "remove") {
  118. await localtb.deleteOne({ _id: eventDoc.rid });
  119. console.log(`Sync remove: ${eventDoc.tb} ${eventDoc.rid}`);
  120. } else if (eventDoc.op === "save") {
  121. const remotedoc = await remotetb.findOne({ _id: eventDoc.rid });
  122. if (!remotedoc) {
  123. console.log(`Remote doc not found, may be deleted, skipping: ${eventDoc.tb} ${eventDoc.rid}`);
  124. return;
  125. }
  126. const localdoc = await localtb.findOne({ _id: eventDoc.rid });
  127. if (!localdoc) {
  128. console.log(`Sync add: ${eventDoc.tb} ${eventDoc.rid}`);
  129. await localtb.insertOne(remotedoc);
  130. } else {
  131. console.log(`Sync update: ${eventDoc.tb} ${eventDoc.rid}`);
  132. await localtb.replaceOne({ _id: eventDoc.rid }, remotedoc);
  133. }
  134. }
  135. }
  136. }
  137. // 导出类以便在其他地方使用
  138. module.exports = MongoSync;
  139. // 如果作为主模块运行,则创建实例并启动
  140. if (require.main === module) {
  141. new MongoSync().run().catch(console.error);
  142. }