构建基于 Fastify SQS 与 Puppeteer 的高韧性异步报告生成服务


最初的矛盾爆发在一次产品演示中。我们的SaaS平台提供一个复杂的报表功能,前端使用 Ant Design 的 FormTable 组件,允许用户自定义查询条件,然后导出一份精美的PDF报告。最初的实现简单粗暴:一个同步的 Fastify API 接口,接收前端参数,后端用 Puppeteer 实时渲染页面并生成PDF,然后直接返回文件流。在开发环境和少数用户的场景下,它工作得很好。

然而,当报告的数据量增大,渲染变得复杂时,这个接口的响应时间能轻易超过30秒,直接触发了API网关的超时。更糟糕的是,在高并发请求下,服务器上瞬间启动的大量 Chrome 无头实例会迅速耗尽 CPU 和内存资源,导致整个后端服务雪崩。这是典型的将长时间运行的计算密集型任务同步处理的后果。

敏捷开发的核心之一是快速响应变化。在下一次的迭代周期中,重构这个摇摇欲坠的功能成了最高优先级的任务。

初步构想:从同步到异步的必然演进

技术痛点非常明确:必须将PDF生成这个重度操作与主API流程解耦。用户点击“导出”后,API应该立即响应,告知“任务已提交,请稍后在通知中心查看”,而不是让用户盯着加载动画直到超时。

这自然导向了基于消息队列的异步任务处理架构。

  1. API 网关 (Fastify): 角色转变为一个纯粹的任务分发器。它负责接收和校验来自 Ant Design 前端的请求,然后将生成任务作为一个消息推送到队列中,立刻返回 202 Accepted。Fastify 的轻量和高性能特性非常适合这种“即发即忘”的场景。
  2. 消息队列 (AWS SQS): 我们选择 SQS 的原因在于它的完全托管特性、高可用性以及与 AWS 生态的无缝集成,特别是其“死信队列(Dead Letter Queue, DLQ)”机制,对于处理失败任务至关重要。相比于自建 RabbitMQ 或 Kafka,SQS 在这个场景下运维成本几乎为零。
  3. 任务处理器 (Worker): 一个或多个独立的 Node.js 进程,持续从 SQS 拉取任务。每个任务都驱动 Puppeteer 完成一次PDF生成。这些 Worker 可以独立于主API部署和扩缩容,实现了资源的物理隔离。

这个架构在理论上解决了同步调用的所有问题。现在,我们需要将它付诸实践。

步骤一:改造 Fastify API 为任务生产者

首先重构 Fastify 的接口。旧的代码被废弃,新的 /reports/generate-async 接口只做三件事:验证输入、构造消息体、发送到 SQS。

// src/routes/report.js

import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
import { v4 as uuidv4 } from 'uuid';
import { config } from '../config.js';

// 初始化 SQS 客户端
// 在真实项目中,配置(region, credentials)应该通过环境变量或 IAM 角色注入
const sqsClient = new SQSClient({
  region: config.aws.region,
  credentials: {
    accessKeyId: config.aws.accessKeyId,
    secretAccessKey: config.aws.secretAccessKey,
  },
});

async function routes(fastify, options) {
  const reportRequestSchema = {
    body: {
      type: 'object',
      required: ['reportType', 'parameters', 'userId'],
      properties: {
        reportType: { type: 'string', enum: ['sales_summary', 'user_activity'] },
        parameters: { type: 'object' },
        userId: { type: 'string', format: 'uuid' },
      },
    },
    response: {
      202: {
        type: 'object',
        properties: {
          message: { type: 'string' },
          taskId: { type: 'string' },
        },
      },
    },
  };

  fastify.post('/reports/generate-async', { schema: reportRequestSchema }, async (request, reply) => {
    const taskId = uuidv4();
    const { reportType, parameters, userId } = request.body;

    const messagePayload = {
      taskId,
      reportType,
      parameters,
      userId,
      submittedAt: new Date().toISOString(),
    };

    const command = new SendMessageCommand({
      QueueUrl: config.sqs.queueUrl,
      MessageBody: JSON.stringify(messagePayload),
      // 使用 MessageGroupId 确保 FIFO 队列中的消息分组(如果需要)
      // 对于标准队列则非必需
      MessageGroupId: reportType, 
      // 使用 taskId 进行内容去重
      MessageDeduplicationId: taskId, 
    });

    try {
      await sqsClient.send(command);
      fastify.log.info({ taskId, userId }, `Report generation task successfully queued.`);
      
      reply.code(202).send({
        message: 'Report generation task has been accepted.',
        taskId: taskId,
      });

    } catch (error) {
      fastify.log.error({ err: error, taskId }, 'Failed to queue report generation task.');
      // 这里的错误处理很关键,如果队列服务不可用,不能欺骗客户端任务已接受
      reply.code(500).send({ error: 'Failed to submit the task. Please try again later.' });
    }
  });
}

