Explorar o código

消息推送完善

guoziyun hai 9 meses
pai
achega
51c9f87125

+ 16 - 0
oms/ecosystem.config.js

@@ -124,5 +124,21 @@ module.exports = {
       },
       },
       // env_development 块可以在需要时添加
       // env_development 块可以在需要时添加
     },
     },
+    {
+      name: "oms-message-worker", // 监听消息推送活动,生成消息记录
+      script: "dist/services/messageWorker.js",
+      instances: 1,
+      exec_mode: "fork",
+      autorestart: true,
+      watch: false,
+      env_production: {
+        NODE_ENV: "production",
+        RABBITMQ_URL: "amqp://coloring:coloring123.@localhost:5672",
+      },
+      env_development: {
+        NODE_ENV: "development",
+        RABBITMQ_URL: "amqp://coloring:coloring123.@localhost:5672",
+      },
+    },
   ],
   ],
 };
 };

+ 66 - 0
oms/services/cron-jobs/daily-activity-detector.ts

@@ -0,0 +1,66 @@
+// oms/src/services/dailyActivityDetector.ts
+
+import mongoose from "mongoose";
+import { isToday } from "date-fns";
+import { MessageActivity } from "../../src/models/messageActivityModel";
+import rabbitmqService from "../../src/services/rabbitmqService";
+
+const QUEUE_NAME = "message-record-generation-queue";
+
+// MongoDB connection URL. This should be configured in your environment variables.
+const MONGO_URL = process.env.MONGO_URI || "mongodb://oms:oms123.@localhost:27717/omsdb?authSource=admin"; // MongoDB URI
+
+/**
+ * 核心检测函数,由外部的 node-cron 任务调用
+ * 连接数据库,查找所有已发布的每日活动,并判断是否需要触发消息生成
+ */
+export async function run(): Promise<void> {
+  console.log("--- Starting daily message activity check ---");
+
+  try {
+    // 1. 连接到 MongoDB
+    await mongoose.connect(MONGO_URL);
+    console.log("Connected to MongoDB successfully.");
+
+    // 2. 查找所有符合条件的活动:已发布且设置为每日推送
+    const activities = await MessageActivity.find({
+      status: 1, // 1 表示已发布 (published)
+      everyday: true,
+    });
+
+    if (activities.length === 0) {
+      console.log("No daily published activities found.");
+    }
+
+    // 3. 遍历活动,检查是否需要推送
+    for (const activity of activities) {
+      console.log(`Checking activity: ${activity.name} (ID: ${activity._id})`);
+
+      const lastPubDate = activity.lastPubDate;
+      const needsNewPush = !lastPubDate || !isToday(lastPubDate);
+
+      if (needsNewPush) {
+        console.log(`-> Activity requires a new push. Last publish date: ${lastPubDate ? lastPubDate.toISOString() : "None"}.`);
+        // 向rabbitmq投递一条消息, 以便异步生成消息推送记录
+        await rabbitmqService.publishActivityMessage(QUEUE_NAME, { activityId: activity._id });
+      } else {
+        console.log("-> Activity has already run today. Skipping.");
+      }
+    }
+
+    console.log("--- Daily message activity check finished ---");
+  } catch (error) {
+    if (error instanceof Error) {
+      console.error("An error occurred during the daily check:", error.message);
+    } else {
+      console.error("An unknown error occurred during the daily check.");
+    }
+  } finally {
+    // 4. 关闭数据库连接
+    await mongoose.disconnect();
+    console.log("Disconnected from MongoDB.");
+  }
+}
+
+// 模拟外部的 node-cron 调用
+// run().catch(console.error);

+ 1 - 0
oms/services/cron-jobs/index.ts

