构建异构分布式任务编排系统以调度高性能C++计算核心


我们面临一个典型的工程困境:一个团队维护着一个极其高效的C++计算库。它是一个命令行工具,接收配置文件,执行数小时的密集计算,然后输出结果。这个工具是业务核心,但它也是个黑盒,脆弱且难以集成。每次任务失败,都需要工程师手动介入,检查日志,清理半成品文件,然后重试。将其服务化的需求变得异常迫切,但简单的用一层API包裹起来,只会把问题从线下运维转移到线上。

任务是长周期的,涉及多个计算阶段,任何一个阶段都可能失败。这意味着我们需要一个可靠的编排层来管理整个生命周期,保证任务的最终一致性。一个直接的RESTful API调用,然后等待C++进程返回结果的同步模型,在这里是行不通的。客户端会超时,服务本身在等待期间也会大量占用资源。

初步的构想是构建一个完全解耦的异步系统。前端UI负责提交任务,一个Go语言编写的编排服务负责接收请求、将任务分解为多个阶段,并调度底层的C++工作节点。任务状态的流转和一致性保证,必须遵循分布式系统中的BASE理论,而非强一致性的ACID。

技术选型决策与权衡

  1. 编排层: Go
    Go的并发模型(goroutine和channel)非常适合构建处理高并发I/O的中间件。编排服务的主要职责是网络通信、状态管理和事件流转,而不是CPU密集型计算。Go的静态编译、快速启动和简洁的语法,使其成为这个角色的理想选择。

  2. 一致性模型: BASE 与 Saga 模式
    由于任务的长时间运行特性,跨多个阶段的分布式事务(如两阶段提交)会长时间锁定资源,严重影响系统可用性。我们选择了基于BASE理论的最终一致性方案。Saga模式是实现这一点的经典方法。我们将采用“编排式Saga”(Orchestration-based Saga),由Go编排器作为中心协调者,负责调用每个服务,并在失败时调用相应的补偿操作。这比“协同式Saga”(Choreography-based Saga)更容易追踪和管理复杂流程。

  3. 异构通信: gRPC
    Go编排器与C++计算核心之间的通信需要一个高性能、强类型的RPC框架。gRPC基于HTTP/2,使用Protocol Buffers进行序列化,性能优越,且天然支持跨语言调用。定义清晰的.proto接口文件,可以作为两者之间不可动摇的契约。

  4. 前端测试: React Testing Library
    前端需要一个可靠的仪表盘来实时展示任务状态。这些状态(PENDING, STAGE_1_RUNNING, STAGE_1_FAILED, COMPENSATING, COMPLETED)是异步更新的,通常通过WebSocket从后端推送。测试这种复杂的UI行为,如果过度依赖组件内部实现,会导致测试非常脆弱。React Testing Library (RTL) 提倡从用户交互的角度进行测试,模拟用户所见所为,这使得我们的测试用例在重构UI实现时依然保持稳定。

  5. 跨语言代码规范: Prettier
    这个项目横跨Go、C++、TypeScript/JavaScript。在真实项目中,代码风格的争论会消耗大量精力。我们决定采用Prettier作为统一的代码格式化工具,并在CI流程中强制执行。虽然Prettier对JavaScript/TypeScript的支持是原生的,但对Go和C++可以通过插件或与其他工具(如gofmt, clang-format)集成,形成一个统一的格式化命令。这保证了代码库的整洁和可读性,降低了代码审查的认知负担。

步骤化实现

1. 定义服务契约: Protocol Buffers

一切从定义Go和C++之间的通信接口开始。这是我们整个系统的基石。

protos/task.proto:

syntax = "proto3";

package task;

option go_package = ".;pb";

// C++ 计算工作节点的gRPC服务定义
service ComputationWorker {
  // 执行一个计算阶段
  rpc ExecuteStage(ExecuteStageRequest) returns (ExecuteStageResponse);
}

// 任务阶段的定义
message ComputationStage {
  string stage_name = 1;    // 阶段名称, e.g., "preprocessing", "core_computation"
  map<string, string> parameters = 2; // 该阶段所需的参数
}

// 请求体
message ExecuteStageRequest {
  string job_id = 1; // 整个任务的唯一ID
  ComputationStage stage = 2; // 当前需要执行的阶段
}

