| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- const utils = require('../libs/utils');
- const path = require('path');
- const MongoClient = require('mongoose').mongo.MongoClient;
- const appConfig = require('../config/app');
- const SyncSeq = require('./sync-seq');
- const { format } = require('date-fns');
- const dbs = ['coloring_ol']; // 需要同步的数据库, 主数据库(即syncevent表所在的库)放在第一个
- /**
- * sync from remote
- */
- async function run() {
- let delay = 30 * 1000; // sync freq
- let localDBHash = {};
- let remoteDBHash = {};
- const localClient = await new MongoClient(appConfig.mongodbUrl).connect();
- const remoteClient = await new MongoClient(appConfig.syncUrl).connect();
- for (let db of dbs) {
- localDBHash[db] = localClient.db(db);
- remoteDBHash[db] = remoteClient.db(db);
- }
- console.log('Connect to remote mongodb ' + appConfig.syncUrl + ' success!');
- // 读取远程syncevent表
- const synceventTB = remoteDBHash[dbs[0]].collection('syncevents');
- // get current local slave sync seq (use mongoose)
- let seq = -1;
- let seqDoc = await SyncSeq.findOne();
- if (seqDoc) seq = seqDoc.seq;
- else seqDoc = new SyncSeq({});
- let count, localtb, remotetb, localdoc, remotedoc;
- setTimeout(async function cycleRun() {
- try {
- let eventDocs = await synceventTB.find({ _id: { $gt: seq } }).limit(200).toArray() || [];
- count = 0;
- for (let i = 0; i < eventDocs.length; i++) {
- let eventDoc = eventDocs[i];
- console.log(eventDoc);
- if (eventDoc.tb == 'identitycounters') continue; // skip
- // 只同步特定的几张表: arts, daily-arts, users, albums, translates
- if (eventDoc.tb != 'arts'
- && eventDoc.tb != 'daily-arts'
- && eventDoc.tb != 'users'
- && eventDoc.tb != 'albums'
- && eventDoc.tb != 'translates') {
- continue;
- }
- if (!eventDoc.db) eventDoc.db = dbs[0];
- // 对应的表
- remotetb = remoteDBHash[eventDoc.db] ? remoteDBHash[eventDoc.db].collection(eventDoc.tb) : null;
- localtb = localDBHash[eventDoc.db] ? localDBHash[eventDoc.db].collection(eventDoc.tb) : null;
- if (!remotetb || !localtb) {
- console.warn('unkown sync database: ' + eventDoc.db + ', will skip');
- continue;
- }
- if (eventDoc.op == 'remove') {
- await localtb.deleteOne({ _id: eventDoc.rid });
- console.log("sync remove : " + eventDoc.tb + " " + eventDoc.rid);
- } else if (eventDoc.op == 'save') {
- remotedoc = await remotetb.findOne({ _id: eventDoc.rid });
- localdoc = await localtb.findOne({ _id: eventDoc.rid });
- if (!remotedoc) {
- console.log("remote doc not found, may be deleted, skip: " + eventDoc.tb + " " + eventDoc.rid);
- } else if (!localdoc) {
- console.log("sync add :" + eventDoc.tb + " " + eventDoc.rid);
- try {
- await localtb.insertOne(remotedoc);
- } catch (e) {
- console.error(e);
- }
- } else {
- console.log("sync update :" + eventDoc.tb + " " + eventDoc.rid);
- // 删除desc和title字段,避免覆盖
- delete remotedoc.desc;
- await localtb.replaceOne({ _id: eventDoc.rid }, remotedoc);
- }
- }
- seq = eventDoc._id;
- seqDoc.seq = seq;
- await seqDoc.save();
- count++;
- }
- console.log(`${format(new Date(), 'yyyy-MM-dd HH:mm')} sync event length: ${eventDocs.length}, precessed: ${count}`);
- } catch (err) {
- console.error(err);
- }
- if (count > 0) setTimeout(cycleRun, 0);
- // else setTimeout(run, delay);
- }, 0);
- }
- module.exports = {
- run
- }
- if (require.main == module) {
- run().catch(console.error);
- }
|