소스 검색

add ingestFCMToken

guoziyun 9 달 전
부모
커밋
9a899c4c46
2개의 변경된 파일363개의 추가작업 그리고 0개의 파일을 삭제
  1. 192 0
      oms/dist/src/scripts/ingestFCMTokens.js
  2. 171 0
      oms/src/scripts/ingestFCMTokens.ts

+ 192 - 0
oms/dist/src/scripts/ingestFCMTokens.js

@@ -0,0 +1,192 @@
+"use strict";
+// oms/scripts/ingestFCMTokens.ts
+var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {
+    if (k2 === undefined) k2 = k;
+    var desc = Object.getOwnPropertyDescriptor(m, k);
+    if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) {
+      desc = { enumerable: true, get: function() { return m[k]; } };
+    }
+    Object.defineProperty(o, k2, desc);
+}) : (function(o, m, k, k2) {
+    if (k2 === undefined) k2 = k;
+    o[k2] = m[k];
+}));
+var __setModuleDefault = (this && this.__setModuleDefault) || (Object.create ? (function(o, v) {
+    Object.defineProperty(o, "default", { enumerable: true, value: v });
+}) : function(o, v) {
+    o["default"] = v;
+});
+var __importStar = (this && this.__importStar) || (function () {
+    var ownKeys = function(o) {
+        ownKeys = Object.getOwnPropertyNames || function (o) {
+            var ar = [];
+            for (var k in o) if (Object.prototype.hasOwnProperty.call(o, k)) ar[ar.length] = k;
+            return ar;
+        };
+        return ownKeys(o);
+    };
+    return function (mod) {
+        if (mod && mod.__esModule) return mod;
+        var result = {};
+        if (mod != null) for (var k = ownKeys(mod), i = 0; i < k.length; i++) if (k[i] !== "default") __createBinding(result, mod, k[i]);
+        __setModuleDefault(result, mod);
+        return result;
+    };
+})();
+var __importDefault = (this && this.__importDefault) || function (mod) {
+    return (mod && mod.__esModule) ? mod : { "default": mod };
+};
+Object.defineProperty(exports, "__esModule", { value: true });
+// Load environment variables for database credentials
+const dotenv = __importStar(require("dotenv"));
+dotenv.config();
+const mongoose_1 = __importDefault(require("mongoose")); // Mongoose for OMS MongoDB models
+const dayjs_1 = __importDefault(require("dayjs")); // For date manipulation
+const userModel_1 = require("../models/userModel"); // OMS User Model
+// Node.js built-in modules for file processing
+const fs = __importStar(require("fs"));
+const path = __importStar(require("path"));
+const zlib = __importStar(require("zlib"));
+const readline = __importStar(require("readline"));
+// --- Persistent Configuration (can still come from .env) ---
+const OMS_MONGO_URI = process.env.MONGO_URI || "mongodb://oms:oms123.@localhost:27717/omsdb?authSource=admin";
+// --- Batching Configuration ---
+const MONGO_BATCH_SIZE = 1000; // MongoDB 批量写入大小
+// --- Initialize Services ---
+async function initializeServices() {
+    try {
+        // Connect to OMS MongoDB (using Mongoose)
+        await mongoose_1.default.connect(OMS_MONGO_URI);
+        console.log(`[Script] Connected to OMS MongoDB: ${OMS_MONGO_URI}`);
+    }
+    catch (error) {
+        console.error("[Script] Failed to initialize services:", error);
+        process.exit(1); // Exit if essential services cannot connect
+    }
+}
+/**
+ * Historical Data Ingestion Logic for FCM Tokens
+ * @param logFilesDir The directory containing the .log.gz files.
+ */
+async function ingestFCMTokens(logFilesDir) {
+    const overallStartTime = (0, dayjs_1.default)();
+    console.log(`[Script] Starting FCM token ingestion from local files in: ${logFilesDir}`);
+    let totalProcessedEvents = 0;
+    let totalUpdatedUsers = 0;
+    const mongoUserWriteOperations = []; // Array of Mongoose bulkWrite operations
+    // Get all files from the directory and filter for .log.gz files
+    let allFiles = [];
+    try {
+        allFiles = fs
+            .readdirSync(logFilesDir)
+            .filter((file) => file.endsWith(".log.gz"))
+            .sort(); // Sort files to ensure chronological processing
+    }
+    catch (readDirError) {
+        console.error(`[Script] Error reading directory '${logFilesDir}':`, readDirError);
+        return;
+    }
+    const totalFiles = allFiles.length;
+    if (totalFiles === 0) {
+        console.log("[Script] No .log.gz files found in the specified directory. Exiting.");
+        return;
+    }
+    console.log(`[Script] Found ${totalFiles} .log.gz files to process.`);
+    for (let i = 0; i < totalFiles; i++) {
+        const expectedFilename = allFiles[i];
+        const filePath = path.join(logFilesDir, expectedFilename);
+        const fileStartTime = (0, dayjs_1.default)();
+        console.log(`\n--- Processing file ${i + 1}/${totalFiles}: ${expectedFilename} ---`);
+        let processedEventsCount = 0;
+        let updatedUsersCount = 0;
+        const fileStream = fs.createReadStream(filePath);
+        const gunzip = zlib.createGunzip();
+        const rl = readline.createInterface({
+            input: fileStream.pipe(gunzip),
+            crlfDelay: Infinity, // Handle Windows/Unix line endings
+        });
+        try {
+            for await (const line of rl) {
+                if (!line.trim())
+                    continue; // Skip empty lines
+                let eventLog;
+                try {
+                    eventLog = JSON.parse(line);
+                }
+                catch (parseError) {
+                    console.error(`[Script] Error parsing JSON from line in '${expectedFilename}': ${line}. Error: ${parseError}`);
+                    continue; // Skip malformed JSON lines
+                }
+                const projectId = eventLog.project_id;
+                const uid = projectId === 1 ? eventLog.uid : eventLog.user_id;
+                const eventType = projectId === 1 ? eventLog.type : eventLog.name;
+                const fmToken = eventLog.token;
+                // --- Filter for the specific event type and ensure required fields exist ---
+                if (eventType === "firebase_message_token" && uid && fmToken) {
+                    // Add update operation to the batch
+                    mongoUserWriteOperations.push({
+                        updateOne: {
+                            filter: { uid: uid },
+                            update: {
+                                $set: { fmToken: fmToken },
+                            },
+                            // We only want to update existing users, not create new ones.
+                            // If we wanted to create new users, we would use upsert: true.
+                            upsert: false,
+                        },
+                    });
+                    processedEventsCount++;
+                }
+                // Execute batch write if batch size is reached
+                if (mongoUserWriteOperations.length >= MONGO_BATCH_SIZE) {
+                    try {
+                        const bulkResult = await userModel_1.User.bulkWrite(mongoUserWriteOperations, { ordered: false });
+                        updatedUsersCount += (bulkResult.upsertedCount || 0) + (bulkResult.modifiedCount || 0);
+                        console.log(`[Script] MongoDB batch written. Matched/Modified: ${bulkResult.matchedCount}/${bulkResult.modifiedCount}.`);
+                    }
+                    catch (bulkError) {
+                        console.error(`[Script] Error in MongoDB bulkWrite for file '${expectedFilename}':`, bulkError);
+                    }
+                    mongoUserWriteOperations.length = 0; // Clear buffer
+                }
+            } // End of for await (const line of rl) loop
+            // --- Flush any remaining buffers after file processing ---
+            if (mongoUserWriteOperations.length > 0) {
+                try {
+                    const bulkResult = await userModel_1.User.bulkWrite(mongoUserWriteOperations, { ordered: false });
+                    updatedUsersCount += (bulkResult.upsertedCount || 0) + (bulkResult.modifiedCount || 0);
+                    console.log(`[Script] MongoDB final batch written. Matched/Modified: ${bulkResult.matchedCount}/${bulkResult.modifiedCount}.`);
+                }
+                catch (bulkError) {
+                    console.error(`[Script] Error in final MongoDB bulkWrite for file '${expectedFilename}':`, bulkError);
+                }
+                mongoUserWriteOperations.length = 0; // Clear buffer
+            }
+            totalProcessedEvents += processedEventsCount;
+            totalUpdatedUsers += updatedUsersCount;
+            const fileDuration = dayjs_1.default.duration((0, dayjs_1.default)().diff(fileStartTime)).asSeconds();
+            console.log(`[Script] File '${expectedFilename}' processed: ${processedEventsCount} events, ${updatedUsersCount} users updated. Time taken: ${fileDuration.toFixed(2)} seconds.`);
+        }
+        catch (error) {
+            console.error(`[Script] Error processing file '${expectedFilename}':`, error);
+        }
+    } // End of for...of allFiles loop
+    const overallDuration = dayjs_1.default.duration((0, dayjs_1.default)().diff(overallStartTime)).asSeconds();
+    console.log(`\n--- FCM token ingestion complete ---`);
+    console.log(`Total processed events: ${totalProcessedEvents}`);
+    console.log(`Total users with updated FCM tokens: ${totalUpdatedUsers}`);
+    console.log(`Total time taken: ${overallDuration.toFixed(2)} seconds.`);
+}
+// --- Main execution ---
+async function main() {
+    await initializeServices(); // Initialize MongoDB connection
+    // Use the second command-line argument for the log file directory
+    const logFilesDirectory = process.argv[2] || path.join(__dirname, "../../../../../logs/");
+    await ingestFCMTokens(logFilesDirectory);
+    // Ensure mongoose connection is properly closed only if it was connected
+    if (mongoose_1.default.connection.readyState === 1)
+        await mongoose_1.default.disconnect();
+    console.log("[Script] Database connections closed.");
+    process.exit(0);
+}
+main().catch(console.error);

