一个生产级的实时风控系统必须在毫秒间回答两个本质上完全不同的问题:一是“最近发生了什么?”,二是“这些参与者之间有什么关系?”。前者是时序问题,关心事件的顺序、频率和时间窗口内的聚合特征;后者是图谱问题,关心实体间的连接、路径和社区结构。试图用单一数据库模型来同时高效解决这两个问题,往往会在系统复杂性或性能上做出难以接受的妥协。
最初的架构构想是,能否用单一数据库解决。
方案A,仅使用TimescaleDB。我们可以将交易事件、登录日志等高频数据高效地存入其Hypertable。TimescaleDB在时间窗口查询、连续聚合等方面表现卓越。但当我们尝试分析欺诈团伙时,问题就暴露了。例如,要找出“与A账户通过两次交易以内关联,且共享同一设备ID的所有账户”,在SQL中需要复杂的WITH RECURSIVE公用表表达式(CTE)。在数据量巨大、关系深度不定的情况下,这种查询的性能会急剧下降,无法满足实时风控对延迟的要求。
方案B,仅使用ArangoDB。ArangoDB作为多模型数据库,理论上可以同时存储事件(作为文档)和实体关系(作为图)。我们可以将每笔交易建模成一个文档,并创建用户、设备、银行卡之间的边。这在分析关系网络时非常强大。但ArangoDB并非为海量时序数据的高吞吐写入和高效时间范围查询而设计的。其存储引擎、分片策略和索引机制不像TimescaleDB那样针对时间维度做了极致优化。面对每日数十亿级别的交易事件,单纯依赖ArangoDB进行时序分析,很快会遇到写入瓶颈和查询性能问题,尤其是在高基数场景下。
最终的决策是放弃“银弹”思维,采用混合数据模型架构。让专业的技术做专业的事:
- TimescaleDB: 作为事件流的权威记录系统。所有不可变的原子事件(交易、登录、改密等)以极高的吞吞吐量写入。它专注于回答“最近发生了什么”的问题。
- ArangoDB: 作为实体关系图谱的动态视图。它维护一个关于用户、设备、IP地址、银行账户等实体及其关系的最新快照。它专注于回答“他们之间有什么关系”的问题。
- Kotlin 服务层: 作为整个系统的大脑和粘合剂。它负责消费上游事件流(例如来自Kafka),将数据双写到TimescaleDB和ArangoDB,并提供统一的查询API,在内部编排对两个数据库的查询,最后融合数据执行风控规则。
这种架构的挑战从数据持久层转移到了应用服务层。服务层必须处理好双写的数据一致性、查询融合的性能以及整体系统的可维护性。
graph TD
subgraph "事件源 (e.g., Kafka)"
A[Transaction Event Topic]
end
subgraph "Kotlin Risk Engine Service"
B(Event Consumer)
C{Data Orchestrator}
D[Risk Rule Engine]
E(Unified Query API)
B --> C
C -- Immutable Event --> F[TimescaleDB Client]
C -- Entity/Edge Update --> G[ArangoDB Client]
E -->|Time-window Query| F
E -->|Graph Traversal| G
E --> D
end
subgraph "持久层"
F_DB[(TimescaleDB
Event Log)]
G_DB[(ArangoDB
Entity Graph)]
end
A --> B
F --> F_DB
G --> G_DB
style F_DB fill:#bde0fe,stroke:#333,stroke-width:2px
style G_DB fill:#ffc8dd,stroke:#333,stroke-width:2px
核心实现:Kotlin服务层
我们的核心是一个基于Ktor或Spring Boot的Kotlin应用。这里以Ktor为例,展示其轻量级和协程友好的特性。
1. 项目依赖与配置
首先是build.gradle.kts中的关键依赖。我们需要TimescaleDB的JDBC驱动、HikariCP连接池、ArangoDB的官方Java驱动以及Kotlin协程库。
// build.gradle.kts
plugins {
kotlin("jvm") version "1.9.20"
id("io.ktor.plugin") version "2.3.6"
// ... other plugins
}
dependencies {
// Ktor Core
implementation("io.ktor:ktor-server-core-jvm")
implementation("io.ktor:ktor-server-netty-jvm")
// Serialization
implementation("io.ktor:ktor-server-content-negotiation-jvm")
implementation("io.ktor:ktor-serialization-kotlinx-json-jvm")
// Database Clients
implementation("org.postgresql:postgresql:42.7.0") // TimescaleDB is compatible with PostgreSQL driver
implementation("com.zaxxer:HikariCP:5.1.0")
implementation("com.arangodb:arangodb-java-driver:7.4.0")
// Logging
implementation("ch.qos.logback:logback-classic:1.4.11")
// Coroutines
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")
}
配置文件 application.conf (HOCON格式) 负责管理数据库连接信息。在真实项目中,这些应该通过环境变量或配置中心注入。
# application.conf
ktor {
deployment {
port = 8080
}
application {
modules = [ com.risk.engine.ApplicationKt.module ]
}
}
database {
timescaledb {
jdbcUrl = "jdbc:postgresql://localhost:5432/risk_db"
username = "user"
password = "password"
poolSize = 10
}
arangodb {
host = "localhost"
port = 8529
user = "root"
password = "password"
database = "risk_graph"
}
}
2. 数据模型定义
清晰的数据模型是第一步。我们使用Kotlin的data class,并利用kotlinx.serialization进行序列化。
// com/risk/engine/models/DataModels.kt
package com.risk.engine.models
import kotlinx.serialization.Serializable
import java.time.Instant
// 事件模型,用于写入TimescaleDB
@Serializable
data class TransactionEvent(
val transactionId: String,
val userId: String,
val sourceAccountId: String,
val targetAccountId: String,
val amount: Double,
val currency: String,
val deviceId: String,
val ipAddress: String,
val timestamp: Instant
)
// 图谱中的实体模型 (Vertex),用于ArangoDB
@Serializable
data class AccountNode(
val _key: String, // accountId
val type: String = "account",
var totalAmountSent: Double = 0.0,
var lastSeen: Instant? = null
)
@Serializable
data class DeviceNode(
val _key: String, // deviceId
val type: String = "device"
)
// 图谱中的关系模型 (Edge),用于ArangoDB
@Serializable
data class UsedDeviceEdge(
val _from: String, // "accounts/{userId}"
val _to: String, // "devices/{deviceId}"
var transactionCount: Int = 1,
val lastTransactionAt: Instant
)
注意_key, _from, _to是ArangoDB的保留字段,用于定义文档的键和边的起点/终点。
3. 数据库客户端封装
为了代码整洁和可维护性,我们将数据库客户端的初始化和操作封装起来。
TimescaleDB Client:
// com/risk/engine/db/TimescaleDBClient.kt
package com.risk.engine.db
import com.risk.engine.models.TransactionEvent
import com.zaxxer.hikari.HikariConfig
import com.zaxxer.hikari.HikariDataSource
import io.ktor.server.config.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import java.sql.Timestamp
class TimescaleDBClient(config: ApplicationConfig) {
private val dataSource: HikariDataSource
init {
val dbConfig = config.config("database.timescaledb")
val hikariConfig = HikariConfig().apply {
jdbcUrl = dbConfig.property("jdbcUrl").getString()
username = dbConfig.property("username").getString()
password = dbConfig.property("password").getString()
maximumPoolSize = dbConfig.property("poolSize").getString().toInt()
driverClassName = "org.postgresql.Driver"
addDataSourceProperty("cachePrepStmts", "true")
addDataSourceProperty("prepStmtCacheSize", "250")
addDataSourceProperty("prepStmtCacheSqlLimit", "2048")
}
dataSource = HikariDataSource(hikariConfig)
}
// 关键写入操作:将事件持久化
suspend fun recordTransaction(event: TransactionEvent) = withContext(Dispatchers.IO) {
val sql = """
INSERT INTO transaction_events (transaction_id, user_id, source_account_id, target_account_id, amount, currency, device_id, ip_address, event_time)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?);
""".trimIndent()
try {
dataSource.connection.use { conn ->
conn.prepareStatement(sql).use { stmt ->
stmt.setString(1, event.transactionId)
stmt.setString(2, event.userId)
stmt.setString(3, event.sourceAccountId)
stmt.setString(4, event.targetAccountId)
stmt.setDouble(5, event.amount)
stmt.setString(6, event.currency)
stmt.setString(7, event.deviceId)
stmt.setString(8, event.ipAddress)
stmt.setTimestamp(9, Timestamp.from(event.timestamp))
stmt.executeUpdate()
}
}
} catch (e: Exception) {
// 在生产环境中,这里应该有更健壮的日志和可能的重试/死信队列逻辑
println("Error recording transaction to TimescaleDB: ${e.message}")
throw e
}
}
// 关键查询操作:获取时间窗口内的事件
suspend fun getTransactionsInLastHour(accountId: String): Int = withContext(Dispatchers.IO) {
val sql = "SELECT count(*) FROM transaction_events WHERE (source_account_id = ? OR target_account_id = ?) AND event_time > NOW() - INTERVAL '1 hour';"
dataSource.connection.use { conn ->
conn.prepareStatement(sql).use { stmt ->
stmt.setString(1, accountId)
stmt.setString(2, accountId)
stmt.executeQuery().use { rs ->
if (rs.next()) rs.getInt(1) else 0
}
}
}
}
}
// SQL for table creation
/*
CREATE TABLE transaction_events (
transaction_id VARCHAR(255) PRIMARY KEY,
user_id VARCHAR(255),
source_account_id VARCHAR(255),
target_account_id VARCHAR(255),
amount DOUBLE PRECISION,
currency VARCHAR(10),
device_id VARCHAR(255),
ip_address VARCHAR(45),
event_time TIMESTAMPTZ NOT NULL
);
SELECT create_hypertable('transaction_events', 'event_time');
*/
这里的withContext(Dispatchers.IO)是关键,它确保了阻塞的JDBC调用不会阻塞主应用线程,充分利用了Kotlin协程的优势。
ArangoDB Client:
// com/risk/engine/db/ArangoDBClient.kt
package com.risk.engine.db
import com.arangodb.ArangoDB
import com.arangodb.async.ArangoDBAsync
import com.arangodb.async.ArangoDatabaseAsync
import com.arangodb.model.AqlQueryOptions
import com.arangodb.model.DocumentCreateOptions
import com.arangodb.model.DocumentUpdateOptions
import com.arangodb.model.UpsertOptions
import com.risk.engine.models.AccountNode
import com.risk.engine.models.DeviceNode
import com.risk.engine.models.UsedDeviceEdge
import io.ktor.server.config.*
import kotlinx.coroutines.future.await
import java.time.Instant
class ArangoDBClient(config: ApplicationConfig) {
private val db: ArangoDatabaseAsync
init {
val dbConfig = config.config("database.arangodb")
val arangoDB: ArangoDBAsync = ArangoDB.Builder()
.host(dbConfig.property("host").getString(), dbConfig.property("port").getString().toInt())
.user(dbConfig.property("user").getString())
.password(dbConfig.property("password").getString())
.build().async()
db = arangoDB.db(dbConfig.property("database").getString())
}
// 关键写入操作:更新图谱
suspend fun updateGraphWithTransaction(event: com.risk.engine.models.TransactionEvent) {
// 使用UPSERT确保实体(账户,设备)存在
val sourceAccount = AccountNode(_key = event.sourceAccountId)
val device = DeviceNode(_key = event.deviceId)
// UPSERT操作是原子的,非常适合这种“如果不存在则创建,否则更新”的场景
val accounts = db.collection("accounts")
val devices = db.collection("devices")
accounts.upsertDocument(sourceAccount, UpsertOptions().insert(sourceAccount).update(mapOf("lastSeen" to event.timestamp))).await()
devices.upsertDocument(device, UpsertOptions().insert(device).update(mapOf())).await() // Device node has no dynamic fields to update here
// 创建或更新账户和设备之间的关系
val edge = UsedDeviceEdge(
_from = "accounts/${event.sourceAccountId}",
_to = "devices/${event.deviceId}",
lastTransactionAt = event.timestamp
)
// 这里的逻辑稍微复杂:如果边存在,则增加计数器;如果不存在,则创建。
// 这需要一个AQL查询来保证原子性。
val aql = """
LET edgeKey = CONCAT_WS(':', @from, @to)
UPSERT { _key: edgeKey }
INSERT {
_key: edgeKey,
_from: @from,
_to: @to,
transactionCount: 1,
lastTransactionAt: @lastTransactionAt
}
UPDATE {
transactionCount: OLD.transactionCount + 1,
lastTransactionAt: @lastTransactionAt
}
IN used_device
""".trimIndent()
val bindVars = mapOf(
"from" to "accounts/${event.sourceAccountId}",
"to" to "devices/${event.deviceId}",
"lastTransactionAt" to event.timestamp.toString()
)
db.query(aql, bindVars, AqlQueryOptions(), Void::class.java).await()
}
// 关键查询操作:进行图遍历
suspend fun findFraudRing(accountId: String, maxDepth: Int = 3): List<String> {
val aql = """
WITH accounts, devices
FOR v, e, p IN 1..@maxDepth
ANY @startNode
GRAPH 'FinancialGraph'
FILTER IS_SAME_COLLECTION('accounts', v)
RETURN DISTINCT v._key
""".trimIndent()
// 'FinancialGraph' 是预先在ArangoDB中定义的图,包含了accounts和devices作为顶点集合,used_device作为边集合
val bindVars = mapOf(
"maxDepth" to maxDepth,
"startNode" to "accounts/$accountId"
)
val cursor = db.query(aql, bindVars, AqlQueryOptions(), String::class.java).await()
return cursor.asListRemaining()
}
}
ArangoDB的异步驱动与Kotlin协程能很好地结合,通过.await()扩展函数将CompletableFuture转换为挂起函数。这里的updateGraphWithTransaction方法展示了一个常见的坑:简单的upsertDocument无法处理像transactionCount这样的累加字段,必须使用AQL查询来实现原子性的“创建或更新并累加”逻辑。
4. 事件处理与风险分析服务
现在我们可以将两者结合起来,构建核心业务逻辑。
// com/risk/engine/service/RiskAnalysisService.kt
package com.risk.engine.service
import com.risk.engine.db.ArangoDBClient
import com.risk.engine.db.TimescaleDBClient
import com.risk.engine.models.TransactionEvent
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
class RiskAnalysisService(
private val timescale: TimescaleDBClient,
private val arango: ArangoDBClient
) {
// 1. 事件摄取入口
suspend fun processTransactionEvent(event: TransactionEvent) {
// 双写操作。在生产环境中,这里需要一个更健壮的机制来处理失败。
// 例如,先写入Kafka的另一个topic,由独立的worker来保证最终一致性。
// 为简化,这里假设同步写入。
try {
timescale.recordTransaction(event)
arango.updateGraphWithTransaction(event)
} catch (e: Exception) {
// 关键的错误处理点
// 如果arango写入失败,timescale的记录是否回滚?
// 我们的架构选择不回滚。TimescaleDB是事实记录,ArangoDB是派生视图。
// 应该有一个后台修复任务(reconciliation job)来保证最终一致性。
println("Failed to process event ${event.transactionId}, data may be inconsistent: ${e.message}")
}
}
// 2. 风险评估入口
suspend fun assessRisk(event: TransactionEvent): RiskResult = coroutineScope {
// 并行查询TimescaleDB和ArangoDB以降低延迟
val recentTxCountDeferred = async { timescale.getTransactionsInLastHour(event.sourceAccountId) }
val relatedFraudAccountsDeferred = async { arango.findFraudRing(event.sourceAccountId) }
val recentTxCount = recentTxCountDeferred.await()
val relatedFraudAccounts = relatedFraudAccountsDeferred.await()
// 3. 应用风控规则
val score = evaluateRules(recentTxCount, relatedFraudAccounts, event)
when {
score > 90 -> RiskResult(RiskLevel.HIGH, "High transaction frequency and connection to known fraud ring.")
score > 60 -> RiskResult(RiskLevel.MEDIUM, "Unusual transaction frequency.")
else -> RiskResult(RiskLevel.LOW, "OK")
}
}
private fun evaluateRules(txCount: Int, fraudRing: List<String>, event: TransactionEvent): Int {
var score = 0
// 规则1: 短时间内交易频率过高
if (txCount > 10) {
score += 50
}
// 规则2: 交易金额巨大
if (event.amount > 10000) {
score += 20
}
// 规则3: 与已知欺诈账户有关联
// (假设我们有一个已知的欺诈账户列表)
val knownFraudsters = setOf("fraud_account_1", "fraud_account_2")
if (fraudRing.any { it in knownFraudsters }) {
score += 50
}
return score
}
}
data class RiskResult(val level: RiskLevel, val reason: String)
enum class RiskLevel { LOW, MEDIUM, HIGH }
assessRisk函数是这个架构的核心价值体现。它使用coroutineScope和async来并行执行对两个数据库的查询,这是性能优化的关键。获取时序特征(recentTxCount)和图谱特征(relatedFraudAccounts)后,在内存中进行规则评估。
架构的局限性与未来展望
此架构的优势在于发挥了每个数据库的长处,实现了低延迟的复杂风控查询。然而,它并非没有成本。
首要的局限性在于数据一致性。在processTransactionEvent中,对TimescaleDB和ArangoDB的写入并非原子操作。如果一个成功一个失败,数据就会出现短暂不一致。对于风控场景,这种最终一致性通常可以接受,因为ArangoDB的图谱本身就是一个不断演化的近似视图。但解决这个问题需要引入更复杂的机制,比如使用Debezium进行CDC(Change Data Capture)从TimescaleDB的WAL日志中捕获事件来更新ArangoDB,从而实现更可靠的数据同步。
其次是运维复杂性。团队需要同时维护PostgreSQL/TimescaleDB和ArangoDB两个集群,这带来了额外的监控、备份、升级和专业知识成本。
未来的优化路径可能包括:
- 规则引擎外置: 将
evaluateRules中的硬编码逻辑替换为一个可动态配置的外部规则引擎(如Drools),使风控策略的调整无需重新部署服务。 - 异步化数据流: 将服务内的双写逻辑改为“写入TimescaleDB -> 发送消息到Kafka”,由一个独立的消费者服务负责将消息异步更新到ArangoDB。这能提高主服务的写入性能和韧性。
- 查询联邦: 探索如Presto/Trino这样的查询联邦引擎,看是否能进一步简化Kotlin服务层中的数据融合逻辑,尽管这可能会引入新的性能开销。
这个架构不是通用的解决方案,而是针对特定问题——即同时需要高性能时序分析和复杂图谱遍历——的一种务实且高效的权衡。