export default routes;

这段代码有几个生产实践要点:

  • Schema 验证: Fastify 的 schema 功能能在路由处理函数执行前自动验证请求体,代码更整洁且安全。
  • AWS SDK v3: 使用模块化的 AWS SDK v3,只引入必要的客户端 @aws-sdk/client-sqs
  • 唯一任务ID: 生成一个 taskId 并将其放入消息体和返回给客户端。这对于后续追踪任务状态至关重要。
  • 错误处理: 明确处理 SQS 发送失败的情况。如果连任务都无法入队,必须向客户端返回一个服务端错误,而不是一个虚假的成功响应。
  • 日志: 关键路径上的日志记录是必须的,包括任务ID和用户ID,便于问题排查。

步骤二:构建消费 SQS 消息的 Puppeteer Worker

Worker 是这个架构的核心,也是最脆弱的部分。它是一个独立的 Node.js 服务,主要逻辑是循环地从 SQS 拉取消息并处理。

// worker/consumer.js

import { SQSClient, ReceiveMessageCommand, DeleteMessageCommand } from '@aws-sdk/client-sqs';
import puppeteer from 'puppeteer';
import { config } from './config.js';
import { logger } from './logger.js';

const sqsClient = new SQSClient({ region: config.aws.region, ... });

// Puppeteer 浏览器实例管理
// 一个常见的错误是在每次处理消息时都启动和关闭浏览器,开销巨大。
// 更好的做法是复用一个浏览器实例。
let browserInstance;
async function getBrowser() {
  if (!browserInstance || !browserInstance.isConnected()) {
    logger.info('Launching new Puppeteer browser instance.');
    browserInstance = await puppeteer.launch({
      headless: true,
      args: [
        '--no-sandbox',
        '--disable-setuid-sandbox',
        '--disable-dev-shm-usage', // 在 Docker 环境中至关重要
      ],
    });
  }
  return browserInstance;
}

async function processMessage(message) {
  const task = JSON.parse(message.Body);
  logger.info({ taskId: task.taskId }, 'Starting to process report task.');
  
  let page;
  try {
    const browser = await getBrowser();
    page = await browser.newPage();
    
    // 这里的 URL 应该指向一个内部服务,该服务能根据参数渲染报表的 HTML 页面。
    // 为了样式统一,这个内部页面可以复用 Ant Design 的样式表。
    const reportUrl = `${config.reportRenderer.baseUrl}?params=${encodeURIComponent(JSON.stringify(task.parameters))}`;
    
    await page.goto(reportUrl, { waitUntil: 'networkidle0', timeout: 60000 });
    
    // 生成 PDF 并上传到 S3
    const pdfBuffer = await page.pdf({ format: 'A4', printBackground: true });
    
    // ... 此处省略上传 S3 和更新数据库任务状态的代码 ...
    
    logger.info({ taskId: task.taskId }, 'Report generated and stored successfully.');
    
    // 任务成功处理后,从 SQS 删除消息
    await sqsClient.send(new DeleteMessageCommand({
      QueueUrl: config.sqs.queueUrl,
      ReceiptHandle: message.ReceiptHandle,
    }));

  } catch (error) {
    logger.error({ err: error, taskId: task.taskId }, 'Error processing report task.');
    // 故意不删除消息,让它在 VisibilityTimeout 后重回队列,由 SQS 的 redrive policy 处理
    throw error; // 向上抛出异常,让主循环知道处理失败
  } finally {
    if (page) {
      await page.close(); // 确保页面被关闭,防止内存泄漏
    }
  }
}

async function startPolling() {
  logger.info('Worker started. Polling for messages...');
  while (true) {
    try {
      const command = new ReceiveMessageCommand({
        QueueUrl: config.sqs.queueUrl,
        MaxNumberOfMessages: 5, // 一次最多拉取5条
        WaitTimeSeconds: 20, // 启用长轮询,降低API调用成本
        MessageAttributeNames: ['All'],
      });
      
      const response = await sqsClient.send(command);

      if (response.Messages && response.Messages.length > 0) {
        logger.info(`Received ${response.Messages.length} messages.`);
        const processingPromises = response.Messages.map(processMessage);
        await Promise.allSettled(processingPromises); // 并行处理,等待所有任务完成(不论成功或失败)
      }
    } catch (error) {
      logger.error({ err: error }, 'Error during SQS polling loop.');
      await new Promise(resolve => setTimeout(resolve, 5000)); // 发生严重错误时,等待5秒再重试
    }
  }
}

startPolling();

