最初的矛盾爆发在一次产品演示中。我们的SaaS平台提供一个复杂的报表功能,前端使用 Ant Design 的 Form 和 Table 组件,允许用户自定义查询条件,然后导出一份精美的PDF报告。最初的实现简单粗暴:一个同步的 Fastify API 接口,接收前端参数,后端用 Puppeteer 实时渲染页面并生成PDF,然后直接返回文件流。在开发环境和少数用户的场景下,它工作得很好。
然而,当报告的数据量增大,渲染变得复杂时,这个接口的响应时间能轻易超过30秒,直接触发了API网关的超时。更糟糕的是,在高并发请求下,服务器上瞬间启动的大量 Chrome 无头实例会迅速耗尽 CPU 和内存资源,导致整个后端服务雪崩。这是典型的将长时间运行的计算密集型任务同步处理的后果。
敏捷开发的核心之一是快速响应变化。在下一次的迭代周期中,重构这个摇摇欲坠的功能成了最高优先级的任务。
初步构想:从同步到异步的必然演进
技术痛点非常明确:必须将PDF生成这个重度操作与主API流程解耦。用户点击“导出”后,API应该立即响应,告知“任务已提交,请稍后在通知中心查看”,而不是让用户盯着加载动画直到超时。
这自然导向了基于消息队列的异步任务处理架构。
- API 网关 (Fastify): 角色转变为一个纯粹的任务分发器。它负责接收和校验来自 Ant Design 前端的请求,然后将生成任务作为一个消息推送到队列中,立刻返回
202 Accepted。Fastify 的轻量和高性能特性非常适合这种“即发即忘”的场景。 - 消息队列 (AWS SQS): 我们选择 SQS 的原因在于它的完全托管特性、高可用性以及与 AWS 生态的无缝集成,特别是其“死信队列(Dead Letter Queue, DLQ)”机制,对于处理失败任务至关重要。相比于自建 RabbitMQ 或 Kafka,SQS 在这个场景下运维成本几乎为零。
- 任务处理器 (Worker): 一个或多个独立的 Node.js 进程,持续从 SQS 拉取任务。每个任务都驱动 Puppeteer 完成一次PDF生成。这些 Worker 可以独立于主API部署和扩缩容,实现了资源的物理隔离。
这个架构在理论上解决了同步调用的所有问题。现在,我们需要将它付诸实践。
步骤一:改造 Fastify API 为任务生产者
首先重构 Fastify 的接口。旧的代码被废弃,新的 /reports/generate-async 接口只做三件事:验证输入、构造消息体、发送到 SQS。
// src/routes/report.js
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
import { v4 as uuidv4 } from 'uuid';
import { config } from '../config.js';
// 初始化 SQS 客户端
// 在真实项目中,配置(region, credentials)应该通过环境变量或 IAM 角色注入
const sqsClient = new SQSClient({
region: config.aws.region,
credentials: {
accessKeyId: config.aws.accessKeyId,
secretAccessKey: config.aws.secretAccessKey,
},
});
async function routes(fastify, options) {
const reportRequestSchema = {
body: {
type: 'object',
required: ['reportType', 'parameters', 'userId'],
properties: {
reportType: { type: 'string', enum: ['sales_summary', 'user_activity'] },
parameters: { type: 'object' },
userId: { type: 'string', format: 'uuid' },
},
},
response: {
202: {
type: 'object',
properties: {
message: { type: 'string' },
taskId: { type: 'string' },
},
},
},
};
fastify.post('/reports/generate-async', { schema: reportRequestSchema }, async (request, reply) => {
const taskId = uuidv4();
const { reportType, parameters, userId } = request.body;
const messagePayload = {
taskId,
reportType,
parameters,
userId,
submittedAt: new Date().toISOString(),
};
const command = new SendMessageCommand({
QueueUrl: config.sqs.queueUrl,
MessageBody: JSON.stringify(messagePayload),
// 使用 MessageGroupId 确保 FIFO 队列中的消息分组(如果需要)
// 对于标准队列则非必需
MessageGroupId: reportType,
// 使用 taskId 进行内容去重
MessageDeduplicationId: taskId,
});
try {
await sqsClient.send(command);
fastify.log.info({ taskId, userId }, `Report generation task successfully queued.`);
reply.code(202).send({
message: 'Report generation task has been accepted.',
taskId: taskId,
});
} catch (error) {
fastify.log.error({ err: error, taskId }, 'Failed to queue report generation task.');
// 这里的错误处理很关键,如果队列服务不可用,不能欺骗客户端任务已接受
reply.code(500).send({ error: 'Failed to submit the task. Please try again later.' });
}
});
}
export default routes;
这段代码有几个生产实践要点:
- Schema 验证: Fastify 的 schema 功能能在路由处理函数执行前自动验证请求体,代码更整洁且安全。
- AWS SDK v3: 使用模块化的 AWS SDK v3,只引入必要的客户端
@aws-sdk/client-sqs。 - 唯一任务ID: 生成一个
taskId并将其放入消息体和返回给客户端。这对于后续追踪任务状态至关重要。 - 错误处理: 明确处理 SQS 发送失败的情况。如果连任务都无法入队,必须向客户端返回一个服务端错误,而不是一个虚假的成功响应。
- 日志: 关键路径上的日志记录是必须的,包括任务ID和用户ID,便于问题排查。
步骤二:构建消费 SQS 消息的 Puppeteer Worker
Worker 是这个架构的核心,也是最脆弱的部分。它是一个独立的 Node.js 服务,主要逻辑是循环地从 SQS 拉取消息并处理。
// worker/consumer.js
import { SQSClient, ReceiveMessageCommand, DeleteMessageCommand } from '@aws-sdk/client-sqs';
import puppeteer from 'puppeteer';
import { config } from './config.js';
import { logger } from './logger.js';
const sqsClient = new SQSClient({ region: config.aws.region, ... });
// Puppeteer 浏览器实例管理
// 一个常见的错误是在每次处理消息时都启动和关闭浏览器,开销巨大。
// 更好的做法是复用一个浏览器实例。
let browserInstance;
async function getBrowser() {
if (!browserInstance || !browserInstance.isConnected()) {
logger.info('Launching new Puppeteer browser instance.');
browserInstance = await puppeteer.launch({
headless: true,
args: [
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-dev-shm-usage', // 在 Docker 环境中至关重要
],
});
}
return browserInstance;
}
async function processMessage(message) {
const task = JSON.parse(message.Body);
logger.info({ taskId: task.taskId }, 'Starting to process report task.');
let page;
try {
const browser = await getBrowser();
page = await browser.newPage();
// 这里的 URL 应该指向一个内部服务,该服务能根据参数渲染报表的 HTML 页面。
// 为了样式统一,这个内部页面可以复用 Ant Design 的样式表。
const reportUrl = `${config.reportRenderer.baseUrl}?params=${encodeURIComponent(JSON.stringify(task.parameters))}`;
await page.goto(reportUrl, { waitUntil: 'networkidle0', timeout: 60000 });
// 生成 PDF 并上传到 S3
const pdfBuffer = await page.pdf({ format: 'A4', printBackground: true });
// ... 此处省略上传 S3 和更新数据库任务状态的代码 ...
logger.info({ taskId: task.taskId }, 'Report generated and stored successfully.');
// 任务成功处理后,从 SQS 删除消息
await sqsClient.send(new DeleteMessageCommand({
QueueUrl: config.sqs.queueUrl,
ReceiptHandle: message.ReceiptHandle,
}));
} catch (error) {
logger.error({ err: error, taskId: task.taskId }, 'Error processing report task.');
// 故意不删除消息,让它在 VisibilityTimeout 后重回队列,由 SQS 的 redrive policy 处理
throw error; // 向上抛出异常,让主循环知道处理失败
} finally {
if (page) {
await page.close(); // 确保页面被关闭,防止内存泄漏
}
}
}
async function startPolling() {
logger.info('Worker started. Polling for messages...');
while (true) {
try {
const command = new ReceiveMessageCommand({
QueueUrl: config.sqs.queueUrl,
MaxNumberOfMessages: 5, // 一次最多拉取5条
WaitTimeSeconds: 20, // 启用长轮询,降低API调用成本
MessageAttributeNames: ['All'],
});
const response = await sqsClient.send(command);
if (response.Messages && response.Messages.length > 0) {
logger.info(`Received ${response.Messages.length} messages.`);
const processingPromises = response.Messages.map(processMessage);
await Promise.allSettled(processingPromises); // 并行处理,等待所有任务完成(不论成功或失败)
}
} catch (error) {
logger.error({ err: error }, 'Error during SQS polling loop.');
await new Promise(resolve => setTimeout(resolve, 5000)); // 发生严重错误时,等待5秒再重试
}
}
}
startPolling();
这个 Worker 的实现考虑了几个生产环境中的关键点:
- 长轮询:
WaitTimeSeconds: 20告诉 SQS,如果没有消息,请保持连接打开最多20秒。这极大地减少了空轮询的次数,节省了成本和网络流量。 - 浏览器实例复用: 通过
getBrowser函数,我们确保只在需要时启动一个浏览器实例,并供所有任务处理共享,显著降低了性能开札。 - 资源清理:
finally块中的page.close()是防止内存泄漏的生命线。无论成功或失败,每个打开的页面都必须被关闭。 - 并发处理:
Promise.allSettled允许我们并行处理一批消息,提高了吞吐量,同时确保单个消息的处理失败不会中断其他消息的处理。 - 失败处理: 当
processMessage抛出异常时,我们不会删除该消息。这使得消息在“可见性超时”后会重新出现在队列中,SQS 会根据配置的重试策略进行重试。达到最大重试次数后,消息会被自动移入预先配置的死信队列(DLQ),等待人工介入。
迭代中的新风暴:毒丸消息与服务雪崩
架构上线后,初期运行平稳。但很快,我们遇到了一个更隐蔽的问题。某个特定类型的报告,由于其数据源的一个 bug,总会导致 Puppeteer 渲染超时或崩溃。
这个消息成了一个“毒丸”。它被 Worker 取出,处理失败,由于我们没有删除它,它回到队列。几分钟后,它又被同一个或另一个 Worker 取出,再次处理失败。这个过程无限循环。
更糟的是,SQS 的死信队列策略是基于接收次数的(maxReceiveCount)。如果我们将 maxReceiveCount 设置为5,那么这个毒丸消息会造成5次重量级的 Puppeteer 启动、渲染、崩溃的循环,才会被隔离到 DLQ。在此期间,它浪费了大量的计算资源。如果同时出现多个这样的毒丸消息,我们的 Worker 集群会把所有资源都浪费在处理这些注定失败的任务上,正常任务被阻塞, фактически 造成了 Worker 服务的雪崩。
我们需要一个更智能、反应更迅速的熔断机制。
解决方案:在 Worker 中实现一个轻量级状态机熔断器
我们需要一个机制,当 Worker 发现某种类型的任务(比如 reportType: 'user_activity')在短时间内连续失败多次时,能主动“熔断”,在接下来的一段时间内直接拒绝处理同类型的任务,让它们暂时留在队列里,给其他类型的任务让路。这本质上是一个客户端侧的断路器模式。
我们不引入复杂的库,而是自己实现一个简单的、基于内存的状态机。
graph TD
A[CLOSED] -- Failure Threshold Reached --> B(OPEN);
B -- Timeout --> C{HALF_OPEN};
C -- One Success --> A;
C -- One Failure --> B;
A -- Success --> A;
这个断路器有三个状态:
- CLOSED: 正常状态,所有任务都正常处理。失败会被计数。
- OPEN: 当失败次数达到阈值,断路器打开。在一段时间内,所有同类任务都会被立即拒绝,不会执行。
- HALF_OPEN: 在 OPEN 状态的超时结束后,断路器进入此状态。它会尝试处理一个任务。如果成功,断路器转为 CLOSED;如果失败,则立即回到 OPEN 状态,并重置超时。
下面是集成到 Worker 中的代码:
// worker/circuitBreaker.js
import { logger } from './logger.js';
const STATE = {
CLOSED: 'CLOSED',
OPEN: 'OPEN',
HALF_OPEN: 'HALF_OPEN',
};
// 为每种任务类型创建一个断路器实例
const breakers = new Map();
class CircuitBreaker {
constructor(name, options = {}) {
this.name = name;
this.state = STATE.CLOSED;
this.failureThreshold = options.failureThreshold || 3; // 连续3次失败则打开
this.recoveryTimeout = options.recoveryTimeout || 10000; // 10秒后进入半开状态
this.failures = 0;
this.lastFailureTime = null;
logger.info(`Circuit breaker '${name}' initialized.`);
}
exec(taskFn) {
if (this.state === STATE.OPEN) {
if (Date.now() - this.lastFailureTime > this.recoveryTimeout) {
this.state = STATE.HALF_OPEN;
logger.warn({ breaker: this.name }, 'Breaker state changed to HALF_OPEN.');
} else {
logger.warn({ breaker: this.name }, 'Breaker is OPEN. Skipping execution.');
return Promise.reject(new Error(`CircuitBreaker[${this.name}] is open.`));
}
}
try {
// 在 HALF_OPEN 或 CLOSED 状态下执行
const promise = taskFn();
promise.then(() => this.onSuccess()).catch((err) => this.onFailure(err));
return promise;
} catch (err) {
this.onFailure(err);
return Promise.reject(err);
}
}
onSuccess() {
if (this.state === STATE.HALF_OPEN) {
logger.info({ breaker: this.name }, 'Breaker recovery successful. State changed to CLOSED.');
}
this.failures = 0;
this.state = STATE.CLOSED;
}
onFailure(err) {
this.failures++;
this.lastFailureTime = Date.now();
if (this.state === STATE.HALF_OPEN || this.failures >= this.failureThreshold) {
if (this.state !== STATE.OPEN) {
logger.error({ breaker: this.name, failures: this.failures }, `Breaker tripped! State changed to OPEN.`);
this.state = STATE.OPEN;
}
}
}
}
export function getBreaker(name) {
if (!breakers.has(name)) {
breakers.set(name, new CircuitBreaker(name));
}
return breakers.get(name);
}
现在,我们把这个断路器整合进 consumer.js 的 processMessage 函数:
// worker/consumer.js (modified processMessage)
import { getBreaker } from './circuitBreaker.js';
async function processMessage(message) {
const task = JSON.parse(message.Body);
const breaker = getBreaker(task.reportType); // 按报告类型获取或创建断路器
try {
// 将核心处理逻辑包装在断路器的 exec 方法中
await breaker.exec(async () => {
logger.info({ taskId: task.taskId, breaker: breaker.name }, 'Breaker is CLOSED/HALF_OPEN. Processing task.');
let page;
try {
const browser = await getBrowser();
page = await browser.newPage();
const reportUrl = `${config.reportRenderer.baseUrl}?params=${encodeURIComponent(JSON.stringify(task.parameters))}`;
await page.goto(reportUrl, { waitUntil: 'networkidle0', timeout: 60000 });
const pdfBuffer = await page.pdf({ format: 'A4', printBackground: true });
// ... 上传 S3, 更新数据库 ...
logger.info({ taskId: task.taskId }, 'Report generated successfully.');
} finally {
if (page) await page.close();
}
});
// 任务成功,从 SQS 删除消息
await sqsClient.send(new DeleteMessageCommand({
QueueUrl: config.sqs.queueUrl,
ReceiptHandle: message.ReceiptHandle,
}));
} catch (error) {
logger.error({ err: error, taskId: task.taskId, breaker: breaker.name }, 'Task processing failed or was rejected by circuit breaker.');
// 失败或被熔断器拒绝,都不删除消息,让其自然返回队列
// 断路器打开时,我们能快速失败,避免了启动 Puppeteer 的昂贵操作
}
}
通过这个改造,当 user_activity 报告连续失败3次后,对应的断路器会打开。在接下来的10秒内,任何新的 user_activity 任务都会被 breaker.exec 立即拒绝,Worker 可以跳过它们去处理其他类型的任务,例如 sales_summary。10秒后,断路器进入半开状态,尝试处理一个 user_activity 任务,如果成功,系统恢复;如果再次失败,则继续保持熔断。
这种机制极大地提升了系统的韧性,防止了局部问题(某个报告类型的bug)演变成整个 Worker 集群的全局性灾难。
遗留问题与未来迭代方向
这个架构并非终点。当前的实现依然存在一些可以优化的地方:
- 断路器状态的持久化: 目前的断路器状态是存储在单个 Worker 实例的内存中的。这意味着每个 Worker 都有自己独立的断路器状态,无法在整个集群层面共享熔断信息。一个更健壮的方案是使用 Redis 或类似的高速缓存来存储断路器的状态(状态、失败次数、最后失败时间),让所有 Worker 共享同一个视图。
- Puppeteer 进程管理: 虽然复用了浏览器实例,但在高负载下,单个浏览器实例内的多个页面也可能互相影响。可以考虑引入一个浏览器实例池,或者为每个任务启动一个隔离的 Docker 容器来运行 Puppeteer,以实现更彻底的资源隔离,但这会增加架构的复杂性。
- 死信队列的自动化处理: 消息进入 DLQ 后,目前需要人工干预。可以构建一个小型自动化服务,专门消费 DLQ 中的消息,尝试解析失败原因,对可修复的错误进行修正后重新入队,或将无法处理的任务信息推送至告警系统。
- 动态扩缩容: Worker 的数量目前是固定的。一个更云原生的做法是,根据 SQS 队列的积压消息数量(
ApproximateNumberOfMessagesVisible指标)来配置 Auto Scaling Group,实现 Worker 节点的自动弹性伸缩。