ingestor-service.ts 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508
  1. // oms/src/ingestor-service/app.ts
  2. // Load environment variables (e.g., RABBITMQ_URL, RABBITMQ_OMS_QUEUE, MONGO_URI, CLICKHOUSE_*)
  3. import * as dotenv from "dotenv";
  4. dotenv.config();
  5. import amqp, { Connection, ChannelModel, Channel, Message } from "amqplib";
  6. import mongoose, { mongo } from "mongoose";
  7. import dayjs from "dayjs"; // For date manipulation
  8. import duration from "dayjs/plugin/duration"; // dayjs plugin for duration
  9. import isSameOrBefore from "dayjs/plugin/isSameOrBefore"; // Day.js plugin for isSameOrBefore
  10. // Import OMS models and services
  11. import { User, IUser } from "../src/models/userModel"; // Assuming userModel.ts exports User
  12. import UserPreference, { IUserPreference } from "../src/models/userPreferenceModel"; // Assuming userPreferenceModel.ts exports UserPreference
  13. import { MessageRecord, IMessageRecord } from "../src/models/messageRecordModel"; // 新增导入 MessageRecord
  14. import { ClickhouseService, IEventLog } from "../src/services/clickhouseService"; // Assuming clickhouseService.ts exports ClickhouseService and IEventLog
  15. dayjs.extend(duration);
  16. dayjs.extend(isSameOrBefore);
  17. // --- Environment Variables ---
  18. const RABBITMQ_URL = process.env.RABBITMQ_URL || "amqp://coloring:coloring123.@localhost:5672";
  19. const RABBITMQ_OMS_QUEUE = process.env.RABBITMQ_OMS_QUEUE || "oms_event_queue"; // 摄取器订阅的队列
  20. const OMS_MONGO_URI = process.env.MONGO_URI || "mongodb://oms:oms123.@localhost:27717/omsdb?authSource=admin"; // MongoDB URI
  21. const CLICKHOUSE_HOST = process.env.CLICKHOUSE_HOST || "http://localhost:8123";
  22. const CLICKHOUSE_DATABASE = process.env.CLICKHOUSE_DATABASE || "omsdb";
  23. const CLICKHOUSE_USER = process.env.CLICKHOUSE_USER || "ckuser";
  24. const CLICKHOUSE_PASSWORD = process.env.CLICKHOUSE_PASSWORD || "ckpassword";
  25. const CLICKHOUSE_EVENTS_TABLE = "events"; // ClickHouse 日志表的名称 (与 event/app.ts 和 ClickHouse table schema 保持一致)
  26. // --- Batching Configuration ---
  27. const CLICKHOUSE_BATCH_SIZE = 5000; // ClickHouse 批量插入大小
  28. const MONGO_USER_BATCH_SIZE = 1000; // MongoDB User 批量写入大小
  29. const MONGO_PREF_BATCH_SIZE = 500; // MongoDB UserPreference 批量写入大小
  30. const FLUSH_INTERVAL_MS = 5000; // 定时刷新缓冲区间隔 (毫秒)
  31. // --- Internal State ---
  32. let amqpConnection: ChannelModel | undefined;
  33. let amqpChannel: Channel | undefined;
  34. let mongoConnection: typeof mongoose | undefined;
  35. let clickhouseService: ClickhouseService;
  36. const clickhouseEventsBuffer: IEventLog[] = [];
  37. const mongoUserWriteOperations: any[] = [];
  38. const mongoUserPrefWriteOperations: any[] = [];
  39. // List of event types to process (reused from ingestHistoricalData.ts)
  40. const ALLOWED_EVENT_TYPES = [
  41. "visit",
  42. "show_deeplink_dialog",
  43. "share",
  44. "save",
  45. "revenue",
  46. "rate",
  47. "favorite",
  48. "color_tip",
  49. "color_start",
  50. "color_done",
  51. "color_data",
  52. "ad_color_tip",
  53. "ad_color_float",
  54. "message_receive",
  55. "message_open", // 新增消息相关事件
  56. "firebase_message_token",
  57. ];
  58. // Define an array of valid IUser keys for copying from event log to User model
  59. const USER_FIELDS_TO_UPDATE: (keyof IUser)[] = [
  60. "network",
  61. "campaign",
  62. "adgroup",
  63. "creative",
  64. "prod",
  65. "libraryName", // maps to library_name
  66. "cc",
  67. "lang",
  68. "manufacturer",
  69. "deviceModel", // maps to model
  70. "deviceInfo", // maps to device
  71. "hardware",
  72. "deviceMem",
  73. "apiLevel", // maps to android_api
  74. "versionName", // maps to version_name or library_version
  75. "versionCode", // maps to version_code
  76. "fmToken", // map to token
  77. "project", // Add project field for updating, map to project_id
  78. ];
  79. // --- Initialize Services ---
  80. async function initializeServices() {
  81. try {
  82. // Connect to RabbitMQ
  83. amqpConnection = await amqp.connect(RABBITMQ_URL);
  84. amqpConnection.on("error", (err) => {
  85. console.error("[RabbitMQ] Connection error:", err);
  86. // Reconnection logic will be handled by PM2 restarting the service, or a dedicated handler
  87. });
  88. amqpConnection.on("close", () => {
  89. console.error("[RabbitMQ] Connection closed. Restarting service...");
  90. // In a production setup, consider graceful shutdown and PM2 restarting
  91. process.exit(1); // Exit to allow PM2 to restart
  92. });
  93. amqpChannel = await amqpConnection.createChannel();
  94. console.log("[RabbitMQ] Channel created for Ingestor Service.");
  95. // Assert the ingestor queue
  96. await amqpChannel.assertQueue(RABBITMQ_OMS_QUEUE, { durable: true });
  97. console.log(`[RabbitMQ] Ingestor Service queue '${RABBITMQ_OMS_QUEUE}' asserted.`);
  98. // Connect to OMS MongoDB (using Mongoose)
  99. mongoConnection = await mongoose.connect(OMS_MONGO_URI);
  100. console.log(`Connected to OMS MongoDB: ${OMS_MONGO_URI}`);
  101. // Initialize ClickHouse service with credentials
  102. clickhouseService = new ClickhouseService(CLICKHOUSE_HOST, CLICKHOUSE_DATABASE, CLICKHOUSE_USER, CLICKHOUSE_PASSWORD);
  103. // Ensure ClickHouse table exists
  104. await clickhouseService.ensureTable(CLICKHOUSE_EVENTS_TABLE);
  105. console.log(`ClickHouse Service initialized for ${CLICKHOUSE_DATABASE} at ${CLICKHOUSE_HOST}`);
  106. } catch (error) {
  107. console.error("[Ingestor Service] Failed to initialize services:", error);
  108. process.exit(1); // Exit to allow PM2 to restart on critical startup failure
  109. }
  110. }
  111. // --- Helper function to flush ClickHouse buffer ---
  112. async function flushClickHouseBuffer() {
  113. if (clickhouseEventsBuffer.length === 0) return;
  114. const eventsToFlush = [...clickhouseEventsBuffer]; // Take a snapshot
  115. clickhouseEventsBuffer.length = 0; // Clear buffer immediately
  116. try {
  117. await clickhouseService.insertEvent(CLICKHOUSE_EVENTS_TABLE, eventsToFlush);
  118. console.log(`[ClickHouse] Flushed ${eventsToFlush.length} events to ClickHouse.`);
  119. } catch (error) {
  120. console.error(`[ClickHouse] Error flushing ClickHouse buffer:`, error);
  121. // On error, decide whether to re-queue (if transient) or log and drop (if data issue)
  122. // For now, we log and drop to avoid blocking the ingestor.
  123. }
  124. }
  125. // --- Helper function to flush MongoDB User buffer ---
  126. async function flushMongoUserBuffer() {
  127. if (mongoUserWriteOperations.length === 0) return;
  128. const operationsToFlush = [...mongoUserWriteOperations]; // Take a snapshot
  129. mongoUserWriteOperations.length = 0; // Clear buffer immediately
  130. try {
  131. const bulkResult = await User.bulkWrite(operationsToFlush);
  132. console.log(`[MongoDB-User] Flushed ${operationsToFlush.length} operations. Upserted/Modified: ${bulkResult.upsertedCount + bulkResult.modifiedCount}`);
  133. } catch (bulkError) {
  134. console.error(`[MongoDB-User] Error in MongoDB bulkWrite:`, bulkError);
  135. }
  136. }
  137. // --- Helper function to flush MongoDB UserPreference buffer ---
  138. async function flushMongoUserPrefBuffer() {
  139. if (mongoUserPrefWriteOperations.length === 0) return;
  140. const operationsToFlush = [...mongoUserPrefWriteOperations]; // Take a snapshot
  141. mongoUserPrefWriteOperations.length = 0; // Clear buffer immediately
  142. try {
  143. const bulkResult = await UserPreference.bulkWrite(operationsToFlush);
  144. console.log(`[MongoDB-UserPref] Flushed ${operationsToFlush.length} operations. Upserted/Modified: ${bulkResult.upsertedCount + bulkResult.modifiedCount}`);
  145. } catch (bulkError) {
  146. console.error(`[MongoDB-UserPref] Error in MongoDB UserPreference bulkWrite:`, bulkError);
  147. }
  148. }
  149. /**
  150. * Handles updating a MessageRecord based on a message event (receive or open).
  151. * @param eventData The parsed event object from RabbitMQ.
  152. * @param eventType The type of the event ('message_receive' or 'message_open').
  153. */
  154. async function handleMessageEvent(eventData: any, eventType: string): Promise<void> {
  155. const msgId = eventData.msgid;
  156. const fcmId = eventData.fcmid;
  157. const eventTime = dayjs(eventData.t).toDate();
  158. const inForeground = eventData.inforeground === "true" || eventData.inforeground === true;
  159. try {
  160. let updateFields: Partial<IMessageRecord>;
  161. let query: any;
  162. // Determine update fields based on event type
  163. if (eventType === "message_receive") {
  164. updateFields = {
  165. status: 2, // 2: delivered
  166. deliveredAt: eventTime,
  167. inforeground: inForeground,
  168. };
  169. } else if (eventType === "message_open") {
  170. updateFields = {
  171. status: 3, // 3: opened
  172. openedAt: eventTime,
  173. // inforeground will be updated by the receive event, so no need to set here
  174. };
  175. } else {
  176. console.warn(`[Ingestor Service] Unhandled message event type: ${eventType}`);
  177. return;
  178. }
  179. // Determine query filter based on available IDs
  180. if (msgId && mongoose.Types.ObjectId.isValid(msgId)) {
  181. query = { _id: msgId };
  182. } else if (fcmId) {
  183. query = { fcmReceipt: fcmId };
  184. } else {
  185. console.warn(`[Ingestor Service] Missing msgid or fcmid for event type: ${eventType}. Event: ${JSON.stringify(eventData)}`);
  186. return;
  187. }
  188. // Perform the update
  189. const result = await MessageRecord.updateOne(query, { $set: updateFields });
  190. // Log the result of the update operation
  191. if (result.matchedCount > 0) {
  192. console.log(`[MongoDB-MessageRecord] Updated record for ${eventType} event. Matched: ${result.matchedCount}, Modified: ${result.modifiedCount}`);
  193. } else {
  194. console.warn(`[MongoDB-MessageRecord] No matching record found for ${eventType} event. Query: ${JSON.stringify(query)}`);
  195. }
  196. } catch (error) {
  197. console.error(`[MongoDB-MessageRecord] Error updating record for ${eventType} event:`, error);
  198. }
  199. }
  200. // --- Process a single event message ---
  201. async function processMessage(msg: Message) {
  202. if (!amqpChannel) {
  203. console.error("[Ingestor Service] RabbitMQ channel not available for processing message.");
  204. return;
  205. }
  206. let eventData: any;
  207. try {
  208. eventData = JSON.parse(msg.content.toString());
  209. // console.log("[Ingestor Service] Received raw event:", eventData); // Log for debugging, but be cautious with high volume
  210. } catch (parseError) {
  211. console.error(`[Ingestor Service] Error parsing message content: ${msg.content.toString()}. Error: ${parseError}`);
  212. amqpChannel.reject(msg, false); // Reject malformed message, do not re-queue
  213. return;
  214. }
  215. // 增加对 eventLog.duration 的校验
  216. if (eventData.duration > 100000 || eventData.duration < 0) {
  217. console.warn(`[Ingestor Service] Skipping event with invalid duration: ${eventData.duration}. Event: ${JSON.stringify(eventData)}`);
  218. amqpChannel.ack(msg); // Acknowledge and drop invalid messages
  219. return;
  220. }
  221. // Filter by project_id
  222. const projectId: number = eventData.project_id || eventData.project; // project_id for android/ios events, project for oms_app events
  223. if (projectId !== 1 && projectId !== 6) {
  224. // Assuming project_id 1 and 6 are relevant
  225. // console.log(`[Ingestor Service] Skipping event with unsupported project_id: ${projectId}`);
  226. amqpChannel.ack(msg); // Acknowledge and drop unsupported events
  227. return;
  228. }
  229. // Determine event type field name based on project_id and event source
  230. // Assuming 'type' for Android-like events, 'name' for iOS-like events,
  231. const eventType = eventData.type || eventData.name;
  232. // --- 1. Handle Message-Specific Events First ---
  233. if (["message_receive", "message_open"].includes(eventType)) {
  234. await handleMessageEvent(eventData, eventType); // 移除 amqpChannel.ack(msg); 和 return; // 让事件继续向下流转,以便被记录到ClickHouse和更新User表
  235. } // Filter by allowed event types
  236. if (!ALLOWED_EVENT_TYPES.includes(eventType)) {
  237. // console.log(`[Ingestor Service] Skipping event with unsupported event_type: ${eventType}`);
  238. amqpChannel.ack(msg); // Acknowledge and drop unsupported events
  239. return;
  240. }
  241. // Determine UID field name based on project_id
  242. const uid: string = eventData.uid || eventData.user_id; // uid for Android, user_id for iOS
  243. if (!uid) {
  244. console.warn(`[Ingestor Service] Skipping event with missing UID: ${JSON.stringify(eventData)}`);
  245. amqpChannel.reject(msg, false); // Reject if UID is missing, do not re-queue
  246. return;
  247. }
  248. try {
  249. // Calculate lastActiveAtDateObj once for consistency
  250. let lastActiveAtDateObj: Date;
  251. if (eventData.t) {
  252. lastActiveAtDateObj = dayjs(eventData.t).toDate();
  253. } else if (eventData.create_at) {
  254. lastActiveAtDateObj = dayjs(eventData.create_at).toDate();
  255. } else {
  256. lastActiveAtDateObj = new Date();
  257. }
  258. // --- 2. Prepare Event Data for ClickHouse Batch ---
  259. const clickhouseEvent: IEventLog = {
  260. log_id: eventData._id ? eventData._id.toString() : new mongoose.Types.ObjectId().toHexString(), // Use existing _id or generate new
  261. uid: uid,
  262. project: projectId,
  263. os: eventData.library_name || null,
  264. version: projectId === 1 ? eventData.version_name : eventData.library_version || null,
  265. event: eventType,
  266. time: lastActiveAtDateObj, // Directly pass Date object, ClickhouseService handles formatting
  267. res: projectId === 1 ? eventData.res : eventData.sku_id || null,
  268. from: projectId === 1 ? eventData.from : eventData.tab_source || null,
  269. position: projectId === 1 ? eventData.position : eventData.click_position || null,
  270. duration: eventData.duration || null,
  271. ad_type: eventData.ad_type || null,
  272. ad_src: eventData.ad_src || null,
  273. revenue: projectId === 1 ? eventData.rev : eventData.ad_revenue || null,
  274. cc: eventData.cc || null,
  275. raw_json_data: JSON.stringify(eventData),
  276. };
  277. clickhouseEventsBuffer.push(clickhouseEvent);
  278. // --- 3. Prepare User Data for MongoDB Batch Update ---
  279. // userSetData will contain fields to be updated using $set for both new and existing documents.
  280. // 'project' is now excluded here as it will be handled by $setOnInsert only.
  281. // const userSetData: Partial<IUser> = { lastActiveAt: lastActiveAtDateObj };
  282. const userSetData: Partial<IUser> = {};
  283. if (eventType !== "message_receive") {
  284. userSetData.lastActiveAt = lastActiveAtDateObj;
  285. }
  286. // SetOnInsert fields will only apply when a new document is created
  287. const setOnInsertFields: any = {
  288. uid: uid,
  289. createdAt: new Date(),
  290. project: projectId, // `project` is a required field, must be set on insert
  291. };
  292. // Derive firstLoginAt from 'days' field if available
  293. let derivedFirstLoginAt: Date | undefined;
  294. if (eventData.days !== undefined && eventData.days !== null) {
  295. derivedFirstLoginAt = dayjs(lastActiveAtDateObj).subtract(eventData.days, "day").toDate();
  296. }
  297. // Set firstLoginAt for $setOnInsert (for new documents)
  298. // This value will only be used if the document is actually inserted (upsert: true creates a new doc)
  299. setOnInsertFields.firstLoginAt = derivedFirstLoginAt || lastActiveAtDateObj;
  300. // Copy relevant fields from event to userSetData (for $set)
  301. for (const field of USER_FIELDS_TO_UPDATE) {
  302. if (field === "uid" || field === "project") continue;
  303. let sourceFieldName: string | undefined;
  304. if (field === "libraryName") sourceFieldName = "library_name";
  305. else if (field === "deviceModel") sourceFieldName = "model";
  306. else if (field === "deviceInfo") sourceFieldName = "device";
  307. else if (field === "apiLevel") sourceFieldName = "android_api";
  308. else if (field === "versionName") sourceFieldName = projectId === 1 ? "version_name" : "library_version";
  309. else if (field === "versionCode") sourceFieldName = "version_code";
  310. else if (field === "deviceMem") sourceFieldName = "deviceMem";
  311. else if (field === "fmToken") sourceFieldName = "token";
  312. else sourceFieldName = field; // Default to same name
  313. if (sourceFieldName && eventData[sourceFieldName] !== undefined && eventData[sourceFieldName] !== null) {
  314. if (field === "deviceMem" && typeof eventData[sourceFieldName] === "number") {
  315. userSetData.deviceMem = eventData[sourceFieldName];
  316. } else if (field === "versionCode" && typeof eventData[sourceFieldName] === "number" && eventData[sourceFieldName] < 0) {
  317. // 异常数据处理,verison code 可能为-1
  318. continue;
  319. } else if (field === "versionName" && eventData[sourceFieldName] === "unknown") {
  320. // 异常数据处理,versionName 可能为 unkown
  321. continue;
  322. } else {
  323. // Type assertion needed as Partial<IUser> doesn't guarantee all keys are assignable at runtime
  324. userSetData[field] = eventData[sourceFieldName];
  325. }
  326. }
  327. }
  328. // Initialize update object with $set and $setOnInsert
  329. const updateOperation: mongo.UpdateFilter<IUser> = {
  330. $set: userSetData,
  331. $setOnInsert: setOnInsertFields,
  332. };
  333. // 👈 关键修改:移除 $min 操作符
  334. // `firstLoginAt` 将只在 `$setOnInsert` 时被设置,
  335. // 如果文档已存在,它将不会被更新,这符合您的需求。
  336. mongoUserWriteOperations.push({
  337. updateOne: {
  338. filter: { uid: uid },
  339. update: updateOperation, // 使用构建好的 updateOperation
  340. upsert: true,
  341. },
  342. });
  343. // --- 4. Conditionally Update UserPreference in MongoDB ---
  344. // If 'color_start' event and it has tags (or can derive from prod)
  345. const tagsToProcess: string[] = [];
  346. if (eventType === "color_start") {
  347. if (Array.isArray(eventData.tags) && eventData.tags.length > 0) {
  348. tagsToProcess.push(...eventData.tags);
  349. }
  350. }
  351. if (tagsToProcess.length > 0) {
  352. for (const tag of tagsToProcess) {
  353. mongoUserPrefWriteOperations.push({
  354. updateOne: {
  355. filter: { uid: uid, tag: tag },
  356. update: { $inc: { count: 1 }, $set: { uid: uid, tag: tag } },
  357. upsert: true,
  358. setDefaultsOnInsert: true, // Ensures defaults are applied on upsert (like count: 0)
  359. },
  360. });
  361. }
  362. }
  363. // --- Batch Flushing Logic ---
  364. if (clickhouseEventsBuffer.length >= CLICKHOUSE_BATCH_SIZE) {
  365. await flushClickHouseBuffer();
  366. }
  367. if (mongoUserWriteOperations.length >= MONGO_USER_BATCH_SIZE) {
  368. await flushMongoUserBuffer();
  369. }
  370. if (mongoUserPrefWriteOperations.length >= MONGO_PREF_BATCH_SIZE) {
  371. await flushMongoUserPrefBuffer();
  372. }
  373. amqpChannel.ack(msg); // Acknowledge message after successful buffering/processing
  374. } catch (error) {
  375. console.error(`[Ingestor Service] Error processing event from UID ${uid}:`, error, "Event:", eventData);
  376. // Reject message without re-queuing to prevent infinite loops on consistent processing failures
  377. amqpChannel.reject(msg, false);
  378. }
  379. }
  380. // --- Main Ingestor Service Start Function ---
  381. async function startIngestorService() {
  382. await initializeServices();
  383. if (!amqpChannel) {
  384. console.error("[Ingestor Service] RabbitMQ channel is not available after initialization. Exiting.");
  385. process.exit(1);
  386. }
  387. // Set consumer prefetch count
  388. amqpChannel.prefetch(100);
  389. console.log(`[Ingestor Service] Waiting for messages in queue: ${RABBITMQ_OMS_QUEUE}`);
  390. amqpChannel.consume(
  391. RABBITMQ_OMS_QUEUE,
  392. async (msg: Message | null) => {
  393. if (msg === null) return; // Channel closed or other null message
  394. await processMessage(msg);
  395. },
  396. { noAck: false } // Manual acknowledgment
  397. );
  398. // Set up periodic flushing for any remaining buffered events
  399. setInterval(async () => {
  400. // console.log("[Ingestor Service] Flushing any remaining buffers...");
  401. await flushClickHouseBuffer();
  402. await flushMongoUserBuffer();
  403. await flushMongoUserPrefBuffer();
  404. }, FLUSH_INTERVAL_MS);
  405. }
  406. // --- Graceful Shutdown ---
  407. async function gracefulShutdown() {
  408. console.log("[Ingestor Service] Shutting down...");
  409. // Flush any remaining buffers before closing connections
  410. await flushClickHouseBuffer();
  411. await flushMongoUserBuffer();
  412. await flushMongoUserPrefBuffer();
  413. if (amqpChannel) {
  414. try {
  415. await amqpChannel.close();
  416. console.log("[Ingestor Service] RabbitMQ channel closed.");
  417. } catch (e) {
  418. console.error("[Ingestor Service] Error closing RabbitMQ channel:", e);
  419. }
  420. }
  421. if (amqpConnection) {
  422. try {
  423. await amqpConnection.close();
  424. console.log("[Ingestor Service] RabbitMQ connection closed.");
  425. } catch (e) {
  426. console.error("[Ingestor Service] Error closing RabbitMQ connection:", e);
  427. }
  428. }
  429. if (mongoose.connection.readyState === 1) {
  430. // Check if connected before trying to disconnect
  431. try {
  432. await mongoose.disconnect();
  433. console.log("[Ingestor Service] MongoDB connection closed.");
  434. } catch (e) {
  435. console.error("[Ingestor Service] Error closing MongoDB connection:", e);
  436. }
  437. }
  438. process.exit(0);
  439. }
  440. process.on("SIGINT", gracefulShutdown);
  441. process.on("SIGTERM", gracefulShutdown);
  442. // --- Start the service if run directly ---
  443. if (require.main === module) {
  444. console.log("Ingestor Service started in standalone mode.");
  445. startIngestorService().catch(console.error);
  446. }
  447. // Export the start function for PM2
  448. export default startIngestorService;