|
@@ -0,0 +1,507 @@
|
|
|
|
|
+"use strict";
|
|
|
|
|
+// oms/src/ingestor-service/app.ts
|
|
|
|
|
+var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {
|
|
|
|
|
+ if (k2 === undefined) k2 = k;
|
|
|
|
|
+ var desc = Object.getOwnPropertyDescriptor(m, k);
|
|
|
|
|
+ if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) {
|
|
|
|
|
+ desc = { enumerable: true, get: function() { return m[k]; } };
|
|
|
|
|
+ }
|
|
|
|
|
+ Object.defineProperty(o, k2, desc);
|
|
|
|
|
+}) : (function(o, m, k, k2) {
|
|
|
|
|
+ if (k2 === undefined) k2 = k;
|
|
|
|
|
+ o[k2] = m[k];
|
|
|
|
|
+}));
|
|
|
|
|
+var __setModuleDefault = (this && this.__setModuleDefault) || (Object.create ? (function(o, v) {
|
|
|
|
|
+ Object.defineProperty(o, "default", { enumerable: true, value: v });
|
|
|
|
|
+}) : function(o, v) {
|
|
|
|
|
+ o["default"] = v;
|
|
|
|
|
+});
|
|
|
|
|
+var __importStar = (this && this.__importStar) || (function () {
|
|
|
|
|
+ var ownKeys = function(o) {
|
|
|
|
|
+ ownKeys = Object.getOwnPropertyNames || function (o) {
|
|
|
|
|
+ var ar = [];
|
|
|
|
|
+ for (var k in o) if (Object.prototype.hasOwnProperty.call(o, k)) ar[ar.length] = k;
|
|
|
|
|
+ return ar;
|
|
|
|
|
+ };
|
|
|
|
|
+ return ownKeys(o);
|
|
|
|
|
+ };
|
|
|
|
|
+ return function (mod) {
|
|
|
|
|
+ if (mod && mod.__esModule) return mod;
|
|
|
|
|
+ var result = {};
|
|
|
|
|
+ if (mod != null) for (var k = ownKeys(mod), i = 0; i < k.length; i++) if (k[i] !== "default") __createBinding(result, mod, k[i]);
|
|
|
|
|
+ __setModuleDefault(result, mod);
|
|
|
|
|
+ return result;
|
|
|
|
|
+ };
|
|
|
|
|
+})();
|
|
|
|
|
+var __importDefault = (this && this.__importDefault) || function (mod) {
|
|
|
|
|
+ return (mod && mod.__esModule) ? mod : { "default": mod };
|
|
|
|
|
+};
|
|
|
|
|
+Object.defineProperty(exports, "__esModule", { value: true });
|
|
|
|
|
+// Load environment variables (e.g., RABBITMQ_URL, RABBITMQ_OMS_QUEUE, MONGO_URI, CLICKHOUSE_*)
|
|
|
|
|
+const dotenv = __importStar(require("dotenv"));
|
|
|
|
|
+dotenv.config();
|
|
|
|
|
+const amqplib_1 = __importDefault(require("amqplib"));
|
|
|
|
|
+const mongoose_1 = __importDefault(require("mongoose"));
|
|
|
|
|
+const dayjs_1 = __importDefault(require("dayjs")); // For date manipulation
|
|
|
|
|
+const duration_1 = __importDefault(require("dayjs/plugin/duration")); // dayjs plugin for duration
|
|
|
|
|
+const isSameOrBefore_1 = __importDefault(require("dayjs/plugin/isSameOrBefore")); // Day.js plugin for isSameOrBefore
|
|
|
|
|
+// Import OMS models and services
|
|
|
|
|
+const userModel_1 = require("../src/models/userModel"); // Assuming userModel.ts exports User
|
|
|
|
|
+const userPreferenceModel_1 = __importDefault(require("../src/models/userPreferenceModel")); // Assuming userPreferenceModel.ts exports UserPreference
|
|
|
|
|
+const messageRecordModel_1 = require("../src/models/messageRecordModel"); // 新增导入 MessageRecord
|
|
|
|
|
+const clickhouseService_1 = require("../src/services/clickhouseService"); // Assuming clickhouseService.ts exports ClickhouseService and IEventLog
|
|
|
|
|
+dayjs_1.default.extend(duration_1.default);
|
|
|
|
|
+dayjs_1.default.extend(isSameOrBefore_1.default);
|
|
|
|
|
+// --- Environment Variables ---
|
|
|
|
|
+const RABBITMQ_URL = process.env.RABBITMQ_URL || "amqp://coloring:coloring123.@localhost:5672";
|
|
|
|
|
+const RABBITMQ_OMS_QUEUE = process.env.RABBITMQ_OMS_QUEUE || "oms_event_queue"; // 摄取器订阅的队列
|
|
|
|
|
+const OMS_MONGO_URI = process.env.MONGO_URI || "mongodb://oms:oms123.@localhost:27717/omsdb?authSource=admin"; // MongoDB URI
|
|
|
|
|
+const CLICKHOUSE_HOST = process.env.CLICKHOUSE_HOST || "http://localhost:8123";
|
|
|
|
|
+const CLICKHOUSE_DATABASE = process.env.CLICKHOUSE_DATABASE || "omsdb";
|
|
|
|
|
+const CLICKHOUSE_USER = process.env.CLICKHOUSE_USER || "ckuser";
|
|
|
|
|
+const CLICKHOUSE_PASSWORD = process.env.CLICKHOUSE_PASSWORD || "ckpassword";
|
|
|
|
|
+const CLICKHOUSE_EVENTS_TABLE = "events"; // ClickHouse 日志表的名称 (与 event/app.ts 和 ClickHouse table schema 保持一致)
|
|
|
|
|
+// --- Batching Configuration ---
|
|
|
|
|
+const CLICKHOUSE_BATCH_SIZE = 5000; // ClickHouse 批量插入大小
|
|
|
|
|
+const MONGO_USER_BATCH_SIZE = 1000; // MongoDB User 批量写入大小
|
|
|
|
|
+const MONGO_PREF_BATCH_SIZE = 500; // MongoDB UserPreference 批量写入大小
|
|
|
|
|
+const FLUSH_INTERVAL_MS = 5000; // 定时刷新缓冲区间隔 (毫秒)
|
|
|
|
|
+// --- Internal State ---
|
|
|
|
|
+let amqpConnection;
|
|
|
|
|
+let amqpChannel;
|
|
|
|
|
+let mongoConnection;
|
|
|
|
|
+let clickhouseService;
|
|
|
|
|
+const clickhouseEventsBuffer = [];
|
|
|
|
|
+const mongoUserWriteOperations = [];
|
|
|
|
|
+const mongoUserPrefWriteOperations = [];
|
|
|
|
|
+// List of event types to process (reused from ingestHistoricalData.ts)
|
|
|
|
|
+const ALLOWED_EVENT_TYPES = [
|
|
|
|
|
+ "visit",
|
|
|
|
|
+ "show_deeplink_dialog",
|
|
|
|
|
+ "share",
|
|
|
|
|
+ "save",
|
|
|
|
|
+ "revenue",
|
|
|
|
|
+ "rate",
|
|
|
|
|
+ "favorite",
|
|
|
|
|
+ "color_tip",
|
|
|
|
|
+ "color_start",
|
|
|
|
|
+ "color_done",
|
|
|
|
|
+ "color_data",
|
|
|
|
|
+ "ad_color_tip",
|
|
|
|
|
+ "ad_color_float",
|
|
|
|
|
+ "message_receive",
|
|
|
|
|
+ "message_open", // 新增消息相关事件
|
|
|
|
|
+];
|
|
|
|
|
+// Define an array of valid IUser keys for copying from event log to User model
|
|
|
|
|
+const USER_FIELDS_TO_UPDATE = [
|
|
|
|
|
+ "network",
|
|
|
|
|
+ "campaign",
|
|
|
|
|
+ "adgroup",
|
|
|
|
|
+ "creative",
|
|
|
|
|
+ "prod",
|
|
|
|
|
+ "libraryName", // maps to library_name
|
|
|
|
|
+ "cc",
|
|
|
|
|
+ "lang",
|
|
|
|
|
+ "manufacturer",
|
|
|
|
|
+ "deviceModel", // maps to model
|
|
|
|
|
+ "deviceInfo", // maps to device
|
|
|
|
|
+ "hardware",
|
|
|
|
|
+ "deviceMem",
|
|
|
|
|
+ "apiLevel", // maps to android_api
|
|
|
|
|
+ "versionName", // maps to version_name or library_version
|
|
|
|
|
+ "versionCode", // maps to version_code
|
|
|
|
|
+ "fmToken", // map to token
|
|
|
|
|
+ "project", // Add project field for updating, map to project_id
|
|
|
|
|
+];
|
|
|
|
|
+// --- Initialize Services ---
|
|
|
|
|
+async function initializeServices() {
|
|
|
|
|
+ try {
|
|
|
|
|
+ // Connect to RabbitMQ
|
|
|
|
|
+ amqpConnection = await amqplib_1.default.connect(RABBITMQ_URL);
|
|
|
|
|
+ amqpConnection.on("error", (err) => {
|
|
|
|
|
+ console.error("[RabbitMQ] Connection error:", err);
|
|
|
|
|
+ // Reconnection logic will be handled by PM2 restarting the service, or a dedicated handler
|
|
|
|
|
+ });
|
|
|
|
|
+ amqpConnection.on("close", () => {
|
|
|
|
|
+ console.error("[RabbitMQ] Connection closed. Restarting service...");
|
|
|
|
|
+ // In a production setup, consider graceful shutdown and PM2 restarting
|
|
|
|
|
+ process.exit(1); // Exit to allow PM2 to restart
|
|
|
|
|
+ });
|
|
|
|
|
+ amqpChannel = await amqpConnection.createChannel();
|
|
|
|
|
+ console.log("[RabbitMQ] Channel created for Ingestor Service.");
|
|
|
|
|
+ // Assert the ingestor queue
|
|
|
|
|
+ await amqpChannel.assertQueue(RABBITMQ_OMS_QUEUE, { durable: true });
|
|
|
|
|
+ console.log(`[RabbitMQ] Ingestor Service queue '${RABBITMQ_OMS_QUEUE}' asserted.`);
|
|
|
|
|
+ // Connect to OMS MongoDB (using Mongoose)
|
|
|
|
|
+ mongoConnection = await mongoose_1.default.connect(OMS_MONGO_URI);
|
|
|
|
|
+ console.log(`Connected to OMS MongoDB: ${OMS_MONGO_URI}`);
|
|
|
|
|
+ // Initialize ClickHouse service with credentials
|
|
|
|
|
+ clickhouseService = new clickhouseService_1.ClickhouseService(CLICKHOUSE_HOST, CLICKHOUSE_DATABASE, CLICKHOUSE_USER, CLICKHOUSE_PASSWORD);
|
|
|
|
|
+ // Ensure ClickHouse table exists
|
|
|
|
|
+ await clickhouseService.ensureTable(CLICKHOUSE_EVENTS_TABLE);
|
|
|
|
|
+ console.log(`ClickHouse Service initialized for ${CLICKHOUSE_DATABASE} at ${CLICKHOUSE_HOST}`);
|
|
|
|
|
+ }
|
|
|
|
|
+ catch (error) {
|
|
|
|
|
+ console.error("[Ingestor Service] Failed to initialize services:", error);
|
|
|
|
|
+ process.exit(1); // Exit to allow PM2 to restart on critical startup failure
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+// --- Helper function to flush ClickHouse buffer ---
|
|
|
|
|
+async function flushClickHouseBuffer() {
|
|
|
|
|
+ if (clickhouseEventsBuffer.length === 0)
|
|
|
|
|
+ return;
|
|
|
|
|
+ const eventsToFlush = [...clickhouseEventsBuffer]; // Take a snapshot
|
|
|
|
|
+ clickhouseEventsBuffer.length = 0; // Clear buffer immediately
|
|
|
|
|
+ try {
|
|
|
|
|
+ await clickhouseService.insertEvent(CLICKHOUSE_EVENTS_TABLE, eventsToFlush);
|
|
|
|
|
+ console.log(`[ClickHouse] Flushed ${eventsToFlush.length} events to ClickHouse.`);
|
|
|
|
|
+ }
|
|
|
|
|
+ catch (error) {
|
|
|
|
|
+ console.error(`[ClickHouse] Error flushing ClickHouse buffer:`, error);
|
|
|
|
|
+ // On error, decide whether to re-queue (if transient) or log and drop (if data issue)
|
|
|
|
|
+ // For now, we log and drop to avoid blocking the ingestor.
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+// --- Helper function to flush MongoDB User buffer ---
|
|
|
|
|
+async function flushMongoUserBuffer() {
|
|
|
|
|
+ if (mongoUserWriteOperations.length === 0)
|
|
|
|
|
+ return;
|
|
|
|
|
+ const operationsToFlush = [...mongoUserWriteOperations]; // Take a snapshot
|
|
|
|
|
+ mongoUserWriteOperations.length = 0; // Clear buffer immediately
|
|
|
|
|
+ try {
|
|
|
|
|
+ const bulkResult = await userModel_1.User.bulkWrite(operationsToFlush);
|
|
|
|
|
+ console.log(`[MongoDB-User] Flushed ${operationsToFlush.length} operations. Upserted/Modified: ${bulkResult.upsertedCount + bulkResult.modifiedCount}`);
|
|
|
|
|
+ }
|
|
|
|
|
+ catch (bulkError) {
|
|
|
|
|
+ console.error(`[MongoDB-User] Error in MongoDB bulkWrite:`, bulkError);
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+// --- Helper function to flush MongoDB UserPreference buffer ---
|
|
|
|
|
+async function flushMongoUserPrefBuffer() {
|
|
|
|
|
+ if (mongoUserPrefWriteOperations.length === 0)
|
|
|
|
|
+ return;
|
|
|
|
|
+ const operationsToFlush = [...mongoUserPrefWriteOperations]; // Take a snapshot
|
|
|
|
|
+ mongoUserPrefWriteOperations.length = 0; // Clear buffer immediately
|
|
|
|
|
+ try {
|
|
|
|
|
+ const bulkResult = await userPreferenceModel_1.default.bulkWrite(operationsToFlush);
|
|
|
|
|
+ console.log(`[MongoDB-UserPref] Flushed ${operationsToFlush.length} operations. Upserted/Modified: ${bulkResult.upsertedCount + bulkResult.modifiedCount}`);
|
|
|
|
|
+ }
|
|
|
|
|
+ catch (bulkError) {
|
|
|
|
|
+ console.error(`[MongoDB-UserPref] Error in MongoDB UserPreference bulkWrite:`, bulkError);
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+/**
|
|
|
|
|
+ * Handles updating a MessageRecord based on a message event (receive or open).
|
|
|
|
|
+ * @param eventData The parsed event object from RabbitMQ.
|
|
|
|
|
+ * @param eventType The type of the event ('message_receive' or 'message_open').
|
|
|
|
|
+ */
|
|
|
|
|
+async function handleMessageEvent(eventData, eventType) {
|
|
|
|
|
+ const msgId = eventData.msgid;
|
|
|
|
|
+ const fcmId = eventData.fcmid;
|
|
|
|
|
+ const eventTime = (0, dayjs_1.default)(eventData.t).toDate();
|
|
|
|
|
+ const inForeground = eventData.inforeground === "true" || eventData.inforeground === true;
|
|
|
|
|
+ try {
|
|
|
|
|
+ let updateFields;
|
|
|
|
|
+ let query;
|
|
|
|
|
+ // Determine update fields based on event type
|
|
|
|
|
+ if (eventType === "message_receive") {
|
|
|
|
|
+ updateFields = {
|
|
|
|
|
+ status: 2, // 2: delivered
|
|
|
|
|
+ deliveredAt: eventTime,
|
|
|
|
|
+ inforeground: inForeground,
|
|
|
|
|
+ };
|
|
|
|
|
+ }
|
|
|
|
|
+ else if (eventType === "message_open") {
|
|
|
|
|
+ updateFields = {
|
|
|
|
|
+ status: 3, // 3: opened
|
|
|
|
|
+ openedAt: eventTime,
|
|
|
|
|
+ // inforeground will be updated by the receive event, so no need to set here
|
|
|
|
|
+ };
|
|
|
|
|
+ }
|
|
|
|
|
+ else {
|
|
|
|
|
+ console.warn(`[Ingestor Service] Unhandled message event type: ${eventType}`);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ // Determine query filter based on available IDs
|
|
|
|
|
+ if (msgId && mongoose_1.default.Types.ObjectId.isValid(msgId)) {
|
|
|
|
|
+ query = { _id: msgId };
|
|
|
|
|
+ }
|
|
|
|
|
+ else if (fcmId) {
|
|
|
|
|
+ query = { fcmReceipt: fcmId };
|
|
|
|
|
+ }
|
|
|
|
|
+ else {
|
|
|
|
|
+ console.warn(`[Ingestor Service] Missing msgid or fcmid for event type: ${eventType}. Event: ${JSON.stringify(eventData)}`);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ // Perform the update
|
|
|
|
|
+ const result = await messageRecordModel_1.MessageRecord.updateOne(query, { $set: updateFields });
|
|
|
|
|
+ // Log the result of the update operation
|
|
|
|
|
+ if (result.matchedCount > 0) {
|
|
|
|
|
+ console.log(`[MongoDB-MessageRecord] Updated record for ${eventType} event. Matched: ${result.matchedCount}, Modified: ${result.modifiedCount}`);
|
|
|
|
|
+ }
|
|
|
|
|
+ else {
|
|
|
|
|
+ console.warn(`[MongoDB-MessageRecord] No matching record found for ${eventType} event. Query: ${JSON.stringify(query)}`);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ catch (error) {
|
|
|
|
|
+ console.error(`[MongoDB-MessageRecord] Error updating record for ${eventType} event:`, error);
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+// --- Process a single event message ---
|
|
|
|
|
+async function processMessage(msg) {
|
|
|
|
|
+ if (!amqpChannel) {
|
|
|
|
|
+ console.error("[Ingestor Service] RabbitMQ channel not available for processing message.");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ let eventData;
|
|
|
|
|
+ try {
|
|
|
|
|
+ eventData = JSON.parse(msg.content.toString());
|
|
|
|
|
+ console.log("[Ingestor Service] Received raw event:", eventData); // Log for debugging, but be cautious with high volume
|
|
|
|
|
+ }
|
|
|
|
|
+ catch (parseError) {
|
|
|
|
|
+ console.error(`[Ingestor Service] Error parsing message content: ${msg.content.toString()}. Error: ${parseError}`);
|
|
|
|
|
+ amqpChannel.reject(msg, false); // Reject malformed message, do not re-queue
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ // 增加对 eventLog.duration 的校验
|
|
|
|
|
+ if (eventData.duration > 100000 || eventData.duration < 0) {
|
|
|
|
|
+ console.warn(`[Ingestor Service] Skipping event with invalid duration: ${eventData.duration}. Event: ${JSON.stringify(eventData)}`);
|
|
|
|
|
+ amqpChannel.ack(msg); // Acknowledge and drop invalid messages
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ // Filter by project_id
|
|
|
|
|
+ const projectId = eventData.project_id || eventData.project; // project_id for android/ios events, project for oms_app events
|
|
|
|
|
+ if (projectId !== 1 && projectId !== 6) {
|
|
|
|
|
+ // Assuming project_id 1 and 6 are relevant
|
|
|
|
|
+ // console.log(`[Ingestor Service] Skipping event with unsupported project_id: ${projectId}`);
|
|
|
|
|
+ amqpChannel.ack(msg); // Acknowledge and drop unsupported events
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ // Determine event type field name based on project_id and event source
|
|
|
|
|
+ // Assuming 'type' for Android-like events, 'name' for iOS-like events,
|
|
|
|
|
+ const eventType = eventData.type || eventData.name;
|
|
|
|
|
+ // --- 1. Handle Message-Specific Events First ---
|
|
|
|
|
+ if (["message_receive", "message_open"].includes(eventType)) {
|
|
|
|
|
+ await handleMessageEvent(eventData, eventType);
|
|
|
|
|
+ amqpChannel.ack(msg); // Acknowledge message after processing message events
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ // Filter by allowed event types
|
|
|
|
|
+ if (!ALLOWED_EVENT_TYPES.includes(eventType)) {
|
|
|
|
|
+ // console.log(`[Ingestor Service] Skipping event with unsupported event_type: ${eventType}`);
|
|
|
|
|
+ amqpChannel.ack(msg); // Acknowledge and drop unsupported events
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ // Determine UID field name based on project_id
|
|
|
|
|
+ const uid = eventData.uid || eventData.user_id; // uid for Android, user_id for iOS
|
|
|
|
|
+ if (!uid) {
|
|
|
|
|
+ console.warn(`[Ingestor Service] Skipping event with missing UID: ${JSON.stringify(eventData)}`);
|
|
|
|
|
+ amqpChannel.reject(msg, false); // Reject if UID is missing, do not re-queue
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ try {
|
|
|
|
|
+ // Calculate lastActiveAtDateObj once for consistency
|
|
|
|
|
+ let lastActiveAtDateObj;
|
|
|
|
|
+ if (eventData.t) {
|
|
|
|
|
+ lastActiveAtDateObj = (0, dayjs_1.default)(eventData.t).toDate();
|
|
|
|
|
+ }
|
|
|
|
|
+ else if (eventData.create_at) {
|
|
|
|
|
+ lastActiveAtDateObj = (0, dayjs_1.default)(eventData.create_at).toDate();
|
|
|
|
|
+ }
|
|
|
|
|
+ else {
|
|
|
|
|
+ lastActiveAtDateObj = new Date();
|
|
|
|
|
+ }
|
|
|
|
|
+ // --- 2. Prepare Event Data for ClickHouse Batch ---
|
|
|
|
|
+ const clickhouseEvent = {
|
|
|
|
|
+ log_id: eventData._id ? eventData._id.toString() : new mongoose_1.default.Types.ObjectId().toHexString(), // Use existing _id or generate new
|
|
|
|
|
+ uid: uid,
|
|
|
|
|
+ project: projectId,
|
|
|
|
|
+ os: eventData.library_name || null,
|
|
|
|
|
+ version: projectId === 1 ? eventData.version_name : eventData.library_version || null,
|
|
|
|
|
+ event: eventType,
|
|
|
|
|
+ time: lastActiveAtDateObj, // Directly pass Date object, ClickhouseService handles formatting
|
|
|
|
|
+ res: projectId === 1 ? eventData.res : eventData.sku_id || null,
|
|
|
|
|
+ from: projectId === 1 ? eventData.from : eventData.tab_source || null,
|
|
|
|
|
+ position: projectId === 1 ? eventData.position : eventData.click_position || null,
|
|
|
|
|
+ duration: eventData.duration || null,
|
|
|
|
|
+ ad_type: eventData.ad_type || null,
|
|
|
|
|
+ ad_src: eventData.ad_src || null,
|
|
|
|
|
+ revenue: projectId === 1 ? eventData.rev : eventData.ad_revenue || null,
|
|
|
|
|
+ cc: eventData.cc || null,
|
|
|
|
|
+ raw_json_data: JSON.stringify(eventData),
|
|
|
|
|
+ };
|
|
|
|
|
+ clickhouseEventsBuffer.push(clickhouseEvent);
|
|
|
|
|
+ // --- 3. Prepare User Data for MongoDB Batch Update ---
|
|
|
|
|
+ // userSetData will contain fields to be updated using $set for both new and existing documents.
|
|
|
|
|
+ // 'project' is now excluded here as it will be handled by $setOnInsert only.
|
|
|
|
|
+ const userSetData = { lastActiveAt: lastActiveAtDateObj };
|
|
|
|
|
+ // SetOnInsert fields will only apply when a new document is created
|
|
|
|
|
+ const setOnInsertFields = {
|
|
|
|
|
+ uid: uid,
|
|
|
|
|
+ createdAt: new Date(),
|
|
|
|
|
+ project: projectId, // `project` is a required field, must be set on insert
|
|
|
|
|
+ };
|
|
|
|
|
+ // Derive firstLoginAt from 'days' field if available
|
|
|
|
|
+ let derivedFirstLoginAt;
|
|
|
|
|
+ if (eventData.days !== undefined && eventData.days !== null) {
|
|
|
|
|
+ derivedFirstLoginAt = (0, dayjs_1.default)(lastActiveAtDateObj).subtract(eventData.days, "day").toDate();
|
|
|
|
|
+ }
|
|
|
|
|
+ // Set firstLoginAt for $setOnInsert (for new documents)
|
|
|
|
|
+ // This value will only be used if the document is actually inserted (upsert: true creates a new doc)
|
|
|
|
|
+ setOnInsertFields.firstLoginAt = derivedFirstLoginAt || lastActiveAtDateObj;
|
|
|
|
|
+ // Copy relevant fields from event to userSetData (for $set)
|
|
|
|
|
+ for (const field of USER_FIELDS_TO_UPDATE) {
|
|
|
|
|
+ if (field === "uid" || field === "project")
|
|
|
|
|
+ continue;
|
|
|
|
|
+ let sourceFieldName;
|
|
|
|
|
+ if (field === "libraryName")
|
|
|
|
|
+ sourceFieldName = "library_name";
|
|
|
|
|
+ else if (field === "deviceModel")
|
|
|
|
|
+ sourceFieldName = "model";
|
|
|
|
|
+ else if (field === "deviceInfo")
|
|
|
|
|
+ sourceFieldName = "device";
|
|
|
|
|
+ else if (field === "apiLevel")
|
|
|
|
|
+ sourceFieldName = "android_api";
|
|
|
|
|
+ else if (field === "versionName")
|
|
|
|
|
+ sourceFieldName = projectId === 1 ? "version_name" : "library_version";
|
|
|
|
|
+ else if (field === "versionCode")
|
|
|
|
|
+ sourceFieldName = "version_code";
|
|
|
|
|
+ else if (field === "deviceMem")
|
|
|
|
|
+ sourceFieldName = "deviceMem";
|
|
|
|
|
+ else if (field === "fmToken")
|
|
|
|
|
+ sourceFieldName = "token";
|
|
|
|
|
+ else
|
|
|
|
|
+ sourceFieldName = field; // Default to same name
|
|
|
|
|
+ if (sourceFieldName && eventData[sourceFieldName] !== undefined && eventData[sourceFieldName] !== null) {
|
|
|
|
|
+ if (field === "deviceMem" && typeof eventData[sourceFieldName] === "number") {
|
|
|
|
|
+ userSetData.deviceMem = eventData[sourceFieldName];
|
|
|
|
|
+ }
|
|
|
|
|
+ else {
|
|
|
|
|
+ // Type assertion needed as Partial<IUser> doesn't guarantee all keys are assignable at runtime
|
|
|
|
|
+ userSetData[field] = eventData[sourceFieldName];
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ // Initialize update object with $set and $setOnInsert
|
|
|
|
|
+ const updateOperation = {
|
|
|
|
|
+ $set: userSetData,
|
|
|
|
|
+ $setOnInsert: setOnInsertFields,
|
|
|
|
|
+ };
|
|
|
|
|
+ // 👈 关键修改:移除 $min 操作符
|
|
|
|
|
+ // `firstLoginAt` 将只在 `$setOnInsert` 时被设置,
|
|
|
|
|
+ // 如果文档已存在,它将不会被更新,这符合您的需求。
|
|
|
|
|
+ mongoUserWriteOperations.push({
|
|
|
|
|
+ updateOne: {
|
|
|
|
|
+ filter: { uid: uid },
|
|
|
|
|
+ update: updateOperation, // 使用构建好的 updateOperation
|
|
|
|
|
+ upsert: true,
|
|
|
|
|
+ },
|
|
|
|
|
+ });
|
|
|
|
|
+ // --- 4. Conditionally Update UserPreference in MongoDB ---
|
|
|
|
|
+ // If 'color_start' event and it has tags (or can derive from prod)
|
|
|
|
|
+ const tagsToProcess = [];
|
|
|
|
|
+ if (eventType === "color_start") {
|
|
|
|
|
+ if (Array.isArray(eventData.tags) && eventData.tags.length > 0) {
|
|
|
|
|
+ tagsToProcess.push(...eventData.tags);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ if (tagsToProcess.length > 0) {
|
|
|
|
|
+ for (const tag of tagsToProcess) {
|
|
|
|
|
+ mongoUserPrefWriteOperations.push({
|
|
|
|
|
+ updateOne: {
|
|
|
|
|
+ filter: { uid: uid, tag: tag },
|
|
|
|
|
+ update: { $inc: { count: 1 }, $set: { uid: uid, tag: tag } },
|
|
|
|
|
+ upsert: true,
|
|
|
|
|
+ setDefaultsOnInsert: true, // Ensures defaults are applied on upsert (like count: 0)
|
|
|
|
|
+ },
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ // --- Batch Flushing Logic ---
|
|
|
|
|
+ if (clickhouseEventsBuffer.length >= CLICKHOUSE_BATCH_SIZE) {
|
|
|
|
|
+ await flushClickHouseBuffer();
|
|
|
|
|
+ }
|
|
|
|
|
+ if (mongoUserWriteOperations.length >= MONGO_USER_BATCH_SIZE) {
|
|
|
|
|
+ await flushMongoUserBuffer();
|
|
|
|
|
+ }
|
|
|
|
|
+ if (mongoUserPrefWriteOperations.length >= MONGO_PREF_BATCH_SIZE) {
|
|
|
|
|
+ await flushMongoUserPrefBuffer();
|
|
|
|
|
+ }
|
|
|
|
|
+ amqpChannel.ack(msg); // Acknowledge message after successful buffering/processing
|
|
|
|
|
+ }
|
|
|
|
|
+ catch (error) {
|
|
|
|
|
+ console.error(`[Ingestor Service] Error processing event from UID ${uid}:`, error, "Event:", eventData);
|
|
|
|
|
+ // Reject message without re-queuing to prevent infinite loops on consistent processing failures
|
|
|
|
|
+ amqpChannel.reject(msg, false);
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+// --- Main Ingestor Service Start Function ---
|
|
|
|
|
+async function startIngestorService() {
|
|
|
|
|
+ await initializeServices();
|
|
|
|
|
+ if (!amqpChannel) {
|
|
|
|
|
+ console.error("[Ingestor Service] RabbitMQ channel is not available after initialization. Exiting.");
|
|
|
|
|
+ process.exit(1);
|
|
|
|
|
+ }
|
|
|
|
|
+ // Set consumer prefetch count
|
|
|
|
|
+ amqpChannel.prefetch(100);
|
|
|
|
|
+ console.log(`[Ingestor Service] Waiting for messages in queue: ${RABBITMQ_OMS_QUEUE}`);
|
|
|
|
|
+ amqpChannel.consume(RABBITMQ_OMS_QUEUE, async (msg) => {
|
|
|
|
|
+ if (msg === null)
|
|
|
|
|
+ return; // Channel closed or other null message
|
|
|
|
|
+ await processMessage(msg);
|
|
|
|
|
+ }, { noAck: false } // Manual acknowledgment
|
|
|
|
|
+ );
|
|
|
|
|
+ // Set up periodic flushing for any remaining buffered events
|
|
|
|
|
+ setInterval(async () => {
|
|
|
|
|
+ // console.log("[Ingestor Service] Flushing any remaining buffers...");
|
|
|
|
|
+ await flushClickHouseBuffer();
|
|
|
|
|
+ await flushMongoUserBuffer();
|
|
|
|
|
+ await flushMongoUserPrefBuffer();
|
|
|
|
|
+ }, FLUSH_INTERVAL_MS);
|
|
|
|
|
+}
|
|
|
|
|
+// --- Graceful Shutdown ---
|
|
|
|
|
+async function gracefulShutdown() {
|
|
|
|
|
+ console.log("[Ingestor Service] Shutting down...");
|
|
|
|
|
+ // Flush any remaining buffers before closing connections
|
|
|
|
|
+ await flushClickHouseBuffer();
|
|
|
|
|
+ await flushMongoUserBuffer();
|
|
|
|
|
+ await flushMongoUserPrefBuffer();
|
|
|
|
|
+ if (amqpChannel) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ await amqpChannel.close();
|
|
|
|
|
+ console.log("[Ingestor Service] RabbitMQ channel closed.");
|
|
|
|
|
+ }
|
|
|
|
|
+ catch (e) {
|
|
|
|
|
+ console.error("[Ingestor Service] Error closing RabbitMQ channel:", e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ if (amqpConnection) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ await amqpConnection.close();
|
|
|
|
|
+ console.log("[Ingestor Service] RabbitMQ connection closed.");
|
|
|
|
|
+ }
|
|
|
|
|
+ catch (e) {
|
|
|
|
|
+ console.error("[Ingestor Service] Error closing RabbitMQ connection:", e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ if (mongoose_1.default.connection.readyState === 1) {
|
|
|
|
|
+ // Check if connected before trying to disconnect
|
|
|
|
|
+ try {
|
|
|
|
|
+ await mongoose_1.default.disconnect();
|
|
|
|
|
+ console.log("[Ingestor Service] MongoDB connection closed.");
|
|
|
|
|
+ }
|
|
|
|
|
+ catch (e) {
|
|
|
|
|
+ console.error("[Ingestor Service] Error closing MongoDB connection:", e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ process.exit(0);
|
|
|
|
|
+}
|
|
|
|
|
+process.on("SIGINT", gracefulShutdown);
|
|
|
|
|
+process.on("SIGTERM", gracefulShutdown);
|
|
|
|
|
+// --- Start the service if run directly ---
|
|
|
|
|
+if (require.main === module) {
|
|
|
|
|
+ console.log("Ingestor Service started in standalone mode.");
|
|
|
|
|
+ startIngestorService().catch(console.error);
|
|
|
|
|
+}
|
|
|
|
|
+// Export the start function for PM2
|
|
|
|
|
+exports.default = startIngestorService;
|