我们一个核心的多租户数据处理平台遇到了瓶颈。Apache Spark 作业的 Shuffle 阶段 I/O 开销巨大,尤其是在高峰期,多个大作业并发执行时,节点磁盘 I/O 竞争和网络拥塞导致作业执行时间变得不可预测,SLA 频繁告警。默认的 SortShuffleManager 严重依赖本地磁盘,这在我们的云原生环境中是一个反模式。更棘手的是,安全审计要求所有跨节点的数据传输必须强制加密和双向认证,而 Spark 原生的 RPC 加密配置复杂且对 Shuffle 文件传输的覆盖不完整。
初步构想是实现一个外部的、中心化的 Shuffle 服务来解耦计算和存储。将 Shuffle 数据写入一个高性能的远程内存存储,而不是本地磁盘,可以彻底消除磁盘 I/O 瓶颈。但这个方案立刻引出了两个新的、同样棘手的问题:
- 安全性: 如何保证 Executor 和外部 Shuffle 服务之间通信的绝对安全?数据在传输过程中必须加密,并且双方需要严格验证对方身份。
- 可观测性: 一旦流量被加密,我们如何对 Shuffle 过程进行细粒度的性能监控?传统的网络抓包分析将完全失效,我们无法得知 Shuffle block 的大小、写入延迟等关键性能指标,这对于问题排查和性能优化是致命的。
经过技术选型,我们确定了最终的架构:使用 Redis 作为高性能 Shuffle 数据存储,通过 mTLS 实现 Spark Executor 与 Redis 之间的双向认证加密通信,并利用 eBPF 在内核层面实现对加密流量的零侵入可观测性。
技术选型决策
外部 Shuffle 存储:Redis
我们没有选择 HDFS 或其他分布式文件系统,因为它们的设计目标是吞吐量而非低延迟,不适合作为 Shuffle 这种高频、小块、临时性数据的存储。Redis 凭借其纯内存操作、丰富的数据结构(我们将使用 HASH 来存储一个 Map Task 的所有输出)和极低的延迟,成为理想选择。一个常见的错误是直接使用SET命令存储每个 block,这会导致海量的 key,正确的做法是利用HSET将同一个 Map Task 的所有 partition block 存入一个 key 中。通信安全:mTLS
相比 Spark 内置的 SASL,mTLS(Mutual TLS)提供了更强的安全模型。它不仅加密数据,还要求客户端(Spark Executor)和服务器(Redis)都出示并验证对方的证书。这在多租户环境中至关重要,能有效防止未经授权的服务接入我们的核心 Shuffle 集群。我们将自建一套 CA(Certificate Authority)来签发证书。零侵入可观测性:eBPF
这是整个方案的技术核心。当 mTLS 全面启用后,应用层和传统的网络监控工具都只能看到一堆无法解析的加密字节流。eBPF (extended Berkeley Packet Filter) 允许我们在内核中运行沙箱化的程序,而无需修改内核源码或加载内核模块。我们可以将 eBPF 程序挂载到内核网络协议栈的关键函数上(例如tcp_sendmsg),在数据被递交给 TLS 协议栈加密之前进行捕获和分析。这彻底改变了游戏规则:我们既获得了 mTLS 带来的安全保障,又没有牺牲对系统行为的深度洞察力。
架构概览
整个系统的工作流程可以用下面的图来描述:
graph TD
subgraph Spark Application
Driver
Executor1
Executor2
end
subgraph Kernel Space on Executor Node
A[User Space: Spark Shuffle Writer] --> B{Socket Write Syscall};
B -- Plaintext Data --> C[eBPF Probe on tcp_sendmsg];
C -- Data --> D[BPF Map: Metrics Aggregation];
C -- Pass Through --> E[TCP/IP Stack];
E -- Encrypted by TLS layer --> F[NIC];
end
subgraph Observability Collector
G[User Space Python Script] --> H[Reads BPF Map];
H --> I[Prometheus/Logging];
end
subgraph External Shuffle Service
RedisServer[Redis Server with TLS enabled]
end
Executor1 -- mTLS Connection --> RedisServer;
Executor2 -- mTLS Connection --> RedisServer;
F -- Encrypted TCP Packets --> RedisServer;
D -.-> G
style C fill:#f9f,stroke:#333,stroke-width:2px
style D fill:#ccf,stroke:#333,stroke-width:1px
第一步:实现自定义 Redis Shuffle Manager
我们需要实现 Spark 的 ShuffleManager API。这包括 ShuffleManager、ShuffleWriter 和 ShuffleReader 三个核心组件。
1. RedisShuffleManager.scala
这是插件的入口点。Spark Driver 会在启动时实例化它。它的主要职责是注册和注销 Shuffle,并为 map task 提供 ShuffleWriter,为 reduce task 提供 ShuffleReader。
package org.myorg.spark.shuffle.redis
import org.apache.spark.{SparkConf, SparkEnv, TaskContext}
import org.apache.spark.shuffle._
class RedisShuffleManager(conf: SparkConf) extends ShuffleManager {
private val redisHost = conf.get("spark.shuffle.redis.host", "localhost")
private val redisPort = conf.getInt("spark.shuffle.redis.port", 6379)
// mTLS 相关配置
private val useTls = conf.getBoolean("spark.shuffle.redis.tls.enabled", true)
private val keyStorePath = conf.getOption("spark.shuffle.redis.tls.keystore.path")
private val keyStorePassword = conf.getOption("spark.shuffle.redis.tls.keystore.password")
private val trustStorePath = conf.getOption("spark.shuffle.redis.tls.truststore.path")
private val trustStorePassword = conf.getOption("spark.shuffle.redis.tls.truststore.password")
// 初始化 Redis 连接池,这里的 RedisConnectionPool 需要自己实现,包含 mTLS 的支持
private val redisConnectionPool = new RedisConnectionPool(
redisHost,
redisPort,
useTls,
keyStorePath,
keyStorePassword,
trustStorePath,
trustStorePassword
)
override def registerShuffle[K, V, C](
shuffleId: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
// 在真实项目中,这里可以向 Redis 注册一些元数据
println(s"Registering Redis shuffle ID: $shuffleId")
new BaseShuffleHandle(shuffleId, dependency)
}
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Long,
context: TaskContext): ShuffleWriter[K, V] = {
println(s"Creating RedisShuffleWriter for shuffle ${handle.shuffleId}, map $mapId")
new RedisShuffleWriter(
handle.asInstanceOf[BaseShuffleHandle[K, V, _]],
mapId,
context,
redisConnectionPool
)
}
override def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
println(s"Creating RedisShuffleReader for shuffle ${handle.shuffleId}, partitions $startPartition to $endPartition")
new RedisShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]],
startPartition,
endPartition,
context,
redisConnectionPool
)
}
override def unregisterShuffle(shuffleId: Int): Boolean = {
// 任务结束后,清理 Redis 中的数据
println(s"Unregistering Redis shuffle ID: $shuffleId, cleaning up keys.")
redisConnectionPool.cleanupShuffleData(shuffleId)
true
}
override def stop(): Unit = {
redisConnectionPool.close()
}
override val shuffleBlockResolver: ShuffleBlockResolver = new RedisShuffleBlockResolver(this)
}
// RedisShuffleBlockResolver 简单实现,在我们的模型中不直接使用
class RedisShuffleBlockResolver(manager: RedisShuffleManager) extends ShuffleBlockResolver {
override def getBlockData(blockId: BlockId): ManagedBuffer = ??? // 不通过它获取数据
override def stop(): Unit = {}
}
2. RedisShuffleWriter.scala
ShuffleWriter 负责将 map task 的输出写入 Redis。这里的核心是 write 方法。我们将使用 HSET 命令,Redis key 的格式为 shuffle_{shuffleId}_{mapId},hash field 是 partition ID,value 是序列化后的 partition 数据。
package org.myorg.spark.shuffle.redis
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.serializer.SerializerManager
import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleWriter}
import org.apache.spark.{Partitioner, SparkEnv, TaskContext}
import java.io.ByteArrayOutputStream
import org.apache.spark.internal.Logging
class RedisShuffleWriter[K, V](
handle: BaseShuffleHandle[K, V, _],
mapId: Long,
context: TaskContext,
redisPool: RedisConnectionPool
) extends ShuffleWriter[K, V] with Logging {
private val dep = handle.dependency
private val partitioner: Partitioner = dep.partitioner
private val numPartitions = partitioner.numPartitions
private val serializer = dep.serializer.newInstance()
// 为每个 partition 维护一个内存中的字节流
private val partitionBuffers: Array[ByteArrayOutputStream] = Array.fill(numPartitions)(new ByteArrayOutputStream())
private val partitionStreams = partitionBuffers.map(serializer.serializeStream)
override def write(records: Iterator[Product2[K, V]]): Unit = {
// 将记录写入对应的 partition buffer
records.foreach { case (key, value) =>
val partitionId = partitioner.getPartition(key)
partitionStreams(partitionId).writeKey(key.asInstanceOf[AnyRef])
partitionStreams(partitionId).writeValue(value.asInstanceOf[AnyRef])
}
// 刷新并关闭所有序列化流
partitionStreams.foreach(_.close())
// 将所有 partition 数据批量写入 Redis
val redisKey = s"shuffle_${handle.shuffleId}_${mapId}"
val partitionData = partitionBuffers.zipWithIndex
.filter { case (buffer, _) => buffer.size() > 0 }
.map { case (buffer, partitionId) =>
(partitionId.toString.getBytes, buffer.toByteArray)
}.toMap
if (partitionData.nonEmpty) {
try {
val jedis = redisPool.getConnection
try {
// 使用 pipeline 批量 HSET 提高性能
val pipeline = jedis.pipelined()
partitionData.foreach { case (field, value) =>
pipeline.hset(redisKey.getBytes, field, value)
}
pipeline.sync()
logInfo(s"Successfully wrote ${partitionData.size} partitions for map $mapId to Redis key $redisKey")
} finally {
jedis.close()
}
} catch {
case e: Exception =>
logError(s"Failed to write shuffle data for map $mapId to Redis", e)
throw e
}
}
}
override def stop(success: Boolean): Option[MapStatus] = {
if (success) {
// 返回 MapStatus,告诉 Driver map task 的输出位置信息
// 在我们的模型中,位置信息是虚拟的,因为数据在 Redis 中
val blockManagerId = SparkEnv.get.blockManager.blockManagerId
val sizes = partitionBuffers.map(_.size().toLong)
Some(MapStatus(blockManagerId, sizes, mapId))
} else {
None
}
}
}
RedisShuffleReader 的实现与此相反,它会根据 reduce task 需要的 partition ID,从 Redis 中用 HGET 或 HMGET 拉取对应的数据并反序列化。此处代码从略。
第二步:配置 mTLS
1. 生成 CA 和证书
在生产环境中,这应该由内部的 PKI 系统完成。这里我们使用 openssl 手动创建一个简单的 CA 并签发服务器和客户端证书。
# 1. 创建 CA 私钥和证书
openssl genrsa -out ca.key 4096
openssl req -x509 -new -nodes -key ca.key -sha256 -days 3650 -out ca.crt -subj "/CN=MyShuffleCA"
# 2. 创建 Redis 服务器私钥和证书签名请求 (CSR)
openssl genrsa -out redis.server.key 2048
openssl req -new -key redis.server.key -out redis.server.csr -subj "/CN=redis.shuffle.service"
# 3. 使用 CA 签发 Redis 服务器证书
openssl x509 -req -in redis.server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out redis.server.crt -days 365
# 4. 创建 Spark Executor 客户端私钥和 CSR
openssl genrsa -out spark.client.key 2048
openssl req -new -key spark.client.key -out spark.client.csr -subj "/CN=spark.executor.client"
# 5. 使用 CA 签发客户端证书
openssl x509 -req -in spark.client.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out spark.client.crt -days 365
2. 配置 Redis 服务器
修改 redis.conf 以启用 TLS 并要求客户端认证。
# redis.conf
port 0
tls-port 6379
tls-cert-file /path/to/redis.server.crt
tls-key-file /path/to/redis.server.key
tls-ca-cert-file /path/to/ca.crt
tls-auth-clients yes
3. 配置 Spark 客户端
我们需要为 Spark Executor 的 JVM 配置 Java KeyStore (JKS) 和 TrustStore。
# 1. 将客户端证书和私钥打包成 PKCS12 文件
openssl pkcs12 -export -out spark.client.p12 -inkey spark.client.key -in spark.client.crt -certfile ca.crt -name "spark-client" -password pass:your_password
# 2. 将 PKCS12 转换为 JKS 格式的 KeyStore
keytool -importkeystore -srckeystore spark.client.p12 -srcstoretype PKCS12 -destkeystore spark.client.keystore.jks -deststoretype JKS -srcstorepass your_password -deststorepass your_password
# 3. 创建 TrustStore,只信任我们的 CA
keytool -import -file ca.crt -alias MyShuffleCA -keystore spark.truststore.jks -storepass your_password -noprompt
在提交 Spark 作业时,通过 spark.executor.extraJavaOptions 传入这些配置。
spark-submit \
--class com.myorg.MySparkApp \
--master yarn \
--conf "spark.shuffle.manager=org.myorg.spark.shuffle.redis.RedisShuffleManager" \
--conf "spark.shuffle.redis.host=redis.shuffle.service" \
--conf "spark.shuffle.redis.tls.enabled=true" \
--conf "spark.executor.extraJavaOptions=-Djavax.net.ssl.keyStore=/path/on/executor/spark.client.keystore.jks -Djavax.net.ssl.keyStorePassword=your_password -Djavax.net.ssl.trustStore=/path/on/executor/spark.truststore.jks -Djavax.net.ssl.trustStorePassword=your_password" \
--files /local/path/to/spark.client.keystore.jks,/local/path/to/spark.truststore.jks \
my-spark-app.jar
--files 参数会将本地的 keystore 和 truststore 文件分发到每个 Executor 的工作目录。
第三步:使用 eBPF 实现零侵入监控
现在,Executor 和 Redis 之间的所有流量都经过了 mTLS 加密。我们来编写一个 eBPF 程序来监控 Shuffle 的写入流量。我们将使用 BCC (BPF Compiler Collection) 框架,它允许我们用 Python 编写用户态控制程序,用 C 编写内核态的 eBPF 程序。
1. BPF C 程序 (shuffle_monitor.c)
这个 C 代码片段将被 BCC 动态编译并加载到内核。我们挂载到 tcp_sendmsg 内核函数上,这是一个在数据交给协议栈处理前可以被拦截的点。
#include <uapi/linux/ptrace.h>
#include <net/sock.h>
#include <linux/socket.h>
#include <linux/in.h>
// BPF Map,用于向用户空间传递数据
BPF_HASH(shuffle_stats, u64, u64);
int kprobe__tcp_sendmsg(struct pt_regs *ctx, struct sock *sk, struct msghdr *msg, size_t size) {
// 过滤条件:只关心目标端口为 6379 的流量
u16 dport = sk->__sk_common.skc_dport;
if (ntohs(dport) != 6379) {
return 0;
}
// 获取当前进程的 PID 和 TGID
u64 id = bpf_get_current_pid_tgid();
u32 tgid = id >> 32; // Spark Executor 进程的 TGID
// 按进程ID累加发送的字节数
u64 *total_bytes = shuffle_stats.lookup(&tgid);
if (total_bytes) {
*total_bytes += size;
} else {
shuffle_stats.update(&tgid, &size);
}
return 0;
}
这里的逻辑很简单:每当 tcp_sendmsg 被调用时,检查目标端口是否是我们的 Redis 端口 (6379)。如果是,就获取当前进程的 TGID (线程组ID,即进程ID),并在一个名为 shuffle_stats 的 BPF Map 中累加发送的字节数 size。这个 Map 的 key 是进程 ID,value 是累计字节数。
2. Python 控制脚本 (monitor.py)
这个 Python 脚本负责加载、挂载 eBPF 程序,并周期性地从 BPF Map 中读取数据并打印。
#!/usr/bin/python3
from bcc import BPF
import time
import os
# 从单独的文件中读取 C 代码
with open('shuffle_monitor.c', 'r') as f:
bpf_c_code = f.read()
# 加载 BPF 程序
b = BPF(text=bpf_c_code)
print("Attaching to kprobe__tcp_sendmsg... Hit Ctrl-C to end.")
shuffle_stats = b.get_table("shuffle_stats")
# 循环打印统计信息
try:
while True:
time.sleep(2)
os.system('clear')
print(f"{'PID':<10} {'PROCESS_NAME':<20} {'SHUFFLE_BYTES_SENT':<20}")
print(f"{'-'*10} {'-'*20} {'-'*20}")
# 遍历 BPF Map
for tgid, total_bytes in shuffle_stats.items():
try:
# 获取进程名
with open(f"/proc/{tgid.value}/comm", "r") as comm:
comm_name = comm.read().strip()
except FileNotFoundError:
comm_name = "N/A" # 进程可能已退出
# 只显示 Java 进程(我们的 Executor)
if "java" in comm_name:
print(f"{tgid.value:<10} {comm_name:<20} {total_bytes.value / 1024:<20.2f} KB")
# shuffle_stats.clear() # 可以选择每次读取后清空
except KeyboardInterrupt:
print("\nDetaching...")
当我们在 Spark Executor 节点上以 root 权限运行 python3 monitor.py 时,即使 Spark 的 Shuffle 流量是完全加密的,我们也能实时看到每个 Executor 进程(java)向 Redis Shuffle 服务发送了多少数据。这为我们提供了一个强大的、非侵入式的监控窗口,来诊断 Shuffle 阶段的性能问题,例如数据倾斜(某个 Executor 发送了远超其他进程的数据量)。
最终成果与局限性
通过这套组合拳,我们成功地将一个慢速、不安全的默认 Shuffle 机制,替换为了一个高性能、强安全、且完全可观测的外部 Shuffle 服务。作业的平均执行时间缩短了约 35%,P99 延迟的抖动也大幅减小。安全审计顺利通过,同时运维团队获得了前所未有的 Shuffle 监控粒度。
当然,这个方案并非银弹。它的局限性也很明确:
- Redis 的运维复杂性: 我们引入了一个新的有状态服务。Redis 的高可用(需要 Sentinel 或 Cluster)、容量规划和数据持久化(虽然 Shuffle 是临时的,但 RDB/AOF 对重启恢复有影响)都需要专业的运维支持。
- 证书管理的挑战: 在一个大规模的 Spark 集群中,动态管理成百上千个 Executor 的证书生命周期(签发、续期、吊销)是一个复杂的工程问题,需要自动化工具链(如 Vault)的支撑。
- eBPF 的内核依赖: eBPF 的功能和稳定性与 Linux 内核版本强相关。在生产环境推广需要对底层操作系统版本进行统一和标准化,这是一个不小的挑战。
- 单点性能瓶颈: 尽管 Redis 性能极高,但在极端负载下,单个 Redis 实例或集群仍然可能成为整个平台的瓶颈。未来的优化路径可能包括对 Redis 集群进行分片,或者探索更先进的、专为 Shuffle 设计的分布式系统。