guoziyun 10 mēneši atpakaļ
vecāks
revīzija
fbeae55f7e
1 mainītis faili ar 20 papildinājumiem un 28 dzēšanām
  1. 20 28
      oms/src/scripts/ingestHistoricalData.ts

+ 20 - 28
oms/src/scripts/ingestHistoricalData.ts

@@ -1,14 +1,13 @@
 // oms/scripts/ingestHistoricalData.ts
 
 // Load environment variables for database credentials (MONGO_URI, CLICKHOUSE_*)
-// Log file directory and date range are now passed as function arguments.
+// Log file directory is now passed as a function argument.
 import * as dotenv from "dotenv";
 dotenv.config();
 
 import mongoose from "mongoose"; // Mongoose for OMS MongoDB models
 import dayjs from "dayjs"; // For date manipulation
 import duration from "dayjs/plugin/duration"; // dayjs plugin for duration
-import isSameOrBefore from "dayjs/plugin/isSameOrBefore"; // Day.js plugin for isSameOrBefore
 import { v4 as uuidv4 } from "uuid"; // For generating unique IDs where needed
 import { User, IUser } from "../models/userModel"; // OMS User Model
 import { ClickhouseService, IEventLog } from "../services/clickhouseService"; // OMS ClickHouse Service
@@ -20,7 +19,6 @@ import * as zlib from "zlib";
 import * as readline from "readline";
 
 dayjs.extend(duration);
-dayjs.extend(isSameOrBefore); // Extend dayjs with the isSameOrBefore plugin
 
 // --- Persistent Configuration (can still come from .env) ---
 const OMS_MONGO_URI = process.env.MONGO_URI || "mongodb://oms:oms123.@localhost:27717/omsdb?authSource=admin";
@@ -94,13 +92,9 @@ async function flushClickHouseBuffer(buffer: IEventLog[], tableName: string, pro
 }
 
 // --- Historical Data Ingestion Logic ---
-// Now accepts logFilesDir, startDateStr, endDateStr as arguments
-async function ingestHistoricalData(logFilesDir: string, startDateStr: string, endDateStr: string) {
+// Now accepts only the log files directory as argument
+async function ingestHistoricalData(logFilesDir: string) {
   console.log(`Starting historical ingestion from local files in: ${logFilesDir}`);
-  console.log(`Filtering log files for dates from ${startDateStr} to ${endDateStr}`);
-
-  const startDate = dayjs(startDateStr, "YYYYMMDD");
-  const endDate = dayjs(endDateStr, "YYYYMMDD");
 
   let totalProcessedEvents = 0;
   let totalUpsertedUsers = 0;
@@ -109,18 +103,21 @@ async function ingestHistoricalData(logFilesDir: string, startDateStr: string, e
   const clickhouseEventsBuffer: IEventLog[] = [];
   const mongoUserWriteOperations: any[] = []; // Array of Mongoose bulkWrite operations
 
-  // Iterate through each day in the specified range to construct expected filenames
-  let currentDate = dayjs(startDateStr, "YYYYMMDD");
-  while (currentDate.isSameOrBefore(endDate, "day")) {
-    const fileDateStr = currentDate.format("YYYYMMDD");
-    const expectedFilename = `coloring-${fileDateStr}.log.gz`;
-    const filePath = path.join(logFilesDir, expectedFilename);
+  // Get all files from the directory and filter for .log.gz files
+  let allFiles: string[] = [];
+  try {
+    allFiles = fs
+      .readdirSync(logFilesDir)
+      .filter((file) => file.endsWith(".log.gz"))
+      .sort(); // Sort files to ensure chronological processing
+  } catch (readDirError) {
+    console.error(`Error reading directory '${logFilesDir}':`, readDirError);
+    // If the directory can't be read, we can't proceed.
+    return;
+  }
 
-    if (!fs.existsSync(filePath)) {
-      console.warn(`File '${expectedFilename}' not found in '${logFilesDir}'. Skipping this date.`);
-      currentDate = currentDate.add(1, "day"); // 文件不存在,日期推进
-      continue;
-    }
+  for (const expectedFilename of allFiles) {
+    const filePath = path.join(logFilesDir, expectedFilename);
 
     console.log(`\n--- Processing file: ${expectedFilename} ---`);
     let processedEventsCount = 0;
@@ -399,14 +396,11 @@ async function ingestHistoricalData(logFilesDir: string, startDateStr: string, e
       totalProcessedEvents += processedEventsCount;
       totalUpsertedUsers += upsertedUsersCount;
       console.log(`File '${expectedFilename}' processed: ${processedEventsCount} events, ${upsertedUsersCount} users upserted.`);
-      // Move to the next day only after successful processing of the current day's file
-      currentDate = currentDate.add(1, "day");
     } catch (error) {
       console.error(`Error processing file '${expectedFilename}':`, error);
-      // If an error occurs during file processing, we still want to move to the next day
-      currentDate = currentDate.add(1, "day");
+      // If an error occurs during file processing, we still want to move to the next file
     }
-  }
+  } // End of for...of allFiles loop
   console.log(`\n--- Historical data ingestion complete ---`);
   console.log(`Total processed events: ${totalProcessedEvents}`);
   console.log(`Total upserted users: ${totalUpsertedUsers}`);
@@ -418,10 +412,8 @@ async function main() {
 
   // --- Hardcoded parameters for one-time ingestion ---
   const logFilesDirectory = process.argv[2] || path.join(__dirname, "../../../../../logs/"); // Fallback directory
-  const ingestionStartDate = "20250821"; // Start date for log files
-  const ingestionEndDate = "20250821"; // End date for log files
 
-  await ingestHistoricalData(logFilesDirectory, ingestionStartDate, ingestionEndDate);
+  await ingestHistoricalData(logFilesDirectory);
 
   // Ensure mongoose connection is properly closed only if it was connected
   if (mongoose.connection.readyState === 1) await mongoose.disconnect();