多租户SaaS平台对数据检索的核心要求是双重的:必须提供精准的关键词匹配能力,同时也要能理解用户意图,进行语义层面的相似度搜索。这一切都构建在严格、不可逾越的租户数据隔离之上。当技术选型落到实处,一个直接的问题摆在面前:是选择一个全能选手,还是组合多个专家系统?
架构决策的十字路口
在一个为企业提供私有知识库服务的项目中,我们面临这个典型的技术权衡。用户上传的文档需要支持两种查询模式:
- 关键词精确查询:例如,查找所有包含“2023年第三季度财务报告”的文档。
- 语义模糊查询:例如,查询“关于公司近期盈利能力的分析”,这可能涉及到财报、会议纪要等多种文档。
同时,系统的多租户特性要求tenant_A的任何查询,无论如何都不能触碰到tenant_B的数据。
方案A:Elasticsearch独撑全局
Elasticsearch自8.x版本后引入了dense_vector字段类型和对HNSW(Hierarchical Navigable Small World)算法的支持,使其具备了向量检索的能力。
- 优势:
- 技术栈统一: 运维成本相对较低,只需维护一个Elasticsearch集群。
- 生态成熟: 强大的布尔查询、聚合分析能力可以与向量检索无缝结合。数据隔离可以通过在文档中添加
tenant_id字段,并在所有查询中强制使用filter子句来实现。
- 劣势:
- 资源争抢: 在同一个集群中,重量级的全文索引和向量索引构建过程会相互竞争CPU、内存和I/O资源。在真实项目中,这可能导致查询性能不稳定。
- 向量检索的专业性: 尽管ES支持向量检索,但其在调优选项、索引构建效率、内存控制等方面,与专门的向量数据库相比,仍有差距。在高并发、低延迟的语义检索场景下,可能会成为性能瓶颈。
- 成本考量: 为了满足高性能的向量检索,ES节点需要大量内存,这会直接推高基础设施成本。
方案B:ChromaDB专注向量,Elasticsearch负责元数据
这个方案将职责进行拆分:
ChromaDB: 专门负责向量的存储和近似最近邻(ANN)搜索。它的设计初衷就是为了高效处理Embeddings。
Elasticsearch: 回归其最擅长的领域——全文检索、结构化数据过滤和聚合。所有文档的元数据(如
doc_id,tenant_id,title,upload_date等)都存储于此。优势:
- 专业分工: 每个组件都做自己最擅长的事,性能更可预测。ChromaDB的HNSW实现经过深度优化,能提供更低的查询延迟。
- 资源隔离: 两个系统可以独立部署和扩缩容。文本索引的峰值负载不会影响向量检索的稳定性,反之亦然。
- 架构清晰: 职责分离使得系统边界明确,便于维护和未来替换其中任一组件。
劣势:
- 架构复杂性增加: 引入了两个存储系统,意味着需要处理双写、数据同步、以及分布式系统固有的最终一致性问题。
- 查询流程复杂化: 一次混合查询需要与两个系统交互,这对服务端的查询逻辑设计提出了更高要求。
最终决策
我们选择了方案B。务实的工程经验告诉我们,试图用一个“万金油”工具解决所有问题,往往会在系统规模扩大后,陷入各种性能和维护的泥潭。长期的稳定性和可扩展性,比短期的运维便利性更重要。该架构的核心在于设计一个健壮的服务层,以优雅地编排对两个底层存储的调用,并对上层应用(包括我们的PWA客户端)屏蔽其复杂性。
核心架构与实现
我们的混合检索服务被设计为一个独立的微服务,它暴露统一的API接口给前端。整体架构如下:
graph TD
subgraph PWA Client
A[Service Worker] -- Caches API Responses --> B[React App]
end
B -- HTTPS Request --> C[API Gateway]
subgraph Backend Infrastructure
C -- gRPC --> D[Hybrid Search Service]
D -- Indexing --> E[Elasticsearch Cluster]
D -- Indexing --> F[ChromaDB Instance]
D -- Querying --> E
D -- Querying --> F
end
subgraph Data Processing
G[Document Processor] -- Generates Embeddings --> D
end
style PWA Client fill:#dae8fc,stroke:#333,stroke-width:2px
style Backend Infrastructure fill:#d5e8d4,stroke:#333,stroke-width:2px
1. 数据模型与租户隔离策略
数据隔离是架构的基石。
Elasticsearch索引设计:
我们创建一个名为documents_metadata的索引。每个文档代表一条元数据记录。
// Elasticsearch Mapping
{
"mappings": {
"properties": {
"doc_id": { "type": "keyword" },
"tenant_id": { "type": "keyword" },
"title": { "type": "text" },
"content_preview": { "type": "text" },
"created_at": { "type": "date" },
"tags": { "type": "keyword" }
}
}
}
这里的tenant_id是实现数据隔离的关键。所有对ES的查询都必须强制包含一个term filter,形如{"term": {"tenant_id": "CURRENT_TENANT_ID"}}。
ChromaDB集合设计:
我们选择使用一个全局的Collection,而不是为每个租户创建一个Collection。这是因为成千上万个租户会导致Collection数量爆炸,难以管理。我们在向量的metadata中嵌入tenant_id。
# ChromaDB Client Initialization
import chromadb
from chromadb.config import Settings
# 在生产环境中,应该使用持久化存储和远程服务器
client = chromadb.HttpClient(host='chromadb.server.host', port=8000, settings=Settings(anonymized_telemetry=False))
# 创建一个全局集合
# hnsw:space指定距离度量,l2是欧氏距离,适用于大多数Sentence Transformer模型
collection = client.get_or_create_collection(
name="global_document_vectors",
metadata={"hnsw:space": "l2"}
)
2. 索引服务实现
索引流程必须保证对两个系统的写入是事务性的,或至少是最终一致的。在我们的场景中,我们采用先写ES,再写ChromaDB的策略,并加入重试和失败日志记录机制。
# In hybrid_search_service/indexing.py
import logging
from sentence_transformers import SentenceTransformer
from elasticsearch import Elasticsearch, helpers
import uuid
# --- 配置 ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 生产级代码中,这些应从配置文件或环境变量中加载
ES_HOST = "http://elasticsearch.server.host:9200"
CHROMA_HOST = "chromadb.server.host"
CHROMA_PORT = 8000
EMBEDDING_MODEL = 'all-MiniLM-L6-v2' # 示例模型
# --- 客户端初始化 ---
try:
es_client = Elasticsearch(ES_HOST)
chroma_client = chromadb.HttpClient(host=CHROMA_HOST, port=CHROMA_PORT)
collection = chroma_client.get_collection(name="global_document_vectors")
model = SentenceTransformer(EMBEDDING_MODEL)
logger.info("Clients and model initialized successfully.")
except Exception as e:
logger.error(f"Failed to initialize clients or model: {e}")
# 服务启动失败,此处应有退出或告警逻辑
raise
def index_document(tenant_id: str, document_content: str, metadata: dict):
"""
索引一个新文档,包含元数据和向量
一个常见的错误是忽略部分失败的情况。
"""
if not all([tenant_id, document_content, metadata.get('title')]):
raise ValueError("tenant_id, document_content, and title are required.")
doc_id = str(uuid.uuid4())
metadata['doc_id'] = doc_id
metadata['tenant_id'] = tenant_id
try:
# 1. 生成向量
logger.info(f"Generating embedding for doc_id: {doc_id}")
embedding = model.encode(document_content).tolist()
# 2. 索引元数据到Elasticsearch
logger.info(f"Indexing metadata to ES for doc_id: {doc_id}")
es_client.index(index="documents_metadata", id=doc_id, document=metadata)
# 3. 索引向量到ChromaDB
# 这里的ID必须与ES中的doc_id一致,以便后续关联
logger.info(f"Indexing vector to ChromaDB for doc_id: {doc_id}")
collection.add(
embeddings=[embedding],
metadatas=[{"tenant_id": tenant_id, "doc_id": doc_id}],
ids=[doc_id]
)
logger.info(f"Successfully indexed document {doc_id} for tenant {tenant_id}")
return doc_id
except Exception as e:
logger.error(f"Indexing failed for a document of tenant {tenant_id}. Error: {e}")
# --- 生产级错误处理 ---
# 这里的坑在于,可能ES写入成功但ChromaDB失败。
# 策略1: 回滚。尝试从ES中删除该文档。
# 策略2: 记录到死信队列。由一个后台任务来补偿/清理。
# 我们选择策略2,因为它对主流程影响更小。
log_failed_indexing(doc_id, tenant_id, "ES_SUCCESS_CHROMA_FAIL")
# 清理已写入的ES数据
try:
es_client.delete(index="documents_metadata", id=doc_id, ignore=[404])
except Exception as del_e:
logger.error(f"Failed to rollback ES data for doc_id {doc_id}: {del_e}")
raise RuntimeError(f"Failed to index document for tenant {tenant_id}") from e
def log_failed_indexing(doc_id, tenant_id, reason):
# 实际项目中,这里会写入到Kafka, RabbitMQ或Redis Stream
with open("failed_indexing_log.txt", "a") as f:
f.write(f"{doc_id},{tenant_id},{reason}\n")
3. 混合检索服务实现
这是架构的核心。查询逻辑必须先通过Elasticsearch进行精确的元数据和租户过滤,然后用得到的候选文档ID集合去ChromaDB中进行更精准的向量搜索。这个两阶段查询是性能和准确性的关键。
# In hybrid_search_service/query.py
import logging
# (复用indexing.py中的客户端和模型)
logger = logging.getLogger(__name__)
def hybrid_search(tenant_id: str, keyword_query: str = None, semantic_query: str = None, top_k: int = 10):
"""
执行混合检索。
- 如果只有keyword_query, 只查ES。
- 如果只有semantic_query, 先查ChromaDB,再从ES获取元数据。
- 如果两者都有,执行两阶段查询。
"""
if not tenant_id:
raise ValueError("tenant_id is mandatory for all searches.")
if not keyword_query and not semantic_query:
return []
# --- 场景1: 仅关键词查询 ---
if keyword_query and not semantic_query:
logger.info(f"Executing keyword-only search for tenant: {tenant_id}")
es_query = {
"bool": {
"must": [
{"multi_match": {"query": keyword_query, "fields": ["title", "content_preview"]}}
],
"filter": [
{"term": {"tenant_id": tenant_id}}
]
}
}
response = es_client.search(index="documents_metadata", query=es_query, size=top_k)
return [hit['_source'] for hit in response['hits']['hits']]
candidate_doc_ids = None
# --- 预处理阶段: 如果有关键词,先从ES筛选候选集 ---
if keyword_query:
logger.info(f"Phase 1: Pre-filtering with Elasticsearch for tenant: {tenant_id}")
es_filter_query = {
"bool": {
"must": [
{"multi_match": {"query": keyword_query, "fields": ["title", "content_preview"]}}
],
"filter": [
{"term": {"tenant_id": tenant_id}}
]
}
}
# _source: false - 我们只需要ID,减小网络开销
response = es_client.search(index="documents_metadata", query=es_filter_query, size=100, _source=False)
candidate_doc_ids = [hit['_id'] for hit in response['hits']['hits']]
if not candidate_doc_ids:
logger.info("No candidates found from ES pre-filtering. Returning empty.")
return []
# --- 场景2 & 3: 向量检索 ---
if semantic_query:
logger.info(f"Phase 2: Vector search with ChromaDB for tenant: {tenant_id}")
query_embedding = model.encode(semantic_query).tolist()
# 核心:构建ChromaDB的where子句
where_filter = {"tenant_id": tenant_id}
# 如果有ES预筛选的结果,将其加入where子句。这是性能优化的关键。
if candidate_doc_ids:
# 一个常见的错误是在`where`中直接使用`"doc_id": {"$in": candidate_doc_ids}`
# 如果`candidate_doc_ids`列表过大,可能会导致性能问题或查询失败。
# 这里需要分批次查询或限制列表大小。
if len(candidate_doc_ids) > 1024:
logger.warning(f"Candidate list size is {len(candidate_doc_ids)}, truncating to 1024.")
candidate_doc_ids = candidate_doc_ids[:1024]
where_filter["doc_id"] = {"$in": candidate_doc_ids}
logger.info(f"Filtering ChromaDB with {len(candidate_doc_ids)} candidate IDs.")
results = collection.query(
query_embeddings=[query_embedding],
n_results=top_k,
where=where_filter
)
vector_search_ids = results['ids'][0]
if not vector_search_ids:
return []
# --- 最终阶段: 从ES获取完整的元数据 ---
logger.info(f"Phase 3: Fetching full metadata from ES for {len(vector_search_ids)} results.")
# 使用mget批量获取,性能远高于单次get
response = es_client.mget(index="documents_metadata", ids=vector_search_ids)
# 保持ChromaDB返回的顺序
id_to_doc_map = {doc['_id']: doc['_source'] for doc in response['docs'] if doc['found']}
final_results = [id_to_doc_map[doc_id] for doc_id in vector_search_ids if doc_id in id_to_doc_map]
return final_results
# 如果只有keyword_query但进入了预处理流程,返回预处理结果的元数据
return fetch_metadata_by_ids(candidate_doc_ids)
def fetch_metadata_by_ids(doc_ids: list):
if not doc_ids:
return []
response = es_client.mget(index="documents_metadata", ids=doc_ids)
return [doc['_source'] for doc in response['docs'] if doc['found']]
4. PWA前端的离线支持
PWA的核心在于Service Worker。我们将为API请求实现一个“网络优先,缓存备用”(Network Falling Back to Cache)的策略。对于经常访问的查询,即使用户网络断开,也能看到上次的结果。
// public/service-worker.js
const CACHE_NAME = 'hybrid-search-cache-v1';
const API_ENDPOINT = '/api/search'; // 假设我们的搜索API端点
self.addEventListener('install', (event) => {
event.waitUntil(
caches.open(CACHE_NAME).then((cache) => {
// 可以在这里预缓存应用外壳(App Shell)
console.log('Service Worker: Cache opened');
})
);
});
self.addEventListener('fetch', (event) => {
const { request } = event;
// 只缓存我们的API GET请求
if (request.method === 'GET' && request.url.includes(API_ENDPOINT)) {
event.respondWith(
// 网络优先
fetch(request)
.then((response) => {
// 如果请求成功,克隆响应并存入缓存
// 一个请求的body只能被读取一次,所以需要克隆
if (response.ok) {
const responseToCache = response.clone();
caches.open(CACHE_NAME).then((cache) => {
cache.put(request, responseToCache);
});
}
return response;
})
.catch(() => {
// 网络请求失败,从缓存中查找
console.log('Network request failed. Trying to serve from cache.');
return caches.match(request).then((cachedResponse) => {
if (cachedResponse) {
return cachedResponse;
}
// 如果缓存也没有,返回一个标准的错误响应
// 在真实项目中,可以返回一个表示“离线且无缓存”的JSON结构
return new Response(JSON.stringify({ error: 'offline' }), {
status: 503,
headers: { 'Content-Type': 'application/json' },
});
});
})
);
} else {
// 对于非API请求,可以采用其他策略,或直接走网络
event.respondWith(fetch(request));
}
});
// 清理旧缓存
self.addEventListener('activate', (event) => {
const cacheWhitelist = [CACHE_NAME];
event.waitUntil(
caches.keys().then((cacheNames) => {
return Promise.all(
cacheNames.map((cacheName) => {
if (cacheWhitelist.indexOf(cacheName) === -1) {
return caches.delete(cacheName);
}
})
);
})
);
});
在React应用中注册这个Service Worker,即可启用PWA的离线缓存能力。前端应用在发起fetch请求时无需做任何特殊处理,Service Worker会透明地拦截和处理请求。
架构的局限性与未来迭代
此架构虽然实现了专业分工和高性能,但并非没有代价。
数据一致性: 索引服务中的双写操作是潜在的脆弱点。虽然我们设计了补偿机制,但在高写入负载下,ES和ChromaDB之间可能出现短暂的数据不一致。未来的优化方向是引入一个可靠的消息队列(如Kafka)作为写入管道,通过CDC(Change Data Capture)或事件溯源模式来保证最终一致性。
查询融合 (Result Fusion): 当前的混合查询在存在关键词和语义查询时,本质上是“过滤后检索”。对于需要将两路召回结果进行智能排序(Reranking)的场景,目前的实现是缺失的。可以引入一个排序层,使用Reciprocal Rank Fusion (RRF)等算法来融合ES和ChromaDB的得分,提供更相关的排序结果。
PWA缓存策略: 当前的缓存策略相对简单。对于一个知识库应用,更智能的策略可能包括:预取用户可能感兴趣的文档、在后台同步更新常用查询的缓存,以及提供更精细的缓存管理界面让用户控制离线数据。
运维成本: 维护两个独立的、有状态的存储系统,需要更成熟的监控、告警和备份恢复方案。这是选择该架构时必须接受的权衡。