在 CockroachDB 上整合 MapReduce 批处理与实时 Rollup 构建混合特征存储


我们面临的第一个真正棘手的问题,是机器学习模型对特征的矛盾需求。风控模型需要用户过去一年的交易行为聚合特征,这是一个典型的批处理场景;但同时,它又需要捕捉到用户最近30秒内的交易频率,这是一个纯粹的流式计算需求。传统的 Lambda 架构将这两者分开处理,但在生产环境中,我们为数据不一致、维护两套独立系统以及最终特征拼接的复杂性付出了沉重代价。目标必须是构建一个统一的、低延迟的混合特征存储,它能同时服务于这两种需求,并且保证数据视图的最终一致性。

最初的构想是寻找一个能够同时处理事务性写入和大规模分析的数据库。经过一番评估,CockroachDB 进入了我们的视野。它的分布式架构、对 PostgreSQL 协议的兼容性以及可串行化的隔离级别,使其成为在线特征存储的有力竞争者。它能水平扩展,提供高可用性,这正是生产级系统所必需的。

对于批处理,我们无需重新发明轮子。我们已经有数TB的历史事件数据存储在对象存储中。一个遵循 MapReduce 模式的分布式计算任务(例如用 Spark 或 Dask,甚至是一个定制的 Go 应用)是处理这些数据的最佳方式。

真正的挑战在于实时部分。我们需要一个“Rollup”引擎,持续地对实时事件流进行聚合,并将结果更新到 CockroachDB 中。这必须是低延迟且容错的。

最后,所有这些特征都需要通过一个稳定、安全的接口暴露给上游的多个模型服务。直接将数据库暴露出去是不可想象的。一个 API 网关是必需的,Tyk 因其开源、高性能和易于扩展的特性被选中,负责认证、限流和路由。

整个架构的蓝图如下:

graph TD
    subgraph "Batch Processing (Daily)"
        A[Data Lake: S3/GCS] --> B{MapReduce Job};
        B -->|Batch Upsert| C[CockroachDB Cluster];
    end

    subgraph "Real-time Processing"
        D[Event Stream: Kafka] --> E[Stateful Rollup Service];
        E -->|Transactional Update| C;
    end

    subgraph "Feature Serving"
        F[ML Model Service] --> G[Tyk API Gateway];
        G --> H[Feature Serving API];
        H -->|Low-latency Read| C;
    end

第一步:CockroachDB 的表结构设计

设计正确的表结构是成败的关键。我们需要一个能同时高效支持点查(服务单个用户的特征)和批量写入(MapReduce 作业)的结构。

我们决定使用一张宽表来存储所有用户的特征。这简化了查询,因为模型服务通常需要一次性获取一个用户的所有特征。

-- user_features 表存储了所有聚合后的特征值
-- 主键是 user_id,便于快速点查
CREATE TABLE user_features (
    user_id UUID PRIMARY KEY,
    
    -- 批处理计算的特征 (Batch Features)
    -- 例如:用户历史总交易额,最近365天的活跃天数等
    total_lifetime_value DECIMAL(18, 4) DEFAULT 0.0,
    active_days_last_365d INT DEFAULT 0,

    -- 实时计算的特征 (Real-time Features)
    -- 例如:最近1分钟/5分钟/1小时的交易次数
    -- 我们使用 JSONB 来存储这些时窗特征,以获得灵活性
    -- 键是窗口大小(如 "1m", "5m"),值是计数值
    transaction_counts_short_term JSONB,

    -- 用于并发控制的版本号
    version BIGINT DEFAULT 1,

    -- 特征最后更新时间戳
    updated_at TIMESTAMPTZ DEFAULT now(),

    -- 索引优化点查性能
    INDEX (updated_at)
) LOCALITY REGIONAL BY ROW;

