guoziyun 9 tháng trước cách đây
mục cha
commit
f45e9ded2e
2 tập tin đã thay đổi với 10 bổ sung89 xóa
  1. 5 42
      oms/dist/services/event-api-service.js
  2. 5 47
      oms/services/event-api-service.ts

+ 5 - 42
oms/dist/services/event-api-service.js

@@ -99,8 +99,8 @@ app.set("trust proxy", true);
 // :remote-addr 和 :req[host] 会通过 app.set("trust proxy", true) 正常工作
 app.use((0, morgan_1.default)('[:date[clf]] :remote-addr :req[host] :status :response-time ms :res[content-length] ":method :url HTTP/:http-version" ":referrer" ":user-agent"'));
 app.use(express_1.default.json()); // To parse JSON request bodies
-// --- API Endpoint: /napi/event/v2 ---
-app.post("/napi/event/v2", async (req, res) => {
+// 提取出的处理函数
+const eventHandler = async (req, res) => {
     if (!amqpChannel) {
         console.error("[Event API] RabbitMQ channel not available.");
         return res.status(500).json({ message: "Server is not ready to process events." });
@@ -137,46 +137,9 @@ app.post("/napi/event/v2", async (req, res) => {
         console.error("[Event API] Error publishing event to RabbitMQ:", error);
         res.status(500).json({ message: "Failed to process event." });
     }
-});
-// --- API Endpoint: /napi/event/ ---
-app.post("/napi/event/", async (req, res) => {
-    if (!amqpChannel) {
-        console.error("[Event API] RabbitMQ channel not available.");
-        return res.status(500).json({ message: "Server is not ready to process events." });
-    }
-    const eventData = req.body;
-    if (!eventData || Object.keys(eventData).length === 0) {
-        return res.status(400).json({ message: "Event data is required." });
-    }
-    // 添加必要字段
-    eventData.ip = req.ip;
-    eventData.t = new Date();
-    eventData.cc = req.header("x-country-code") || "nil";
-    eventData._id = new mongoose_1.default.Types.ObjectId();
-    try {
-        const message = JSON.stringify(eventData);
-        // Publish the message to the exchange for broadcast
-        // persistent: true ensures the message survives RabbitMQ restarts
-        // '' as routing key for fanout exchange means it goes to all bound queues
-        const published = amqpChannel.publish(RABBITMQ_EXCHANGE_NAME, "", // Routing key (ignored by fanout exchange)
-        Buffer.from(message), { persistent: true } // Mark message as persistent
-        );
-        if (published) {
-            // console.log('[Event API] Event published to RabbitMQ:', eventData); // Commented for high volume
-            res.status(200).json({ msg: "ok" });
-        }
-        else {
-            // This case indicates the RabbitMQ buffer is full.
-            // In a real-world scenario, you might want to implement backpressure or retry.
-            console.warn("[Event API] Failed to publish event to RabbitMQ (channel full). Event:", eventData);
-            res.status(503).json({ message: "Service temporarily unavailable, please retry." });
-        }
-    }
-    catch (error) {
-        console.error("[Event API] Error publishing event to RabbitMQ:", error);
-        res.status(500).json({ message: "Failed to process event." });
-    }
-});
+};
+app.post("/napi/event/v2", eventHandler);
+app.post("/napi/event/", eventHandler);
 // --- Start the Express Server ---
 async function startServer() {
     await connectRabbitMQ(); // Connect to RabbitMQ before starting the server

+ 5 - 47
oms/services/event-api-service.ts

@@ -77,8 +77,8 @@ app.use(morgan('[:date[clf]] :remote-addr :req[host] :status :response-time ms :
 
 app.use(express.json()); // To parse JSON request bodies
 
-// --- API Endpoint: /napi/event/v2 ---
-app.post("/napi/event/v2", async (req: Request, res: Response) => {
+// 提取出的处理函数
+const eventHandler = async (req: Request, res: Response) => {
   if (!amqpChannel) {
     console.error("[Event API] RabbitMQ channel not available.");
     return res.status(500).json({ message: "Server is not ready to process events." });
@@ -120,52 +120,10 @@ app.post("/napi/event/v2", async (req: Request, res: Response) => {
     console.error("[Event API] Error publishing event to RabbitMQ:", error);
     res.status(500).json({ message: "Failed to process event." });
   }
-});
-
-// --- API Endpoint: /napi/event/ ---
-app.post("/napi/event/", async (req: Request, res: Response) => {
-  if (!amqpChannel) {
-    console.error("[Event API] RabbitMQ channel not available.");
-    return res.status(500).json({ message: "Server is not ready to process events." });
-  }
-
-  const eventData = req.body;
-  if (!eventData || Object.keys(eventData).length === 0) {
-    return res.status(400).json({ message: "Event data is required." });
-  }
+};
 
-  // 添加必要字段
-  eventData.ip = req.ip;
-  eventData.t = new Date();
-  eventData.cc = req.header("x-country-code") || "nil";
-  eventData._id = new mongoose.Types.ObjectId();
-
-  try {
-    const message = JSON.stringify(eventData);
-    // Publish the message to the exchange for broadcast
-    // persistent: true ensures the message survives RabbitMQ restarts
-    // '' as routing key for fanout exchange means it goes to all bound queues
-    const published = amqpChannel.publish(
-      RABBITMQ_EXCHANGE_NAME,
-      "", // Routing key (ignored by fanout exchange)
-      Buffer.from(message),
-      { persistent: true } // Mark message as persistent
-    );
-
-    if (published) {
-      // console.log('[Event API] Event published to RabbitMQ:', eventData); // Commented for high volume
-      res.status(200).json({ msg: "ok" });
-    } else {
-      // This case indicates the RabbitMQ buffer is full.
-      // In a real-world scenario, you might want to implement backpressure or retry.
-      console.warn("[Event API] Failed to publish event to RabbitMQ (channel full). Event:", eventData);
-      res.status(503).json({ message: "Service temporarily unavailable, please retry." });
-    }
-  } catch (error) {
-    console.error("[Event API] Error publishing event to RabbitMQ:", error);
-    res.status(500).json({ message: "Failed to process event." });
-  }
-});
+app.post("/napi/event/v2", eventHandler);
+app.post("/napi/event/", eventHandler);
 
 // --- Start the Express Server ---
 async function startServer() {