// 响应体
message ExecuteStageResponse {
  string job_id = 1;
  bool success = 2;
  string message = 3; // 成功或失败的信息
  string output_path = 4; // 结果文件的路径
}

这个.proto文件清晰地定义了ComputationWorker服务,它只有一个ExecuteStage方法。这种简洁性是刻意为之的,它将C++核心封装成一个无状态的“函数”,所有状态管理都在Go编排器中进行。

2. 实现C++ gRPC工作节点

C++工作节点需要实现ComputationWorker服务。这里的核心是接收请求,执行模拟的长时间计算,并返回结果。在真实项目中,这里会调用那个核心的计算库。

worker/main.cc:

#include <iostream>
#include <string>
#include <memory>
#include <thread>
#include <chrono>

#include <grpcpp/grpcpp.h>
#include "task.grpc.pb.h"

// 模拟日志记录
void Log(const std::string& message) {
    auto now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
    std::cout << std::ctime(&now) << " [Worker] " << message << std::endl;
}

class ComputationWorkerImpl final : public task::ComputationWorker::Service {
public:
    grpc::Status ExecuteStage(grpc::ServerContext* context,
                              const task::ExecuteStageRequest* request,
                              task::ExecuteStageResponse* response) override {
        
        std::string job_id = request->job_id();
        std::string stage_name = request->stage().stage_name();

        Log("Received request for job " + job_id + ", stage " + stage_name);

        response->set_job_id(job_id);

        try {
            // ----- 核心计算逻辑 -----
            // 这是一个模拟,实际项目中会调用复杂的C++库
            Log("Starting computation for stage: " + stage_name);
            if (stage_name == "preprocessing") {
                std::this_thread::sleep_for(std::chrono::seconds(5)); // 模拟IO密集型任务
            } else if (stage_name == "core_computation") {
                // 模拟一个可能失败的计算
                if (request->stage().parameters().count("force_fail") > 0) {
                     Log("Forcing failure for core_computation stage.");
                     throw std::runtime_error("Simulated computation failure.");
                }
                std::this_thread::sleep_for(std::chrono::seconds(10)); // 模拟CPU密集型任务
            }
            Log("Computation finished for stage: " + stage_name);
            // ------------------------

            response->set_success(true);
            response->set_message("Stage " + stage_name + " completed successfully.");
            response->set_output_path("/path/to/results/" + job_id + "/" + stage_name);

        } catch (const std::exception& e) {
            Log("Error executing stage " + stage_name + ": " + e.what());
            response->set_success(false);
            response->set_message(e.what());
        }

        return grpc::Status::OK;
    }
};

void RunServer() {
    std::string server_address("0.0.0.0:50051");
    ComputationWorkerImpl service;

    grpc::ServerBuilder builder;
    builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
    builder.RegisterService(&service);

    std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
    Log("Server listening on " + server_address);
    server->Wait();
}

int main(int argc, char** argv) {
    RunServer();
    return 0;
}

这个C++服务是无状态的。它只负责执行给定的阶段,成功或失败都明确地通过gRPC响应返回。这种设计简化了C++端的逻辑,所有复杂的流程控制和状态管理都上移到了Go编排器。

3. Go编排器的Saga实现

这是系统的核心。我们使用一个内存中的map来存储Saga实例的状态,并在一个集中的地方处理状态转换。

orchestrator/saga.go:

package main

import (
	"context"
	"log"
	"sync"
	"time"

	"github.com/google/uuid"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"

	pb "orchestrator/pb" // 生成的 Go protobuf 代码
)

// 定义Saga的状态
type JobStatus string
const (
	StatusPending       JobStatus = "PENDING"
	StatusPreprocessing JobStatus = "PREPROCESSING"
	StatusCoreCompute   JobStatus = "CORE_COMPUTING"
	StatusFailed        JobStatus = "FAILED"
	StatusCompensating  JobStatus = "COMPENSATING"
	StatusCompleted     JobStatus = "COMPLETED"
)

// 定义Saga的每个阶段
type Stage struct {
	Name        string
	Action      func(job *Job) error
	Compensation func(job *Job) error
}

// Job 代表一个Saga实例
type Job struct {
	ID         string
	Status     JobStatus
	Stages     []Stage
	currentStage int
	Error      error
	// 用于存储每个阶段的结果,以便补偿时使用
	stageOutputs map[string]string
	mu         sync.Mutex
}