@@ -12,6 +12,7 @@ interface CronJobModule {
 const settings: [string, string, CronJobModule][] = [
 const settings: [string, string, CronJobModule][] = [
   // 假设这些文件将存在于 oms/services/cron-jobs/ 目录下
   // 假设这些文件将存在于 oms/services/cron-jobs/ 目录下
   ["done-rate", "10 0 * * *", require("./done-rate") as CronJobModule], // 每天凌晨0点10分, 统计作品完成率
   ["done-rate", "10 0 * * *", require("./done-rate") as CronJobModule], // 每天凌晨0点10分, 统计作品完成率
+  ["daily-activity-detector", "50 0 * * *", require("./daily-activity-detector") as CronJobModule], // 每天凌晨0点50分, 检查是否需要生成新的推送消息
   ["sync", "*/10 * * * *", require("./sync/sync-service") as CronJobModule], // 每10分钟跑一次同步
   ["sync", "*/10 * * * *", require("./sync/sync-service") as CronJobModule], // 每10分钟跑一次同步
 ];
 ];
 
 

+ 57 - 0
oms/services/message-worker.ts

@@ -0,0 +1,57 @@
+import amqp from "amqplib";
+import { MessageActivityService } from "../src/services/messageActivityService";
+
+// RabbitMQ连接URL,通常来自环境变量
+const RABBITMQ_URL = process.env.RABBITMQ_URL || "amqp://coloring:coloring123.@localhost:5672";
+// 消息队列的名称
+const QUEUE_NAME = "message-record-generation-queue";
+
+/**
+ * 启动消息工作进程。
+ * 该函数会连接到 RabbitMQ,监听消息队列,并处理每一条消息。
+ */
+const startWorker = async () => {
+  try {
+    // 建立与 RabbitMQ 的连接
+    const connection = await amqp.connect(RABBITMQ_URL);
+    // 创建一个通道
+    const channel = await connection.createChannel();
+
+    // 声明消息队列,确保它存在
+    await channel.assertQueue(QUEUE_NAME, { durable: true });
+
+    console.log(" [*] Message worker is running and listening for messages in %s. To exit press CTRL+C", QUEUE_NAME);
+
+    // 设置预取计数,一次只从队列中取出一条消息进行处理
+    channel.prefetch(1);
+
+    // 消费消息
+    channel.consume(QUEUE_NAME, async (msg) => {
+      if (msg !== null) {
+        try {
+          const { activityId } = JSON.parse(msg.content.toString());
+          console.log(` [x] Received activityId: ${activityId}. Starting record generation...`);
+
+          // 使用 MessageActivityService 来生成记录
+          const service = new MessageActivityService();
+          const count = await service.generateRecordsForActivity(activityId);
+
+          console.log(` [x] Finished processing activity ${activityId}. Generated ${count} records.`);
+
+          // 确认消息已处理,将其从队列中移除
+          channel.ack(msg);
+        } catch (error) {
+          console.error(` [!] Error processing message: ${msg.content.toString()}`, error);
+          // 消息处理失败,将其拒绝并重新放回队列
+          channel.reject(msg, true);
+        }
+      }
+    });
+  } catch (error) {
+    console.error("Failed to connect to RabbitMQ or start worker:", error);
+    // 在这里可以实现重连逻辑,确保 worker 的健壮性
+    setTimeout(startWorker, 5000); // 5秒后尝试重连
+  }
+};
+
+startWorker();

+ 95 - 0
oms/src/controllers/messageActivityController.ts

@@ -1,5 +1,8 @@
 import { Request, Response } from "express";
 import { Request, Response } from "express";
 import { MessageActivity } from "../models/messageActivityModel";
 import { MessageActivity } from "../models/messageActivityModel";
+import rabbitmqService from "../services/rabbitmqService";
+
+const QUEUE_NAME = "message-record-generation-queue";
 
 
 class MessageActivityController {
 class MessageActivityController {
   /**
   /**
@@ -86,6 +89,98 @@ class MessageActivityController {
       return res.status(500).json({ success: false, message: "Server error", error: error.message });
       return res.status(500).json({ success: false, message: "Server error", error: error.message });
     }
     }
   }
   }
+
+  /**
+   * @route PUT /api/message-activity/:id/status
+   * @desc 更新消息活动状态
+   * @access Private
+   * @param req.body { status: number } 新状态值
+   */
+  public async updateActivityStatus(req: Request, res: Response): Promise<Response> {
+    try {
+      const { id } = req.params;
+      const { status } = req.body;
+
+      // 验证状态值是否合法
+      if (![0, 1, 2, 3].includes(status)) {
+        return res.status(400).json({
+          success: false,
+          message: "Invalid status value. Allowed values: 0 (未发布), 1 (已发布), 2 (已完成), 3 (已中止)",
+        });
+      }
+
+      // 获取当前活动
+      const activity = await MessageActivity.findById(id);
+      if (!activity) {
+        return res.status(404).json({ success: false, message: "Message activity not found" });
+      }
+
+      // 验证状态变更是否合法
+      if (!this.isStatusTransitionValid(activity.status, status)) {
+        return res.status(400).json({
+          success: false,
+          message: `Invalid status transition from ${this.getStatusName(activity.status)} to ${this.getStatusName(status)}`,
+        });
+      }
+
+      // 更新状态
+      activity.status = status;
+
+      // 如果是完成或中止状态,记录完成时间
+      if (status === 2 || status === 3) {
+        activity.completedAt = new Date();
+      }
+
+      // 如果是发布状态,记录发布时间
+      if (status === 1) {
+        activity.publishedAt = new Date();
+
+        // 立即向rabbitmq投递一条消息, 以便异步生成消息推送记录
+        await rabbitmqService.publishActivityMessage(QUEUE_NAME, { activityId: id });
+      }
+
+      await activity.save();
+
+      return res.status(200).json({
+        success: true,
+        data: activity,
+        message: `Activity status updated to ${this.getStatusName(status)}`,
+      });
+    } catch (error: any) {
+      console.error("Error updating activity status:", error);
+      return res.status(500).json({ success: false, message: "Server error", error: error.message });
+    }
+  }
+
+  /**
+   * 验证状态变更是否合法
+   * @param currentStatus 当前状态
+   * @param newStatus 新状态
+   * @returns 如果变更合法返回true,否则返回false
+   */
+  private isStatusTransitionValid(currentStatus: number, newStatus: number): boolean {
+    const validTransitions: Record<number, number[]> = {
+      0: [1], // 未发布 -> 已发布
+      1: [2, 3], // 已发布 -> 已完成/已中止
+      2: [], // 已完成 -> 不允许变更
+      3: [], // 已中止 -> 不允许变更
+    };
+
+    return validTransitions[currentStatus]?.includes(newStatus) || false;
+  }
+
+  /**
+   * 获取状态名称
+   */
+  private getStatusName(status: number): string {
+    const statusNames: Record<number, string> = {
+      0: "未发布",
+      1: "已发布",
+      2: "已完成",
+      3: "已中止",
+    };
+    return statusNames[status] || "未知状态";
+  }
 }
 }
 
 
 export default new MessageActivityController();
 export default new MessageActivityController();

+ 9 - 1
oms/src/models/messageActivityModel.ts

@@ -1,3 +1,5 @@
+// oms/src/models/messageActivityModal.ts
+
 import { Schema, model, Document } from "mongoose";
 import { Schema, model, Document } from "mongoose";
 
 
 // 定义筛选条件子文档的接口
 // 定义筛选条件子文档的接口
@@ -19,6 +21,9 @@ export interface IMessageActivity extends Document {
   strategy: number;
   strategy: number;
   filter?: IFilterCondition[]; // 将类型改为一个包含筛选条件的数组
   filter?: IFilterCondition[]; // 将类型改为一个包含筛选条件的数组
   scheduleAt?: Date;
   scheduleAt?: Date;
+  publishedAt?: Date;
+  completedAt?: Date;
+  lastPubDate?: Date;
   everyday: boolean;
   everyday: boolean;
   status: number;
   status: number;
   createdAt: Date;
   createdAt: Date;
@@ -79,12 +84,15 @@ const messageActivitySchema = new Schema<IMessageActivity>(
     scheduleAt: {
     scheduleAt: {
       type: Date,
       type: Date,
     },
     },
+    publishedAt: { type: Date }, // 发布时间
+    completedAt: { type: Date }, // 完成/中止时间
+    lastPubDate: { type: Date }, // 上次推送日期
     // 是否周期每天推送
     // 是否周期每天推送
     everyday: {
     everyday: {
       type: Boolean,
       type: Boolean,
       default: false,
       default: false,
     },
     },
-    // 消息通知活动状态:0-未发布, 1-已发布(进行中), 2-已完成。
+    // 消息通知活动状态:0-未发布, 1-已发布(进行中), 2-已完成, 3-已中止
     status: {
     status: {
       type: Number,
       type: Number,
       default: 0, // 0-unreleased; 1-published; 2-completed
       default: 0, // 0-unreleased; 1-published; 2-completed

+ 25 - 0
oms/src/models/messageRecordModel.ts

@@ -16,6 +16,10 @@ export interface IMessageRecord extends Document {
   inforeground?: boolean;
   inforeground?: boolean;
   errno?: string;
   errno?: string;
   fcmReceipt?: string;
   fcmReceipt?: string;
+  plannedSendAt?: Date; // 计划发送时间
+  actualSendAt?: Date; // 实际调用firebase admin api发送的时间
+  deliveredAt?: Date; // 客户端接收到消息的时间
+  openedAt?: Date; // 消息被打开的时间
   createdAt: Date;
   createdAt: Date;
   updatedAt: Date;
   updatedAt: Date;
 }
 }
@@ -34,6 +38,7 @@ const messageRecordSchema = new Schema<IMessageRecord>(
       type: Schema.Types.ObjectId,
       type: Schema.Types.ObjectId,
       ref: "MessageActivity",
       ref: "MessageActivity",
       required: false, // It's optional as per design
       required: false, // It's optional as per design
+      index: true, // Added index for better query performance by activity
     },
     },
     // 关联的消息模版表,不太重要了,也可能是空值,对于点对点发送消息的情况可能没有模版
     // 关联的消息模版表,不太重要了,也可能是空值,对于点对点发送消息的情况可能没有模版
     templateId: {
     templateId: {
@@ -91,10 +96,30 @@ const messageRecordSchema = new Schema<IMessageRecord>(
       type: String,
       type: String,
       index: true, // Index for faster lookup by FCM receipt
       index: true, // Index for faster lookup by FCM receipt
     },
     },
+    // 新增字段
+    plannedSendAt: {
+      type: Date,
+      index: true,
+    },
+    actualSendAt: {
+      type: Date,
+      index: true,
+    },
+    deliveredAt: {
+      type: Date,
+      index: true,
+    },
+    openedAt: {
+      type: Date,
+      index: true,
+    },
   },
   },
   {
   {
     timestamps: true, // Automatically adds createdAt and updatedAt
     timestamps: true, // Automatically adds createdAt and updatedAt
   }
   }
 );
 );
 
 
