我们生产环境的 Aurora MySQL 读副本 CPU 占用率已经连续数周在 90% 以上的高位徘徊。最初的读写分离设计,本意是隔离在线事务处理(OLTP)和报表查询,但随着业务增长,数据分析团队的临时复杂查询和 BI 工具的仪表盘刷新,正频繁地将读副本拖入性能泥潭。这些查询通常涉及大范围扫描和多表聚合,对 OLTP 优化的数据库结构而言是天生的噩梦。直接扩大读副本规格的方案被迅速否决——这不仅成本高昂,而且治标不治本,下一次查询洪峰到来时,问题会原封不动地重现。我们需要的是一个结构性的解决方案,将分析型(OLAP)负载从事务型系统中彻底剥离。
初步的构想是传统的夜间 ETL 批处理,将数据抽取到数据仓库。然而,业务方要求数据延迟不能超过五分钟,这使得批处理方案在起跑线上就出局了。我们必须转向流式处理,构建一个近乎实时的管道,将生产数据库的变更持续同步到一个为分析而生的存储系统中。
最终技术选型决策如下:
- 数据捕获 - CDC (Change Data Capture): 使用 AWS Database Migration Service (DMS) 直接从 MySQL 的 binlog 中捕获行级变更(INSERT, UPDATE, DELETE)。这对源数据库的性能影响极小,远胜于基于时间戳的轮询查询。
- 数据传输 - Amazon Kinesis Data Streams: 作为连接 DMS 和处理逻辑的缓冲层。它具备高吞吐和持久化能力,能够平滑数据变更的波峰波谷,并为下游消费端提供重放能力。
- 数据处理 - AWS Lambda (Serverless): 采用 Python 运行时。Serverless 的按需计费模型完美契合了数据库变更流量不均的特性。当没有数据变更时,我们不支付任何计算费用。Python 及其丰富的数据处理库(如 Pandas, PyArrow)是处理结构化数据的理想工具。
- 数据存储 - Amazon S3 Data Lake: 将处理后的数据以列式存储格式(Apache Parquet)存放在 S3 中。S3 提供近乎无限的扩展性和极低的存储成本。Parquet 格式则为后续的分析查询(如使用 AWS Athena)提供了极高的性能。
这个架构的核心目标是:将 OLTP 数据库的“写”操作,通过一个低延迟、高弹性的管道,转化为数据湖中的“分析就绪”数据,从而实现一种更彻底的、面向负载特性的“读写分离”。
架构概览
整个数据流的设计非常直接,关键在于解耦和弹性。
graph TD
subgraph "Transactional System (OLTP)"
RDS[Aurora MySQL 主库]
end
subgraph "Change Data Capture & Streaming"
DMS[AWS DMS] --> Kinesis[Kinesis Data Stream]
end
subgraph "Serverless Processing Layer"
Lambda[Python Lambda 函数]
end
subgraph "Analytical Data Lake (OLAP)"
S3[S3 Bucket: Parquet Files] --> Athena[AWS Athena]
end
RDS -- Binlog --> DMS
Kinesis -- Event Trigger --> Lambda
Lambda -- Writes Parquet --> S3
BI[BI Tools / Analysts] -- SQL Query --> Athena
这个流程的瓶颈和复杂性主要集中在 Lambda 函数的实现上。它不仅要正确地解析和转换数据,还必须解决流式处理写入数据湖时一个经典且棘手的问题:小文件问题。
核心实现:Lambda 处理函数
如果每个数据库变更事件都触发 Lambda 写入一个 S3 文件,我们将很快在 S3 上产生数百万个微小的 Parquet 文件。这对于任何基于文件系统的查询引擎(包括 Athena)来说都是一场灾难,因为文件系统元数据操作(list, open)的开销将远超实际数据读取的开销。
因此,Lambda 的核心职责是微批处理(Micro-batching)。它在内存中累积来自 Kinesis 的变更事件,直到达到预设的阈值(例如,累积了 5MB 数据或经过了 60 秒),然后将整个批次合并成一个尺寸合理的 Parquet 文件一次性写入 S3。
以下是该 Lambda 函数的生产级 Python 实现。
项目结构与依赖
cdc-processor/
├── app.py # Lambda handler and core logic
├── requirements.txt # Python dependencies
└── template.yaml # AWS SAM template for deployment
requirements.txt:
boto3
pandas
pyarrow
aws-lambda-powertools # For logging, tracing, and metrics
核心代码 app.py
代码中包含了详尽的注释、配置管理、错误处理和解决小文件问题的策略。
import os
import json
import logging
import base64
import time
from typing import List, Dict, Any
from collections import defaultdict
import boto3
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from aws_lambda_powertools import Logger, Tracer
# --- Configuration ---
# 从环境变量加载配置,提供默认值以保证健壮性
# The pitfall here is hardcoding values. Always use environment variables for deployment flexibility.
S3_BUCKET_NAME = os.environ.get("S3_BUCKET_NAME")
S3_PREFIX = os.environ.get("S3_PREFIX", "processed-data/")
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")
# 微批处理阈值:内存中数据大小(字节)和最大等待时间(秒)
# These thresholds are critical for performance tuning.
# Too small -> small files. Too large -> high memory usage & latency.
BATCH_SIZE_BYTES_THRESHOLD = int(os.environ.get("BATCH_SIZE_BYTES_THRESHOLD", 5 * 1024 * 1024)) # 5MB
BATCH_MAX_RECORDS_THRESHOLD = int(os.environ.get("BATCH_MAX_RECORDS_THRESHOLD", 10000))
# --- Initialization ---
# Best practice: initialize clients and variables outside the handler
# to leverage Lambda's execution context reuse.
logger = Logger(service="cdc-processor")
tracer = Tracer(service="cdc-processor")
s3_client = boto3.client("s3")
# This in-memory buffer is key. It's declared globally to persist across
# multiple invocations within the same warm Lambda container, although this
# approach has limitations we'll discuss later.
# A more robust solution might use an external store like ElastiCache,
# but for many use cases, this is a pragmatic start.
record_buffer = defaultdict(list)
buffer_size_bytes = defaultdict(int)
last_flush_time = time.time()
@tracer.capture_lambda_handler
@logger.inject_lambda_context(log_event=True)
def handler(event: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
"""
AWS Lambda handler function triggered by Kinesis Data Streams.
It processes CDC records from DMS, batches them in memory,
and writes them as Parquet files to S3.
"""
global last_flush_time
# Kinesis events come in a list of records.
for record in event['Records']:
try:
# Kinesis record data is base64 encoded.
payload_decoded = base64.b64decode(record['kinesis']['data'])
dms_event = json.loads(payload_decoded)
# A common mistake is to not handle control messages from DMS.
# These are metadata, not data changes, and should be skipped.
if dms_event.get("Op") is None:
logger.info("Skipping non-data DMS control message.")
continue
process_dms_event(dms_event)
except (json.JSONDecodeError, KeyError) as e:
logger.error(f"Failed to parse Kinesis record: {e}", extra={"record": record})
# Depending on the use case, you might want to send this to a DLQ.
continue
# After processing all records in the current invocation,
# check if any buffer needs to be flushed based on size or record count.
# We don't use a time-based flush here because Lambda might be idle.
# Time-based flushing needs a different mechanism (e.g., EventBridge cron).
flush_all_buffers(force=False)
return {"status": "SUCCESS"}
def process_dms_event(dms_event: Dict[str, Any]):
"""
Parses a single DMS event and adds it to the in-memory buffer.
"""
global buffer_size_bytes, record_buffer
# DMS payload structure contains table name, schema, and operation type.
# Format: data.{schema-name}.{table-name}
table_id = dms_event.get("metadata", {}).get("table-name")
schema_id = dms_event.get("metadata", {}).get("schema-name")
if not table_id or not schema_id:
logger.warning("DMS event is missing table/schema metadata.", extra={"event": dms_event})
return
# Create a unique key for each table to buffer its records separately.
buffer_key = f"{schema_id}.{table_id}"
# The actual row data is in the 'data' key.
data_row = dms_event.get("data")
if not data_row:
logger.warning("DMS event has no data field.", extra={"event": dms_event})
return
# Add metadata for analytical queries: event type and timestamp.
# This is crucial for handling updates/deletes downstream.
data_row['_op'] = dms_event.get("Op") # 'I' for insert, 'U' for update, 'D' for delete
data_row['_ingest_time_ms'] = int(time.time() * 1000)
data_row['_source_commit_time_ms'] = dms_event.get("metadata", {}).get("timestamp")
record_buffer[buffer_key].append(data_row)
# Estimate the size increase to check against the threshold.
# A simple but effective estimation.
buffer_size_bytes[buffer_key] += len(json.dumps(data_row))
def flush_all_buffers(force: bool = False):
"""
Iterates through all table buffers and flushes them to S3 if they meet the threshold.
"""
# Create a copy of keys to avoid issues with modifying dict during iteration
for key in list(record_buffer.keys()):
should_flush = (
force or
buffer_size_bytes[key] >= BATCH_SIZE_BYTES_THRESHOLD or
len(record_buffer[key]) >= BATCH_MAX_RECORDS_THRESHOLD
)
if should_flush and record_buffer[key]:
logger.info(
f"Flushing buffer for {key}",
extra={
"reason": "force" if force else "threshold_met",
"record_count": len(record_buffer[key]),
"size_bytes": buffer_size_bytes[key],
}
)
flush_buffer_to_s3(key)
def flush_buffer_to_s3(buffer_key: str):
"""
Converts a batch of records to a Parquet file and uploads it to S3.
This is the most critical part of the implementation.
"""
global record_buffer, buffer_size_bytes
records_to_flush = record_buffer.pop(buffer_key, [])
buffer_size_bytes.pop(buffer_key, 0)
if not records_to_flush:
return
try:
# Using Pandas is convenient for converting list of dicts to a tabular format.
df = pd.DataFrame(records_to_flush)
# Data types can be tricky. DMS often sends everything as strings.
# In a real project, you'd have a schema registry or casting logic here
# to ensure proper types (e.g., converting '123' to int).
# For this example, we proceed with inferred types.
table = pa.Table.from_pandas(df, preserve_index=False)
# Generate a unique object key for S3.
# Partitioning by date is a fundamental best practice for data lakes.
current_time = time.strftime('%Y/%m/%d/%H', time.gmtime())
file_uuid = f"{int(time.time() * 1000)}-{context.aws_request_id}"
schema_name, table_name = buffer_key.split('.')
s3_key = f"{S3_PREFIX}{schema_name}/{table_name}/{current_time}/{file_uuid}.parquet"
# Write Parquet to an in-memory buffer.
parquet_buffer = pa.BufferOutputStream()
pq.write_table(table, parquet_buffer, compression='snappy')
# Upload the buffer content to S3.
s3_client.put_object(
Bucket=S3_BUCKET_NAME,
Key=s3_key,
Body=parquet_buffer.getvalue().to_pybytes()
)
logger.info(f"Successfully wrote {len(records_to_flush)} records to {s3_key}")
except Exception as e:
logger.error(
f"Failed to write Parquet file for {buffer_key} to S3: {e}",
exc_info=True
)
# Error handling strategy: put the failed batch back? Send to DLQ?
# A simple approach is to log and drop, but a robust system needs a DLQ.
# For now, we are dropping the batch.
部署与配置 (AWS SAM)
使用 AWS SAM (Serverless Application Model) 来定义和部署整个 Serverless 应用是标准实践。它简化了 Lambda、IAM 角色和事件源映射的配置。
template.yaml:
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
Serverless CDC processing pipeline from Kinesis to S3 Data Lake.
Resources:
CDCProcessorFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: cdc-processor-py
CodeUri: cdc-processor/
Handler: app.handler
Runtime: python3.9
Architectures:
- x86_64
MemorySize: 512 # Start with 512MB, adjust based on batch size
Timeout: 300 # 5 minutes, should be long enough for batching
Environment:
Variables:
S3_BUCKET_NAME: !Ref DataLakeBucket # Reference the S3 bucket defined below
S3_PREFIX: "cdc-data/"
LOG_LEVEL: "INFO"
BATCH_SIZE_BYTES_THRESHOLD: "5242880" # 5 MB
BATCH_MAX_RECORDS_THRESHOLD: "10000"
POWERTOOLS_SERVICE_NAME: "cdc-processor"
Policies:
- KinesisStreamReadPolicy:
StreamName: !Ref CDCStream # Reference the Kinesis stream
- S3WritePolicy:
BucketName: !Ref DataLakeBucket
Events:
KinesisTrigger:
Type: Kinesis
Properties:
Stream: !GetAtt CDCStream.Arn
StartingPosition: LATEST
BatchSize: 1000 # Max records per Lambda invocation
DataLakeBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: your-company-datalake-bucket-unique-name # Change to a unique name
VersioningConfiguration:
Status: Enabled
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
CDCStream:
Type: AWS::Kinesis::Stream
Properties:
Name: dms-cdc-aurora-stream
ShardCount: 1 # Start with one shard, scale as needed
Outputs:
DataLakeBucketName:
Description: "Name of the S3 bucket for the data lake"
Value: !Ref DataLakeBucket
CDCProcessorFunctionName:
Description: "Name of the CDC processing Lambda function"
Value: !Ref CDCProcessorFunction
测试思路
对这套系统的测试必须分层进行:
- 单元测试: 针对
process_dms_event函数,构造各种 DMS 事件(Insert, Update, Delete, 控制消息),断言record_buffer的状态是否符合预期。可以 Mock 掉boto3客户端。 - 集成测试: 在一个隔离的 AWS 环境中,手动向 Kinesis Stream 推送一批模拟的 DMS 事件。然后轮询检查 S3 Bucket 中是否生成了正确的 Parquet 文件,并使用
awswrangler或类似库下载并验证其内容和 Schema。 - 端到端测试: 在 Staging 环境中,完整部署 DMS、Kinesis、Lambda 和 S3,在 Staging 数据库中执行一系 DML 操作,验证数据最终是否以正确的形态出现在数据湖中,并能被 Athena 查询。
遗留问题与未来迭代
这个架构虽然高效且成本可控,但在生产环境中运行时仍有一些需要注意的局限性和优化方向。
处理更新(Updates)和删除(Deletes): 当前实现只是将
_op字段(’U’, ‘D’)一同写入 Parquet。下游的 Athena 查询需要自行处理这些逻辑,例如,通过窗口函数找出每个主键的最新记录,并过滤掉已删除的记录。这对查询性能和复杂度都是一个挑战。未来的迭代方向是引入 Apache Iceberg、Hudi 或 Delta Lake 这类事务性数据湖表格式。它们原生支持行级更新和删除(Merge on Read / Copy on Write),能极大简化下游的查询逻辑。Lambda 内存缓冲的局限性: 将数据缓存在 Lambda 的全局变量中,依赖于“热”的执行环境。如果流量中断,或者 Lambda 环境被回收,内存中的数据将会丢失。对于要求严格不丢数据的场景,这个方案风险较高。更可靠的方案是,在 Lambda 内部维持一个较小的批次,但将多个 Lambda 实例的输出先写入一个临时的、支持追加写的 S3 位置,再由一个定时的 Fargate 或 Step Functions 任务,将这些小文件合并(Compaction)成大文件。
Schema 演进: 当上游数据库发生
ALTER TABLE等 Schema 变更时,DMS 会传递这些 DDL 事件。当前的 Lambda 代码没有处理这种情况,可能会导致写入 Parquet 文件时因 Schema 不一致而出错。一个完整的解决方案需要集成 Schema 注册中心(如 AWS Glue Schema Registry),在处理数据前验证或更新 Schema,确保数据管道的鲁棒性。无流量时的文件刷新: 当前设计依赖 Kinesis 的持续事件来触发 Lambda 执行和缓冲区检查。如果某个表在很长一段时间内没有变更,它的小批量数据可能永远“卡”在内存里。需要一个辅助的定时触发器(如 EventBridge Scheduler),每隔几分钟强制调用一次 Lambda(带有特定
force_flush=true的负载),以清空所有残留的缓冲区。