// Saga管理器
type SagaManager struct {
	jobs map[string]*Job
	mu   sync.RWMutex
	// gRPC 客户端连接
	workerConn *grpc.ClientConn
}

func NewSagaManager() *SagaManager {
	// 在真实项目中,这里应该有重连和负载均衡逻辑
	conn, err := grpc.Dial("localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("did not connect to worker: %v", err)
	}
	
	return &SagaManager{
		jobs: make(map[string]*Job),
		workerConn: conn,
	}
}

// 创建并执行一个新的Job (Saga)
func (sm *SagaManager) ExecuteJob(params map[string]string) *Job {
	job := &Job{
		ID:           uuid.New().String(),
		Status:       StatusPending,
		currentStage: -1,
		stageOutputs: make(map[string]string),
	}

	// 定义Saga的步骤和补偿逻辑
	job.Stages = []Stage{
		{
			Name: "preprocessing",
			Action: func(j *Job) error {
				return sm.callWorker(j, "preprocessing", params)
			},
			Compensation: func(j *Job) error {
				log.Printf("[Job %s] Compensating for preprocessing. Deleting artifacts...", j.ID)
				// 实际的补偿逻辑,例如删除预处理生成的文件
				time.Sleep(1 * time.Second)
				return nil
			},
		},
		{
			Name: "core_computation",
			Action: func(j *Job) error {
				return sm.callWorker(j, "core_computation", params)
			},
			Compensation: func(j *Job) error {
				log.Printf("[Job %s] Compensating for core_computation.", j.ID)
				// 核心计算没有需要显式补偿的操作
				return nil
			},
		},
	}

	sm.mu.Lock()
	sm.jobs[job.ID] = job
	sm.mu.Unlock()

	// 异步执行Saga
	go sm.runSaga(job)

	return job
}

// Saga的执行引擎
func (sm *SagaManager) runSaga(job *Job) {
	for i, stage := range job.Stages {
		job.mu.Lock()
		job.currentStage = i
		// 更新状态以反映当前正在执行的阶段
		switch stage.Name {
		case "preprocessing":
			job.Status = StatusPreprocessing
		case "core_computation":
			job.Status = StatusCoreCompute
		}
		job.mu.Unlock()

		// websocket.BroadcastStatusUpdate(job) // 向前端广播状态更新

		err := stage.Action(job)
		if err != nil {
			job.mu.Lock()
			job.Error = err
			job.Status = StatusFailed
			job.mu.Unlock()
			// websocket.BroadcastStatusUpdate(job)
			
			log.Printf("[Job %s] Stage '%s' failed: %v. Starting compensation.", job.ID, stage.Name, err)
			sm.compensate(job)
			return
		}
	}

	job.mu.Lock()
	job.Status = StatusCompleted
	job.mu.Unlock()
	// websocket.BroadcastStatusUpdate(job)
	log.Printf("[Job %s] All stages completed successfully.", job.ID)
}

// 补偿逻辑
func (sm *SagaManager) compensate(job *Job) {
	job.mu.Lock()
	job.Status = StatusCompensating
	job.mu.Unlock()
	// websocket.BroadcastStatusUpdate(job)
	
	for i := job.currentStage; i >= 0; i-- {
		stage := job.Stages[i]
		if stage.Compensation != nil {
			err := stage.Compensation(job)
			if err != nil {
				// 补偿失败是一个严重问题,需要告警和人工干预
				log.Printf("[CRITICAL] [Job %s] Compensation for stage '%s' failed: %v", job.ID, stage.Name, err)
			}
		}
	}
	log.Printf("[Job %s] Compensation finished.", job.ID)
}

// 辅助函数:调用C++ worker
func (sm *SagaManager) callWorker(job *Job, stageName string, params map[string]string) error {
	log.Printf("[Job %s] Executing stage: %s", job.ID, stageName)
	client := pb.NewComputationWorkerClient(sm.workerConn)
	ctx, cancel := context.WithTimeout(context.Background(), time.Minute*30) // 长超时
	defer cancel()

	req := &pb.ExecuteStageRequest{
		JobId: job.ID,
		Stage: &pb.ComputationStage{
			StageName: stageName,
			Parameters: params,
		},
	}

	res, err := client.ExecuteStage(ctx, req)
	if err != nil {
		return err
	}
	if !res.Success {
		return errors.New(res.Message)
	}

	job.mu.Lock()
	job.stageOutputs[stageName] = res.OutputPath
	job.mu.Unlock()
	
	return nil
}