这里的几个设计决策至关重要:

  1. user_id 作为主键:保证了按用户ID查询特征的性能是 O(log N) 级别。
  2. 混合列类型:稳定的、批处理计算的特征使用固定列,这性能更好,类型也安全。而多变的、需要快速迭代的实时窗口特征则放入 JSONB 字段,这样增加一个新的时间窗口(比如从1分钟、5分钟,增加一个10分钟窗口)就不需要 ALTER TABLE 这种重操作。
  3. version 字段:这是实现乐观并发控制的关键,在实时更新时,可以避免脏写。虽然 CockroachDB 的事务能保证原子性,但在高并发的读-改-写模式下,使用显式版本号是一种很好的实践。
  4. **LOCALITY REGIONAL BY ROW**:如果我们的系统需要多区域部署,这个设置能将一个用户的所有数据固定在同一个区域,减少跨区域读写的延迟,这对于低延迟特征服务至关重要。

第二步:批处理层 - MapReduce 回填历史特征

我们的历史数据以 Parquet 格式存储在 S3 上。我们需要一个作业,每天运行一次,计算如 total_lifetime_value 这样的长期特征,并将其写入 user_features 表。

下面是一个简化的 Go 程序,模拟了这个 MapReduce 作业的核心写入逻辑。在真实项目中,这部分可能由 Spark 执行,但这里的重点是与 CockroachDB 的交互。我们必须使用高效的批量写入方式,而不是逐条 INSERT。CockroachDB 的 UPSERT 和事务批处理是这里的关键。

package main

import (
	"context"
	"fmt"
	"log"
	"math/rand"
	"time"

	"github.com/cockroachdb/cockroach-go/v2/crdbpgx"
	"github.com/google/uuid"
	"github.com/jackc/pgx/v4"
)

// FeatureRecord 代表从MapReduce作业计算出的一个用户的聚合特征
type FeatureRecord struct {
	UserID              uuid.UUID
	TotalLifetimeValue  float64
	ActiveDaysLast365d int
}

// batchUpsertFeatures 负责将一批特征记录高效地写入 CockroachDB
func batchUpsertFeatures(ctx context.Context, conn *pgx.Conn, records []FeatureRecord) error {
	// 在一个事务中执行所有 UPSERT 操作,保证原子性
	err := crdbpgx.ExecuteTx(ctx, conn, pgx.TxOptions{}, func(tx pgx.Tx) error {
		// 准备 UPSERT 语句
		// 使用 ON CONFLICT 子句,如果 user_id 已存在,则更新批处理相关的字段
		// 注意:我们只更新批处理字段,不会触碰实时字段 transaction_counts_short_term
		stmt, err := tx.Prepare(ctx, "upsert_features", `
			UPSERT INTO user_features (user_id, total_lifetime_value, active_days_last_365d, updated_at)
			VALUES ($1, $2, $3, now())
		`)
		if err != nil {
			return fmt.Errorf("failed to prepare statement: %w", err)
		}

		for _, rec := range records {
			if _, err := tx.Exec(ctx, stmt.Name, rec.UserID, rec.TotalLifetimeValue, rec.ActiveDaysLast365d); err != nil {
				// 在真实项目中,这里应该有更复杂的错误处理和重试逻辑
				log.Printf("failed to execute upsert for user %s: %v", rec.UserID, err)
				// 根据策略决定是继续还是中断整个事务
				// return err
			}
		}
		return nil
	})

	if err != nil {
		return fmt.Errorf("transaction failed: %w", err)
	}

	return nil
}

