在真实项目中,我们经常遇到需要向用户实时反馈长耗时任务进度的场景,例如数据报表生成、视频转码或批量数据处理。一个常见的错误是采用轮询(Polling)方案,即UI组件定期向后端发送HTTP请求查询任务状态。这种方式不仅给服务器和数据库带来不必要的周期性压力,还存在延迟,用户体验不佳。
一个典型的反模式实现可能如下所示:
// A controller endpoint for polling
@GetMapping("/tasks/{taskId}/status")
public ResponseEntity<TaskStatus> getTaskStatus(@PathVariable String taskId) {
// This hits the database every few seconds for every active user.
// It's a recipe for disaster at scale.
Task task = taskRepository.findById(taskId);
if (task == null) {
return ResponseEntity.notFound().build();
}
TaskStatus status = new TaskStatus(task.getStatus(), task.getProgress());
return ResponseEntity.ok(status);
}
这种设计将前端的实时性需求与后端的处理状态紧密耦合,违反了单一职责原则,且扩展性极差。一旦任务处理逻辑变得复杂,或者需要支持的用户量增加,整个系统将很快达到瓶颈。
定义问题:一个解耦、可扩展的实时反馈架构
我们需要一个架构,它必须满足以下几个核心要求:
- 实时推送: 任务进度的更新必须由服务器主动推送到客户端,而不是客户端轮询。
- 完全解耦: 任务的发起、执行与状态通知这三个环节必须是相互独立的,可以独立部署、扩展和修改。
- 高韧性: 系统需要能够应对客户端连接中断、服务节点故障等问题,保证消息至少被传递一次。
- 可维护性: 业务逻辑(任务具体做什么)必须与技术实现(如何通信)分离,以保证代码库的长期健康。
架构方案权衡
方案A:WebSocket + 数据库 Pub/Sub
一种看似直接的方案是使用WebSocket进行双向通信,后端工作节点在更新数据库状态后,利用数据库自身的发布/订阅机制(如PostgreSQL的NOTIFY/LISTEN)来触发消息推送。
- 优势: WebSocket提供全双工通信,理论上延迟更低。
- 劣势:
- 状态管理复杂: 维护大量WebSocket长连接对服务器是巨大的状态管理负担,尤其是在需要水平扩展时,连接状态的分发成为难题。
- 数据库耦合: 将消息系统与数据库绑定是一个严重的架构坏味道。这使得数据库成为整个实时系统的核心瓶颈和单点故障源。任何数据库的抖动都会直接冲击到推送的稳定性。
- 扩展性受限: 工作节点与通知节点都直接依赖数据库,无法做到真正的独立伸缩。
方案B:HTTP长轮询
即文章开头提到的反模式的变种,通过持有请求直到有新状态或超时。
- 优势: 兼容性好,实现相对简单。
- 劣势: 资源浪费严重,服务器需要为每个客户端维持一个挂起的请求。延迟不可控,本质上还是拉模型,并非真正的服务器推送。
最终选择:消息队列 + Server-Sent Events (SSE)
这个方案是我们最终采纳的架构。它将整个流程拆分为几个清晰的、通过消息队列进行异步通信的服务。
graph TD
subgraph Browser
A[UI Component]
end
subgraph API Gateway
B[Task Submission API]
end
subgraph Backend Services
C[Task Service]
D[Message Queue]
E[Worker Service]
F[SSE Service]
end
A -- 1. Submit Task (HTTP Request) --> B
B -- 2. Validate & Publish 'TaskCreated' event --> D
C -.-> B
E -- 3. Consume 'TaskCreated' event --> D
E -- 4. Process Task & Publish 'ProgressUpdate' events --> D
F -- 5. Consume 'ProgressUpdate' events --> D
F -- 6. Push Progress (SSE) --> A
- 为什么是SSE?: 针对这种只需要服务器到客户端单向推送的场景,SSE比WebSocket更轻量。它基于标准HTTP协议,无需特殊的协议升级,能更好地兼容现有的网络基础设施(如负载均衡器、防火墙)。此外,浏览器
EventSourceAPI内置了自动重连机制,简化了客户端的韧性设计。 - 为什么是消息队列?: 这是实现解耦和韧性的核心。无论是RabbitMQ、Kafka还是Pulsar,它们都提供了可靠的异步通信机制。
- 削峰填谷: 任务提交的洪峰会被消息队列缓冲,保护下游工作节点。
- 持久化与重试: 消息可以被持久化,即使消费节点宕机,任务也不会丢失。
- 独立扩展: 我们可以根据负载独立地增加
Worker Service或SSE Service的实例数量。
- 为什么是整洁架构(Clean Architecture)?: 在
Worker Service和SSE Service内部,我们需要确保核心业务逻辑(例如,一个报表生成任务的具体步骤)不依赖于任何外部框架或工具(如RabbitMQ的客户端库或Spring WebFlux的SSE实现)。这使得核心逻辑可以独立于基础设施进行单元测试,并且未来更换消息队列或通知技术时,改动范围被限制在最外层的Infrastructure层。
核心实现概览
我们将以后端采用Java/Spring Boot,消息队列使用RabbitMQ为例,展示关键代码。
1. 整洁架构的项目结构
对于Worker Service,其结构如下:
worker-service/
├── domain/
│ ├── model/
│ │ ├── Task.java
│ │ └── ProgressEvent.java
│ └── repository/
│ └── TaskRepository.java // Interface
├── application/
│ ├── usecase/
│ │ └── ProcessTaskUseCase.java
│ └── port/
│ └── out/
│ └── ProgressEventPublisher.java // Interface
├── infrastructure/
│ ├── adapter/
│ │ ├── mq/
│ │ │ ├── TaskEventListener.java // Listens for new tasks
│ │ │ └── RabbitMQProgressPublisher.java // Implements ProgressEventPublisher
│ │ └── persistence/
│ │ └── PostgresTaskRepository.java // Implements TaskRepository
│ └── config/
│ └── RabbitMQConfig.java
这种结构强制将依赖方向从外部(Infrastructure)指向内部(Application, Domain),核心业务逻辑位于中心,完全不了解外部世界。
2. 任务提交与事件发布
任务提交API接收到请求后,并不直接执行任务,而是创建一个任务实体,存入数据库,然后发布一个TaskCreated事件。
// In Task Submission Service
@Service
@RequiredArgsConstructor
public class SubmitTaskUseCase {
private final TaskRepository taskRepository;
private final ApplicationEventPublisher eventPublisher; // Or a direct MQ publisher port
@Transactional
public String handle(CreateTaskCommand command) {
// 1. Create and persist the initial task state
Task task = new Task(command.getTaskType(), command.getPayload());
taskRepository.save(task);
// 2. Publish an event to the message queue
// In a real system, this should be part of the transaction.
// Using an outbox pattern is a robust way to achieve this.
TaskCreatedEvent event = new TaskCreatedEvent(task.getId(), task.getTaskType(), task.getPayload());
eventPublisher.publishEvent(event); // This will be sent to RabbitMQ
return task.getId();
}
}
这里的坑在于,数据库save操作和消息发布操作需要具备事务性。如果save成功但消息发布失败,任务将永远不会被执行。一个健壮的解决方案是采用Outbox模式,即将事件写入到与业务数据同一数据库的outbox表中,通过一个独立的进程轮询outbox表并将事件可靠地发布到消息队列。
3. Worker Service:消费任务并发布进度
Worker Service监听tasks.created队列。
// In worker-service/infrastructure/adapter/mq/TaskEventListener.java
@Component
@RequiredArgsConstructor
@Slf4j
public class TaskEventListener {
private final ProcessTaskUseCase processTaskUseCase;
@RabbitListener(queues = "tasks.created.queue")
public void handleTaskCreatedEvent(TaskCreatedEvent event) {
try {
log.info("Received new task to process: {}", event.getTaskId());
processTaskucus.execute(event.getTaskId());
} catch (Exception e) {
log.error("Failed to process task {}. Moving to dead-letter queue.", event.getTaskId(), e);
// The listener configuration should handle automatic retries and dead-lettering.
throw new AmqpRejectAndDontRequeueException("Processing failed", e);
}
}
}
核心的ProcessTaskUseCase则完全是纯粹的业务逻辑,它通过一个抽象的ProgressEventPublisher接口来发布进度,而不知道背后是RabbitMQ。
// In worker-service/application/usecase/ProcessTaskUseCase.java
@Service
@RequiredArgsConstructor
public class ProcessTaskUseCase {
private final TaskRepository taskRepository;
private final ProgressEventPublisher progressPublisher;
public void execute(String taskId) {
Task task = taskRepository.findById(taskId).orElseThrow(() -> new TaskNotFoundException(taskId));
try {
// Step 1
task.updateStatus("PROCESSING_STEP_1", 10);
taskRepository.save(task);
progressPublisher.publish(new ProgressEvent(taskId, "Step 1: Data ingestion started.", 10));
Thread.sleep(2000); // Simulate work
// Step 2
task.updateStatus("PROCESSING_STEP_2", 50);
taskRepository.save(task);
progressPublisher.publish(new ProgressEvent(taskId, "Step 2: Analysis complete.", 50));
Thread.sleep(3000);
// ... more steps ...
// Final step
task.updateStatus("COMPLETED", 100);
taskRepository.save(task);
progressPublisher.publish(new ProgressEvent(taskId, "Task completed successfully.", 100, true));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
task.updateStatus("FAILED", task.getProgress());
taskRepository.save(task);
progressPublisher.publish(new ProgressEvent(taskId, "Task failed due to interruption.", task.getProgress(), true));
}
}
}
RabbitMQProgressPublisher是这个接口的具体实现,它负责将ProgressEvent对象序列化并发送到RabbitMQ的fanout交换机,以便所有SSE Service实例都能收到。
// In worker-service/infrastructure/adapter/mq/RabbitMQProgressPublisher.java
@Component
@RequiredArgsConstructor
public class RabbitMQProgressPublisher implements ProgressEventPublisher {
private final RabbitTemplate rabbitTemplate;
private final ObjectMapper objectMapper;
// We use a fanout exchange to broadcast progress to all SSE service instances.
private static final String PROGRESS_EXCHANGE = "tasks.progress.exchange";
@Override
public void publish(ProgressEvent event) {
try {
String message = objectMapper.writeValueAsString(event);
rabbitTemplate.convertAndSend(PROGRESS_EXCHANGE, "", message);
} catch (JsonProcessingException e) {
// In a production system, this should trigger an alert.
// If serialization fails, we risk losing progress updates.
log.error("Failed to serialize ProgressEvent for task {}", event.getTaskId(), e);
}
}
}
4. SSE Service: 广播进度到客户端
SSE Service的核心职责是维护客户端连接,并作为消息队列和客户端之间的桥梁。
// In sse-service/SseController.java
@RestController
@Slf4j
public class SseController {
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
@GetMapping("/tasks/{taskId}/progress")
public SseEmitter streamProgress(@PathVariable String taskId) {
// Create an emitter with a long timeout.
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
// A common pitfall is not handling timeouts, errors, and completion.
emitter.onCompletion(() -> {
log.info("Emitter completed for task {}", taskId);
emitters.remove(taskId);
});
emitter.onTimeout(() -> {
log.warn("Emitter timed out for task {}", taskId);
emitter.complete();
emitters.remove(taskId);
});
emitter.onError(e -> {
log.error("Emitter error for task {}", taskId, e);
emitter.complete();
emitters.remove(taskId);
});
emitters.put(taskId, emitter);
// Send a confirmation event to establish the connection.
try {
emitter.send(SseEmitter.event().name("connected").data("Connection established."));
} catch (IOException e) {
log.error("Failed to send initial connection event for task {}", taskId, e);
emitter.complete();
emitters.remove(taskId);
}
return emitter;
}
// This method is called by the RabbitMQ listener
public void dispatchProgressEvent(ProgressEvent event) {
SseEmitter emitter = emitters.get(event.getTaskId());
if (emitter != null) {
try {
emitter.send(SseEmitter.event().name("progress").id(String.valueOf(System.currentTimeMillis())).data(event));
if (event.isTerminal()) {
emitter.complete();
emitters.remove(event.getTaskId());
}
} catch (IOException e) {
log.warn("Failed to send progress to client for task {}. Client might have disconnected.", event.getTaskId());
// The emitter's error/completion handlers will clean up.
}
}
}
}
一个关键的细节是,上述emitters的ConcurrentHashMap实现是单实例的。在生产环境中,SSE Service会部署多个实例,一个用户的请求可能落在实例A,而处理其任务进度的消息可能被实例B的MQ消费者接收。为了解决这个问题,需要一个外部的、共享的订阅机制,例如Redis Pub/Sub。Worker Service将进度发布到Redis Channel,所有SSE Service实例都订阅这个Channel。当一个实例收到消息时,它检查自己管理的连接中是否有匹配的taskId,如果有,则推送。
5. 客户端UI组件的实现
客户端使用EventSource API,代码非常简洁。
// A simple UI component using vanilla JavaScript
class TaskProgressComponent {
constructor(taskId, elementId) {
this.taskId = taskId;
this.element = document.getElementById(elementId);
this.eventSource = null;
}
connect() {
if (this.eventSource) {
this.eventSource.close();
}
// The EventSource API handles reconnections automatically.
this.eventSource = new EventSource(`/api/tasks/${this.taskId}/progress`);
this.eventSource.addEventListener('connected', (event) => {
console.log('SSE connection established:', event.data);
this.updateUI('Connecting to task...');
});
this.eventSource.addEventListener('progress', (event) => {
const progressData = JSON.parse(event.data);
console.log('Received progress:', progressData);
// This logic must be idempotent.
// Receiving the same 50% update twice should not cause issues.
this.updateUI(`[${progressData.percentage}%] ${progressData.message}`);
if (progressData.terminal) {
this.eventSource.close();
this.updateUI(`Task finished: ${progressData.message}`);
}
});
this.eventSource.onerror = (err) => {
console.error('EventSource failed:', err);
// Browser will attempt to reconnect automatically.
// We might want to update the UI to show a "reconnecting" state.
this.updateUI('Connection lost. Reconnecting...');
};
}
updateUI(message) {
this.element.innerHTML = message;
}
}
// Usage:
// const progressComponent = new TaskProgressComponent('some-task-id-123', 'progress-container');
// progressComponent.connect();
这里的核心是客户端逻辑需要具备幂等性。由于消息队列通常提供“至少一次”的投递保证,SSE Service可能会收到重复的进度消息并转发给客户端。客户端UI的更新逻辑不应因此产生副作用。
架构的局限性与未来迭代路径
这个架构虽然健壮,但并非没有缺点。
- 最终一致性: 这是一个事件驱动的异步系统,客户端看到的状态与数据库中的状态之间存在毫秒级的延迟。对于大多数进度反馈场景来说,这不是问题,但对于需要强一致性的业务则不适用。
- 消息顺序: 虽然单个RabbitMQ队列内的消息是有序的,但如果
Worker Service因为重试或并发处理而乱序发布进度事件,客户端可能会看到进度的“回跳”(例如从50%跳回到40%)。在事件中加入一个单调递增的序列号,让客户端忽略旧的事件,是一种常见的解决方案。 - 连接恢复后的状态同步:
EventSource的自动重连只会接收到它断开连接之后的新事件。如果用户断网5分钟再回来,他会错过中间的所有进度。一个完善的方案是,客户端在connected事件后,可以主动发起一次HTTP请求获取任务的当前完整状态(快照),然后再开始监听实时事件流,确保UI展示的是最新、最完整的状态。 - SSE的连接数限制: 浏览器对每个域名下的并发HTTP连接数有限制(通常是6个),这包括SSE连接。如果一个页面需要同时展示大量任务的进度,可能会耗尽连接池。此时,可以考虑将多个任务的进度更新复用到同一个SSE连接上,通过事件内容来区分不同的任务。