+// Added compound index for common queries by user and status
+messageRecordSchema.index({ uid: 1, status: 1 });
+
 export const MessageRecord = model<IMessageRecord>("MessageRecord", messageRecordSchema);
 export const MessageRecord = model<IMessageRecord>("MessageRecord", messageRecordSchema);

+ 1 - 0
oms/src/routes/apiRoutes.ts

@@ -47,6 +47,7 @@ router.get("/message-activities", messageActivityController.getActivities);
 router.get("/message-activity/:id", messageActivityController.getActivityById);
 router.get("/message-activity/:id", messageActivityController.getActivityById);
 router.put("/message-activity/:id", messageActivityController.updateActivity);
 router.put("/message-activity/:id", messageActivityController.updateActivity);
 router.delete("/message-activity/:id", messageActivityController.deleteActivity);
 router.delete("/message-activity/:id", messageActivityController.deleteActivity);
+router.put("/message-activity/:id/status", messageActivityController.updateActivityStatus);
 
 
 // 新增:用户筛选相关路由
 // 新增:用户筛选相关路由
 router.post("/users/count", UserTargetingController.countTargetUsers);
 router.post("/users/count", UserTargetingController.countTargetUsers);

+ 168 - 1
oms/src/services/messageActivityService.ts

@@ -1,7 +1,45 @@
+// oms/src/services/messageActivityService.ts
+
 import { MessageActivity, IMessageActivity } from "../models/messageActivityModel";
 import { MessageActivity, IMessageActivity } from "../models/messageActivityModel";