这段Go代码实现了一个简单的编排式Saga管理器。它定义了任务的各个阶段及其对应的补偿操作。当一个新任务进来时,它会异步地按顺序执行每个Action。一旦有Action失败,它会从失败的阶段开始,反向执行所有已完成阶段的Compensation操作。

4. 前端仪表盘与RTL测试

前端使用React来展示任务列表和状态。关键在于如何测试这些异步更新的UI。

frontend/src/JobDashboard.tsx:

import React, { useState, useEffect } from 'react';

// 模拟API/WebSocket客户端
import { api, Job } from './api';

export const JobDashboard = () => {
  const [jobs, setJobs] = useState<Job[]>([]);
  const [error, setError] = useState<string | null>(null);

  useEffect(() => {
    // 初始加载和设置WebSocket监听
    const handleUpdate = (updatedJob: Job) => {
      setJobs(prevJobs => 
        prevJobs.map(j => j.id === updatedJob.id ? updatedJob : j)
      );
    };
    api.loadInitialJobs().then(setJobs);
    api.onJobUpdate(handleUpdate);

    return () => api.offJobUpdate(handleUpdate);
  }, []);

  const handleSubmit = async () => {
    try {
      const newJob = await api.submitNewJob({ force_fail: 'false' });
      setJobs(prevJobs => [...prevJobs, newJob]);
    } catch (e) {
      setError('Failed to submit job.');
    }
  };

  return (
    <div>
      <h1>Job Dashboard</h1>
      <button onClick={handleSubmit}>Submit New Job</button>
      {error && <p role="alert">{error}</p>}
      <table>
        <thead>
          <tr>
            <th>Job ID</th>
            <th>Status</th>
          </tr>
        </thead>
        <tbody>
          {jobs.map(job => (
            <tr key={job.id}>
              <td>{job.id}</td>
              <td>{job.status}</td>
            </tr>
          ))}
        </tbody>
      </table>
    </div>
  );
};

现在,我们用React Testing Library来测试这个组件的完整生命周期。注意,测试代码不关心组件内部的useStateuseEffect,只关心用户看到的结果。

frontend/src/JobDashboard.test.tsx:

import React from 'react';
import { render, screen, waitFor } from '@testing-library/react';
import userEvent from '@testing-library/user-event';
import { JobDashboard } from './JobDashboard';
import { api, mockApi } from './api'; // 模拟的api

// 使用jest来模拟我们的API模块
jest.mock('./api');

const mockedApi = api as jest.Mocked<typeof mockApi>;

describe('JobDashboard', () => {
  
  beforeEach(() => {
    // 每次测试前重置所有模拟
    mockedApi.loadInitialJobs.mockResolvedValue([]);
    mockedApi.submitNewJob.mockClear();
    mockedApi.listeners = []; // 清空监听器
  });

  test('should display job status transitioning from PENDING to COMPLETED', async () => {
    const user = userEvent.setup();
    const initialJob = { id: 'job-1', status: 'PENDING' };
    mockedApi.submitNewJob.mockResolvedValue(initialJob);

    render(<JobDashboard />);

    // 1. 用户点击提交按钮
    const submitButton = screen.getByRole('button', { name: /submit new job/i });
    await user.click(submitButton);

    // 2. 初始状态应该是 PENDING
    // 使用 findBy* 查询,因为它会等待元素出现
    expect(await screen.findByText('job-1')).toBeInTheDocument();
    expect(screen.getByText('PENDING')).toBeInTheDocument();

    // 3. 模拟WebSocket推送第一个阶段的状态更新
    mockApi.simulateJobUpdate({ id: 'job-1', status: 'PREPROCESSING' });
    
    // 4. 等待UI更新为 PREPROCESSING
    await waitFor(() => {
      expect(screen.getByText('PREPROCESSING')).toBeInTheDocument();
    });

    // 5. 模拟WebSocket推送最终状态
    mockApi.simulateJobUpdate({ id: 'job-1', status: 'COMPLETED' });

    // 6. 验证最终状态
    await waitFor(() => {
      expect(screen.getByText('COMPLETED')).toBeInTheDocument();
    });
  });

  test('should display job status transitioning to FAILED and COMPENSATING', async () => {
    const user = userEvent.setup();
    const initialJob = { id: 'job-2', status: 'PENDING' };
    mockedApi.submitNewJob.mockResolvedValue(initialJob);

    render(<JobDashboard />);
    
    await user.click(screen.getByRole('button', { name: /submit new job/i }));
    expect(await screen.findByText('job-2')).toBeInTheDocument();

    // 模拟运行到某个阶段
    mockApi.simulateJobUpdate({ id: 'job-2', status: 'CORE_COMPUTING' });
    expect(await screen.findByText('CORE_COMPUTING')).toBeInTheDocument();
    
    // 模拟失败
    mockApi.simulateJobUpdate({ id: 'job-2', status: 'FAILED' });
    expect(await screen.findByText('FAILED')).toBeInTheDocument();
    
    // 模拟补偿
    mockApi.simulateJobUpdate({ id: 'job-2', status: 'COMPENSATING' });
    expect(await screen.findByText('COMPENSATING')).toBeInTheDocument();
  });
});

