构建基于 Sanic 和 Prettier 的 Azure Data Lake 规范化数据摄入网关


我们的数据湖最近出现了一个棘手的“熵增”问题。团队的多个微服务和数据源持续不断地向 Azure Data Lake Storage (ADLS Gen2) 推送 JSON 格式的配置快照、事件载荷和元数据。功能上,这些数据都完全正确。但问题在于格式。同一个逻辑对象,来自不同服务或不同开发者之手,其 JSON 字符串的格式千差万别:缩进、空格、键顺序、换行符,完全是随机的。

这在生产环境中造成了切实的麻烦。当需要追溯一次配置变更时,在 Git 历史或是 Data Lake 的文件版本中进行 diff,会看到海量的无用格式差异,淹没了真正有价值的一行代码改动。人工审计和问题排查变得异常痛苦。

最初的构想是在每个产生数据的服务内部各自引入格式化逻辑。但这很快被否决了。这不仅增加了十几个微服务的维护成本,而且无法保证所有团队都使用完全相同的格式化规则,问题无法根除。我们需要的是一个中央集权的解决方案:一个所有数据写入 Data Lake 都必须经过的“强制关卡”,它不仅负责数据传输,更核心的职责是实现数据的规范化(Canonicalization)

这个关卡,我们决定将其构建为一个高性能的异步 API 网关。技术选型是这个项目的第一步,也是最关键的一步。

  1. Web 框架: Sanic。选择 Sanic 的理由非常明确。这是一个 I/O 密集型任务,主要瓶颈在于网络传输(接收请求、上传到 Azure)。Sanic 基于 async/await,其性能在 Python 异步框架中名列前茅,足以应对预期的并发摄入请求。相比于 Flask 或 Django,它的异步原生特性让我们能用更简洁的代码写出更高吞吐的应用。

  2. 云存储: Azure Data Lake Storage Gen2。我们已经是 Azure 用户,ADLS Gen2 是自然之选。它提供了分层命名空间,让我们可以像操作本地文件系统一样用 directory/file 的结构组织数据,这对我们按日期和来源服务组织数据至关重要。同时,其成本效益和可扩展性也完全满足数据湖的需求。

  3. 格式化工具: Prettier。这是整个方案中最不寻常,但事后证明最正确的选择。我们为什么不用 Python 内置的 json.dumps(indent=2)?因为它只能解决缩进问题,无法统一键顺序或处理更细微的风格差异。我们可以用 sort_keys=True,但这又过于武断。Prettier 是前端生态中无可争议的代码格式化之王,它的算法经过深思熟虑,能产生最具可读性的输出。与其在 Python 中重新发明一个不完美的轮子,不如直接利用这个最佳工具。在后端服务中调用一个 Node.js 工具链,这在真实项目中是一种务实的、关注最终效果的工程权衡。

整个架构的请求流程因此变得清晰:

sequenceDiagram
    participant Client as 数据源服务
    participant Gateway as Sanic 摄入网关
    participant Prettier as Prettier (Node.js 进程)
    participant ADLS as Azure Data Lake Storage

    Client->>Gateway: POST /ingest (原始 JSON, X-Request-ID)
    Gateway->>Gateway: 验证请求体与头部
    Gateway->>Prettier: 调用子进程格式化 JSON
    Prettier-->>Gateway: 返回格式化后的 JSON
    Gateway->>ADLS: 检查目标文件是否存在 (实现幂等)
    alt 文件不存在
        Gateway->>ADLS: 异步上传格式化后的 JSON
        ADLS-->>Gateway: 上传成功
        Gateway-->>Client: 201 Created
    else 文件已存在
        Gateway-->>Client: 200 OK (幂等返回)
    end

第一步:环境搭建与 Prettier 封装

在项目中同时管理 Python 和 Node.js 依赖是第一个要解决的问题。一个干净的方案是使用 Docker,但在开发阶段,我们先在本地把环境配置好。

项目结构如下:

ingestion-gateway/
├── app/
│   ├── __init__.py
│   ├── main.py             # Sanic 应用主文件
│   ├── prettier.py         # Prettier 异步调用封装
│   ├── storage.py          # Azure Datalake 异步客户端
│   └── config.py           # 配置管理
├── nodejs/
│   ├── package.json        # Node.js 依赖 (prettier)
│   └── .prettierrc         # Prettier 配置文件
├── tests/
│   └── ...
├── .env
├── requirements.txt
└── Dockerfile

首先,在 nodejs/ 目录下初始化 Node.js 环境并安装 Prettier。

# 进入 nodejs 目录
cd nodejs

# 初始化 package.json
npm init -y

# 安装 prettier
npm install --save-exact prettier

# 创建配置文件 .prettierrc
echo '{ "printWidth": 100, "tabWidth": 2, "singleQuote": true, "trailingComma": "es5" }' > .prettierrc

这里的 .prettierrc 文件至关重要,它固化了团队对代码风格的统一约定,是实现规范化的基础。

接下来是在 Python 中异步调用 Prettier。直接使用 subprocess 模块是可行的,但为了在 Sanic 的事件循环中正确运行,我们必须使用 asyncio.create_subprocess_exec。我们将这个逻辑封装在 app/prettier.py 中。

# app/prettier.py

import asyncio
import logging
from typing import Tuple, Optional

# Prettier 可执行文件的路径,在生产环境中可以通过配置传入
# 这里假设它在项目的一个固定位置
PRETTIER_EXECUTABLE = "./nodejs/node_modules/.bin/prettier"
PRETTIER_CONFIG_PATH = "./nodejs/.prettierrc"

logger = logging.getLogger(__name__)

class PrettierError(Exception):
    """Custom exception for Prettier formatting failures."""
    def __init__(self, message, stderr):
        super().__init__(message)
        self.stderr = stderr

async def format_json_payload(payload: str) -> str:
    """
    Asynchronously formats a JSON string using an external Prettier process.

    This function is a crucial part of the canonicalization gateway. It shells
    out to the Prettier CLI to ensure every piece of ingested data conforms to
    a single, consistent format defined in .prettierrc.

    Args:
        payload: The raw JSON string to be formatted.

    Returns:
        The formatted JSON string.

    Raises:
        PrettierError: If the prettier process fails (e.g., invalid JSON).
        asyncio.TimeoutError: If the process takes too long to complete.
    """
    # Prettier 通过 --parser json 明确指定解析器
    # --config 指定我们的统一配置文件
    # --stdin-filepath dummy.json 让 Prettier 认为在处理一个文件,这有助于某些插件的正确工作
    command = [
        PRETTIER_EXECUTABLE,
        "--config", PRETTIER_CONFIG_PATH,
        "--parser", "json",
        "--stdin-filepath", "dummy.json",
    ]

    try:
        # 创建一个异步子进程
        process = await asyncio.create_subprocess_exec(
            *command,
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )

        # 将 payload 写入子进程的 stdin,然后关闭它
        # 必须是 bytes 类型
        stdout, stderr = await asyncio.wait_for(
            process.communicate(input=payload.encode('utf-8')),
            timeout=5.0  # 设置一个合理的超时,防止子进程卡死
        )

        if process.returncode != 0:
            error_message = stderr.decode('utf-8').strip()
            logger.error(
                f"Prettier process failed with exit code {process.returncode}. "
                f"Stderr: {error_message}"
            )
            raise PrettierError(
                "Failed to format JSON payload.",
                stderr=error_message
            )

        formatted_payload = stdout.decode('utf-8')
        logger.debug("Successfully formatted payload with Prettier.")
        return formatted_payload

    except FileNotFoundError:
        logger.critical(f"Prettier executable not found at {PRETTIER_EXECUTABLE}")
        raise PrettierError("Prettier executable not found.", stderr="")
    except asyncio.TimeoutError:
        logger.error("Prettier process timed out after 5 seconds.")
        # 在超时后,确保进程被终止
        if process.returncode is None:
            process.kill()
            await process.wait()
        raise

