基于ChromaDB与Elasticsearch构建多租户隔离的混合检索服务


多租户SaaS平台对数据检索的核心要求是双重的:必须提供精准的关键词匹配能力,同时也要能理解用户意图,进行语义层面的相似度搜索。这一切都构建在严格、不可逾越的租户数据隔离之上。当技术选型落到实处,一个直接的问题摆在面前:是选择一个全能选手,还是组合多个专家系统?

架构决策的十字路口

在一个为企业提供私有知识库服务的项目中,我们面临这个典型的技术权衡。用户上传的文档需要支持两种查询模式:

  1. 关键词精确查询:例如,查找所有包含“2023年第三季度财务报告”的文档。
  2. 语义模糊查询:例如,查询“关于公司近期盈利能力的分析”,这可能涉及到财报、会议纪要等多种文档。

同时,系统的多租户特性要求tenant_A的任何查询,无论如何都不能触碰到tenant_B的数据。

方案A:Elasticsearch独撑全局

Elasticsearch自8.x版本后引入了dense_vector字段类型和对HNSW(Hierarchical Navigable Small World)算法的支持,使其具备了向量检索的能力。

  • 优势:
    • 技术栈统一: 运维成本相对较低,只需维护一个Elasticsearch集群。
    • 生态成熟: 强大的布尔查询、聚合分析能力可以与向量检索无缝结合。数据隔离可以通过在文档中添加tenant_id字段,并在所有查询中强制使用filter子句来实现。
  • 劣势:
    • 资源争抢: 在同一个集群中,重量级的全文索引和向量索引构建过程会相互竞争CPU、内存和I/O资源。在真实项目中,这可能导致查询性能不稳定。
    • 向量检索的专业性: 尽管ES支持向量检索,但其在调优选项、索引构建效率、内存控制等方面,与专门的向量数据库相比,仍有差距。在高并发、低延迟的语义检索场景下,可能会成为性能瓶颈。
    • 成本考量: 为了满足高性能的向量检索,ES节点需要大量内存,这会直接推高基础设施成本。

方案B:ChromaDB专注向量,Elasticsearch负责元数据

这个方案将职责进行拆分:

  • ChromaDB: 专门负责向量的存储和近似最近邻(ANN)搜索。它的设计初衷就是为了高效处理Embeddings。

  • Elasticsearch: 回归其最擅长的领域——全文检索、结构化数据过滤和聚合。所有文档的元数据(如doc_id, tenant_id, title, upload_date等)都存储于此。

  • 优势:

    • 专业分工: 每个组件都做自己最擅长的事,性能更可预测。ChromaDB的HNSW实现经过深度优化,能提供更低的查询延迟。
    • 资源隔离: 两个系统可以独立部署和扩缩容。文本索引的峰值负载不会影响向量检索的稳定性,反之亦然。
    • 架构清晰: 职责分离使得系统边界明确,便于维护和未来替换其中任一组件。
  • 劣势:

    • 架构复杂性增加: 引入了两个存储系统,意味着需要处理双写、数据同步、以及分布式系统固有的最终一致性问题。
    • 查询流程复杂化: 一次混合查询需要与两个系统交互,这对服务端的查询逻辑设计提出了更高要求。

最终决策

我们选择了方案B。务实的工程经验告诉我们,试图用一个“万金油”工具解决所有问题,往往会在系统规模扩大后,陷入各种性能和维护的泥潭。长期的稳定性和可扩展性,比短期的运维便利性更重要。该架构的核心在于设计一个健壮的服务层,以优雅地编排对两个底层存储的调用,并对上层应用(包括我们的PWA客户端)屏蔽其复杂性。

核心架构与实现

我们的混合检索服务被设计为一个独立的微服务,它暴露统一的API接口给前端。整体架构如下:

graph TD
    subgraph PWA Client
        A[Service Worker] -- Caches API Responses --> B[React App]
    end

    B -- HTTPS Request --> C[API Gateway]

    subgraph Backend Infrastructure
        C -- gRPC --> D[Hybrid Search Service]
        D -- Indexing --> E[Elasticsearch Cluster]
        D -- Indexing --> F[ChromaDB Instance]
        D -- Querying --> E
        D -- Querying --> F
    end

    subgraph Data Processing
        G[Document Processor] -- Generates Embeddings --> D
    end

    style PWA Client fill:#dae8fc,stroke:#333,stroke-width:2px
    style Backend Infrastructure fill:#d5e8d4,stroke:#333,stroke-width:2px

1. 数据模型与租户隔离策略