func main() {
	// --- 数据库连接配置 (生产环境中应从配置管理中获取) ---
	// 注意: cockroachdb-0.cockroachdb.default.svc.cluster.local 是k8s内部地址
	connStr := "postgresql://root@localhost:26257/defaultdb?sslmode=disable"
	config, err := pgx.ParseConfig(connStr)
	if err != nil {
		log.Fatalf("failed to parse config: %v", err)
	}

	conn, err := pgx.ConnectConfig(context.Background(), config)
	if err != nil {
		log.Fatalf("failed to connect to database: %v", err)
	}
	defer conn.Close(context.Background())
	
	log.Println("Successfully connected to CockroachDB")

	// --- 模拟 MapReduce 作业的输出 ---
	// 在真实场景中,这些记录是从分布式计算引擎的 Reducer 阶段产生的
	var records []FeatureRecord
	for i := 0; i < 1000; i++ {
		records = append(records, FeatureRecord{
			UserID:              uuid.New(),
			TotalLifetimeValue:  rand.Float64() * 10000,
			ActiveDaysLast365d: rand.Intn(365),
		})
	}
	
	log.Printf("Generated %d feature records to upsert", len(records))

	// --- 执行批量写入 ---
	// 将记录分片处理,避免单个事务过大
	batchSize := 100
	for i := 0; i < len(records); i += batchSize {
		end := i + batchSize
		if end > len(records) {
			end = len(records)
		}
		
		batch := records[i:end]
		log.Printf("Upserting batch of %d records...", len(batch))
		
		start := time.Now()
		if err := batchUpsertFeatures(context.Background(), conn, batch); err != nil {
			log.Fatalf("Failed to upsert batch: %v", err)
		}
		log.Printf("Batch upsert took %s", time.Since(start))
	}
	
	log.Println("Batch backfill process completed.")
}

这段代码的关键在于 batchUpsertFeatures 函数。它没有使用简单的循环 INSERT,而是将一组操作包裹在一个事务中。UPSERT 语句是核心,它原子地执行“如果不存在则插入,如果存在则更新”的逻辑。这对于回填作业至关重要,因为作业可能需要重复运行,并且必须能够正确处理新用户和老用户。

第三步:实时层 - 高并发 Rollup 服务

这是整个系统中最具挑战性的部分。我们需要一个服务,它消费来自 Kafka 的交易事件流,并实时更新 user_features 表中的 transaction_counts_short_term 字段。

事件格式可能如下:

{
  "event_id": "evt_...",
  "user_id": "a1b2c3d4-...",
  "amount": 123.45,
  "timestamp": "2023-10-27T10:30:00Z"
}

我们的 Rollup 服务必须做到:

  1. 高吞吐:能跟上 Kafka 的消息速率。
  2. 状态化:它需要为每个用户维护多个时间窗口的计数器。
  3. 一致性:更新 CockroachDB 必须是原子操作,以防数据损坏。
  4. 容错:服务崩溃重启后,能从上次的位置继续处理,不丢失数据。

下面是一个 Go 实现的核心逻辑,它演示了如何在一个事务中安全地执行读-修改-写操作来更新 JSONB 字段。

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/cockroachdb/cockroach-go/v2/crdbpgx"
	"github.com/google/uuid"
	"github.com/jackc/pgx/v4"
)

// TransactionEvent 代表从 Kafka 消费的单个事件
type TransactionEvent struct {
	UserID    uuid.UUID `json:"user_id"`
	Timestamp time.Time `json:"timestamp"`
	// ... 其他字段
}

// TimeWindowCounts 定义了存储在 JSONB 字段中的结构
type TimeWindowCounts map[string]int64

