sync-service.js 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. const utils = require('../libs/utils');
  2. const path = require('path');
  3. const MongoClient = require('mongoose').mongo.MongoClient;
  4. const appConfig = require('../config/app');
  5. const SyncSeq = require('./sync-seq');
  6. const { format } = require('date-fns');
  7. const dbs = ['coloring_ol']; // 需要同步的数据库, 主数据库(即syncevent表所在的库)放在第一个
  8. /**
  9. * sync from remote
  10. */
  11. async function run() {
  12. let delay = 30 * 1000; // sync freq
  13. let localDBHash = {};
  14. let remoteDBHash = {};
  15. const localClient = await new MongoClient(appConfig.mongodbUrl).connect();
  16. const remoteClient = await new MongoClient(appConfig.syncUrl).connect();
  17. for (let db of dbs) {
  18. localDBHash[db] = localClient.db(db);
  19. remoteDBHash[db] = remoteClient.db(db);
  20. }
  21. console.log('Connect to remote mongodb ' + appConfig.syncUrl + ' success!');
  22. // 读取远程syncevent表
  23. const synceventTB = remoteDBHash[dbs[0]].collection('syncevents');
  24. // get current local slave sync seq (use mongoose)
  25. let seq = -1;
  26. let seqDoc = await SyncSeq.findOne();
  27. if (seqDoc) seq = seqDoc.seq;
  28. else seqDoc = new SyncSeq({});
  29. let count, localtb, remotetb, localdoc, remotedoc;
  30. setTimeout(async function cycleRun() {
  31. try {
  32. let eventDocs = await synceventTB.find({ _id: { $gt: seq } }).limit(200).toArray() || [];
  33. count = 0;
  34. for (let i = 0; i < eventDocs.length; i++) {
  35. let eventDoc = eventDocs[i];
  36. console.log(eventDoc);
  37. if (eventDoc.tb == 'identitycounters') continue; // skip
  38. // 只同步特定的几张表: arts, daily-arts, users, albums, translates
  39. if (eventDoc.tb != 'arts'
  40. && eventDoc.tb != 'daily-arts'
  41. && eventDoc.tb != 'users'
  42. && eventDoc.tb != 'albums'
  43. && eventDoc.tb != 'translates') {
  44. continue;
  45. }
  46. if (!eventDoc.db) eventDoc.db = dbs[0];
  47. // 对应的表
  48. remotetb = remoteDBHash[eventDoc.db] ? remoteDBHash[eventDoc.db].collection(eventDoc.tb) : null;
  49. localtb = localDBHash[eventDoc.db] ? localDBHash[eventDoc.db].collection(eventDoc.tb) : null;
  50. if (!remotetb || !localtb) {
  51. console.warn('unkown sync database: ' + eventDoc.db + ', will skip');
  52. continue;
  53. }
  54. if (eventDoc.op == 'remove') {
  55. await localtb.deleteOne({ _id: eventDoc.rid });
  56. console.log("sync remove : " + eventDoc.tb + " " + eventDoc.rid);
  57. } else if (eventDoc.op == 'save') {
  58. remotedoc = await remotetb.findOne({ _id: eventDoc.rid });
  59. localdoc = await localtb.findOne({ _id: eventDoc.rid });
  60. if (!remotedoc) {
  61. console.log("remote doc not found, may be deleted, skip: " + eventDoc.tb + " " + eventDoc.rid);
  62. } else if (!localdoc) {
  63. console.log("sync add :" + eventDoc.tb + " " + eventDoc.rid);
  64. try {
  65. await localtb.insertOne(remotedoc);
  66. } catch (e) {
  67. console.error(e);
  68. }
  69. } else {
  70. console.log("sync update :" + eventDoc.tb + " " + eventDoc.rid);
  71. // 删除desc和title字段,避免覆盖
  72. delete remotedoc.desc;
  73. await localtb.replaceOne({ _id: eventDoc.rid }, remotedoc);
  74. }
  75. }
  76. seq = eventDoc._id;
  77. seqDoc.seq = seq;
  78. await seqDoc.save();
  79. count++;
  80. }
  81. console.log(`${format(new Date(), 'yyyy-MM-dd HH:mm')} sync event length: ${eventDocs.length}, precessed: ${count}`);
  82. } catch (err) {
  83. console.error(err);
  84. }
  85. if (count > 0) setTimeout(cycleRun, 0);
  86. // else setTimeout(run, delay);
  87. }, 0);
  88. }
  89. module.exports = {
  90. run
  91. }
  92. if (require.main == module) {
  93. run().catch(console.error);
  94. }