Explorar o código

1.消息推送早中晚一日三发改成只发中午;2.clickhouse只保留半年数据;3.message_receive消息不存入clickhouse

guoziyun hai 6 meses
pai
achega
c24cf09890

+ 2 - 2
oms/dist/services/cron-jobs/notify/local-timezone-notify.js

@@ -37,9 +37,9 @@ const countryCodeToLanguageMap = {
 };
 // 新增:每日推送策略的配置
 const dailyStrategies = [
-    { name: "local-morning-notify", hour: 8 }, // 早上8点
+    // { name: "local-morning-notify", hour: 8 }, // 早上8点
     { name: "local-midday-notify", hour: 12 }, // 中午12点
-    { name: "local-evening-notify", hour: 20 }, // 晚上8点
+    // { name: "local-evening-notify", hour: 20 }, // 晚上8点
 ];
 /**
  * 根据用户的 lang 或 cc 字段推断其语言。

+ 44 - 23
oms/dist/services/ingestor-service.js

@@ -199,7 +199,17 @@ async function flushMongoUserPrefBuffer() {
 async function handleMessageEvent(eventData, eventType) {
     const msgId = eventData.msgid;
     const fcmId = eventData.fcmid;
-    const eventTime = (0, dayjs_1.default)(eventData.t).toDate();
+    // Ensure 't' or 'create_at' is used for event time, default to current time
+    let eventTime;
+    if (eventData.t) {
+        eventTime = (0, dayjs_1.default)(eventData.t).toDate();
+    }
+    else if (eventData.create_at) {
+        eventTime = (0, dayjs_1.default)(eventData.create_at).toDate();
+    }
+    else {
+        eventTime = new Date();
+    }
     const inForeground = eventData.inforeground === "true" || eventData.inforeground === true;
     try {
         let updateFields;
@@ -231,7 +241,7 @@ async function handleMessageEvent(eventData, eventType) {
             query = { fcmReceipt: fcmId };
         }
         else {
-            console.warn(`[Ingestor Service] Missing msgid or fcmid for event type: ${eventType}. Event: ${JSON.stringify(eventData)}`);
+            console.warn(`[MongoDB-MessageRecord] Missing msgid or fcmid for event type: ${eventType}. Event: ${JSON.stringify(eventData)}`);
             return;
         }
         // Perform the update
@@ -241,7 +251,11 @@ async function handleMessageEvent(eventData, eventType) {
             console.log(`[MongoDB-MessageRecord] Updated record for ${eventType} event. Matched: ${result.matchedCount}, Modified: ${result.modifiedCount}`);
         }
         else {
-            console.warn(`[MongoDB-MessageRecord] No matching record found for ${eventType} event. Query: ${JSON.stringify(query)}`);
+            // 消息打开(message_open)可能比消息接收(message_receive)先到达,
+            // 对于message_open找不到记录是正常的,因为message_receive是创建记录的源头。
+            if (eventType === "message_receive") {
+                console.warn(`[MongoDB-MessageRecord] No matching record found for ${eventType} event. Query: ${JSON.stringify(query)}`);
+            }
         }
     }
     catch (error) {
@@ -264,6 +278,30 @@ async function processMessage(msg) {
         amqpChannel.reject(msg, false); // Reject malformed message, do not re-queue
         return;
     }
+    // Determine event type field name based on project_id and event source
+    // Assuming 'type' for Android-like events, 'name' for iOS-like events,
+    const eventType = eventData.type || eventData.name;
+    // --- 1. Handle Message-Specific Events First ---
+    // 无论是 message_receive 还是 message_open,都需要先即时处理 MongoDB MessageRecord。
+    if (["message_receive", "message_open"].includes(eventType)) {
+        // Note: This function includes its own error handling and logging.
+        await handleMessageEvent(eventData, eventType);
+        // 【关键改动】对于 message_receive 事件:
+        // 1. 已在 handleMessageEvent 中处理完毕 MessageRecord。
+        // 2. 需求是不存入 ClickHouse,也不更新 User.lastActiveAt。
+        // 3. 因此,处理完毕后,直接 Acknowledge 消息并返回。
+        if (eventType === "message_receive") {
+            amqpChannel.ack(msg);
+            return; // 立即返回,跳过后续的 ClickHouse 和 User 更新逻辑
+        }
+        // message_open 将继续向下流转,以便被记录到 ClickHouse 和更新 User 表
+    }
+    // Filter by allowed event types (excluding message_receive now as it was handled above)
+    if (!ALLOWED_EVENT_TYPES.includes(eventType)) {
+        // console.log(`[Ingestor Service] Skipping event with unsupported event_type: ${eventType}`);
+        amqpChannel.ack(msg); // Acknowledge and drop unsupported events
+        return;
+    }
     // 增加对 eventLog.duration 的校验
     if (eventData.duration > 100000 || eventData.duration < 0) {
         console.warn(`[Ingestor Service] Skipping event with invalid duration: ${eventData.duration}. Event: ${JSON.stringify(eventData)}`);
@@ -278,18 +316,6 @@ async function processMessage(msg) {
         amqpChannel.ack(msg); // Acknowledge and drop unsupported events
         return;
     }
-    // Determine event type field name based on project_id and event source
-    // Assuming 'type' for Android-like events, 'name' for iOS-like events,
-    const eventType = eventData.type || eventData.name;
-    // --- 1. Handle Message-Specific Events First ---
-    if (["message_receive", "message_open"].includes(eventType)) {
-        await handleMessageEvent(eventData, eventType); // 移除 amqpChannel.ack(msg); 和 return; // 让事件继续向下流转,以便被记录到ClickHouse和更新User表
-    } // Filter by allowed event types
-    if (!ALLOWED_EVENT_TYPES.includes(eventType)) {
-        // console.log(`[Ingestor Service] Skipping event with unsupported event_type: ${eventType}`);
-        amqpChannel.ack(msg); // Acknowledge and drop unsupported events
-        return;
-    }
     // Determine UID field name based on project_id
     const uid = eventData.uid || eventData.user_id; // uid for Android, user_id for iOS
     if (!uid) {
@@ -310,6 +336,7 @@ async function processMessage(msg) {
             lastActiveAtDateObj = new Date();
         }
         // --- 2. Prepare Event Data for ClickHouse Batch ---
+        // Note: message_receive 已经被前面的逻辑过滤掉了,这里只处理需要存入 ClickHouse 的事件
         const clickhouseEvent = {
             log_id: eventData._id ? eventData._id.toString() : new mongoose_1.default.Types.ObjectId().toHexString(), // Use existing _id or generate new
             uid: uid,
@@ -330,11 +357,9 @@ async function processMessage(msg) {
         };
         clickhouseEventsBuffer.push(clickhouseEvent);
         // --- 3. Prepare User Data for MongoDB Batch Update ---
-        // userSetData will contain fields to be updated using $set for both new and existing documents.
-        // 'project' is now excluded here as it will be handled by $setOnInsert only.
-        //const userSetData: Partial<IUser> = { lastActiveAt: lastActiveAtDateObj };
         const userSetData = {};
-        // 👇 关键修改:仅当事件类型不是 message_receive 时,才更新 lastActiveAt
+        // 关键逻辑:只有非 message_receive 事件才更新 lastActiveAt
+        // message_receive 已在前面被 return 掉,但为了代码清晰和健壮性,保留此判断。
         if (eventType !== "message_receive") {
             userSetData.lastActiveAt = lastActiveAtDateObj;
         }
@@ -350,7 +375,6 @@ async function processMessage(msg) {
             derivedFirstLoginAt = (0, dayjs_1.default)(lastActiveAtDateObj).subtract(eventData.days, "day").toDate();
         }
         // Set firstLoginAt for $setOnInsert (for new documents)
-        // This value will only be used if the document is actually inserted (upsert: true creates a new doc)
         setOnInsertFields.firstLoginAt = derivedFirstLoginAt || lastActiveAtDateObj;
         // Copy relevant fields from event to userSetData (for $set)
         for (const field of USER_FIELDS_TO_UPDATE) {
@@ -398,9 +422,6 @@ async function processMessage(msg) {
             $set: userSetData,
             $setOnInsert: setOnInsertFields,
         };
-        // 👈 关键修改:移除 $min 操作符
-        // `firstLoginAt` 将只在 `$setOnInsert` 时被设置,
-        // 如果文档已存在,它将不会被更新,这符合您的需求。
         mongoUserWriteOperations.push({
             updateOne: {
                 filter: { uid: uid },

+ 24 - 6
oms/dist/src/services/clickhouseService.js

@@ -6,7 +6,7 @@ Object.defineProperty(exports, "__esModule", { value: true });
 exports.ClickhouseService = void 0;
 // oms/src/services/clickhouseService.ts
 const client_1 = require("@clickhouse/client");
-const dayjs_1 = __importDefault(require("dayjs")); // 👈 新增:导入 dayjs 用于日期格式化
+const dayjs_1 = __importDefault(require("dayjs")); // 导入 dayjs 用于日期格式化
 class ClickhouseService {
     constructor(host, database, username, password) {
         this.database = database;
@@ -44,11 +44,29 @@ class ClickhouseService {
       ) ENGINE = MergeTree()
       ORDER BY (uid, time, event) -- 建议的排序键
       PRIMARY KEY (uid, time) -- 主键可用于数据去重和部分查询优化
-      -- TTL time + INTERVAL 30 DAY -- 示例:数据保留30天,根据需求调整。 暂时先注释掉
+      TTL time + INTERVAL 6 MONTH -- 【新增】数据保留6个月
     `;
         try {
+            // ClickHouse 运行此 SQL。如果表已存在,它不会被重新创建,
+            // 但如果 ClickHouse 版本支持,可能会更新其 TTL 属性(具体行为取决于ClickHouse版本和表类型)。
+            // 对于 MergeTree 引擎,推荐使用 ALTER TABLE 修改 TTL。
             await this.client.exec({ query: createTableSql });
-            console.log(`ClickHouse table '${tableName}' ensured.`);
+            console.log(`ClickHouse table '${tableName}' ensured (with TTL 6 months).`);
+            // 增加一步,尝试使用 ALTER TABLE 明确设置或修改 TTL,以确保生效,特别是当表已经存在时。
+            // 注意:ALTER TABLE 仅在 ClickHouse 表已存在时才执行。
+            const alterTableSql = `
+        ALTER TABLE ${tableName} MODIFY TTL time + INTERVAL 6 MONTH
+      `;
+            // 这是一个非关键操作,如果 ClickHouse 版本不支持或报错,不应该导致程序崩溃。
+            try {
+                await this.client.exec({ query: alterTableSql });
+                console.log(`ClickHouse table '${tableName}' TTL successfully applied/modified.`);
+            }
+            catch (alterError) {
+                // 通常在表不存在时或 TTL 已设置时发生,或者权限不足。
+                // 我们在 CREATE TABLE IF NOT EXISTS 中已设置,此步主要作为保险。
+                console.warn(`Could not ALTER TABLE ${tableName} to modify TTL (May be okay if table was just created):`, alterError);
+            }
         }
         catch (error) {
             console.error(`Failed to ensure ClickHouse table '${tableName}':`, error);
@@ -61,11 +79,11 @@ class ClickhouseService {
      * @param events - 单个事件日志数据或事件日志数据数组。
      */
     async insertEvent(tableName, events) {
-        const valuesToInsert = Array.isArray(events) ? events : [events]; // <--- 关键修改:确保 values 始终是一个数组
+        const valuesToInsert = Array.isArray(events) ? events : [events]; // 确保 values 始终是一个数组
         if (valuesToInsert.length === 0) {
             return; // No events to insert
         }
-        // 👈 关键修改:在发送到 ClickHouse 之前,格式化 Date 对象
+        // 在发送到 ClickHouse 之前,格式化 Date 对象
         const formattedValues = valuesToInsert.map((event) => ({
             ...event,
             // 将 Date 对象格式化为 ClickHouse 期望的字符串格式
@@ -74,7 +92,7 @@ class ClickhouseService {
         try {
             await this.client.insert({
                 table: tableName,
-                values: formattedValues, // <--- 使用处理后的数组
+                values: formattedValues, // 使用处理后的数组
                 format: "JSONEachRow", // ClickHouse 客户端支持多种格式
             });
             // console.log(`Inserted ${valuesToInsert.length} events into ${tableName}.`); // 更适合批量插入的日志

+ 2 - 2
oms/services/cron-jobs/notify/local-timezone-notify.ts

@@ -34,9 +34,9 @@ const countryCodeToLanguageMap: { [key: string]: string } = {
 
 // 新增:每日推送策略的配置
 const dailyStrategies = [
-  { name: "local-morning-notify", hour: 8 }, // 早上8点
+  // { name: "local-morning-notify", hour: 8 }, // 早上8点
   { name: "local-midday-notify", hour: 12 }, // 中午12点
-  { name: "local-evening-notify", hour: 20 }, // 晚上8点
+  // { name: "local-evening-notify", hour: 20 }, // 晚上8点
 ];
 
 /**

+ 46 - 27
oms/services/ingestor-service.ts

@@ -176,7 +176,15 @@ async function flushMongoUserPrefBuffer() {
 async function handleMessageEvent(eventData: any, eventType: string): Promise<void> {
   const msgId = eventData.msgid;
   const fcmId = eventData.fcmid;
-  const eventTime = dayjs(eventData.t).toDate();
+  // Ensure 't' or 'create_at' is used for event time, default to current time
+  let eventTime: Date;
+  if (eventData.t) {
+    eventTime = dayjs(eventData.t).toDate();
+  } else if (eventData.create_at) {
+    eventTime = dayjs(eventData.create_at).toDate();
+  } else {
+    eventTime = new Date();
+  }
   const inForeground = eventData.inforeground === "true" || eventData.inforeground === true;
 
   try {
@@ -207,7 +215,7 @@ async function handleMessageEvent(eventData: any, eventType: string): Promise<vo
     } else if (fcmId) {
       query = { fcmReceipt: fcmId };
     } else {
-      console.warn(`[Ingestor Service] Missing msgid or fcmid for event type: ${eventType}. Event: ${JSON.stringify(eventData)}`);
+      console.warn(`[MongoDB-MessageRecord] Missing msgid or fcmid for event type: ${eventType}. Event: ${JSON.stringify(eventData)}`);
       return;
     }
 
@@ -218,7 +226,11 @@ async function handleMessageEvent(eventData: any, eventType: string): Promise<vo
     if (result.matchedCount > 0) {
       console.log(`[MongoDB-MessageRecord] Updated record for ${eventType} event. Matched: ${result.matchedCount}, Modified: ${result.modifiedCount}`);
     } else {
-      console.warn(`[MongoDB-MessageRecord] No matching record found for ${eventType} event. Query: ${JSON.stringify(query)}`);
+      // 消息打开(message_open)可能比消息接收(message_receive)先到达,
+      // 对于message_open找不到记录是正常的,因为message_receive是创建记录的源头。
+      if (eventType === "message_receive") {
+        console.warn(`[MongoDB-MessageRecord] No matching record found for ${eventType} event. Query: ${JSON.stringify(query)}`);
+      }
     }
   } catch (error) {
     console.error(`[MongoDB-MessageRecord] Error updating record for ${eventType} event:`, error);
@@ -242,6 +254,34 @@ async function processMessage(msg: Message) {
     return;
   }
 
+  // Determine event type field name based on project_id and event source
+  // Assuming 'type' for Android-like events, 'name' for iOS-like events,
+  const eventType = eventData.type || eventData.name;
+
+  // --- 1. Handle Message-Specific Events First ---
+  // 无论是 message_receive 还是 message_open,都需要先即时处理 MongoDB MessageRecord。
+  if (["message_receive", "message_open"].includes(eventType)) {
+    // Note: This function includes its own error handling and logging.
+    await handleMessageEvent(eventData, eventType);
+
+    // 【关键改动】对于 message_receive 事件:
+    // 1. 已在 handleMessageEvent 中处理完毕 MessageRecord。
+    // 2. 需求是不存入 ClickHouse,也不更新 User.lastActiveAt。
+    // 3. 因此,处理完毕后,直接 Acknowledge 消息并返回。
+    if (eventType === "message_receive") {
+      amqpChannel.ack(msg);
+      return; // 立即返回,跳过后续的 ClickHouse 和 User 更新逻辑
+    }
+    // message_open 将继续向下流转,以便被记录到 ClickHouse 和更新 User 表
+  }
+
+  // Filter by allowed event types (excluding message_receive now as it was handled above)
+  if (!ALLOWED_EVENT_TYPES.includes(eventType)) {
+    // console.log(`[Ingestor Service] Skipping event with unsupported event_type: ${eventType}`);
+    amqpChannel.ack(msg); // Acknowledge and drop unsupported events
+    return;
+  }
+
   // 增加对 eventLog.duration 的校验
   if (eventData.duration > 100000 || eventData.duration < 0) {
     console.warn(`[Ingestor Service] Skipping event with invalid duration: ${eventData.duration}. Event: ${JSON.stringify(eventData)}`);
@@ -258,21 +298,6 @@ async function processMessage(msg: Message) {
     return;
   }
 
-  // Determine event type field name based on project_id and event source
-  // Assuming 'type' for Android-like events, 'name' for iOS-like events,
-  const eventType = eventData.type || eventData.name;
-
-  // --- 1. Handle Message-Specific Events First ---
-  if (["message_receive", "message_open"].includes(eventType)) {
-    await handleMessageEvent(eventData, eventType); // 移除 amqpChannel.ack(msg); 和 return; // 让事件继续向下流转,以便被记录到ClickHouse和更新User表
-  } // Filter by allowed event types
-
-  if (!ALLOWED_EVENT_TYPES.includes(eventType)) {
-    // console.log(`[Ingestor Service] Skipping event with unsupported event_type: ${eventType}`);
-    amqpChannel.ack(msg); // Acknowledge and drop unsupported events
-    return;
-  }
-
   // Determine UID field name based on project_id
   const uid: string = eventData.uid || eventData.user_id; // uid for Android, user_id for iOS
   if (!uid) {
@@ -293,6 +318,7 @@ async function processMessage(msg: Message) {
     }
 
     // --- 2. Prepare Event Data for ClickHouse Batch ---
+    // Note: message_receive 已经被前面的逻辑过滤掉了,这里只处理需要存入 ClickHouse 的事件
     const clickhouseEvent: IEventLog = {
       log_id: eventData._id ? eventData._id.toString() : new mongoose.Types.ObjectId().toHexString(), // Use existing _id or generate new
       uid: uid,
@@ -314,12 +340,10 @@ async function processMessage(msg: Message) {
     clickhouseEventsBuffer.push(clickhouseEvent);
 
     // --- 3. Prepare User Data for MongoDB Batch Update ---
-    // userSetData will contain fields to be updated using $set for both new and existing documents.
-    // 'project' is now excluded here as it will be handled by $setOnInsert only.
-    //const userSetData: Partial<IUser> = { lastActiveAt: lastActiveAtDateObj };
     const userSetData: Partial<IUser> = {};
 
-    // 👇 关键修改:仅当事件类型不是 message_receive 时,才更新 lastActiveAt
+    // 关键逻辑:只有非 message_receive 事件才更新 lastActiveAt
+    // message_receive 已在前面被 return 掉,但为了代码清晰和健壮性,保留此判断。
     if (eventType !== "message_receive") {
       userSetData.lastActiveAt = lastActiveAtDateObj;
     }
@@ -337,7 +361,6 @@ async function processMessage(msg: Message) {
       derivedFirstLoginAt = dayjs(lastActiveAtDateObj).subtract(eventData.days, "day").toDate();
     }
     // Set firstLoginAt for $setOnInsert (for new documents)
-    // This value will only be used if the document is actually inserted (upsert: true creates a new doc)
     setOnInsertFields.firstLoginAt = derivedFirstLoginAt || lastActiveAtDateObj;
 
     // Copy relevant fields from event to userSetData (for $set)
@@ -376,10 +399,6 @@ async function processMessage(msg: Message) {
       $setOnInsert: setOnInsertFields,
     };
 
-    // 👈 关键修改:移除 $min 操作符
-    // `firstLoginAt` 将只在 `$setOnInsert` 时被设置,
-    // 如果文档已存在,它将不会被更新,这符合您的需求。
-
     mongoUserWriteOperations.push({
       updateOne: {
         filter: { uid: uid },

+ 24 - 6
oms/src/services/clickhouseService.ts

@@ -1,6 +1,6 @@
 // oms/src/services/clickhouseService.ts
 import { createClient, ClickHouseClient } from "@clickhouse/client";
-import dayjs from "dayjs"; // 👈 新增:导入 dayjs 用于日期格式化
+import dayjs from "dayjs"; // 导入 dayjs 用于日期格式化
 
 // 定义 ClickHouse Event Log 的 TypeScript 接口
 export interface IEventLog {
@@ -63,11 +63,29 @@ class ClickhouseService {
       ) ENGINE = MergeTree()
       ORDER BY (uid, time, event) -- 建议的排序键
       PRIMARY KEY (uid, time) -- 主键可用于数据去重和部分查询优化
-      -- TTL time + INTERVAL 30 DAY -- 示例:数据保留30天,根据需求调整。 暂时先注释掉
+      TTL time + INTERVAL 6 MONTH -- 【新增】数据保留6个月
     `;
     try {
+      // ClickHouse 运行此 SQL。如果表已存在,它不会被重新创建,
+      // 但如果 ClickHouse 版本支持,可能会更新其 TTL 属性(具体行为取决于ClickHouse版本和表类型)。
+      // 对于 MergeTree 引擎,推荐使用 ALTER TABLE 修改 TTL。
       await this.client.exec({ query: createTableSql });
-      console.log(`ClickHouse table '${tableName}' ensured.`);
+      console.log(`ClickHouse table '${tableName}' ensured (with TTL 6 months).`);
+
+      // 增加一步,尝试使用 ALTER TABLE 明确设置或修改 TTL,以确保生效,特别是当表已经存在时。
+      // 注意:ALTER TABLE 仅在 ClickHouse 表已存在时才执行。
+      const alterTableSql = `
+        ALTER TABLE ${tableName} MODIFY TTL time + INTERVAL 6 MONTH
+      `;
+      // 这是一个非关键操作,如果 ClickHouse 版本不支持或报错,不应该导致程序崩溃。
+      try {
+        await this.client.exec({ query: alterTableSql });
+        console.log(`ClickHouse table '${tableName}' TTL successfully applied/modified.`);
+      } catch (alterError) {
+        // 通常在表不存在时或 TTL 已设置时发生,或者权限不足。
+        // 我们在 CREATE TABLE IF NOT EXISTS 中已设置,此步主要作为保险。
+        console.warn(`Could not ALTER TABLE ${tableName} to modify TTL (May be okay if table was just created):`, alterError);
+      }
     } catch (error) {
       console.error(`Failed to ensure ClickHouse table '${tableName}':`, error);
       throw error; // Rethrow to handle in calling context
@@ -80,12 +98,12 @@ class ClickhouseService {
    * @param events - 单个事件日志数据或事件日志数据数组。
    */
   public async insertEvent(tableName: string, events: IEventLog | IEventLog[]): Promise<void> {
-    const valuesToInsert = Array.isArray(events) ? events : [events]; // <--- 关键修改:确保 values 始终是一个数组
+    const valuesToInsert = Array.isArray(events) ? events : [events]; // 确保 values 始终是一个数组
     if (valuesToInsert.length === 0) {
       return; // No events to insert
     }
 
-    // 👈 关键修改:在发送到 ClickHouse 之前,格式化 Date 对象
+    // 在发送到 ClickHouse 之前,格式化 Date 对象
     const formattedValues = valuesToInsert.map((event) => ({
       ...event,
       // 将 Date 对象格式化为 ClickHouse 期望的字符串格式
@@ -95,7 +113,7 @@ class ClickhouseService {
     try {
       await this.client.insert({
         table: tableName,
-        values: formattedValues, // <--- 使用处理后的数组
+        values: formattedValues, // 使用处理后的数组
         format: "JSONEachRow", // ClickHouse 客户端支持多种格式
       });
       // console.log(`Inserted ${valuesToInsert.length} events into ${tableName}.`); // 更适合批量插入的日志