-import { MessageTemplate } from "../models/messageTemplateModel";
+import { IMessageTemplate, MessageTemplate } from "../models/messageTemplateModel";
+import { MessageRecord, IMessageRecord } from "../models/messageRecordModel";
+import { UserTargetingService } from "./userTargetingService";
+import rabbitmqService from "./rabbitmqService";
+import { addHours, isPast, differenceInHours } from "date-fns";
+
+// 语言映射,用于根据国家代码确定语言
+const ccToLangMap: { [key: string]: string } = {
+  US: "en",
+  GB: "en",
+  CA: "en",
+  AU: "en",
+  CN: "zh-cn",
+  TW: "zh-tw",
+  JP: "ja",
+  KR: "ko",
+  FR: "fr",
+  DE: "de",
+  ES: "es",
+  PT: "pt",
+  RU: "ru",
+  IT: "it",
+};
+
+// 检查传入的语言是否在消息模板中定义
+const getLocalizedText = (template: IMessageTemplate, lang: string, key: "messageTitle" | "messageContent"): string => {
+  if (template[key][lang]) {
+    return template[key][lang];
+  }
+  return template[key]["en"] || ""; // 如果找不到语言,则默认使用 'en'
+};
 
 
 export class MessageActivityService {
 export class MessageActivityService {
+  private userTargetingService: UserTargetingService;
+
+  constructor() {
+    this.userTargetingService = new UserTargetingService();
+  }
+
   /**
   /**
    * 创建一个新的消息活动
    * 创建一个新的消息活动
    * @param activityData 消息活动数据
    * @param activityData 消息活动数据
@@ -68,4 +106,133 @@ export class MessageActivityService {
   public async deleteMessageActivity(activityId: string): Promise<IMessageActivity | null> {
   public async deleteMessageActivity(activityId: string): Promise<IMessageActivity | null> {
     return await MessageActivity.findByIdAndDelete(activityId);
     return await MessageActivity.findByIdAndDelete(activityId);
   }
   }
+
+  /**
+   * 根据消息活动生成消息记录。
+   *
+   * @param activityId 要生成记录的消息活动ID
+   * @returns 成功生成的记录数量
+   */
+  public async generateRecordsForActivity(activityId: string): Promise<number> {
+    try {
+      // 1. 获取消息活动和模板
+      const activity = await MessageActivity.findById(activityId);
+      if (!activity || !activity.templateId) {
+        console.error(`Message activity or template not found for id: ${activityId}`);
+        return 0;
+      }
+
+      const template = await MessageTemplate.findById(activity.templateId);
+      if (!template) {
+        console.error(`Message template not found for id: ${activity.templateId}`);
+        return 0;
+      }
+
+      // 2. 查找所有目标用户
+      const targetUsers = await this.userTargetingService.findTargetUsers(activity.filter || []);
+      if (targetUsers.length === 0) {
+        console.log(`No users found for activity: ${activity.name}`);
+        return 0;
+      }
+
+      // 3. 找出最近3天内(以 plannedSendAt 计)已收到过消息的用户
+      const threeDaysAgo = addHours(new Date(), -72);
+      const recentMessages = await MessageRecord.aggregate([
+        { $match: { uid: { $in: targetUsers.map((u) => u.uid) } } },
+        { $sort: { plannedSendAt: -1 } },
+        { $group: { _id: "$uid", lastPlannedSendAt: { $first: "$plannedSendAt" } } },
+      ]);
+      const lastPlannedSendMap = new Map<string, Date>();
+      recentMessages.forEach((item) => {
+        if (item.lastPlannedSendAt >= threeDaysAgo) {
+          lastPlannedSendMap.set(item._id, item.lastPlannedSendAt);
+        }
+      });
+
+      const recordsToInsert = [];
+
+      // 4. 遍历目标用户,生成消息记录
+      for (const user of targetUsers) {
+        // 检查用户是否在最近3天内收到过消息
+        if (lastPlannedSendMap.has(user.uid)) {
+          console.log(`User ${user.uid} has a recent message scheduled. Skipping.`);
+          continue;
+        }
+
+        // 5. 确定消息语言
+        let userLang = "en"; // 默认语言
+        if (user.lang) {
+          userLang = user.lang;
+        } else if (user.cc) {
+          userLang = ccToLangMap[user.cc] || "en";
+        }
+
+        // 6. 从模板中获取本地化文本
+        const messageTitle = getLocalizedText(template, userLang, "messageTitle");
+        const messageContent = getLocalizedText(template, userLang, "messageContent");
+
+        if (!messageTitle || !messageContent) {
+          console.error(`No message title or content found for user ${user.uid} with lang/cc: ${userLang}/${user.cc}. Skipping.`);
+          continue;
+        }
+
+        // 7. 计算 plannedSendAt
+        let baseDate = activity.scheduleAt || new Date();
+        // 如果活动计划时间已过期,则使用当前日期作为基础
+        if (isPast(baseDate) && !activity.everyday) {
+          baseDate = new Date();
+        }
+
+        let plannedSendAt = baseDate;
+        if (user.lastActiveAt) {
+          // 获取用户上次活跃时间的小时和分钟
+          const lastActiveHour = user.lastActiveAt.getHours();
+          const lastActiveMinute = user.lastActiveAt.getMinutes();
+
+          // 将基础日期的时间调整为用户上次活跃时间的前一小时
+          plannedSendAt = new Date(baseDate.getFullYear(), baseDate.getMonth(), baseDate.getDate(), lastActiveHour - 1, lastActiveMinute, 0, 0);
+
+          // 如果调整后的时间在过去,则将日期推到第二天
+          if (isPast(plannedSendAt) && differenceInHours(new Date(), plannedSendAt) > 2) {
+            plannedSendAt = addHours(plannedSendAt, 24);
+          }
+        }
+
+        // 8. 创建消息记录对象
+        recordsToInsert.push({
+          uid: user.uid,
+          activityId: activity._id,
+          templateId: template._id,
+          title: messageTitle,
+          content: messageContent,
+          image: activity.image,
+          bigger: activity.bigger,
+          action: activity.action,
+          param: activity.param,
+          extend: activity.extend,
+          status: 0, // 0: 未发送
+          plannedSendAt: plannedSendAt,
+        });
+      }
+
+      // 9. 批量插入消息记录
+      let count = 0;
+      if (recordsToInsert.length > 0) {
+        const result = await MessageRecord.insertMany(recordsToInsert);
+        console.log(`Successfully generated ${result.length} message records.`);
+        count = result.length;
+      } else {
+        console.log("No new message records to generate.");
+      }
+
+      // 10. 更新activity的lastPubDate字段
+      activity.lastPubDate = new Date();
+      await activity.save();
+
+      return count;
+    } catch (error) {
+      console.error("Error generating message records for activity:", error);
+      return 0;
+    }
+  }
 }
 }