这个 Worker 的实现考虑了几个生产环境中的关键点:

  • 长轮询: WaitTimeSeconds: 20 告诉 SQS,如果没有消息,请保持连接打开最多20秒。这极大地减少了空轮询的次数,节省了成本和网络流量。
  • 浏览器实例复用: 通过 getBrowser 函数,我们确保只在需要时启动一个浏览器实例,并供所有任务处理共享,显著降低了性能开札。
  • 资源清理: finally 块中的 page.close() 是防止内存泄漏的生命线。无论成功或失败,每个打开的页面都必须被关闭。
  • 并发处理: Promise.allSettled 允许我们并行处理一批消息,提高了吞吐量,同时确保单个消息的处理失败不会中断其他消息的处理。
  • 失败处理:processMessage 抛出异常时,我们不会删除该消息。这使得消息在“可见性超时”后会重新出现在队列中,SQS 会根据配置的重试策略进行重试。达到最大重试次数后,消息会被自动移入预先配置的死信队列(DLQ),等待人工介入。

迭代中的新风暴:毒丸消息与服务雪崩

架构上线后,初期运行平稳。但很快,我们遇到了一个更隐蔽的问题。某个特定类型的报告,由于其数据源的一个 bug,总会导致 Puppeteer 渲染超时或崩溃。

这个消息成了一个“毒丸”。它被 Worker 取出,处理失败,由于我们没有删除它,它回到队列。几分钟后,它又被同一个或另一个 Worker 取出,再次处理失败。这个过程无限循环。

更糟的是,SQS 的死信队列策略是基于接收次数的(maxReceiveCount)。如果我们将 maxReceiveCount 设置为5,那么这个毒丸消息会造成5次重量级的 Puppeteer 启动、渲染、崩溃的循环,才会被隔离到 DLQ。在此期间,它浪费了大量的计算资源。如果同时出现多个这样的毒丸消息,我们的 Worker 集群会把所有资源都浪费在处理这些注定失败的任务上,正常任务被阻塞, фактически 造成了 Worker 服务的雪崩。

我们需要一个更智能、反应更迅速的熔断机制。

解决方案:在 Worker 中实现一个轻量级状态机熔断器

我们需要一个机制,当 Worker 发现某种类型的任务(比如 reportType: 'user_activity')在短时间内连续失败多次时,能主动“熔断”,在接下来的一段时间内直接拒绝处理同类型的任务,让它们暂时留在队列里,给其他类型的任务让路。这本质上是一个客户端侧的断路器模式。

我们不引入复杂的库,而是自己实现一个简单的、基于内存的状态机。

graph TD
    A[CLOSED] -- Failure Threshold Reached --> B(OPEN);
    B -- Timeout --> C{HALF_OPEN};
    C -- One Success --> A;
    C -- One Failure --> B;
    A -- Success --> A;

这个断路器有三个状态:

  • CLOSED: 正常状态,所有任务都正常处理。失败会被计数。
  • OPEN: 当失败次数达到阈值,断路器打开。在一段时间内,所有同类任务都会被立即拒绝,不会执行。
  • HALF_OPEN: 在 OPEN 状态的超时结束后,断路器进入此状态。它会尝试处理一个任务。如果成功,断路器转为 CLOSED;如果失败,则立即回到 OPEN 状态,并重置超时。

下面是集成到 Worker 中的代码:

// worker/circuitBreaker.js

import { logger } from './logger.js';

const STATE = {
  CLOSED: 'CLOSED',
  OPEN: 'OPEN',
  HALF_OPEN: 'HALF_OPEN',
};

// 为每种任务类型创建一个断路器实例
const breakers = new Map();

class CircuitBreaker {
  constructor(name, options = {}) {
    this.name = name;
    this.state = STATE.CLOSED;
    this.failureThreshold = options.failureThreshold || 3; // 连续3次失败则打开
    this.recoveryTimeout = options.recoveryTimeout || 10000; // 10秒后进入半开状态
    this.failures = 0;
    this.lastFailureTime = null;
    
    logger.info(`Circuit breaker '${name}' initialized.`);
  }

  exec(taskFn) {
    if (this.state === STATE.OPEN) {
      if (Date.now() - this.lastFailureTime > this.recoveryTimeout) {
        this.state = STATE.HALF_OPEN;
        logger.warn({ breaker: this.name }, 'Breaker state changed to HALF_OPEN.');
      } else {
        logger.warn({ breaker: this.name }, 'Breaker is OPEN. Skipping execution.');
        return Promise.reject(new Error(`CircuitBreaker[${this.name}] is open.`));
      }
    }

    try {
      // 在 HALF_OPEN 或 CLOSED 状态下执行
      const promise = taskFn();
      promise.then(() => this.onSuccess()).catch((err) => this.onFailure(err));
      return promise;
    } catch (err) {
      this.onFailure(err);
      return Promise.reject(err);
    }
  }