数据隔离是架构的基石。

Elasticsearch索引设计:
我们创建一个名为documents_metadata的索引。每个文档代表一条元数据记录。

// Elasticsearch Mapping
{
  "mappings": {
    "properties": {
      "doc_id": { "type": "keyword" },
      "tenant_id": { "type": "keyword" },
      "title": { "type": "text" },
      "content_preview": { "type": "text" },
      "created_at": { "type": "date" },
      "tags": { "type": "keyword" }
    }
  }
}

这里的tenant_id是实现数据隔离的关键。所有对ES的查询都必须强制包含一个term filter,形如{"term": {"tenant_id": "CURRENT_TENANT_ID"}}

ChromaDB集合设计:
我们选择使用一个全局的Collection,而不是为每个租户创建一个Collection。这是因为成千上万个租户会导致Collection数量爆炸,难以管理。我们在向量的metadata中嵌入tenant_id

# ChromaDB Client Initialization
import chromadb
from chromadb.config import Settings

# 在生产环境中,应该使用持久化存储和远程服务器
client = chromadb.HttpClient(host='chromadb.server.host', port=8000, settings=Settings(anonymized_telemetry=False))

# 创建一个全局集合
# hnsw:space指定距离度量,l2是欧氏距离,适用于大多数Sentence Transformer模型
collection = client.get_or_create_collection(
    name="global_document_vectors",
    metadata={"hnsw:space": "l2"}
)

2. 索引服务实现

索引流程必须保证对两个系统的写入是事务性的,或至少是最终一致的。在我们的场景中,我们采用先写ES,再写ChromaDB的策略,并加入重试和失败日志记录机制。

# In hybrid_search_service/indexing.py
import logging
from sentence_transformers import SentenceTransformer
from elasticsearch import Elasticsearch, helpers
import uuid

# --- 配置 ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 生产级代码中,这些应从配置文件或环境变量中加载
ES_HOST = "http://elasticsearch.server.host:9200"
CHROMA_HOST = "chromadb.server.host"
CHROMA_PORT = 8000
EMBEDDING_MODEL = 'all-MiniLM-L6-v2' # 示例模型

# --- 客户端初始化 ---
try:
    es_client = Elasticsearch(ES_HOST)
    chroma_client = chromadb.HttpClient(host=CHROMA_HOST, port=CHROMA_PORT)
    collection = chroma_client.get_collection(name="global_document_vectors")
    model = SentenceTransformer(EMBEDDING_MODEL)
    logger.info("Clients and model initialized successfully.")
except Exception as e:
    logger.error(f"Failed to initialize clients or model: {e}")
    # 服务启动失败,此处应有退出或告警逻辑
    raise

def index_document(tenant_id: str, document_content: str, metadata: dict):
    """
    索引一个新文档,包含元数据和向量
    一个常见的错误是忽略部分失败的情况。
    """
    if not all([tenant_id, document_content, metadata.get('title')]):
        raise ValueError("tenant_id, document_content, and title are required.")

    doc_id = str(uuid.uuid4())
    metadata['doc_id'] = doc_id
    metadata['tenant_id'] = tenant_id

    try:
        # 1. 生成向量
        logger.info(f"Generating embedding for doc_id: {doc_id}")
        embedding = model.encode(document_content).tolist()

        # 2. 索引元数据到Elasticsearch
        logger.info(f"Indexing metadata to ES for doc_id: {doc_id}")
        es_client.index(index="documents_metadata", id=doc_id, document=metadata)

        # 3. 索引向量到ChromaDB
        # 这里的ID必须与ES中的doc_id一致,以便后续关联
        logger.info(f"Indexing vector to ChromaDB for doc_id: {doc_id}")
        collection.add(
            embeddings=[embedding],
            metadatas=[{"tenant_id": tenant_id, "doc_id": doc_id}],
            ids=[doc_id]
        )
        
        logger.info(f"Successfully indexed document {doc_id} for tenant {tenant_id}")
        return doc_id

    except Exception as e:
        logger.error(f"Indexing failed for a document of tenant {tenant_id}. Error: {e}")
        # --- 生产级错误处理 ---
        # 这里的坑在于,可能ES写入成功但ChromaDB失败。
        # 策略1: 回滚。尝试从ES中删除该文档。
        # 策略2: 记录到死信队列。由一个后台任务来补偿/清理。
        # 我们选择策略2,因为它对主流程影响更小。
        log_failed_indexing(doc_id, tenant_id, "ES_SUCCESS_CHROMA_FAIL")
        # 清理已写入的ES数据
        try:
            es_client.delete(index="documents_metadata", id=doc_id, ignore=[404])
        except Exception as del_e:
            logger.error(f"Failed to rollback ES data for doc_id {doc_id}: {del_e}")
        
        raise RuntimeError(f"Failed to index document for tenant {tenant_id}") from e