+ 48 - 0
oms/src/services/rabbitmqService.ts

@@ -0,0 +1,48 @@
+// services/rabbitmqService.ts
+import amqp, { Channel, ChannelModel } from "amqplib";
+
+class RabbitmqService {
+  /**
+   * 将消息发布到指定的 RabbitMQ 队列
+   * @param {string} queueName - 消息将要发送到的队列名称
+   * @param {object} messagePayload - 要发送的 JSON 消息体
+   */
+  public async publishActivityMessage(queueName: string, messagePayload: object): Promise<void> {
+    let connection: ChannelModel | null = null;
+    try {
+      const rabbitMqUrl = process.env.RABBITMQ_URL;
+      if (!rabbitMqUrl) {
+        throw new Error("RABBITMQ_URL not defined in environment variables.");
+      }
+
+      connection = await amqp.connect(rabbitMqUrl);
+      const channel: Channel = await connection.createChannel();
+
+      // 确保队列存在
+      await channel.assertQueue(queueName, {
+        durable: true,
+      });
+
+      // 将消息转换为 Buffer 并发送
+      const msg = JSON.stringify(messagePayload);
+      channel.sendToQueue(queueName, Buffer.from(msg), {
+        persistent: true,
+      });
+
+      console.log(`[x] Sent message to queue '${queueName}': '${msg}'`);
+    } catch (error) {
+      if (error instanceof Error) {
+        console.error(`Failed to publish message: ${error.message}`);
+      } else {
+        console.error("An unknown error occurred while publishing the message.");
+      }
+    } finally {
+      if (connection) {
+        // 确保连接在消息发送后关闭
+        await connection.close();
+      }
+    }
+  }
+}
+
+export default new RabbitmqService();

+ 2 - 0
oms/src/services/userTargetingService.ts

@@ -1,3 +1,5 @@
+// oms/src/services/userTargetingServices
+
 import { User, IUser } from "../models/userModel"; // 假设你有一个User模型
 import { User, IUser } from "../models/userModel"; // 假设你有一个User模型
 import { FilterQuery } from "mongoose";
 import { FilterQuery } from "mongoose";