|
|
@@ -0,0 +1,144 @@
|
|
|
+"use strict";
|
|
|
+var __importDefault = (this && this.__importDefault) || function (mod) {
|
|
|
+ return (mod && mod.__esModule) ? mod : { "default": mod };
|
|
|
+};
|
|
|
+Object.defineProperty(exports, "__esModule", { value: true });
|
|
|
+exports.run = void 0;
|
|
|
+const mongoose_1 = __importDefault(require("mongoose"));
|
|
|
+const messageRecordModel_1 = require("../src/models/messageRecordModel");
|
|
|
+const userModel_1 = require("../src/models/userModel");
|
|
|
+const fcmService_1 = require("../src/services/fcmService");
|
|
|
+const utils_1 = require("../src/libs/utils");
|
|
|
+const database_1 = require("../src/database"); // 从封装的模块导入连接函数
|
|
|
+const fcmService = fcmService_1.FCMService.getInstance();
|
|
|
+const DELAY_MINUTES = 10;
|
|
|
+/**
|
|
|
+ * 等待指定分钟数。
|
|
|
+ * @param minutes 等待分钟数
|
|
|
+ * @returns Promise
|
|
|
+ */
|
|
|
+const delay = (minutes) => {
|
|
|
+ return new Promise((resolve) => setTimeout(resolve, minutes * 60 * 1000));
|
|
|
+};
|
|
|
+/**
|
|
|
+ * 核心消息发送逻辑:
|
|
|
+ * 1. 查询所有状态为0(未发送)且计划发送时间已到的消息记录。
|
|
|
+ * 2. 遍历这些消息,为每个消息找到对应的用户和设备令牌。
|
|
|
+ * 3. 顺序调用 FCMService 发送消息。
|
|
|
+ * 4. 根据发送结果更新 messageRecord 的状态和相关字段。
|
|
|
+ *
|
|
|
+ * @returns {Promise<void>}
|
|
|
+ */
|
|
|
+const processMessages = async () => {
|
|
|
+ try {
|
|
|
+ // 检查连接状态,确保在执行查询前数据库已连接
|
|
|
+ if (mongoose_1.default.connection.readyState !== 1) {
|
|
|
+ console.error("[Message Sender] Database connection is not ready. Skipping cycle.");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ const recordsToSend = await messageRecordModel_1.MessageRecord.find({
|
|
|
+ status: 0,
|
|
|
+ plannedSendAt: { $lte: new Date() },
|
|
|
+ });
|
|
|
+ if (recordsToSend.length === 0) {
|
|
|
+ console.log(`[Message Sender] No messages found to send. Waiting ${DELAY_MINUTES} minutes.`);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ console.log(`[Message Sender] Found ${recordsToSend.length} messages to send. Starting sequential process.`);
|
|
|
+ // 使用 for...of 循环确保消息按顺序逐个发送,保证稳定性
|
|
|
+ for (const record of recordsToSend) {
|
|
|
+ try {
|
|
|
+ const user = await userModel_1.User.findOne({ uid: record.uid });
|
|
|
+ if (!user || !user.fmToken) {
|
|
|
+ console.warn(`[Message Sender] User ${record.uid} not found or no FCM token, updating record status to failed.`);
|
|
|
+ await messageRecordModel_1.MessageRecord.findByIdAndUpdate(record._id, {
|
|
|
+ status: -1,
|
|
|
+ actualSendAt: new Date(),
|
|
|
+ errno: "User not found or no FCM token",
|
|
|
+ });
|
|
|
+ continue; // 继续处理下一个记录
|
|
|
+ }
|
|
|
+ const messageData = (0, utils_1.filterEmptyProps)({
|
|
|
+ msgid: record._id.toString(),
|
|
|
+ title: record.title,
|
|
|
+ content: record.content,
|
|
|
+ image: record.image || "",
|
|
|
+ bigger: record.bigger?.toString() || "false",
|
|
|
+ action: record.action || "",
|
|
|
+ param: record.param || "",
|
|
|
+ extend: record.extend || "",
|
|
|
+ });
|
|
|
+ const result = await fcmService.sendMessage(user.fmToken, messageData);
|
|
|
+ if (result instanceof Error) {
|
|
|
+ const isTokenInvalid = result.message.includes("messaging/invalid-argument") || result.message.includes("messaging/registration-token-not-registered") || result.message.includes("messaging/unregistered");
|
|
|
+ let updateOps = {
|
|
|
+ status: -1,
|
|
|
+ actualSendAt: new Date(),
|
|
|
+ errno: result.message,
|
|
|
+ };
|
|
|
+ if (isTokenInvalid) {
|
|
|
+ console.warn(`[Message Sender] Invalid FCM Token for user ${record.uid}. Clearing token.`);
|
|
|
+ await userModel_1.User.findOneAndUpdate({ uid: record.uid }, { fmToken: null });
|
|
|
+ updateOps.errno = `Invalid FCM Token. Token has been cleared. Original error: ${result.message}`;
|
|
|
+ }
|
|
|
+ await messageRecordModel_1.MessageRecord.findByIdAndUpdate(record._id, updateOps);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ await messageRecordModel_1.MessageRecord.findByIdAndUpdate(record._id, {
|
|
|
+ status: 1,
|
|
|
+ actualSendAt: new Date(),
|
|
|
+ fcmReceipt: result,
|
|
|
+ errno: null,
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (error) {
|
|
|
+ console.error(`[Message Sender] Failed to process message record ${record._id}:`, error);
|
|
|
+ await messageRecordModel_1.MessageRecord.findByIdAndUpdate(record._id, {
|
|
|
+ status: -1,
|
|
|
+ actualSendAt: new Date(),
|
|
|
+ errno: "Internal server error while processing message",
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ console.log(`[Message Sender] Finished processing ${recordsToSend.length} messages.`);
|
|
|
+ }
|
|
|
+ catch (err) {
|
|
|
+ console.error("[Message Sender] Error in message sender service:", err);
|
|
|
+ }
|
|
|
+};
|
|
|
+/**
|
|
|
+ * 启动消息发送服务。
|
|
|
+ * @returns Promise<void>
|
|
|
+ */
|
|
|
+const run = async () => {
|
|
|
+ console.log("[Message Sender] Service starting...");
|
|
|
+ await (0, database_1.connectToDatabase)();
|
|
|
+ while (true) {
|
|
|
+ console.log(`[Message Sender] Running a new cycle...`);
|
|
|
+ const startTime = new Date();
|
|
|
+ await processMessages();
|
|
|
+ const duration = new Date().getTime() - startTime.getTime();
|
|
|
+ console.log(`[Message Sender] Cycle finished in ${duration / 1000} seconds. Delaying for ${DELAY_MINUTES} minutes.`);
|
|
|
+ await delay(DELAY_MINUTES);
|
|
|
+ }
|
|
|
+};
|
|
|
+exports.run = run;
|
|
|
+// 如果此文件被直接运行
|
|
|
+if (require.main === module) {
|
|
|
+ // Handle graceful shutdown
|
|
|
+ process.on("SIGINT", async () => {
|
|
|
+ console.log("[Message Sender] Received SIGINT. Shutting down gracefully...");
|
|
|
+ await (0, database_1.disconnectFromDatabase)();
|
|
|
+ process.exit(0);
|
|
|
+ });
|
|
|
+ process.on("SIGTERM", async () => {
|
|
|
+ console.log("[Message Sender] Received SIGTERM. Shutting down gracefully...");
|
|
|
+ await (0, database_1.disconnectFromDatabase)();
|
|
|
+ process.exit(0);
|
|
|
+ });
|
|
|
+ (0, exports.run)().catch((err) => {
|
|
|
+ console.error("Failed to start message sender service:", err);
|
|
|
+ process.exit(1);
|
|
|
+ });
|
|
|
+}
|