| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508 |
- // oms/src/ingestor-service/app.ts
- // Load environment variables (e.g., RABBITMQ_URL, RABBITMQ_OMS_QUEUE, MONGO_URI, CLICKHOUSE_*)
- import * as dotenv from "dotenv";
- dotenv.config();
- import amqp, { Connection, ChannelModel, Channel, Message } from "amqplib";
- import mongoose, { mongo } from "mongoose";
- import dayjs from "dayjs"; // For date manipulation
- import duration from "dayjs/plugin/duration"; // dayjs plugin for duration
- import isSameOrBefore from "dayjs/plugin/isSameOrBefore"; // Day.js plugin for isSameOrBefore
- // Import OMS models and services
- import { User, IUser } from "../src/models/userModel"; // Assuming userModel.ts exports User
- import UserPreference, { IUserPreference } from "../src/models/userPreferenceModel"; // Assuming userPreferenceModel.ts exports UserPreference
- import { MessageRecord, IMessageRecord } from "../src/models/messageRecordModel"; // 新增导入 MessageRecord
- import { ClickhouseService, IEventLog } from "../src/services/clickhouseService"; // Assuming clickhouseService.ts exports ClickhouseService and IEventLog
- dayjs.extend(duration);
- dayjs.extend(isSameOrBefore);
- // --- Environment Variables ---
- const RABBITMQ_URL = process.env.RABBITMQ_URL || "amqp://coloring:coloring123.@localhost:5672";
- const RABBITMQ_OMS_QUEUE = process.env.RABBITMQ_OMS_QUEUE || "oms_event_queue"; // 摄取器订阅的队列
- const OMS_MONGO_URI = process.env.MONGO_URI || "mongodb://oms:oms123.@localhost:27717/omsdb?authSource=admin"; // MongoDB URI
- const CLICKHOUSE_HOST = process.env.CLICKHOUSE_HOST || "http://localhost:8123";
- const CLICKHOUSE_DATABASE = process.env.CLICKHOUSE_DATABASE || "omsdb";
- const CLICKHOUSE_USER = process.env.CLICKHOUSE_USER || "ckuser";
- const CLICKHOUSE_PASSWORD = process.env.CLICKHOUSE_PASSWORD || "ckpassword";
- const CLICKHOUSE_EVENTS_TABLE = "events"; // ClickHouse 日志表的名称 (与 event/app.ts 和 ClickHouse table schema 保持一致)
- // --- Batching Configuration ---
- const CLICKHOUSE_BATCH_SIZE = 5000; // ClickHouse 批量插入大小
- const MONGO_USER_BATCH_SIZE = 1000; // MongoDB User 批量写入大小
- const MONGO_PREF_BATCH_SIZE = 500; // MongoDB UserPreference 批量写入大小
- const FLUSH_INTERVAL_MS = 5000; // 定时刷新缓冲区间隔 (毫秒)
- // --- Internal State ---
- let amqpConnection: ChannelModel | undefined;
- let amqpChannel: Channel | undefined;
- let mongoConnection: typeof mongoose | undefined;
- let clickhouseService: ClickhouseService;
- const clickhouseEventsBuffer: IEventLog[] = [];
- const mongoUserWriteOperations: any[] = [];
- const mongoUserPrefWriteOperations: any[] = [];
- // List of event types to process (reused from ingestHistoricalData.ts)
- const ALLOWED_EVENT_TYPES = [
- "visit",
- "show_deeplink_dialog",
- "share",
- "save",
- "revenue",
- "rate",
- "favorite",
- "color_tip",
- "color_start",
- "color_done",
- "color_data",
- "ad_color_tip",
- "ad_color_float",
- "message_receive",
- "message_open", // 新增消息相关事件
- "firebase_message_token",
- ];
- // Define an array of valid IUser keys for copying from event log to User model
- const USER_FIELDS_TO_UPDATE: (keyof IUser)[] = [
- "network",
- "campaign",
- "adgroup",
- "creative",
- "prod",
- "libraryName", // maps to library_name
- "cc",
- "lang",
- "manufacturer",
- "deviceModel", // maps to model
- "deviceInfo", // maps to device
- "hardware",
- "deviceMem",
- "apiLevel", // maps to android_api
- "versionName", // maps to version_name or library_version
- "versionCode", // maps to version_code
- "fmToken", // map to token
- "project", // Add project field for updating, map to project_id
- ];
- // --- Initialize Services ---
- async function initializeServices() {
- try {
- // Connect to RabbitMQ
- amqpConnection = await amqp.connect(RABBITMQ_URL);
- amqpConnection.on("error", (err) => {
- console.error("[RabbitMQ] Connection error:", err);
- // Reconnection logic will be handled by PM2 restarting the service, or a dedicated handler
- });
- amqpConnection.on("close", () => {
- console.error("[RabbitMQ] Connection closed. Restarting service...");
- // In a production setup, consider graceful shutdown and PM2 restarting
- process.exit(1); // Exit to allow PM2 to restart
- });
- amqpChannel = await amqpConnection.createChannel();
- console.log("[RabbitMQ] Channel created for Ingestor Service.");
- // Assert the ingestor queue
- await amqpChannel.assertQueue(RABBITMQ_OMS_QUEUE, { durable: true });
- console.log(`[RabbitMQ] Ingestor Service queue '${RABBITMQ_OMS_QUEUE}' asserted.`);
- // Connect to OMS MongoDB (using Mongoose)
- mongoConnection = await mongoose.connect(OMS_MONGO_URI);
- console.log(`Connected to OMS MongoDB: ${OMS_MONGO_URI}`);
- // Initialize ClickHouse service with credentials
- clickhouseService = new ClickhouseService(CLICKHOUSE_HOST, CLICKHOUSE_DATABASE, CLICKHOUSE_USER, CLICKHOUSE_PASSWORD);
- // Ensure ClickHouse table exists
- await clickhouseService.ensureTable(CLICKHOUSE_EVENTS_TABLE);
- console.log(`ClickHouse Service initialized for ${CLICKHOUSE_DATABASE} at ${CLICKHOUSE_HOST}`);
- } catch (error) {
- console.error("[Ingestor Service] Failed to initialize services:", error);
- process.exit(1); // Exit to allow PM2 to restart on critical startup failure
- }
- }
- // --- Helper function to flush ClickHouse buffer ---
- async function flushClickHouseBuffer() {
- if (clickhouseEventsBuffer.length === 0) return;
- const eventsToFlush = [...clickhouseEventsBuffer]; // Take a snapshot
- clickhouseEventsBuffer.length = 0; // Clear buffer immediately
- try {
- await clickhouseService.insertEvent(CLICKHOUSE_EVENTS_TABLE, eventsToFlush);
- console.log(`[ClickHouse] Flushed ${eventsToFlush.length} events to ClickHouse.`);
- } catch (error) {
- console.error(`[ClickHouse] Error flushing ClickHouse buffer:`, error);
- // On error, decide whether to re-queue (if transient) or log and drop (if data issue)
- // For now, we log and drop to avoid blocking the ingestor.
- }
- }
- // --- Helper function to flush MongoDB User buffer ---
- async function flushMongoUserBuffer() {
- if (mongoUserWriteOperations.length === 0) return;
- const operationsToFlush = [...mongoUserWriteOperations]; // Take a snapshot
- mongoUserWriteOperations.length = 0; // Clear buffer immediately
- try {
- const bulkResult = await User.bulkWrite(operationsToFlush);
- console.log(`[MongoDB-User] Flushed ${operationsToFlush.length} operations. Upserted/Modified: ${bulkResult.upsertedCount + bulkResult.modifiedCount}`);
- } catch (bulkError) {
- console.error(`[MongoDB-User] Error in MongoDB bulkWrite:`, bulkError);
- }
- }
- // --- Helper function to flush MongoDB UserPreference buffer ---
- async function flushMongoUserPrefBuffer() {
- if (mongoUserPrefWriteOperations.length === 0) return;
- const operationsToFlush = [...mongoUserPrefWriteOperations]; // Take a snapshot
- mongoUserPrefWriteOperations.length = 0; // Clear buffer immediately
- try {
- const bulkResult = await UserPreference.bulkWrite(operationsToFlush);
- console.log(`[MongoDB-UserPref] Flushed ${operationsToFlush.length} operations. Upserted/Modified: ${bulkResult.upsertedCount + bulkResult.modifiedCount}`);
- } catch (bulkError) {
- console.error(`[MongoDB-UserPref] Error in MongoDB UserPreference bulkWrite:`, bulkError);
- }
- }
- /**
- * Handles updating a MessageRecord based on a message event (receive or open).
- * @param eventData The parsed event object from RabbitMQ.
- * @param eventType The type of the event ('message_receive' or 'message_open').
- */
- async function handleMessageEvent(eventData: any, eventType: string): Promise<void> {
- const msgId = eventData.msgid;
- const fcmId = eventData.fcmid;
- const eventTime = dayjs(eventData.t).toDate();
- const inForeground = eventData.inforeground === "true" || eventData.inforeground === true;
- try {
- let updateFields: Partial<IMessageRecord>;
- let query: any;
- // Determine update fields based on event type
- if (eventType === "message_receive") {
- updateFields = {
- status: 2, // 2: delivered
- deliveredAt: eventTime,
- inforeground: inForeground,
- };
- } else if (eventType === "message_open") {
- updateFields = {
- status: 3, // 3: opened
- openedAt: eventTime,
- // inforeground will be updated by the receive event, so no need to set here
- };
- } else {
- console.warn(`[Ingestor Service] Unhandled message event type: ${eventType}`);
- return;
- }
- // Determine query filter based on available IDs
- if (msgId && mongoose.Types.ObjectId.isValid(msgId)) {
- query = { _id: msgId };
- } else if (fcmId) {
- query = { fcmReceipt: fcmId };
- } else {
- console.warn(`[Ingestor Service] Missing msgid or fcmid for event type: ${eventType}. Event: ${JSON.stringify(eventData)}`);
- return;
- }
- // Perform the update
- const result = await MessageRecord.updateOne(query, { $set: updateFields });
- // Log the result of the update operation
- if (result.matchedCount > 0) {
- console.log(`[MongoDB-MessageRecord] Updated record for ${eventType} event. Matched: ${result.matchedCount}, Modified: ${result.modifiedCount}`);
- } else {
- console.warn(`[MongoDB-MessageRecord] No matching record found for ${eventType} event. Query: ${JSON.stringify(query)}`);
- }
- } catch (error) {
- console.error(`[MongoDB-MessageRecord] Error updating record for ${eventType} event:`, error);
- }
- }
- // --- Process a single event message ---
- async function processMessage(msg: Message) {
- if (!amqpChannel) {
- console.error("[Ingestor Service] RabbitMQ channel not available for processing message.");
- return;
- }
- let eventData: any;
- try {
- eventData = JSON.parse(msg.content.toString());
- // console.log("[Ingestor Service] Received raw event:", eventData); // Log for debugging, but be cautious with high volume
- } catch (parseError) {
- console.error(`[Ingestor Service] Error parsing message content: ${msg.content.toString()}. Error: ${parseError}`);
- amqpChannel.reject(msg, false); // Reject malformed message, do not re-queue
- return;
- }
- // 增加对 eventLog.duration 的校验
- if (eventData.duration > 100000 || eventData.duration < 0) {
- console.warn(`[Ingestor Service] Skipping event with invalid duration: ${eventData.duration}. Event: ${JSON.stringify(eventData)}`);
- amqpChannel.ack(msg); // Acknowledge and drop invalid messages
- return;
- }
- // Filter by project_id
- const projectId: number = eventData.project_id || eventData.project; // project_id for android/ios events, project for oms_app events
- if (projectId !== 1 && projectId !== 6) {
- // Assuming project_id 1 and 6 are relevant
- // console.log(`[Ingestor Service] Skipping event with unsupported project_id: ${projectId}`);
- amqpChannel.ack(msg); // Acknowledge and drop unsupported events
- return;
- }
- // Determine event type field name based on project_id and event source
- // Assuming 'type' for Android-like events, 'name' for iOS-like events,
- const eventType = eventData.type || eventData.name;
- // --- 1. Handle Message-Specific Events First ---
- if (["message_receive", "message_open"].includes(eventType)) {
- await handleMessageEvent(eventData, eventType); // 移除 amqpChannel.ack(msg); 和 return; // 让事件继续向下流转,以便被记录到ClickHouse和更新User表
- } // Filter by allowed event types
- if (!ALLOWED_EVENT_TYPES.includes(eventType)) {
- // console.log(`[Ingestor Service] Skipping event with unsupported event_type: ${eventType}`);
- amqpChannel.ack(msg); // Acknowledge and drop unsupported events
- return;
- }
- // Determine UID field name based on project_id
- const uid: string = eventData.uid || eventData.user_id; // uid for Android, user_id for iOS
- if (!uid) {
- console.warn(`[Ingestor Service] Skipping event with missing UID: ${JSON.stringify(eventData)}`);
- amqpChannel.reject(msg, false); // Reject if UID is missing, do not re-queue
- return;
- }
- try {
- // Calculate lastActiveAtDateObj once for consistency
- let lastActiveAtDateObj: Date;
- if (eventData.t) {
- lastActiveAtDateObj = dayjs(eventData.t).toDate();
- } else if (eventData.create_at) {
- lastActiveAtDateObj = dayjs(eventData.create_at).toDate();
- } else {
- lastActiveAtDateObj = new Date();
- }
- // --- 2. Prepare Event Data for ClickHouse Batch ---
- const clickhouseEvent: IEventLog = {
- log_id: eventData._id ? eventData._id.toString() : new mongoose.Types.ObjectId().toHexString(), // Use existing _id or generate new
- uid: uid,
- project: projectId,
- os: eventData.library_name || null,
- version: projectId === 1 ? eventData.version_name : eventData.library_version || null,
- event: eventType,
- time: lastActiveAtDateObj, // Directly pass Date object, ClickhouseService handles formatting
- res: projectId === 1 ? eventData.res : eventData.sku_id || null,
- from: projectId === 1 ? eventData.from : eventData.tab_source || null,
- position: projectId === 1 ? eventData.position : eventData.click_position || null,
- duration: eventData.duration || null,
- ad_type: eventData.ad_type || null,
- ad_src: eventData.ad_src || null,
- revenue: projectId === 1 ? eventData.rev : eventData.ad_revenue || null,
- cc: eventData.cc || null,
- raw_json_data: JSON.stringify(eventData),
- };
- clickhouseEventsBuffer.push(clickhouseEvent);
- // --- 3. Prepare User Data for MongoDB Batch Update ---
- // userSetData will contain fields to be updated using $set for both new and existing documents.
- // 'project' is now excluded here as it will be handled by $setOnInsert only.
- // const userSetData: Partial<IUser> = { lastActiveAt: lastActiveAtDateObj };
- const userSetData: Partial<IUser> = {};
- if (eventType !== "message_receive") {
- userSetData.lastActiveAt = lastActiveAtDateObj;
- }
- // SetOnInsert fields will only apply when a new document is created
- const setOnInsertFields: any = {
- uid: uid,
- createdAt: new Date(),
- project: projectId, // `project` is a required field, must be set on insert
- };
- // Derive firstLoginAt from 'days' field if available
- let derivedFirstLoginAt: Date | undefined;
- if (eventData.days !== undefined && eventData.days !== null) {
- derivedFirstLoginAt = dayjs(lastActiveAtDateObj).subtract(eventData.days, "day").toDate();
- }
- // Set firstLoginAt for $setOnInsert (for new documents)
- // This value will only be used if the document is actually inserted (upsert: true creates a new doc)
- setOnInsertFields.firstLoginAt = derivedFirstLoginAt || lastActiveAtDateObj;
- // Copy relevant fields from event to userSetData (for $set)
- for (const field of USER_FIELDS_TO_UPDATE) {
- if (field === "uid" || field === "project") continue;
- let sourceFieldName: string | undefined;
- if (field === "libraryName") sourceFieldName = "library_name";
- else if (field === "deviceModel") sourceFieldName = "model";
- else if (field === "deviceInfo") sourceFieldName = "device";
- else if (field === "apiLevel") sourceFieldName = "android_api";
- else if (field === "versionName") sourceFieldName = projectId === 1 ? "version_name" : "library_version";
- else if (field === "versionCode") sourceFieldName = "version_code";
- else if (field === "deviceMem") sourceFieldName = "deviceMem";
- else if (field === "fmToken") sourceFieldName = "token";
- else sourceFieldName = field; // Default to same name
- if (sourceFieldName && eventData[sourceFieldName] !== undefined && eventData[sourceFieldName] !== null) {
- if (field === "deviceMem" && typeof eventData[sourceFieldName] === "number") {
- userSetData.deviceMem = eventData[sourceFieldName];
- } else if (field === "versionCode" && typeof eventData[sourceFieldName] === "number" && eventData[sourceFieldName] < 0) {
- // 异常数据处理,verison code 可能为-1
- continue;
- } else if (field === "versionName" && eventData[sourceFieldName] === "unknown") {
- // 异常数据处理,versionName 可能为 unkown
- continue;
- } else {
- // Type assertion needed as Partial<IUser> doesn't guarantee all keys are assignable at runtime
- userSetData[field] = eventData[sourceFieldName];
- }
- }
- }
- // Initialize update object with $set and $setOnInsert
- const updateOperation: mongo.UpdateFilter<IUser> = {
- $set: userSetData,
- $setOnInsert: setOnInsertFields,
- };
- // 👈 关键修改:移除 $min 操作符
- // `firstLoginAt` 将只在 `$setOnInsert` 时被设置,
- // 如果文档已存在,它将不会被更新,这符合您的需求。
- mongoUserWriteOperations.push({
- updateOne: {
- filter: { uid: uid },
- update: updateOperation, // 使用构建好的 updateOperation
- upsert: true,
- },
- });
- // --- 4. Conditionally Update UserPreference in MongoDB ---
- // If 'color_start' event and it has tags (or can derive from prod)
- const tagsToProcess: string[] = [];
- if (eventType === "color_start") {
- if (Array.isArray(eventData.tags) && eventData.tags.length > 0) {
- tagsToProcess.push(...eventData.tags);
- }
- }
- if (tagsToProcess.length > 0) {
- for (const tag of tagsToProcess) {
- mongoUserPrefWriteOperations.push({
- updateOne: {
- filter: { uid: uid, tag: tag },
- update: { $inc: { count: 1 }, $set: { uid: uid, tag: tag } },
- upsert: true,
- setDefaultsOnInsert: true, // Ensures defaults are applied on upsert (like count: 0)
- },
- });
- }
- }
- // --- Batch Flushing Logic ---
- if (clickhouseEventsBuffer.length >= CLICKHOUSE_BATCH_SIZE) {
- await flushClickHouseBuffer();
- }
- if (mongoUserWriteOperations.length >= MONGO_USER_BATCH_SIZE) {
- await flushMongoUserBuffer();
- }
- if (mongoUserPrefWriteOperations.length >= MONGO_PREF_BATCH_SIZE) {
- await flushMongoUserPrefBuffer();
- }
- amqpChannel.ack(msg); // Acknowledge message after successful buffering/processing
- } catch (error) {
- console.error(`[Ingestor Service] Error processing event from UID ${uid}:`, error, "Event:", eventData);
- // Reject message without re-queuing to prevent infinite loops on consistent processing failures
- amqpChannel.reject(msg, false);
- }
- }
- // --- Main Ingestor Service Start Function ---
- async function startIngestorService() {
- await initializeServices();
- if (!amqpChannel) {
- console.error("[Ingestor Service] RabbitMQ channel is not available after initialization. Exiting.");
- process.exit(1);
- }
- // Set consumer prefetch count
- amqpChannel.prefetch(100);
- console.log(`[Ingestor Service] Waiting for messages in queue: ${RABBITMQ_OMS_QUEUE}`);
- amqpChannel.consume(
- RABBITMQ_OMS_QUEUE,
- async (msg: Message | null) => {
- if (msg === null) return; // Channel closed or other null message
- await processMessage(msg);
- },
- { noAck: false } // Manual acknowledgment
- );
- // Set up periodic flushing for any remaining buffered events
- setInterval(async () => {
- // console.log("[Ingestor Service] Flushing any remaining buffers...");
- await flushClickHouseBuffer();
- await flushMongoUserBuffer();
- await flushMongoUserPrefBuffer();
- }, FLUSH_INTERVAL_MS);
- }
- // --- Graceful Shutdown ---
- async function gracefulShutdown() {
- console.log("[Ingestor Service] Shutting down...");
- // Flush any remaining buffers before closing connections
- await flushClickHouseBuffer();
- await flushMongoUserBuffer();
- await flushMongoUserPrefBuffer();
- if (amqpChannel) {
- try {
- await amqpChannel.close();
- console.log("[Ingestor Service] RabbitMQ channel closed.");
- } catch (e) {
- console.error("[Ingestor Service] Error closing RabbitMQ channel:", e);
- }
- }
- if (amqpConnection) {
- try {
- await amqpConnection.close();
- console.log("[Ingestor Service] RabbitMQ connection closed.");
- } catch (e) {
- console.error("[Ingestor Service] Error closing RabbitMQ connection:", e);
- }
- }
- if (mongoose.connection.readyState === 1) {
- // Check if connected before trying to disconnect
- try {
- await mongoose.disconnect();
- console.log("[Ingestor Service] MongoDB connection closed.");
- } catch (e) {
- console.error("[Ingestor Service] Error closing MongoDB connection:", e);
- }
- }
- process.exit(0);
- }
- process.on("SIGINT", gracefulShutdown);
- process.on("SIGTERM", gracefulShutdown);
- // --- Start the service if run directly ---
- if (require.main === module) {
- console.log("Ingestor Service started in standalone mode.");
- startIngestorService().catch(console.error);
- }
- // Export the start function for PM2
- export default startIngestorService;
|