这个封装考虑了几个生产环境中的关键点:

  1. 明确的路径:硬编码路径在开发时可行,但生产部署时应通过环境变量配置。
  2. 错误处理:如果传入的 JSON 无效,Prettier 会返回非零退出码,stderr 中会包含错误信息。我们捕获这个情况并抛出自定义异常,以便上层逻辑能返回 400 Bad Request
  3. 超时机制asyncio.wait_for 防止了 Prettier 进程因某些原因卡死而耗尽服务器资源。
  4. 日志:详尽的日志记录了成功和失败的场景,便于问题排查。

第二步:集成 Azure Data Lake 异步客户端

与 Azure Storage 的交互是 I/O 密集操作,必须使用异步 SDK (azure-storage-file-datalake-aio) 以免阻塞 Sanic 的事件循环。我们将所有与 ADLS 的交互逻辑封装在 app/storage.py 中。

# app/storage.py

import logging
from azure.storage.filedatalake.aio import DataLakeServiceClient
from azure.core.exceptions import ResourceNotFoundError, ResourceExistsError
from .config import settings

logger = logging.getLogger(__name__)

class AsyncDataLakeClient:
    """
    An asynchronous client for interacting with Azure Data Lake Storage Gen2.
    It encapsulates the logic for uploading and checking file existence,
    handling credentials and connection management.
    """
    _client_instance = None

    def __init__(self, connection_string: str, file_system_name: str):
        self.connection_string = connection_string
        self.file_system_name = file_system_name
        if AsyncDataLakeClient._client_instance is None:
            try:
                # DataLakeServiceClient 可以在应用生命周期内复用
                AsyncDataLakeClient._client_instance = DataLakeServiceClient.from_connection_string(
                    self.connection_string
                )
                logger.info("DataLakeServiceClient initialized.")
            except Exception as e:
                logger.critical(f"Failed to initialize DataLakeServiceClient: {e}")
                raise

    async def close(self):
        """Closes the underlying client connection."""
        if AsyncDataLakeClient._client_instance:
            await AsyncDataLakeClient._client_instance.close()
            AsyncDataLakeClient._client_instance = None
            logger.info("DataLakeServiceClient connection closed.")

    async def file_exists(self, path: str) -> bool:
        """

        Asynchronously checks if a file exists in the data lake.
        This is a key component for implementing idempotency.
        """
        try:
            file_system_client = self._client_instance.get_file_system_client(self.file_system_name)
            file_client = file_system_client.get_file_client(path)
            await file_client.get_file_properties()
            return True
        except ResourceNotFoundError:
            return False
        except Exception as e:
            logger.error(f"Error checking existence for file '{path}': {e}")
            # 在不确定状态下,保守地认为它可能存在或操作失败,向上抛出异常
            raise

    async def upload_data(self, data: bytes, destination_path: str, overwrite: bool = False):
        """
        Asynchronously uploads data to a specific path in the data lake.
        """
        file_system_client = self._client_instance.get_file_system_client(self.file_system_name)
        file_client = file_system_client.get_file_client(destination_path)
        
        try:
            # Azure SDK 的 upload_data 方法已经处理了分块上传等复杂性
            await file_client.upload_data(data, overwrite=overwrite, length=len(data))
            logger.info(f"Successfully uploaded {len(data)} bytes to '{destination_path}'.")
        except ResourceExistsError as e:
            # 如果 overwrite=False 且文件已存在,SDK会抛出此异常。
            # 我们的幂等逻辑应在调用此方法前就处理这种情况,但这里依然做好防御
            logger.warning(f"File '{destination_path}' already exists and overwrite is False. "
                           f"This should be handled by idempotency check. Error: {e}")
            raise
        except Exception as e:
            logger.error(f"Failed to upload data to '{destination_path}': {e}")
            raise


# 使用单例模式或依赖注入框架来管理客户端实例
# 这里用一个简单的全局变量来演示
storage_client: AsyncDataLakeClient = None

def get_storage_client() -> AsyncDataLakeClient:
    global storage_client
    if storage_client is None:
        storage_client = AsyncDataLakeClient(
            connection_string=settings.AZURE_CONNECTION_STRING,
            file_system_name=settings.AZURE_FILESYSTEM_NAME
        )
    return storage_client

