构建一个基于消息队列与SSE的整洁架构实时任务进度反馈系统


在真实项目中,我们经常遇到需要向用户实时反馈长耗时任务进度的场景,例如数据报表生成、视频转码或批量数据处理。一个常见的错误是采用轮询(Polling)方案,即UI组件定期向后端发送HTTP请求查询任务状态。这种方式不仅给服务器和数据库带来不必要的周期性压力,还存在延迟,用户体验不佳。

一个典型的反模式实现可能如下所示:

// A controller endpoint for polling
@GetMapping("/tasks/{taskId}/status")
public ResponseEntity<TaskStatus> getTaskStatus(@PathVariable String taskId) {
    // This hits the database every few seconds for every active user.
    // It's a recipe for disaster at scale.
    Task task = taskRepository.findById(taskId);
    if (task == null) {
        return ResponseEntity.notFound().build();
    }
    TaskStatus status = new TaskStatus(task.getStatus(), task.getProgress());
    return ResponseEntity.ok(status);
}

这种设计将前端的实时性需求与后端的处理状态紧密耦合,违反了单一职责原则,且扩展性极差。一旦任务处理逻辑变得复杂,或者需要支持的用户量增加,整个系统将很快达到瓶颈。

定义问题:一个解耦、可扩展的实时反馈架构

我们需要一个架构,它必须满足以下几个核心要求:

  1. 实时推送: 任务进度的更新必须由服务器主动推送到客户端,而不是客户端轮询。
  2. 完全解耦: 任务的发起、执行与状态通知这三个环节必须是相互独立的,可以独立部署、扩展和修改。
  3. 高韧性: 系统需要能够应对客户端连接中断、服务节点故障等问题,保证消息至少被传递一次。
  4. 可维护性: 业务逻辑(任务具体做什么)必须与技术实现(如何通信)分离,以保证代码库的长期健康。

架构方案权衡

方案A:WebSocket + 数据库 Pub/Sub

一种看似直接的方案是使用WebSocket进行双向通信,后端工作节点在更新数据库状态后,利用数据库自身的发布/订阅机制(如PostgreSQL的NOTIFY/LISTEN)来触发消息推送。

  • 优势: WebSocket提供全双工通信,理论上延迟更低。
  • 劣势:
    1. 状态管理复杂: 维护大量WebSocket长连接对服务器是巨大的状态管理负担,尤其是在需要水平扩展时,连接状态的分发成为难题。
    2. 数据库耦合: 将消息系统与数据库绑定是一个严重的架构坏味道。这使得数据库成为整个实时系统的核心瓶颈和单点故障源。任何数据库的抖动都会直接冲击到推送的稳定性。
    3. 扩展性受限: 工作节点与通知节点都直接依赖数据库,无法做到真正的独立伸缩。

方案B:HTTP长轮询

即文章开头提到的反模式的变种,通过持有请求直到有新状态或超时。

  • 优势: 兼容性好,实现相对简单。
  • 劣势: 资源浪费严重,服务器需要为每个客户端维持一个挂起的请求。延迟不可控,本质上还是拉模型,并非真正的服务器推送。

最终选择:消息队列 + Server-Sent Events (SSE)

这个方案是我们最终采纳的架构。它将整个流程拆分为几个清晰的、通过消息队列进行异步通信的服务。