  onSuccess() {
    if (this.state === STATE.HALF_OPEN) {
      logger.info({ breaker: this.name }, 'Breaker recovery successful. State changed to CLOSED.');
    }
    this.failures = 0;
    this.state = STATE.CLOSED;
  }

  onFailure(err) {
    this.failures++;
    this.lastFailureTime = Date.now();
    
    if (this.state === STATE.HALF_OPEN || this.failures >= this.failureThreshold) {
      if (this.state !== STATE.OPEN) {
        logger.error({ breaker: this.name, failures: this.failures }, `Breaker tripped! State changed to OPEN.`);
        this.state = STATE.OPEN;
      }
    }
  }
}

export function getBreaker(name) {
  if (!breakers.has(name)) {
    breakers.set(name, new CircuitBreaker(name));
  }
  return breakers.get(name);
}

现在,我们把这个断路器整合进 consumer.jsprocessMessage 函数:

// worker/consumer.js (modified processMessage)
import { getBreaker } from './circuitBreaker.js';

async function processMessage(message) {
  const task = JSON.parse(message.Body);
  const breaker = getBreaker(task.reportType); // 按报告类型获取或创建断路器

  try {
    // 将核心处理逻辑包装在断路器的 exec 方法中
    await breaker.exec(async () => {
      logger.info({ taskId: task.taskId, breaker: breaker.name }, 'Breaker is CLOSED/HALF_OPEN. Processing task.');
      
      let page;
      try {
        const browser = await getBrowser();
        page = await browser.newPage();
        const reportUrl = `${config.reportRenderer.baseUrl}?params=${encodeURIComponent(JSON.stringify(task.parameters))}`;
        await page.goto(reportUrl, { waitUntil: 'networkidle0', timeout: 60000 });
        const pdfBuffer = await page.pdf({ format: 'A4', printBackground: true });
        // ... 上传 S3, 更新数据库 ...
        logger.info({ taskId: task.taskId }, 'Report generated successfully.');
      } finally {
        if (page) await page.close();
      }
    });

    // 任务成功,从 SQS 删除消息
    await sqsClient.send(new DeleteMessageCommand({
      QueueUrl: config.sqs.queueUrl,
      ReceiptHandle: message.ReceiptHandle,
    }));

  } catch (error) {
    logger.error({ err: error, taskId: task.taskId, breaker: breaker.name }, 'Task processing failed or was rejected by circuit breaker.');
    // 失败或被熔断器拒绝,都不删除消息,让其自然返回队列
    // 断路器打开时,我们能快速失败,避免了启动 Puppeteer 的昂贵操作
  }
}

通过这个改造,当 user_activity 报告连续失败3次后,对应的断路器会打开。在接下来的10秒内,任何新的 user_activity 任务都会被 breaker.exec 立即拒绝,Worker 可以跳过它们去处理其他类型的任务,例如 sales_summary。10秒后,断路器进入半开状态,尝试处理一个 user_activity 任务,如果成功,系统恢复;如果再次失败,则继续保持熔断。

这种机制极大地提升了系统的韧性,防止了局部问题(某个报告类型的bug)演变成整个 Worker 集群的全局性灾难。

遗留问题与未来迭代方向

这个架构并非终点。当前的实现依然存在一些可以优化的地方:

  1. 断路器状态的持久化: 目前的断路器状态是存储在单个 Worker 实例的内存中的。这意味着每个 Worker 都有自己独立的断路器状态,无法在整个集群层面共享熔断信息。一个更健壮的方案是使用 Redis 或类似的高速缓存来存储断路器的状态(状态、失败次数、最后失败时间),让所有 Worker 共享同一个视图。
  2. Puppeteer 进程管理: 虽然复用了浏览器实例,但在高负载下,单个浏览器实例内的多个页面也可能互相影响。可以考虑引入一个浏览器实例池,或者为每个任务启动一个隔离的 Docker 容器来运行 Puppeteer,以实现更彻底的资源隔离,但这会增加架构的复杂性。
  3. 死信队列的自动化处理: 消息进入 DLQ 后,目前需要人工干预。可以构建一个小型自动化服务,专门消费 DLQ 中的消息,尝试解析失败原因,对可修复的错误进行修正后重新入队,或将无法处理的任务信息推送至告警系统。
  4. 动态扩缩容: Worker 的数量目前是固定的。一个更云原生的做法是,根据 SQS 队列的积压消息数量(ApproximateNumberOfMessagesVisible 指标)来配置 Auto Scaling Group,实现 Worker 节点的自动弹性伸缩。

  目录