const utils = require('../libs/utils'); const path = require('path'); const MongoClient = require('mongoose').mongo.MongoClient; const appConfig = require('../config/app'); const SyncSeq = require('./sync-seq'); const { format } = require('date-fns'); const dbs = ['coloring_ol']; // 需要同步的数据库, 主数据库(即syncevent表所在的库)放在第一个 /** * sync from remote */ async function run() { let delay = 30 * 1000; // sync freq let localDBHash = {}; let remoteDBHash = {}; const localClient = await new MongoClient(appConfig.mongodbUrl).connect(); const remoteClient = await new MongoClient(appConfig.syncUrl).connect(); for (let db of dbs) { localDBHash[db] = localClient.db(db); remoteDBHash[db] = remoteClient.db(db); } console.log('Connect to remote mongodb ' + appConfig.syncUrl + ' success!'); // 读取远程syncevent表 const synceventTB = remoteDBHash[dbs[0]].collection('syncevents'); // get current local slave sync seq (use mongoose) let seq = -1; let seqDoc = await SyncSeq.findOne(); if (seqDoc) seq = seqDoc.seq; else seqDoc = new SyncSeq({}); let count, localtb, remotetb, localdoc, remotedoc; setTimeout(async function cycleRun() { try { let eventDocs = await synceventTB.find({ _id: { $gt: seq } }).limit(200).toArray() || []; count = 0; for (let i = 0; i < eventDocs.length; i++) { let eventDoc = eventDocs[i]; console.log(eventDoc); if (eventDoc.tb == 'identitycounters') continue; // skip // 只同步特定的几张表: arts, daily-arts, users, albums, translates if (eventDoc.tb != 'arts' && eventDoc.tb != 'daily-arts' && eventDoc.tb != 'users' && eventDoc.tb != 'albums' && eventDoc.tb != 'translates') { continue; } if (!eventDoc.db) eventDoc.db = dbs[0]; // 对应的表 remotetb = remoteDBHash[eventDoc.db] ? remoteDBHash[eventDoc.db].collection(eventDoc.tb) : null; localtb = localDBHash[eventDoc.db] ? localDBHash[eventDoc.db].collection(eventDoc.tb) : null; if (!remotetb || !localtb) { console.warn('unkown sync database: ' + eventDoc.db + ', will skip'); continue; } if (eventDoc.op == 'remove') { await localtb.deleteOne({ _id: eventDoc.rid }); console.log("sync remove : " + eventDoc.tb + " " + eventDoc.rid); } else if (eventDoc.op == 'save') { remotedoc = await remotetb.findOne({ _id: eventDoc.rid }); localdoc = await localtb.findOne({ _id: eventDoc.rid }); if (!remotedoc) { console.log("remote doc not found, may be deleted, skip: " + eventDoc.tb + " " + eventDoc.rid); } else if (!localdoc) { console.log("sync add :" + eventDoc.tb + " " + eventDoc.rid); try { await localtb.insertOne(remotedoc); } catch (e) { console.error(e); } } else { console.log("sync update :" + eventDoc.tb + " " + eventDoc.rid); // 删除desc和title字段,避免覆盖 delete remotedoc.desc; await localtb.replaceOne({ _id: eventDoc.rid }, remotedoc); } } seq = eventDoc._id; seqDoc.seq = seq; await seqDoc.save(); count++; } console.log(`${format(new Date(), 'yyyy-MM-dd HH:mm')} sync event length: ${eventDocs.length}, precessed: ${count}`); } catch (err) { console.error(err); } if (count > 0) setTimeout(cycleRun, 0); // else setTimeout(run, delay); }, 0); } module.exports = { run } if (require.main == module) { run().catch(console.error); }