|
|
@@ -0,0 +1,110 @@
|
|
|
+const utils = require('../libs/utils');
|
|
|
+const path = require('path');
|
|
|
+const MongoClient = require('mongoose').mongo.MongoClient;
|
|
|
+const appConfig = require('../config/app');
|
|
|
+const SyncSeq = require('./sync-seq');
|
|
|
+const { format } = require('date-fns');
|
|
|
+
|
|
|
+const dbs = ['coloring_ol']; // 需要同步的数据库, 主数据库(即syncevent表所在的库)放在第一个
|
|
|
+
|
|
|
+/**
|
|
|
+ * start sync from remote
|
|
|
+ */
|
|
|
+async function start() {
|
|
|
+ let delay = 30 * 1000; // sync freq
|
|
|
+ let localDBHash = {};
|
|
|
+ let remoteDBHash = {};
|
|
|
+ const localClient = await new MongoClient(appConfig.mongodbUrl).connect();
|
|
|
+ const remoteClient = await new MongoClient(appConfig.syncUrl).connect();
|
|
|
+ for (let db of dbs) {
|
|
|
+ localDBHash[db] = localClient.db(db);
|
|
|
+ remoteDBHash[db] = remoteClient.db(db);
|
|
|
+ }
|
|
|
+ console.log('Connect to remote mongodb ' + appConfig.syncUrl + ' success!');
|
|
|
+
|
|
|
+ // 读取远程syncevent表
|
|
|
+ const synceventTB = remoteDBHash[dbs[0]].collection('syncevents');
|
|
|
+ // get current local slave sync seq (use mongoose)
|
|
|
+ let seq = -1;
|
|
|
+ let seqDoc = await SyncSeq.findOne();
|
|
|
+ if (seqDoc) seq = seqDoc.seq;
|
|
|
+ else seqDoc = new SyncSeq({});
|
|
|
+
|
|
|
+ let count, localtb, remotetb, localdoc, remotedoc;
|
|
|
+ setTimeout(async function run() {
|
|
|
+ try {
|
|
|
+ let eventDocs = await synceventTB.find({ _id: { $gt: seq } }).limit(200).toArray() || [];
|
|
|
+ count = 0;
|
|
|
+ for (let i = 0; i < eventDocs.length; i++) {
|
|
|
+ let eventDoc = eventDocs[i];
|
|
|
+ console.log(eventDoc);
|
|
|
+ if (eventDoc.tb == 'identitycounters') continue; // skip
|
|
|
+ // 只同步特定的几张表: arts, daily-arts, users, albums, translates
|
|
|
+ if (eventDoc.tb != 'arts'
|
|
|
+ && eventDoc.tb != 'daily-arts'
|
|
|
+ && eventDoc.tb != 'users'
|
|
|
+ && eventDoc.tb != 'albums'
|
|
|
+ && eventDoc.tb != 'translates') {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!eventDoc.db) eventDoc.db = dbs[0];
|
|
|
+ // 对应的表
|
|
|
+ remotetb = remoteDBHash[eventDoc.db] ? remoteDBHash[eventDoc.db].collection(eventDoc.tb) : null;
|
|
|
+ localtb = localDBHash[eventDoc.db] ? localDBHash[eventDoc.db].collection(eventDoc.tb) : null;
|
|
|
+ if (!remotetb || !localtb) {
|
|
|
+ console.warn('unkown sync database: ' + eventDoc.db + ', will skip');
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (eventDoc.op == 'remove') {
|
|
|
+ await localtb.deleteOne({ _id: eventDoc.rid });
|
|
|
+ console.log("sync remove : " + eventDoc.tb + " " + eventDoc.rid);
|
|
|
+ } else if (eventDoc.op == 'save') {
|
|
|
+ remotedoc = await remotetb.findOne({ _id: eventDoc.rid });
|
|
|
+ localdoc = await localtb.findOne({ _id: eventDoc.rid });
|
|
|
+ if (!remotedoc) {
|
|
|
+ console.log("remote doc not found, may be deleted, skip: " + eventDoc.tb + " " + eventDoc.rid);
|
|
|
+ } else if (!localdoc) {
|
|
|
+ console.log("sync add :" + eventDoc.tb + " " + eventDoc.rid);
|
|
|
+ try {
|
|
|
+ await localtb.insertOne(remotedoc);
|
|
|
+ } catch (e) {
|
|
|
+ console.error(e);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ console.log("sync update :" + eventDoc.tb + " " + eventDoc.rid);
|
|
|
+ // 删除desc和title字段,避免覆盖
|
|
|
+ delete remotedoc.desc;
|
|
|
+ await localtb.replaceOne({ _id: eventDoc.rid }, remotedoc);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ seq = eventDoc._id;
|
|
|
+ seqDoc.seq = seq;
|
|
|
+ await seqDoc.save();
|
|
|
+ count++;
|
|
|
+ }
|
|
|
+
|
|
|
+ console.log(`${format(new Date(), 'yyyy-MM-dd HH:mm')} sync event length: ${eventDocs.length}, precessed: ${count}`);
|
|
|
+
|
|
|
+ } catch (err) {
|
|
|
+ console.error(err);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (count > 0) setTimeout(run, 0);
|
|
|
+ else setTimeout(run, delay);
|
|
|
+
|
|
|
+ }, 0);
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+module.exports = {
|
|
|
+ start
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+if (require.main == module) {
|
|
|
+ start().catch(console.error);
|
|
|
+}
|