+ 171 - 0
oms/src/scripts/ingestFCMTokens.ts

@@ -0,0 +1,171 @@
+// oms/scripts/ingestFCMTokens.ts
+
+// Load environment variables for database credentials
+import * as dotenv from "dotenv";
+dotenv.config();
+
+import mongoose from "mongoose"; // Mongoose for OMS MongoDB models
+import dayjs from "dayjs"; // For date manipulation
+import { User, IUser } from "../models/userModel"; // OMS User Model
+
+// Node.js built-in modules for file processing
+import * as fs from "fs";
+import * as path from "path";
+import * as zlib from "zlib";
+import * as readline from "readline";
+
+// --- Persistent Configuration (can still come from .env) ---
+const OMS_MONGO_URI = process.env.MONGO_URI || "mongodb://oms:oms123.@localhost:27717/omsdb?authSource=admin";
+
+// --- Batching Configuration ---
+const MONGO_BATCH_SIZE = 1000; // MongoDB 批量写入大小
+
+// --- Initialize Services ---
+async function initializeServices() {
+  try {
+    // Connect to OMS MongoDB (using Mongoose)
+    await mongoose.connect(OMS_MONGO_URI);
+    console.log(`[Script] Connected to OMS MongoDB: ${OMS_MONGO_URI}`);
+  } catch (error) {
+    console.error("[Script] Failed to initialize services:", error);
+    process.exit(1); // Exit if essential services cannot connect
+  }
+}
+
+/**
+ * Historical Data Ingestion Logic for FCM Tokens
+ * @param logFilesDir The directory containing the .log.gz files.
+ */
+async function ingestFCMTokens(logFilesDir: string) {
+  const overallStartTime = dayjs();
+  console.log(`[Script] Starting FCM token ingestion from local files in: ${logFilesDir}`);
+
+  let totalProcessedEvents = 0;
+  let totalUpdatedUsers = 0;
+  const mongoUserWriteOperations: any[] = []; // Array of Mongoose bulkWrite operations
+
+  // Get all files from the directory and filter for .log.gz files
+  let allFiles: string[] = [];
+  try {
+    allFiles = fs
+      .readdirSync(logFilesDir)
+      .filter((file) => file.endsWith(".log.gz"))
+      .sort(); // Sort files to ensure chronological processing
+  } catch (readDirError) {
+    console.error(`[Script] Error reading directory '${logFilesDir}':`, readDirError);
+    return;
+  }
+
+  const totalFiles = allFiles.length;
+  if (totalFiles === 0) {
+    console.log("[Script] No .log.gz files found in the specified directory. Exiting.");
+    return;
+  }
+  console.log(`[Script] Found ${totalFiles} .log.gz files to process.`);
+
+  for (let i = 0; i < totalFiles; i++) {
+    const expectedFilename = allFiles[i];
+    const filePath = path.join(logFilesDir, expectedFilename);
+    const fileStartTime = dayjs();
+    console.log(`\n--- Processing file ${i + 1}/${totalFiles}: ${expectedFilename} ---`);
+
+    let processedEventsCount = 0;
+    let updatedUsersCount = 0;
+
+    const fileStream = fs.createReadStream(filePath);
+    const gunzip = zlib.createGunzip();
+    const rl = readline.createInterface({
+      input: fileStream.pipe(gunzip),
+      crlfDelay: Infinity, // Handle Windows/Unix line endings
+    });
+
+    try {
+      for await (const line of rl) {
+        if (!line.trim()) continue; // Skip empty lines
+
+        let eventLog: any;
+        try {
+          eventLog = JSON.parse(line);
+        } catch (parseError) {
+          console.error(`[Script] Error parsing JSON from line in '${expectedFilename}': ${line}. Error: ${parseError}`);
+          continue; // Skip malformed JSON lines
+        }
+
+        const projectId = eventLog.project_id;
+        const uid = projectId === 1 ? eventLog.uid : eventLog.user_id;
+        const eventType = projectId === 1 ? eventLog.type : eventLog.name;
+        const fmToken = eventLog.token;
+
+        // --- Filter for the specific event type and ensure required fields exist ---
+        if (eventType === "firebase_message_token" && uid && fmToken) {
+          // Add update operation to the batch
+          mongoUserWriteOperations.push({
+            updateOne: {
+              filter: { uid: uid },
+              update: {
+                $set: { fmToken: fmToken },
+              },
+              // We only want to update existing users, not create new ones.
+              // If we wanted to create new users, we would use upsert: true.
+              upsert: false,
+            },
+          });
+          processedEventsCount++;
+        }
+
+        // Execute batch write if batch size is reached
+        if (mongoUserWriteOperations.length >= MONGO_BATCH_SIZE) {
+          try {
+            const bulkResult = await User.bulkWrite(mongoUserWriteOperations, { ordered: false });
+            updatedUsersCount += (bulkResult.upsertedCount || 0) + (bulkResult.modifiedCount || 0);
+            console.log(`[Script] MongoDB batch written. Matched/Modified: ${bulkResult.matchedCount}/${bulkResult.modifiedCount}.`);
+          } catch (bulkError) {
+            console.error(`[Script] Error in MongoDB bulkWrite for file '${expectedFilename}':`, bulkError);
+          }
+          mongoUserWriteOperations.length = 0; // Clear buffer
+        }
+      } // End of for await (const line of rl) loop
+
+      // --- Flush any remaining buffers after file processing ---
+      if (mongoUserWriteOperations.length > 0) {
+        try {
+          const bulkResult = await User.bulkWrite(mongoUserWriteOperations, { ordered: false });
+          updatedUsersCount += (bulkResult.upsertedCount || 0) + (bulkResult.modifiedCount || 0);
+          console.log(`[Script] MongoDB final batch written. Matched/Modified: ${bulkResult.matchedCount}/${bulkResult.modifiedCount}.`);
+        } catch (bulkError) {
+          console.error(`[Script] Error in final MongoDB bulkWrite for file '${expectedFilename}':`, bulkError);
+        }
+        mongoUserWriteOperations.length = 0; // Clear buffer
+      }
+
+      totalProcessedEvents += processedEventsCount;
+      totalUpdatedUsers += updatedUsersCount;
+      const fileDuration = dayjs.duration(dayjs().diff(fileStartTime)).asSeconds();
+      console.log(`[Script] File '${expectedFilename}' processed: ${processedEventsCount} events, ${updatedUsersCount} users updated. Time taken: ${fileDuration.toFixed(2)} seconds.`);
+    } catch (error) {
+      console.error(`[Script] Error processing file '${expectedFilename}':`, error);
+    }
+  } // End of for...of allFiles loop
+  const overallDuration = dayjs.duration(dayjs().diff(overallStartTime)).asSeconds();
+  console.log(`\n--- FCM token ingestion complete ---`);
+  console.log(`Total processed events: ${totalProcessedEvents}`);
+  console.log(`Total users with updated FCM tokens: ${totalUpdatedUsers}`);
+  console.log(`Total time taken: ${overallDuration.toFixed(2)} seconds.`);
+}
+
+// --- Main execution ---
+async function main() {
+  await initializeServices(); // Initialize MongoDB connection
+
+  // Use the second command-line argument for the log file directory
+  const logFilesDirectory = process.argv[2] || path.join(__dirname, "../../../../../logs/");
+
+  await ingestFCMTokens(logFilesDirectory);
+
+  // Ensure mongoose connection is properly closed only if it was connected
+  if (mongoose.connection.readyState === 1) await mongoose.disconnect();
+  console.log("[Script] Database connections closed.");
+  process.exit(0);
+}
+
+main().catch(console.error);