def log_failed_indexing(doc_id, tenant_id, reason):
    # 实际项目中,这里会写入到Kafka, RabbitMQ或Redis Stream
    with open("failed_indexing_log.txt", "a") as f:
        f.write(f"{doc_id},{tenant_id},{reason}\n")

3. 混合检索服务实现

这是架构的核心。查询逻辑必须先通过Elasticsearch进行精确的元数据和租户过滤,然后用得到的候选文档ID集合去ChromaDB中进行更精准的向量搜索。这个两阶段查询是性能和准确性的关键。

# In hybrid_search_service/query.py
import logging
# (复用indexing.py中的客户端和模型)

logger = logging.getLogger(__name__)

def hybrid_search(tenant_id: str, keyword_query: str = None, semantic_query: str = None, top_k: int = 10):
    """
    执行混合检索。
    - 如果只有keyword_query, 只查ES。
    - 如果只有semantic_query, 先查ChromaDB,再从ES获取元数据。
    - 如果两者都有,执行两阶段查询。
    """
    if not tenant_id:
        raise ValueError("tenant_id is mandatory for all searches.")
    
    if not keyword_query and not semantic_query:
        return []

    # --- 场景1: 仅关键词查询 ---
    if keyword_query and not semantic_query:
        logger.info(f"Executing keyword-only search for tenant: {tenant_id}")
        es_query = {
            "bool": {
                "must": [
                    {"multi_match": {"query": keyword_query, "fields": ["title", "content_preview"]}}
                ],
                "filter": [
                    {"term": {"tenant_id": tenant_id}}
                ]
            }
        }
        response = es_client.search(index="documents_metadata", query=es_query, size=top_k)
        return [hit['_source'] for hit in response['hits']['hits']]

    candidate_doc_ids = None
    # --- 预处理阶段: 如果有关键词,先从ES筛选候选集 ---
    if keyword_query:
        logger.info(f"Phase 1: Pre-filtering with Elasticsearch for tenant: {tenant_id}")
        es_filter_query = {
            "bool": {
                "must": [
                    {"multi_match": {"query": keyword_query, "fields": ["title", "content_preview"]}}
                ],
                "filter": [
                    {"term": {"tenant_id": tenant_id}}
                ]
            }
        }
        # _source: false - 我们只需要ID,减小网络开销
        response = es_client.search(index="documents_metadata", query=es_filter_query, size=100, _source=False) 
        candidate_doc_ids = [hit['_id'] for hit in response['hits']['hits']]

        if not candidate_doc_ids:
            logger.info("No candidates found from ES pre-filtering. Returning empty.")
            return []

    # --- 场景2 & 3: 向量检索 ---
    if semantic_query:
        logger.info(f"Phase 2: Vector search with ChromaDB for tenant: {tenant_id}")
        query_embedding = model.encode(semantic_query).tolist()
        
        # 核心:构建ChromaDB的where子句
        where_filter = {"tenant_id": tenant_id}
        
        # 如果有ES预筛选的结果,将其加入where子句。这是性能优化的关键。
        if candidate_doc_ids:
            # 一个常见的错误是在`where`中直接使用`"doc_id": {"$in": candidate_doc_ids}`
            # 如果`candidate_doc_ids`列表过大,可能会导致性能问题或查询失败。
            # 这里需要分批次查询或限制列表大小。
            if len(candidate_doc_ids) > 1024:
                logger.warning(f"Candidate list size is {len(candidate_doc_ids)}, truncating to 1024.")
                candidate_doc_ids = candidate_doc_ids[:1024]

            where_filter["doc_id"] = {"$in": candidate_doc_ids}
            logger.info(f"Filtering ChromaDB with {len(candidate_doc_ids)} candidate IDs.")

        results = collection.query(
            query_embeddings=[query_embedding],
            n_results=top_k,
            where=where_filter
        )
        
        vector_search_ids = results['ids'][0]
        if not vector_search_ids:
            return []

        # --- 最终阶段: 从ES获取完整的元数据 ---
        logger.info(f"Phase 3: Fetching full metadata from ES for {len(vector_search_ids)} results.")
        # 使用mget批量获取,性能远高于单次get
        response = es_client.mget(index="documents_metadata", ids=vector_search_ids)
        # 保持ChromaDB返回的顺序
        id_to_doc_map = {doc['_id']: doc['_source'] for doc in response['docs'] if doc['found']}
        final_results = [id_to_doc_map[doc_id] for doc_id in vector_search_ids if doc_id in id_to_doc_map]
        
        return final_results
        
    # 如果只有keyword_query但进入了预处理流程,返回预处理结果的元数据
    return fetch_metadata_by_ids(candidate_doc_ids)