graph TD
    subgraph Browser
        A[UI Component]
    end

    subgraph API Gateway
        B[Task Submission API]
    end

    subgraph Backend Services
        C[Task Service]
        D[Message Queue]
        E[Worker Service]
        F[SSE Service]
    end
    
    A -- 1. Submit Task (HTTP Request) --> B
    B -- 2. Validate & Publish 'TaskCreated' event --> D
    C -.-> B
    E -- 3. Consume 'TaskCreated' event --> D
    E -- 4. Process Task & Publish 'ProgressUpdate' events --> D
    F -- 5. Consume 'ProgressUpdate' events --> D
    F -- 6. Push Progress (SSE) --> A
  • 为什么是SSE?: 针对这种只需要服务器到客户端单向推送的场景,SSE比WebSocket更轻量。它基于标准HTTP协议,无需特殊的协议升级,能更好地兼容现有的网络基础设施(如负载均衡器、防火墙)。此外,浏览器EventSource API内置了自动重连机制,简化了客户端的韧性设计。
  • 为什么是消息队列?: 这是实现解耦和韧性的核心。无论是RabbitMQ、Kafka还是Pulsar,它们都提供了可靠的异步通信机制。
    • 削峰填谷: 任务提交的洪峰会被消息队列缓冲,保护下游工作节点。
    • 持久化与重试: 消息可以被持久化,即使消费节点宕机,任务也不会丢失。
    • 独立扩展: 我们可以根据负载独立地增加Worker ServiceSSE Service的实例数量。
  • 为什么是整洁架构(Clean Architecture)?: 在Worker ServiceSSE Service内部,我们需要确保核心业务逻辑(例如,一个报表生成任务的具体步骤)不依赖于任何外部框架或工具(如RabbitMQ的客户端库或Spring WebFlux的SSE实现)。这使得核心逻辑可以独立于基础设施进行单元测试,并且未来更换消息队列或通知技术时,改动范围被限制在最外层的Infrastructure层。

核心实现概览

我们将以后端采用Java/Spring Boot,消息队列使用RabbitMQ为例,展示关键代码。

1. 整洁架构的项目结构

对于Worker Service,其结构如下:

worker-service/
├── domain/
│   ├── model/
│   │   ├── Task.java
│   │   └── ProgressEvent.java
│   └── repository/
│       └── TaskRepository.java // Interface
├── application/
│   ├── usecase/
│   │   └── ProcessTaskUseCase.java
│   └── port/
│       └── out/
│           └── ProgressEventPublisher.java // Interface
├── infrastructure/
│   ├── adapter/
│   │   ├── mq/
│   │   │   ├── TaskEventListener.java // Listens for new tasks
│   │   │   └── RabbitMQProgressPublisher.java // Implements ProgressEventPublisher
│   │   └── persistence/
│   │       └── PostgresTaskRepository.java // Implements TaskRepository
│   └── config/
│       └── RabbitMQConfig.java

这种结构强制将依赖方向从外部(Infrastructure)指向内部(Application, Domain),核心业务逻辑位于中心,完全不了解外部世界。

2. 任务提交与事件发布

任务提交API接收到请求后,并不直接执行任务,而是创建一个任务实体,存入数据库,然后发布一个TaskCreated事件。

// In Task Submission Service
@Service
@RequiredArgsConstructor
public class SubmitTaskUseCase {

    private final TaskRepository taskRepository;
    private final ApplicationEventPublisher eventPublisher; // Or a direct MQ publisher port

    @Transactional
    public String handle(CreateTaskCommand command) {
        // 1. Create and persist the initial task state
        Task task = new Task(command.getTaskType(), command.getPayload());
        taskRepository.save(task);

        // 2. Publish an event to the message queue
        // In a real system, this should be part of the transaction.
        // Using an outbox pattern is a robust way to achieve this.
        TaskCreatedEvent event = new TaskCreatedEvent(task.getId(), task.getTaskType(), task.getPayload());
        eventPublisher.publishEvent(event); // This will be sent to RabbitMQ

        return task.getId();
    }
}

这里的坑在于,数据库save操作和消息发布操作需要具备事务性。如果save成功但消息发布失败,任务将永远不会被执行。一个健壮的解决方案是采用Outbox模式,即将事件写入到与业务数据同一数据库的outbox表中,通过一个独立的进程轮询outbox表并将事件可靠地发布到消息队列。

3. Worker Service:消费任务并发布进度

Worker Service监听tasks.created队列。

// In worker-service/infrastructure/adapter/mq/TaskEventListener.java
@Component
@RequiredArgsConstructor
@Slf4j
public class TaskEventListener {

    private final ProcessTaskUseCase processTaskUseCase;

    @RabbitListener(queues = "tasks.created.queue")
    public void handleTaskCreatedEvent(TaskCreatedEvent event) {
        try {
            log.info("Received new task to process: {}", event.getTaskId());
            processTaskucus.execute(event.getTaskId());
        } catch (Exception e) {
            log.error("Failed to process task {}. Moving to dead-letter queue.", event.getTaskId(), e);
            // The listener configuration should handle automatic retries and dead-lettering.
            throw new AmqpRejectAndDontRequeueException("Processing failed", e);
        }
    }
}

