log-service.ts 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. // oms/src/log-service/app.ts
  2. // Load environment variables (e.g., RABBITMQ_URL, RABBITMQ_LOG_QUEUE, LOG_DIR)
  3. import * as dotenv from "dotenv";
  4. dotenv.config();
  5. import amqp, { Connection, ChannelModel, Channel, Message } from "amqplib"; // 明确导入 Connection, Channel, Message 类型
  6. import * as rfs from "rotating-file-stream"; // 👈 关键修复:使用 `import * as rfs`
  7. import moment from "moment"; // 导入 moment (用于日志文件名生成)
  8. import * as path from "path"; // 导入 path 模块
  9. // --- Environment Variables ---
  10. const RABBITMQ_URL = process.env.RABBITMQ_URL || "amqp://coloring:coloring123.@localhost:5672";
  11. const RABBITMQ_LOG_QUEUE = process.env.RABBITMQ_LOG_QUEUE || "log-event-queue"; // 日志服务订阅的队列
  12. const LOG_DIR = process.env.LOG_DIR || path.join(__dirname, "..", "..", "logs", "coloring"); // 日志文件存储路径
  13. let amqpConnection: ChannelModel | undefined; // 使用 undefined 初始化,因为连接是异步的
  14. let amqpChannel: Channel | undefined; // 使用 undefined 初始化,因为频道是异步的
  15. // --- Log Rotation Setup ---
  16. // 确保日志目录存在
  17. try {
  18. if (!require("fs").existsSync(LOG_DIR)) {
  19. require("fs").mkdirSync(LOG_DIR, { recursive: true });
  20. console.log(`[Log Service] Created log directory: ${LOG_DIR}`);
  21. }
  22. } catch (error) {
  23. console.error(`[Log Service] Failed to create log directory ${LOG_DIR}:`, error);
  24. process.exit(1);
  25. }
  26. // 日志文件名生成器
  27. const generator = (time: Date | number | undefined, index?: number): string => {
  28. if (!time) return "coloring.log"; // 初始文件名
  29. const suffix = moment(time).format("YYYYMMDD");
  30. return `coloring-${suffix}.log.gz`; // 每日轮换并压缩
  31. };
  32. // 创建文件写入流
  33. const logStream = rfs.createStream(generator, {
  34. interval: "1d", // 每日轮换
  35. compress: "gzip", // 使用 gzip 压缩
  36. path: LOG_DIR, // 日志文件路径
  37. intervalBoundary: true, // 确保在指定时间边界轮换
  38. // size: '10M', // 也可以配置按大小轮换,例如每 10MB
  39. // maxFiles: 10, // 最多保留 10 个文件
  40. // maxSize: '100M', // 最大总大小
  41. });
  42. // 监听日志流事件 (可选,用于调试和监控)
  43. logStream.on("external", () => console.log("[Log Stream] external"));
  44. logStream.on("history", () => console.log("[Log Stream] history"));
  45. logStream.on("open", (f: string) => console.log(`[Log Stream] Opened: ${f}`));
  46. logStream.on("removed", (f: string) => console.log(`[Log Stream] Removed: ${f}`));
  47. logStream.on("rotation", () => console.log("[Log Stream] rotation"));
  48. logStream.on("rotated", (f: string) => console.log(`[Log Stream] Rotated to: ${f}`));
  49. logStream.on("warning", (e: Error) => console.warn("[Log Stream] warning:", e));
  50. logStream.on("error", (e: Error) => console.error("[Log Stream] error:", e)); // 捕获写入错误
  51. // --- Utility Functions ---
  52. function delay(ms: number): Promise<void> {
  53. return new Promise((resolve) => setTimeout(resolve, ms));
  54. }
  55. /**
  56. * 将事件数据写入日志文件。
  57. * @param data - 要写入的日志数据字符串。
  58. * @returns Promise<void> - 写入成功或失败。
  59. */
  60. function pushToFile(data: string): Promise<void> {
  61. return new Promise((resolve, reject) => {
  62. // 使用 stream.write 的回调函数来判断写入是否成功
  63. logStream.write(`${data}\n`, (error: Error | null | undefined) => {
  64. if (error) {
  65. return reject(error);
  66. }
  67. resolve();
  68. });
  69. });
  70. }
  71. /**
  72. * 带重试机制地将消息推送到文件。
  73. * @param data - 要写入的日志数据字符串。
  74. * @param ms - 每次重试的延迟时间(毫秒)。
  75. * @param retries - 最大重试次数。
  76. * @returns Promise<void> - 如果写入成功或所有重试都失败。
  77. */
  78. async function retryPushToFile(data: string, ms: number = 1000, retries: number = 5): Promise<void> {
  79. try {
  80. await pushToFile(data);
  81. return; // 成功写入
  82. } catch (err) {
  83. console.error(`[Log Service] Failed to write to log (retries left: ${retries}):`, err);
  84. if (retries > 0) {
  85. await delay(ms);
  86. return retryPushToFile(data, ms, retries - 1); // 递归重试
  87. } else {
  88. throw new Error(`Failed to write to log after ${retries} retries.`); // 抛出最终错误
  89. }
  90. }
  91. }
  92. // --- Main Log Service Start Function ---
  93. async function startLogService() {
  94. try {
  95. amqpConnection = await amqp.connect(RABBITMQ_URL);
  96. amqpConnection.on("error", (err) => {
  97. console.error("[RabbitMQ] Connection error:", err);
  98. if (amqpChannel) amqpChannel.close();
  99. if (amqpConnection) amqpConnection.close();
  100. amqpConnection = undefined;
  101. amqpChannel = undefined;
  102. setTimeout(startLogService, 5000); // 尝试重新连接
  103. });
  104. amqpConnection.on("close", () => {
  105. console.error("[RabbitMQ] Connection closed. Reconnecting...");
  106. amqpConnection = undefined;
  107. amqpChannel = undefined;
  108. setTimeout(startLogService, 5000); // 尝试重新连接
  109. });
  110. amqpChannel = await amqpConnection.createChannel();
  111. console.log("[RabbitMQ] Channel created for Log Service.");
  112. // 确保队列存在且是持久化的
  113. await amqpChannel.assertQueue(RABBITMQ_LOG_QUEUE, { durable: true });
  114. console.log(`[RabbitMQ] Log Service queue '${RABBITMQ_LOG_QUEUE}' asserted.`);
  115. // 设置消费者预取数量,平衡吞吐量和资源使用
  116. amqpChannel.prefetch(100);
  117. console.log(`Log Service connected to RabbitMQ and waiting for messages in queue: ${RABBITMQ_LOG_QUEUE}`);
  118. amqpChannel.consume(
  119. RABBITMQ_LOG_QUEUE,
  120. async (msg: Message | null) => {
  121. if (msg === null) return; // Channel closed or other null message
  122. try {
  123. const eventDataString = msg.content.toString();
  124. console.log("[Log Service] Received event:", eventDataString); // 生产环境可以减少日志量
  125. await retryPushToFile(eventDataString, 3000, 5); // 写入文件,带重试
  126. if (amqpChannel) {
  127. amqpChannel.ack(msg); // 成功写入文件后确认消息
  128. }
  129. } catch (err) {
  130. console.error("[Log Service] Failed to process or write message to log after retries. Rejecting message:", err);
  131. if (amqpChannel) {
  132. // 拒绝消息,不重新入队,防止反复失败阻塞队列
  133. // 或者根据您的策略,可以设置为 true 重新入队
  134. amqpChannel.reject(msg, false);
  135. }
  136. }
  137. },
  138. { noAck: false } // 必须手动确认
  139. );
  140. } catch (error) {
  141. console.error("[Log Service] Failed to connect to RabbitMQ or start consuming:", error);
  142. // 在 PM2 管理下,这里不直接 exit(1),让 PM2 尝试重启整个服务
  143. // setTimeout(startLogService, 5000); // 如果是单进程运行,可以尝试重连
  144. }
  145. }
  146. // --- Graceful Shutdown ---
  147. async function gracefulShutdown() {
  148. console.log("[Log Service] Shutting down...");
  149. if (amqpChannel) {
  150. try {
  151. await amqpChannel.close();
  152. console.log("[Log Service] RabbitMQ channel closed.");
  153. } catch (e) {
  154. console.error("[Log Service] Error closing RabbitMQ channel:", e);
  155. }
  156. }
  157. if (amqpConnection) {
  158. try {
  159. await amqpConnection.close();
  160. console.log("[Log Service] RabbitMQ connection closed.");
  161. } catch (e) {
  162. console.error("[Log Service] Error closing RabbitMQ connection:", e);
  163. }
  164. }
  165. logStream.end(() => {
  166. console.log("[Log Service] Log stream closed.");
  167. process.exit(0);
  168. });
  169. }
  170. process.on("SIGINT", gracefulShutdown);
  171. process.on("SIGTERM", gracefulShutdown);
  172. // --- Start the service if run directly ---
  173. if (require.main === module) {
  174. console.log("Log Service started in standalone mode.");
  175. startLogService().catch(console.error);
  176. }
  177. // Export the start function for PM2
  178. export default startLogService;