| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- // oms/src/log-service/app.ts
- // Load environment variables (e.g., RABBITMQ_URL, RABBITMQ_LOG_QUEUE, LOG_DIR)
- import * as dotenv from "dotenv";
- dotenv.config();
- import amqp, { Connection, ChannelModel, Channel, Message } from "amqplib"; // 明确导入 Connection, Channel, Message 类型
- import * as rfs from "rotating-file-stream"; // 👈 关键修复:使用 `import * as rfs`
- import moment from "moment"; // 导入 moment (用于日志文件名生成)
- import * as path from "path"; // 导入 path 模块
- // --- Environment Variables ---
- const RABBITMQ_URL = process.env.RABBITMQ_URL || "amqp://coloring:coloring123.@localhost:5672";
- const RABBITMQ_LOG_QUEUE = process.env.RABBITMQ_LOG_QUEUE || "log-event-queue"; // 日志服务订阅的队列
- const LOG_DIR = process.env.LOG_DIR || path.join(__dirname, "..", "..", "logs", "coloring"); // 日志文件存储路径
- let amqpConnection: ChannelModel | undefined; // 使用 undefined 初始化,因为连接是异步的
- let amqpChannel: Channel | undefined; // 使用 undefined 初始化,因为频道是异步的
- // --- Log Rotation Setup ---
- // 确保日志目录存在
- try {
- if (!require("fs").existsSync(LOG_DIR)) {
- require("fs").mkdirSync(LOG_DIR, { recursive: true });
- console.log(`[Log Service] Created log directory: ${LOG_DIR}`);
- }
- } catch (error) {
- console.error(`[Log Service] Failed to create log directory ${LOG_DIR}:`, error);
- process.exit(1);
- }
- // 日志文件名生成器
- const generator = (time: Date | number | undefined, index?: number): string => {
- if (!time) return "coloring.log"; // 初始文件名
- const suffix = moment(time).format("YYYYMMDD");
- return `coloring-${suffix}.log.gz`; // 每日轮换并压缩
- };
- // 创建文件写入流
- const logStream = rfs.createStream(generator, {
- interval: "1d", // 每日轮换
- compress: "gzip", // 使用 gzip 压缩
- path: LOG_DIR, // 日志文件路径
- intervalBoundary: true, // 确保在指定时间边界轮换
- // size: '10M', // 也可以配置按大小轮换,例如每 10MB
- // maxFiles: 10, // 最多保留 10 个文件
- // maxSize: '100M', // 最大总大小
- });
- // 监听日志流事件 (可选,用于调试和监控)
- logStream.on("external", () => console.log("[Log Stream] external"));
- logStream.on("history", () => console.log("[Log Stream] history"));
- logStream.on("open", (f: string) => console.log(`[Log Stream] Opened: ${f}`));
- logStream.on("removed", (f: string) => console.log(`[Log Stream] Removed: ${f}`));
- logStream.on("rotation", () => console.log("[Log Stream] rotation"));
- logStream.on("rotated", (f: string) => console.log(`[Log Stream] Rotated to: ${f}`));
- logStream.on("warning", (e: Error) => console.warn("[Log Stream] warning:", e));
- logStream.on("error", (e: Error) => console.error("[Log Stream] error:", e)); // 捕获写入错误
- // --- Utility Functions ---
- function delay(ms: number): Promise<void> {
- return new Promise((resolve) => setTimeout(resolve, ms));
- }
- /**
- * 将事件数据写入日志文件。
- * @param data - 要写入的日志数据字符串。
- * @returns Promise<void> - 写入成功或失败。
- */
- function pushToFile(data: string): Promise<void> {
- return new Promise((resolve, reject) => {
- // 使用 stream.write 的回调函数来判断写入是否成功
- logStream.write(`${data}\n`, (error: Error | null | undefined) => {
- if (error) {
- return reject(error);
- }
- resolve();
- });
- });
- }
- /**
- * 带重试机制地将消息推送到文件。
- * @param data - 要写入的日志数据字符串。
- * @param ms - 每次重试的延迟时间(毫秒)。
- * @param retries - 最大重试次数。
- * @returns Promise<void> - 如果写入成功或所有重试都失败。
- */
- async function retryPushToFile(data: string, ms: number = 1000, retries: number = 5): Promise<void> {
- try {
- await pushToFile(data);
- return; // 成功写入
- } catch (err) {
- console.error(`[Log Service] Failed to write to log (retries left: ${retries}):`, err);
- if (retries > 0) {
- await delay(ms);
- return retryPushToFile(data, ms, retries - 1); // 递归重试
- } else {
- throw new Error(`Failed to write to log after ${retries} retries.`); // 抛出最终错误
- }
- }
- }
- // --- Main Log Service Start Function ---
- async function startLogService() {
- try {
- amqpConnection = await amqp.connect(RABBITMQ_URL);
- amqpConnection.on("error", (err) => {
- console.error("[RabbitMQ] Connection error:", err);
- if (amqpChannel) amqpChannel.close();
- if (amqpConnection) amqpConnection.close();
- amqpConnection = undefined;
- amqpChannel = undefined;
- setTimeout(startLogService, 5000); // 尝试重新连接
- });
- amqpConnection.on("close", () => {
- console.error("[RabbitMQ] Connection closed. Reconnecting...");
- amqpConnection = undefined;
- amqpChannel = undefined;
- setTimeout(startLogService, 5000); // 尝试重新连接
- });
- amqpChannel = await amqpConnection.createChannel();
- console.log("[RabbitMQ] Channel created for Log Service.");
- // 确保队列存在且是持久化的
- await amqpChannel.assertQueue(RABBITMQ_LOG_QUEUE, { durable: true });
- console.log(`[RabbitMQ] Log Service queue '${RABBITMQ_LOG_QUEUE}' asserted.`);
- // 设置消费者预取数量,平衡吞吐量和资源使用
- amqpChannel.prefetch(100);
- console.log(`Log Service connected to RabbitMQ and waiting for messages in queue: ${RABBITMQ_LOG_QUEUE}`);
- amqpChannel.consume(
- RABBITMQ_LOG_QUEUE,
- async (msg: Message | null) => {
- if (msg === null) return; // Channel closed or other null message
- try {
- const eventDataString = msg.content.toString();
- console.log("[Log Service] Received event:", eventDataString); // 生产环境可以减少日志量
- await retryPushToFile(eventDataString, 3000, 5); // 写入文件,带重试
- if (amqpChannel) {
- amqpChannel.ack(msg); // 成功写入文件后确认消息
- }
- } catch (err) {
- console.error("[Log Service] Failed to process or write message to log after retries. Rejecting message:", err);
- if (amqpChannel) {
- // 拒绝消息,不重新入队,防止反复失败阻塞队列
- // 或者根据您的策略,可以设置为 true 重新入队
- amqpChannel.reject(msg, false);
- }
- }
- },
- { noAck: false } // 必须手动确认
- );
- } catch (error) {
- console.error("[Log Service] Failed to connect to RabbitMQ or start consuming:", error);
- // 在 PM2 管理下,这里不直接 exit(1),让 PM2 尝试重启整个服务
- // setTimeout(startLogService, 5000); // 如果是单进程运行,可以尝试重连
- }
- }
- // --- Graceful Shutdown ---
- async function gracefulShutdown() {
- console.log("[Log Service] Shutting down...");
- if (amqpChannel) {
- try {
- await amqpChannel.close();
- console.log("[Log Service] RabbitMQ channel closed.");
- } catch (e) {
- console.error("[Log Service] Error closing RabbitMQ channel:", e);
- }
- }
- if (amqpConnection) {
- try {
- await amqpConnection.close();
- console.log("[Log Service] RabbitMQ connection closed.");
- } catch (e) {
- console.error("[Log Service] Error closing RabbitMQ connection:", e);
- }
- }
- logStream.end(() => {
- console.log("[Log Service] Log stream closed.");
- process.exit(0);
- });
- }
- process.on("SIGINT", gracefulShutdown);
- process.on("SIGTERM", gracefulShutdown);
- // --- Start the service if run directly ---
- if (require.main === module) {
- console.log("Log Service started in standalone mode.");
- startLogService().catch(console.error);
- }
- // Export the start function for PM2
- export default startLogService;
|