核心的ProcessTaskUseCase则完全是纯粹的业务逻辑,它通过一个抽象的ProgressEventPublisher接口来发布进度,而不知道背后是RabbitMQ。

// In worker-service/application/usecase/ProcessTaskUseCase.java
@Service
@RequiredArgsConstructor
public class ProcessTaskUseCase {

    private final TaskRepository taskRepository;
    private final ProgressEventPublisher progressPublisher;

    public void execute(String taskId) {
        Task task = taskRepository.findById(taskId).orElseThrow(() -> new TaskNotFoundException(taskId));

        try {
            // Step 1
            task.updateStatus("PROCESSING_STEP_1", 10);
            taskRepository.save(task);
            progressPublisher.publish(new ProgressEvent(taskId, "Step 1: Data ingestion started.", 10));
            Thread.sleep(2000); // Simulate work

            // Step 2
            task.updateStatus("PROCESSING_STEP_2", 50);
            taskRepository.save(task);
            progressPublisher.publish(new ProgressEvent(taskId, "Step 2: Analysis complete.", 50));
            Thread.sleep(3000);

            // ... more steps ...

            // Final step
            task.updateStatus("COMPLETED", 100);
            taskRepository.save(task);
            progressPublisher.publish(new ProgressEvent(taskId, "Task completed successfully.", 100, true));

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            task.updateStatus("FAILED", task.getProgress());
            taskRepository.save(task);
            progressPublisher.publish(new ProgressEvent(taskId, "Task failed due to interruption.", task.getProgress(), true));
        }
    }
}

RabbitMQProgressPublisher是这个接口的具体实现,它负责将ProgressEvent对象序列化并发送到RabbitMQ的fanout交换机,以便所有SSE Service实例都能收到。

// In worker-service/infrastructure/adapter/mq/RabbitMQProgressPublisher.java
@Component
@RequiredArgsConstructor
public class RabbitMQProgressPublisher implements ProgressEventPublisher {

    private final RabbitTemplate rabbitTemplate;
    private final ObjectMapper objectMapper;

    // We use a fanout exchange to broadcast progress to all SSE service instances.
    private static final String PROGRESS_EXCHANGE = "tasks.progress.exchange";

    @Override
    public void publish(ProgressEvent event) {
        try {
            String message = objectMapper.writeValueAsString(event);
            rabbitTemplate.convertAndSend(PROGRESS_EXCHANGE, "", message);
        } catch (JsonProcessingException e) {
            // In a production system, this should trigger an alert.
            // If serialization fails, we risk losing progress updates.
            log.error("Failed to serialize ProgressEvent for task {}", event.getTaskId(), e);
        }
    }
}

4. SSE Service: 广播进度到客户端

SSE Service的核心职责是维护客户端连接,并作为消息队列和客户端之间的桥梁。

// In sse-service/SseController.java
@RestController
@Slf4j
public class SseController {

    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();

    @GetMapping("/tasks/{taskId}/progress")
    public SseEmitter streamProgress(@PathVariable String taskId) {
        // Create an emitter with a long timeout.
        SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
        
        // A common pitfall is not handling timeouts, errors, and completion.
        emitter.onCompletion(() -> {
            log.info("Emitter completed for task {}", taskId);
            emitters.remove(taskId);
        });
        emitter.onTimeout(() -> {
            log.warn("Emitter timed out for task {}", taskId);
            emitter.complete();
            emitters.remove(taskId);
        });
        emitter.onError(e -> {
            log.error("Emitter error for task {}", taskId, e);
            emitter.complete();
            emitters.remove(taskId);
        });

        emitters.put(taskId, emitter);

        // Send a confirmation event to establish the connection.
        try {
            emitter.send(SseEmitter.event().name("connected").data("Connection established."));
        } catch (IOException e) {
            log.error("Failed to send initial connection event for task {}", taskId, e);
            emitter.complete();
            emitters.remove(taskId);
        }

        return emitter;
    }