我们将客户端封装成一个类,并通过 get_storage_client 函数进行实例化管理,确保在整个应用生命周期中只有一个 DataLakeServiceClient 实例。这对于管理连接池和资源非常重要。file_exists 方法是实现幂等性的核心,它通过查询文件属性来判断文件是否存在,ResourceNotFoundError 明确告诉我们文件不存在。

第三步:构建 Sanic Ingestion Endpoint

现在,我们将 Prettier 封装和 Azure 客户端组合到 Sanic 的主应用逻辑 app/main.py 中。

# app/main.py

import logging
import uuid
from sanic import Sanic, response
from sanic.exceptions import SanicException

from .config import settings
from .prettier import format_json_payload, PrettierError
from .storage import get_storage_client, AsyncDataLakeClient

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

app = Sanic("IngestionGateway")

# Sanic 应用生命周期事件,用于初始化和关闭资源
@app.listener('before_server_start')
async def setup_dependencies(app, loop):
    """Initialize dependencies like the storage client."""
    app.ctx.storage_client = get_storage_client()

@app.listener('after_server_stop')
async def cleanup_dependencies(app, loop):
    """Gracefully close connections."""
    client: AsyncDataLakeClient = app.ctx.storage_client
    if client:
        await client.close()

@app.post("/ingest/<source_system:str>/<data_type:str>")
async def ingest_data(request, source_system: str, data_type: str):
    """
    The main ingestion endpoint.
    - Validates request
    - Formats the payload using Prettier
    - Uploads the formatted data to Azure Data Lake with idempotency
    """
    # 1. 获取幂等性关键信息
    # 在真实项目中,这个 ID 应该由客户端保证唯一性
    request_id = request.headers.get("X-Request-ID")
    if not request_id:
        raise SanicException("Header 'X-Request-ID' is required for idempotency.", status_code=400)

    # 2. 验证请求体
    raw_payload = request.body.decode('utf-8')
    if not raw_payload:
        raise SanicException("Request body cannot be empty.", status_code=400)

    # 3. 规范化步骤:调用 Prettier
    try:
        formatted_payload = await format_json_payload(raw_payload)
    except PrettierError as e:
        # 如果 Prettier 失败,通常意味着输入不是有效的 JSON
        raise SanicException(f"Invalid JSON payload. Prettier failed: {e.stderr}", status_code=400)
    except Exception:
        # 其他未知错误,比如 Prettier 进程超时
        raise SanicException("Internal error during payload formatting.", status_code=500)

    # 4. 与 Azure Data Lake 交互
    storage_client: AsyncDataLakeClient = request.app.ctx.storage_client
    
    # 构建一个结构化的存储路径
    # 例如:/raw/transactions/2023/10/27/{request_id}.json
    # 这里的路径策略需要根据业务需求精心设计
    destination_path = f"raw/{source_system}/{data_type}/{request_id}.json"

    try:
        # 5. 幂等性检查
        exists = await storage_client.file_exists(destination_path)
        if exists:
            logging.info(f"Idempotency key hit. File '{destination_path}' already exists.")
            # 文件已存在,直接返回成功,表示该请求已处理
            return response.json(
                {"status": "already_exists", "path": destination_path},
                status=200
            )

        # 6. 上传数据
        await storage_client.upload_data(
            data=formatted_payload.encode('utf-8'),
            destination_path=destination_path
        )

        return response.json(
            {"status": "created", "path": destination_path},
            status=201
        )

    except Exception as e:
        logging.error(f"Failed to process request {request_id} due to storage error: {e}")
        # 返回 503 Service Unavailable 表示后端存储暂时不可用,客户端可以重试
        raise SanicException("Service is temporarily unavailable due to storage issues.", status_code=503)