def fetch_metadata_by_ids(doc_ids: list):
    if not doc_ids:
        return []
    response = es_client.mget(index="documents_metadata", ids=doc_ids)
    return [doc['_source'] for doc in response['docs'] if doc['found']]

4. PWA前端的离线支持

PWA的核心在于Service Worker。我们将为API请求实现一个“网络优先,缓存备用”(Network Falling Back to Cache)的策略。对于经常访问的查询,即使用户网络断开,也能看到上次的结果。

// public/service-worker.js

const CACHE_NAME = 'hybrid-search-cache-v1';
const API_ENDPOINT = '/api/search'; // 假设我们的搜索API端点

self.addEventListener('install', (event) => {
  event.waitUntil(
    caches.open(CACHE_NAME).then((cache) => {
      // 可以在这里预缓存应用外壳(App Shell)
      console.log('Service Worker: Cache opened');
    })
  );
});

self.addEventListener('fetch', (event) => {
  const { request } = event;

  // 只缓存我们的API GET请求
  if (request.method === 'GET' && request.url.includes(API_ENDPOINT)) {
    event.respondWith(
      // 网络优先
      fetch(request)
        .then((response) => {
          // 如果请求成功,克隆响应并存入缓存
          // 一个请求的body只能被读取一次,所以需要克隆
          if (response.ok) {
            const responseToCache = response.clone();
            caches.open(CACHE_NAME).then((cache) => {
              cache.put(request, responseToCache);
            });
          }
          return response;
        })
        .catch(() => {
          // 网络请求失败,从缓存中查找
          console.log('Network request failed. Trying to serve from cache.');
          return caches.match(request).then((cachedResponse) => {
            if (cachedResponse) {
              return cachedResponse;
            }
            // 如果缓存也没有,返回一个标准的错误响应
            // 在真实项目中,可以返回一个表示“离线且无缓存”的JSON结构
            return new Response(JSON.stringify({ error: 'offline' }), {
              status: 503,
              headers: { 'Content-Type': 'application/json' },
            });
          });
        })
    );
  } else {
    // 对于非API请求,可以采用其他策略,或直接走网络
    event.respondWith(fetch(request));
  }
});

// 清理旧缓存
self.addEventListener('activate', (event) => {
  const cacheWhitelist = [CACHE_NAME];
  event.waitUntil(
    caches.keys().then((cacheNames) => {
      return Promise.all(
        cacheNames.map((cacheName) => {
          if (cacheWhitelist.indexOf(cacheName) === -1) {
            return caches.delete(cacheName);
          }
        })
      );
    })
  );
});

在React应用中注册这个Service Worker,即可启用PWA的离线缓存能力。前端应用在发起fetch请求时无需做任何特殊处理,Service Worker会透明地拦截和处理请求。

架构的局限性与未来迭代

此架构虽然实现了专业分工和高性能,但并非没有代价。

  1. 数据一致性: 索引服务中的双写操作是潜在的脆弱点。虽然我们设计了补偿机制,但在高写入负载下,ES和ChromaDB之间可能出现短暂的数据不一致。未来的优化方向是引入一个可靠的消息队列(如Kafka)作为写入管道,通过CDC(Change Data Capture)或事件溯源模式来保证最终一致性。

  2. 查询融合 (Result Fusion): 当前的混合查询在存在关键词和语义查询时,本质上是“过滤后检索”。对于需要将两路召回结果进行智能排序(Reranking)的场景,目前的实现是缺失的。可以引入一个排序层,使用Reciprocal Rank Fusion (RRF)等算法来融合ES和ChromaDB的得分,提供更相关的排序结果。

  3. PWA缓存策略: 当前的缓存策略相对简单。对于一个知识库应用,更智能的策略可能包括:预取用户可能感兴趣的文档、在后台同步更新常用查询的缓存,以及提供更精细的缓存管理界面让用户控制离线数据。

  4. 运维成本: 维护两个独立的、有状态的存储系统,需要更成熟的监控、告警和备份恢复方案。这是选择该架构时必须接受的权衡。


  目录