// processEvent 是处理单个事件的核心函数
func processEvent(ctx context.Context, conn *pgx.Conn, event TransactionEvent) error {
	// 这是一个关键操作:在一个可重试的事务中完成读-修改-写
	err := crdbpgx.ExecuteTx(ctx, conn, pgx.TxOptions{IsoLevel: pgx.Serializable}, func(tx pgx.Tx) error {
		// 1. 读取当前用户的特征,并使用 SELECT FOR UPDATE 锁定该行
		// 这可以防止其他并发事务修改该用户的特征,避免竞争条件
		var currentCountsJSON []byte
		var currentVersion int64
		err := tx.QueryRow(ctx, `
			SELECT transaction_counts_short_term, version
			FROM user_features
			WHERE user_id = $1
			FOR UPDATE
		`, event.UserID).Scan(&currentCountsJSON, &currentVersion)

		var counts TimeWindowCounts
		if err == pgx.ErrNoRows {
			// 如果用户是新用户,在特征表中不存在记录,则初始化
			counts = make(TimeWindowCounts)
		} else if err != nil {
			return fmt.Errorf("failed to select for update: %w", err)
		} else {
			// 反序列化 JSONB 数据
			if err := json.Unmarshal(currentCountsJSON, &counts); err != nil {
				// 在生产中,需要处理 JSON 解析失败的毒丸消息
				log.Printf("WARNING: failed to unmarshal JSONB for user %s: %v. Re-initializing.", event.UserID, err)
				counts = make(TimeWindowCounts)
			}
		}

		// 2. 修改状态(Rollup 逻辑)
		// 在真实场景中,这里会有一个复杂的窗口管理器
		// 这里简化为只更新一个 "1m" 的计数器
		// 注意:这里的窗口逻辑是简化的,实际需要滑动窗口或滚动窗口的实现
		counts["1m"] = counts["1m"] + 1

		// 3. 将新状态写回数据库
		newCountsJSON, err := json.Marshal(counts)
		if err != nil {
			return fmt.Errorf("failed to marshal new counts: %w", err)
		}

		// 使用 UPSERT,并带上版本号检查(乐观锁),尽管 SELECT FOR UPDATE 已经是悲观锁了
		// 这里的 version + 1 是一种双重保险,并可用于调试
		_, err = tx.Exec(ctx, `
			UPSERT INTO user_features (user_id, transaction_counts_short_term, version, updated_at)
			VALUES ($1, $2, $3, now())
		`, event.UserID, newCountsJSON, currentVersion+1)
		
		return err
	})

	if err != nil {
		// crdbpgx.ExecuteTx 会自动处理事务重试(针对 40001 错误码)
		log.Printf("failed to process event for user %s after retries: %v", event.UserID, err)
		return err
	}
	
	return nil
}

// main 函数模拟一个消费循环
func main() {
    // ... (数据库连接代码与批处理示例相同)
    connStr := "postgresql://root@localhost:26257/defaultdb?sslmode=disable"
    config, err := pgx.ParseConfig(connStr)
    if err != nil { log.Fatal(err) }
    conn, err := pgx.ConnectConfig(context.Background(), config)
    if err != nil { log.Fatal(err) }
    defer conn.Close(context.Background())
    log.Println("Rollup service connected to CockroachDB")

    // 模拟从 Kafka 消费事件
    mockEventStream := make(chan TransactionEvent, 100)
    go func() {
        // 模拟一个高频交易用户
        highFreqUserID := uuid.New()
        for {
            mockEventStream <- TransactionEvent{UserID: highFreqUserID, Timestamp: time.Now()}
            time.Sleep(100 * time.Millisecond) // 10 events/sec
        }
    }()

    for event := range mockEventStream {
        start := time.Now()
        if err := processEvent(context.Background(), conn, event); err != nil {
            // 生产环境中,如果处理失败,消息应该被送往死信队列
            log.Printf("ERROR: could not process event: %v", err)
        } else {
            log.Printf("Processed event for user %s in %s", event.UserID, time.Since(start))
        }
    }
}

这段代码的核心是 processEventcrdbpgx.ExecuteTx 包装了 CockroachDB 的客户端事务重试逻辑。在高并发下,两个事务可能同时尝试更新同一个用户的特征,其中一个会失败并收到 40001 重启错误。这个辅助函数会自动捕获该错误并重试事务。SELECT ... FOR UPDATE 是悲观锁,它确保在当前事务完成之前,没有其他事务可以修改这一行,这是保证读-修改-写操作正确性的关键。

第四步:服务层 - 通过 Tyk 网关提供安全的特征访问

现在数据已经存入 CockroachDB,我们需要一个低延迟的 API 来服务这些特征。这是一个简单的 gRPC 或 HTTP 服务。