这个端点是整个网关的核心,它体现了几个重要的设计原则:

  1. RESTful 路径设计:路径 /ingest/<source_system:str>/<data_type:str> 清晰地表达了数据的来源和类型,便于路由和后续的数据治理。
  2. 强制幂等性:通过 X-Request-ID 请求头,我们强制调用方必须考虑请求的唯一性。服务端利用这个ID来构建文件名或查找记录,并通过 file_exists 检查来避免重复处理。这是一个简单而有效的幂等性实现。
  3. 清晰的职责分离:端点逻辑本身只负责编排,具体的 Prettier调用 和 Azure 上传都委托给了专门的模块。
  4. 精确的 HTTP 状态码
    • 201 Created:首次成功创建资源。
    • 200 OK:因幂等性检查发现资源已存在,表示请求被接受但未重复操作。
    • 400 Bad Request:客户端错误,如缺少 X-Request-ID 或提供了无效的 JSON。
    • 503 Service Unavailable:服务端暂时性错误(如连接 Azure 失败),暗示客户端可以稍后重试。

打包与部署

为了在生产环境中可靠地运行,我们需要将这个混合应用打包成一个 Docker 镜像。Dockerfile 需要同时安装 Python 和 Node.js 环境。

# 使用一个包含 Python 和 Node.js 的基础镜像,或者分阶段构建
FROM python:3.10-slim

# 安装 Node.js 和 npm
RUN apt-get update && \
    apt-get install -y curl && \
    curl -fsSL https://deb.nodesource.com/setup_18.x | bash - && \
    apt-get install -y nodejs && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

WORKDIR /app

# 拷贝 Node.js 相关文件并安装依赖
COPY nodejs/package*.json ./nodejs/
RUN cd nodejs && npm install --production

# 拷贝 Python 应用代码和依赖文件
COPY requirements.txt .
COPY app/ ./app/
COPY nodejs/.prettierrc ./nodejs/.prettierrc

# 安装 Python 依赖
RUN pip install --no-cache-dir -r requirements.txt

# 暴露端口
EXPOSE 8000

# 设置环境变量 (在实际部署中通过 k8s secret 或 app service 配置)
# ENV AZURE_CONNECTION_STRING="your_connection_string"
# ENV AZURE_FILESYSTEM_NAME="your_filesystem"

# 运行 Sanic 应用
CMD ["sanic", "app.main.app", "--host=0.0.0.0", "--port=8000", "--workers=4"]

这个 Dockerfile 创建了一个自包含的、可移植的运行环境。部署到 Azure App Service, Azure Kubernetes Service (AKS) 或任何容器平台都变得非常直接。

遗留问题与未来迭代方向

这个方案有效地解决了我们最初面临的数据格式不一致问题,但在实践中,我们也识别出了一些局限性和可优化的方向。

首先,性能瓶颈在于每次请求都需要启动一个新的 prettier 子进程。尽管是异步执行,进程创建和销毁本身依然有不可忽视的开销。在高并发场景下,这可能成为 CPU 的瓶颈。一个可行的优化路径是,将 Prettier 封装成一个常驻的 Node.js 微服务,Sanic 网关通过本地 RPC(如 gRPC 或简单的 HTTP 请求)与其通信,从而省去进程创建的开销。更前沿的探索方向是,利用 WebAssembly。如果能将 Prettier 编译成 WASM 模块,就可以在 Python 的 WASM 运行时(如 wasmtime)中直接执行,彻底消除跨语言调用的开销。

其次,幂等性实现的健壮性。当前基于“检查文件是否存在”的方案在单实例部署时是可靠的。但当网关水平扩展到多个实例时,可能会出现竞态条件:两个实例几乎同时检查文件,都发现不存在,然后都尝试写入,其中一个会因 overwrite=False 而失败。虽然最终数据只有一份,但这个过程不够优雅。更稳健的方案是引入一个外部的分布式锁服务(如 Redis),在写入前获取基于 request_id 的锁。或者,将 request_id 的处理记录写入一个支持原子操作的数据库或日志中。

最后,可扩展性。目前网关只处理 JSON。如果未来需要支持 YAML 或其他 Prettier 支持的格式,当前的端点需要改造。可以引入 Content-Type 头部来动态选择 Prettier 的 --parser,让网关变得更加通用。


  目录