这个测试完美地模拟了用户场景:点击按钮,然后观察状态的异步变化。我们通过模拟WebSocket推送来驱动UI更新,并使用waitFor来处理异步断言。这使得测试既健壮又易于理解。

5. 统一代码风格

最后,为了管理这个异构代码库,我们在package.json中配置一个统一的格式化脚本。

package.json:

{
  "name": "polyglot-project",
  "scripts": {
    "format": "prettier --write \"**/*.{js,jsx,ts,tsx,json,md}\" && gofmt -w . && clang-format -i -style=file $(find . -name '*.cc' -or -name '*.h')"
  },
  "devDependencies": {
    "prettier": "^2.8.0"
  }
}

并且在CI流程中加入一个步骤来运行npm run format -- --check,确保所有提交到主分支的代码都符合规范。这是一个工程实践上的小细节,但在多人协作的大型项目中,其价值不可估量。

架构流程图

整个系统的交互流程可以用下面的图来表示:

sequenceDiagram
    participant User
    participant Frontend (React)
    participant Orchestrator (Go)
    participant Worker (C++)

    User->>+Frontend: 1. Click "Submit Job"
    Frontend->>+Orchestrator: 2. POST /jobs (HTTP API)
    Orchestrator->>Orchestrator: 3. Create Saga instance (Status: PENDING)
    Orchestrator-->>-Frontend: 4. Respond with Job ID
    Note right of Orchestrator: (Async) Saga Execution Starts
    Orchestrator->>Orchestrator: 5. Execute Stage 1: preprocessing
    Orchestrator->>+Worker: 6. gRPC Call: ExecuteStage(preprocessing)
    Worker-->>-Orchestrator: 7. gRPC Response: Success
    Orchestrator->>Orchestrator: 8. Execute Stage 2: core_computation
    Orchestrator->>+Worker: 9. gRPC Call: ExecuteStage(core_computation)
    Worker-->>-Orchestrator: 10. gRPC Response: Failure
    Note right of Orchestrator: Saga failed, start compensation
    Orchestrator->>Orchestrator: 11. Compensate Stage 1: preprocessing
    Note right of Orchestrator: (e.g. Delete intermediate files)
    Orchestrator->>Orchestrator: 12. Final Status: FAILED

这个架构虽然只展示了核心部分,但它已经具备了生产级系统的雏形。它解耦了计算和调度,通过Saga模式保证了长周期任务的最终一致性,并为整个异构系统提供了健壮的测试和开发规范。

当前方案的局限性也显而易见。Saga的状态目前存储在内存中,如果编排器服务重启,所有进行中的任务状态都会丢失。一个生产级的实现需要将Saga状态持久化到数据库(如PostgreSQL或etcd)中。此外,C++工作节点的调度目前是直接的gRPC调用,未来可以引入一个消息队列(如NATS或RabbitMQ)来实现更灵活的负载均衡和工作节点动态扩缩容。最后,自己实现Saga管理器虽然能满足当前需求,但随着业务逻辑变得更复杂,引入一个成熟的工作流引擎(如Temporal或Cadence)可能是更明智的选择,它们提供了更强大的状态管理、重试、定时任务等功能。


  目录