// Feature Serving API (简化示例)
func (s *FeatureServer) GetUserFeatures(ctx context.Context, req *pb.FeatureRequest) (*pb.FeatureResponse, error) {
    // 伪代码:
    // 1. 从 context 获取认证信息(由 Tyk 注入)
    // 2. 查询 CockroachDB 获取 user_features 行
    // 3. 将行数据转换为 protobuf 响应
    // 4. 实现一层内存缓存 (e.g. Caffeine/Ristretto) 来减少数据库负载
    // 5. 返回响应
    
    // ...
}

这个服务本身很简单,但直接暴露给外部是危险的。这里 Tyk 发挥作用。我们在 Tyk 中创建一个 API 定义,将外部请求代理到我们的特征服务。

一个简化的 Tyk API 定义 (JSON格式):

{
  "name": "Feature-Store-API",
  "api_id": "feature-store-api-1",
  "org_id": "default",
  "use_keyless": false,
  "auth": {
    "auth_header_name": "Authorization"
  },
  "version_data": {
    "not_versioned": true,
    "versions": {
      "Default": {
        "name": "Default",
        "use_extended_paths": true
      }
    }
  },
  "proxy": {
    "listen_path": "/features/",
    "target_url": "http://feature-serving-service.internal:8080/",
    "strip_listen_path": true
  },
  "rate_limits": {
    "rate": 1000,
    "per": 1
  },
  "enable_ip_whitelisting": true,
  "allowed_ips": [
    "10.0.0.0/8" // 只允许内部服务访问
  ]
}

这份配置为我们的特征服务做了几件重要的事情:

  1. 认证 (auth): 要求所有请求都必须包含一个 Authorization 头,Tyk 会验证这个 key 的有效性。这确保了只有授权的模型服务才能访问特征。
  2. 路由 (proxy): 将公网的 /features/ 路径的请求,转发到内部的 feature-serving-service
  3. 限流 (rate_limits): 保护后端服务和数据库。这里设置为每秒最多1000个请求,任何超出的流量都会被 Tyk 拒绝,防止数据库被打垮。
  4. IP 白名单: 增加一层网络安全,只允许来自可信内部网络的请求。

最终的请求流程图如下:

sequenceDiagram
    participant Model as ML Model Service
    participant Gateway as Tyk API Gateway
    participant API as Feature Serving API
    participant DB as CockroachDB
    
    Model->>+Gateway: GET /features/user/a1b2c3d4
    Note over Gateway: 1. Authenticate API Key
    Note over Gateway: 2. Check Rate Limit
    Gateway->>+API: GET /user/a1b2c3d4
    API-->>DB: SELECT * FROM user_features WHERE user_id = 'a1b2c3d4'
    DB-->>API: Feature Row
    API-->>-Gateway: Feature Response (JSON/Proto)
    Gateway-->>-Model: Feature Response

局限与未来的迭代路径

这套架构解决了我们最初的问题,但它并非没有缺点。在真实项目中,它还有许多需要演进的地方。

首先,我们手写的这个 Stateful Rollup Service 虽然能工作,但状态管理和容错逻辑相对简陋。在更大的规模下,使用一个成熟的流处理框架如 Apache Flink 或 Kafka Streams 会是更稳健的选择。它们提供了更强大的窗口函数、状态后端(state backends)和 exactly-once 处理语义的保证。

其次,成本是一个考量。将所有数据都放在高性能的 CockroachDB 中可能非常昂贵。一个可行的优化是数据分层:将最近的、需要快速访问的特征保留在 CockroachDB,而将数月前的历史数据归档到更廉价的对象存储中。

最后,批处理和流处理之间的数据一致性依然是一个微妙的问题。当批处理作业运行时,它会覆盖流处理在过去24小时内累积的一些特征。这期间可能会有短暂的数据不一致。解决这个问题需要更复杂的逻辑,比如让批处理作业写入到一个新的版本或临时表中,然后原子地切换,但这会显著增加架构的复杂性。未来的方向可能是探索像 Materialize 或 Flink SQL 这样的流式数仓技术,尝试统一批和流的计算模型,从而根本上消除 Lambda 架构的接缝问题。


  目录