    // This method is called by the RabbitMQ listener
    public void dispatchProgressEvent(ProgressEvent event) {
        SseEmitter emitter = emitters.get(event.getTaskId());
        if (emitter != null) {
            try {
                emitter.send(SseEmitter.event().name("progress").id(String.valueOf(System.currentTimeMillis())).data(event));
                if (event.isTerminal()) {
                    emitter.complete();
                    emitters.remove(event.getTaskId());
                }
            } catch (IOException e) {
                log.warn("Failed to send progress to client for task {}. Client might have disconnected.", event.getTaskId());
                // The emitter's error/completion handlers will clean up.
            }
        }
    }
}

一个关键的细节是,上述emittersConcurrentHashMap实现是单实例的。在生产环境中,SSE Service会部署多个实例,一个用户的请求可能落在实例A,而处理其任务进度的消息可能被实例B的MQ消费者接收。为了解决这个问题,需要一个外部的、共享的订阅机制,例如Redis Pub/SubWorker Service将进度发布到Redis Channel,所有SSE Service实例都订阅这个Channel。当一个实例收到消息时,它检查自己管理的连接中是否有匹配的taskId,如果有,则推送。

5. 客户端UI组件的实现

客户端使用EventSource API,代码非常简洁。

// A simple UI component using vanilla JavaScript
class TaskProgressComponent {
    constructor(taskId, elementId) {
        this.taskId = taskId;
        this.element = document.getElementById(elementId);
        this.eventSource = null;
    }

    connect() {
        if (this.eventSource) {
            this.eventSource.close();
        }

        // The EventSource API handles reconnections automatically.
        this.eventSource = new EventSource(`/api/tasks/${this.taskId}/progress`);

        this.eventSource.addEventListener('connected', (event) => {
            console.log('SSE connection established:', event.data);
            this.updateUI('Connecting to task...');
        });

        this.eventSource.addEventListener('progress', (event) => {
            const progressData = JSON.parse(event.data);
            console.log('Received progress:', progressData);
            
            // This logic must be idempotent.
            // Receiving the same 50% update twice should not cause issues.
            this.updateUI(`[${progressData.percentage}%] ${progressData.message}`);

            if (progressData.terminal) {
                this.eventSource.close();
                this.updateUI(`Task finished: ${progressData.message}`);
            }
        });

        this.eventSource.onerror = (err) => {
            console.error('EventSource failed:', err);
            // Browser will attempt to reconnect automatically.
            // We might want to update the UI to show a "reconnecting" state.
            this.updateUI('Connection lost. Reconnecting...');
        };
    }

    updateUI(message) {
        this.element.innerHTML = message;
    }
}

// Usage:
// const progressComponent = new TaskProgressComponent('some-task-id-123', 'progress-container');
// progressComponent.connect();

这里的核心是客户端逻辑需要具备幂等性。由于消息队列通常提供“至少一次”的投递保证,SSE Service可能会收到重复的进度消息并转发给客户端。客户端UI的更新逻辑不应因此产生副作用。

架构的局限性与未来迭代路径

这个架构虽然健壮,但并非没有缺点。

  1. 最终一致性: 这是一个事件驱动的异步系统,客户端看到的状态与数据库中的状态之间存在毫秒级的延迟。对于大多数进度反馈场景来说,这不是问题,但对于需要强一致性的业务则不适用。
  2. 消息顺序: 虽然单个RabbitMQ队列内的消息是有序的,但如果Worker Service因为重试或并发处理而乱序发布进度事件,客户端可能会看到进度的“回跳”(例如从50%跳回到40%)。在事件中加入一个单调递增的序列号,让客户端忽略旧的事件,是一种常见的解决方案。
  3. 连接恢复后的状态同步: EventSource的自动重连只会接收到它断开连接之后的新事件。如果用户断网5分钟再回来,他会错过中间的所有进度。一个完善的方案是,客户端在connected事件后,可以主动发起一次HTTP请求获取任务的当前完整状态(快照),然后再开始监听实时事件流,确保UI展示的是最新、最完整的状态。
  4. SSE的连接数限制: 浏览器对每个域名下的并发HTTP连接数有限制(通常是6个),这包括SSE连接。如果一个页面需要同时展示大量任务的进度,可能会耗尽连接池。此时,可以考虑将多个任务的进度更新复用到同一个SSE连接上,通过事件内容来区分不同的任务。

  目录