Ver Fonte

日志处理过滤掉duration非法的情况

guoziyun há 10 meses atrás
pai
commit
c13b25c444

+ 7 - 0
oms/services/ingestor-service.ts

@@ -167,6 +167,13 @@ async function processMessage(msg: Message) {
     return;
   }
 
+  // 增加对 eventLog.duration 的校验
+  if (eventData.duration > 10000 || eventData.duration < 0) {
+    console.warn(`[Ingestor Service] Skipping event with invalid duration: ${eventData.duration}. Event: ${JSON.stringify(eventData)}`);
+    amqpChannel.ack(msg); // Acknowledge and drop invalid messages
+    return;
+  }
+
   // Filter by project_id
   const projectId: number = eventData.project_id || eventData.project; // project_id for android/ios events, project for oms_app events
   if (projectId !== 1 && projectId !== 6) {

+ 23 - 3
oms/src/scripts/ingestHistoricalData.ts

@@ -94,6 +94,7 @@ async function flushClickHouseBuffer(buffer: IEventLog[], tableName: string, pro
 // --- Historical Data Ingestion Logic ---
 // Now accepts only the log files directory as argument
 async function ingestHistoricalData(logFilesDir: string) {
+  const overallStartTime = dayjs();
   console.log(`Starting historical ingestion from local files in: ${logFilesDir}`);
 
   let totalProcessedEvents = 0;
@@ -116,10 +117,20 @@ async function ingestHistoricalData(logFilesDir: string) {
     return;
   }
 
-  for (const expectedFilename of allFiles) {
+  const totalFiles = allFiles.length;
+  if (totalFiles === 0) {
+    console.log("No .log.gz files found in the specified directory. Exiting.");
+    return;
+  }
+  console.log(`Found ${totalFiles} .log.gz files to process.`);
+
+  for (let i = 0; i < totalFiles; i++) {
+    const expectedFilename = allFiles[i];
     const filePath = path.join(logFilesDir, expectedFilename);
+    const fileStartTime = dayjs();
+
+    console.log(`\n--- Processing file ${i + 1}/${totalFiles}: ${expectedFilename} ---`);
 
-    console.log(`\n--- Processing file: ${expectedFilename} ---`);
     let processedEventsCount = 0;
     let upsertedUsersCount = 0;
 
@@ -161,6 +172,12 @@ async function ingestHistoricalData(logFilesDir: string) {
           continue; // Skip if not an allowed event type
         }
 
+        // Skip events with invalid duration
+        if (eventLog.duration > 10000 || eventLog.duration < 0) {
+          console.warn(`Skipping event with invalid duration: ${eventLog.duration}. Event: ${JSON.stringify(eventLog)}`);
+          continue;
+        }
+
         const uid = projectId === 1 ? eventLog.uid : eventLog.user_id;
         if (!uid) {
           console.warn(`Skipping event with missing UID in '${expectedFilename}': ${JSON.stringify(eventLog)}`);
@@ -395,15 +412,18 @@ async function ingestHistoricalData(logFilesDir: string) {
 
       totalProcessedEvents += processedEventsCount;
       totalUpsertedUsers += upsertedUsersCount;
-      console.log(`File '${expectedFilename}' processed: ${processedEventsCount} events, ${upsertedUsersCount} users upserted.`);
+      const fileDuration = dayjs.duration(dayjs().diff(fileStartTime)).asSeconds();
+      console.log(`File '${expectedFilename}' processed: ${processedEventsCount} events, ${upsertedUsersCount} users upserted. Time taken: ${fileDuration.toFixed(2)} seconds.`);
     } catch (error) {
       console.error(`Error processing file '${expectedFilename}':`, error);
       // If an error occurs during file processing, we still want to move to the next file
     }
   } // End of for...of allFiles loop
+  const overallDuration = dayjs.duration(dayjs().diff(overallStartTime)).asSeconds();
   console.log(`\n--- Historical data ingestion complete ---`);
   console.log(`Total processed events: ${totalProcessedEvents}`);
   console.log(`Total upserted users: ${totalUpsertedUsers}`);
+  console.log(`Total time taken: ${overallDuration.toFixed(2)} seconds.`);
 }
 
 // --- Main execution ---