Solid.js 与 Django 应用中基于上下文传递的 ELK 结构化日志系统构建实践


生产环境的日志总是散乱的。前端应用在用户的浏览器里安静地报错,而后端服务在自己的世界里记录着业务流程。当用户报告一个模糊不清的“功能失灵”时,我们面对的是两套毫无关联的时间戳和日志流。前端开发者在 console.log 的海洋里挣扎,后端工程师在 grep 数G的文本文件。定位一个由前端交互触发的、在后端深处才显现的 bug,成了一场昂贵的跨团队“猜谜游戏”。

这个问题的根源在于上下文的丢失。一次完整的用户交互,从点击按钮到后台数据落库,本应是一个完整的“事务”。但它的执行轨迹却被网络边界无情地切分成了两半。我们的目标,就是用一个唯一的标识符,像一根线,将这些散落的珠子(日志)串起来。

初步构想是引入一个 correlation_id (或 trace_id) 的概念。这个ID在用户交互的起点(前端)被创建,然后通过API请求头传递到后端。后端的所有日志,以及前端的所有日志,都必须附带这个ID。当所有日志最终汇集到ELK这样的中央日志系统时,我们只需用这个ID进行筛选,就能重构出一次操作的全链路视图。

这套方案的技术选型并不复杂,但魔鬼藏在细节里。我们需要一个轻量级的前端框架(Solid.js),一个稳健的后端框架(Django),以及一套日志聚合方案(ELK Stack)。而将它们无缝粘合起来的关键,在于对框架生命周期和设计模式的恰当运用,尤其是在Django中,我们需要一个无侵入的方式来注入这个跨请求的上下文。

技术栈与架构概览

在深入代码之前,我们先明确整个数据流。

sequenceDiagram
    participant User as 用户
    participant Solid as Solid.js (Frontend)
    participant Django as Django (Backend)
    participant Logstash
    participant Elasticsearch
    participant Kibana

    User->>Solid: 触发操作 (例如: 点击按钮)
    Solid->>Solid: 生成 Correlation ID (e.g., `uuidv4()`)
    Solid->>Django: 发起API请求 (Header: `X-Correlation-ID`)
    Solid->>Django: (异步)发送前端日志 (Payload包含`correlation_id`)

    Django->>Django: Middleware捕获`X-Correlation-ID`
    Django->>Django: 将ID存入请求上下文 (threading.local)
    Django->>Django: View处理业务逻辑,生成日志
    Note right of Django: 所有日志自动
附加`correlation_id` Django->>Logstash: 发送结构化日志 (JSON) Logstash->>Elasticsearch: 清洗、转换、索引日志 User->>Kibana: 查询日志 Kibana->>Elasticsearch: 按`correlation_id`聚合查询 Elasticsearch-->>Kibana: 返回关联的前后端日志 Kibana-->>User: 展示全链路日志视图

这个流程的核心在于两点:

  1. 上下文传递: X-Correlation-ID HTTP头的生成与解析。
  2. 上下文注入: Django中间件如何将ID注入到Python的logging模块中,使其对整个请求的生命周期可见。

环境准备:本地ELK Stack

在真实项目中,ELK集群是独立运维的。为了本地开发和验证,我们使用 docker-compose 快速启动一个精简版ELK环境。

创建一个 docker-compose.yml 文件:

# docker-compose.yml
version: '3.7'

services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.9.0
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false # 仅用于开发环境
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ports:
      - "9200:9200"
    volumes:
      - esdata:/usr/share/elasticsearch/data

  logstash:
    image: docker.elastic.co/logstash/logstash:8.9.0
    container_name: logstash
    ports:
      - "5044:5044"
      - "5000:5000/tcp" # 我们将使用TCP输入
      - "9600:9600"
    volumes:
      - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
    depends_on:
      - elasticsearch

  kibana:
    image: docker.elastic.co/kibana/kibana:8.9.0
    container_name: kibana
    ports:
      - "5601:5601"
    depends_on:
      - elasticsearch

volumes:
  esdata:
    driver: local

同时,提供一个基础的 logstash.conf 配置文件,它监听5000端口,接收JSON格式的日志,并将其发送到Elasticsearch。

# logstash.conf
input {
  tcp {
    port => 5000
    codec => json_lines
  }
}

filter {
  # 在这里可以进行数据清洗、字段转换等操作
  # 例如,解析时间戳
  date {
    match => ["timestamp", "ISO8601"]
    target => "@timestamp"
  }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "django-solid-logs-%{+YYYY.MM.dd}"
  }
  # 开发时在控制台也输出一份,方便调试
  stdout { codec => rubydebug }
}

在包含这两个文件的目录下运行 docker-compose up -d,稍等片刻,一个本地的ELK环境就绪了。

后端实现:Django中间件与结构化日志

后端的改造是整个方案的基石。我们需要完成三件事:

  1. 安装必要的Python库。
  2. 编写一个中间件来处理 X-Correlation-ID
  3. 配置Django的LOGGING系统,使其输出JSON并发送到Logstash。

1. 安装依赖

pip install django djangorestframework python-json-logger python-logstash

2. 设计CorrelationIDMiddleware

在Django中,中间件是处理请求-响应生命周期的完美钩子。但请求上下文如何在不同函数、类之间传递?一个常见的错误是尝试将其附加到 request 对象上,但这在很多地方(如后台任务、模型信号)都无法访问。

这里的关键是使用 threading.local()。它提供了一个线程隔离的存储空间,我们可以在中间件的开头存入ID,在请求处理的任何地方都能安全地读出,而不必担心多线程环境下的数据污染。

在你的Django App(例如 core)下创建 middleware.py

# core/middleware.py
import uuid
import threading
from typing import Callable

from django.http import HttpRequest, HttpResponse

# 创建一个线程安全的本地存储对象
_request_storage = threading.local()

# 常量定义
CORRELATION_ID_HEADER = "HTTP_X_CORRELATION_ID"
CORRELATION_ID_ATTR = "correlation_id"

def get_correlation_id() -> str | None:
    """
    一个全局可用的函数,用于获取当前请求的correlation_id。
    这是解耦的关键,业务代码不需要知道ID来自何处,只需调用此函数。
    """
    return getattr(_request_storage, CORRELATION_ID_ATTR, None)

class CorrelationIDMiddleware:
    """
    该中间件负责从请求头中提取X-Correlation-ID,
    如果不存在,则生成一个新的ID。
    然后将其存储在线程本地存储中,供后续的日志记录器等使用。
    """
    def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]):
        self.get_response = get_response

    def __call__(self, request: HttpRequest) -> HttpResponse:
        # 从请求头获取ID,注意Django会将 X-Correlation-ID 转换为 HTTP_X_CORRELATION_ID
        correlation_id = request.META.get(CORRELATION_ID_HEADER)
        if not correlation_id:
            correlation_id = str(uuid.uuid4())

        # 将ID设置到线程本地存储中
        setattr(_request_storage, CORRELATION_ID_ATTR, correlation_id)

        response = self.get_response(request)

        # 在响应中也附加这个ID,方便前端调试或客户端链路追踪
        response['X-Correlation-ID'] = correlation_id
        
        # 请求结束时,清理存储,虽然对于WSGI来说不是必须的,但这是个好习惯
        delattr(_request_storage, CORRELATION_ID_ATTR)

        return response

3. 配置Django LOGGING

这是最复杂也最容易出错的部分。Django的日志配置非常灵活,但也相当繁琐。我们需要定义formatters, filters, handlers, 和 loggers

# settings.py
from .middleware import get_correlation_id

# ...

# 将中间件添加到MIDDLEWARE列表的顶部或靠前位置,确保尽早处理ID
MIDDLEWARE = [
    'core.middleware.CorrelationIDMiddleware',
    # ... 其他中间件
]

# ...

class CorrelationIDFilter:
    """
    这是一个日志过滤器,用于将correlation_id注入到每条日志记录中。
    """
    def filter(self, record):
        record.correlation_id = get_correlation_id()
        return True

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        # 用于发送到Logstash的JSON格式
        'json': {
            '()': 'pythonjsonlogger.jsonlogger.JsonFormatter',
            'format': '%(asctime)s %(levelname)s %(name)s %(module)s %(funcName)s %(lineno)d %(correlation_id)s %(message)s'
        },
        # 用于本地开发时在控制台输出的易读格式
        'verbose': {
            'format': '{levelname} {asctime} {module} {correlation_id} {message}',
            'style': '{',
        },
    },
    'filters': {
        'correlation_id': {
            '()': 'your_project.settings.CorrelationIDFilter',  # 确保路径正确
        },
    },
    'handlers': {
        'console': {
            'level': 'DEBUG',
            'class': 'logging.StreamHandler',
            'formatter': 'verbose',
            'filters': ['correlation_id'],
        },
        'logstash': {
            'level': 'INFO',
            'class': 'logstash.TCPLogstashHandler',
            'host': 'localhost', # 指向你的Logstash服务
            'port': 5000,
            'version': 1,
            'message_type': 'django_log',
            'fqdn': False,
            'tags': ['django', 'production'],
            'formatter': 'json',
            'filters': ['correlation_id'],
        },
    },
    'loggers': {
        # Django框架本身的日志
        'django': {
            'handlers': ['console', 'logstash'],
            'level': 'INFO',
            'propagate': True,
        },
        # 我们自己应用代码的日志
        '': { # 空字符串''表示root logger
            'handlers': ['console', 'logstash'],
            'level': 'DEBUG', # 开发时可以设为DEBUG
            'propagate': True,
        }
    },
}

配置完成后,在任何视图或服务代码中,我们只需要像平常一样使用 logging 模块即可:

# core/views.py
import logging
from rest_framework.views import APIView
from rest_framework.response import Response

logger = logging.getLogger(__name__)

class MyTestView(APIView):
    def get(self, request, *args, **kwargs):
        logger.info("开始处理请求...", extra={'user_id': request.user.id if request.user.is_authenticated else 'anonymous'})
        
        try:
            # 模拟一些业务逻辑
            if request.query_params.get('error'):
                raise ValueError("一个模拟的业务错误")
            
            logger.debug("业务逻辑执行成功。")
            
            return Response({"status": "ok"})
        
        except Exception as e:
            # 使用 exc_info=True 会自动附加堆栈信息
            logger.error("处理请求时发生错误", exc_info=True, extra={'error_detail': str(e)})
            return Response({"status": "error", "message": str(e)}, status=500)

当这个视图被调用时,产生的每条日志都会通过CorrelationIDFilter自动带上从中间件设置的ID,然后被JsonFormatter格式化并由TCPLogstashHandler发送出去。

前端实现:Solid.js的日志服务与上下文注入

前端的挑战在于如何优雅地管理correlation_id并将其应用到所有日志和API调用中。我们将使用Solid.js的createContext API,这是一种依赖注入模式的实现。

1. 创建LoggerService

这个服务将是单例的,负责ID的生命周期管理、日志的格式化、批处理和发送。

// src/services/LoggerService.ts
import { v4 as uuidv4 } from 'uuid';

interface LogPayload {
  level: 'info' | 'warn' | 'error';
  message: string;
  context?: Record<string, any>;
  timestamp: string;
  correlation_id: string;
}

class LoggerService {
  private correlationId: string;
  private logQueue: LogPayload[] = [];
  private batchSize = 10;
  private flushInterval = 5000; // 5 seconds
  private flushTimer: number | null = null;

  constructor() {
    this.correlationId = this.generateCorrelationId();
    // 页面刷新或关闭时,尝试发送剩余日志
    window.addEventListener('beforeunload', this.flushLogs.bind(this));
  }

  private generateCorrelationId(): string {
    // 我们可以将会话ID存储在sessionStorage中,以在页面刷新时保持一致
    let sessionId = sessionStorage.getItem('correlation_id');
    if (!sessionId) {
      sessionId = uuidv4();
      sessionStorage.setItem('correlation_id', sessionId);
    }
    return sessionId;
  }
  
  public getCorrelationId(): string {
    return this.correlationId;
  }

  public info(message: string, context: Record<string, any> = {}) {
    this.log('info', message, context);
  }

  public warn(message: string, context: Record<string, any> = {}) {
    this.log('warn', message, context);
  }
  
  public error(message: string, error?: Error, context: Record<string, any> = {}) {
    const errorContext = { ...context };
    if (error) {
        errorContext.error_message = error.message;
        errorContext.error_stack = error.stack;
    }
    this.log('error', message, errorContext);
  }

  private log(level: LogPayload['level'], message: string, context: Record<string, any> = {}) {
    const payload: LogPayload = {
      level,
      message,
      context: {
        ...context,
        user_agent: navigator.userAgent,
        location: window.location.pathname,
      },
      timestamp: new Date().toISOString(),
      correlation_id: this.correlationId,
    };
    
    // 开发环境下直接打印到控制台
    if (process.env.NODE_ENV === 'development') {
      console[level](`[${level.toUpperCase()}] ${message}`, payload);
    }

    this.logQueue.push(payload);

    if (this.logQueue.length >= this.batchSize) {
      this.flushLogs();
    } else if (!this.flushTimer) {
      this.flushTimer = window.setTimeout(() => {
        this.flushLogs();
        this.flushTimer = null;
      }, this.flushInterval);
    }
  }

  private async flushLogs() {
    if (this.logQueue.length === 0) {
      return;
    }

    const logsToSend = [...this.logQueue];
    this.logQueue = [];
    if (this.flushTimer) {
        clearTimeout(this.flushTimer);
        this.flushTimer = null;
    }

    try {
      // 在Django中需要创建一个专门接收前端日志的API endpoint
      // 注意:真实项目中,这个endpoint需要做严格的速率限制和安全防护
      await fetch('/api/frontend-logs/', {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          // 也可以在这里传递ID,但日志体中已经有了
        },
        body: JSON.stringify(logsToSend),
        keepalive: true, // 确保页面卸载时请求也能发出
      });
    } catch (e) {
      console.error("Failed to send logs to server:", e);
      // 发送失败,可以将日志重新放回队列
      this.logQueue.unshift(...logsToSend);
    }
  }
}

export const logger = new LoggerService();

为了接收前端日志,Django需要一个简单的视图:

# core/views.py
import logging
# ... other imports

frontend_logger = logging.getLogger('frontend_logger')

class FrontendLogReceiverView(APIView):
    # 这个视图不需要认证,但需要严格的限流
    permission_classes = [] 
    authentication_classes = []

    def post(self, request, *args, **kwargs):
        logs = request.data
        if not isinstance(logs, list):
            logs = [logs]
        
        for log_entry in logs:
            # 我们相信前端发来的数据结构,但生产环境需要校验
            level = log_entry.get('level', 'info').lower()
            message = log_entry.get('message', 'No message')
            
            # 使用extra字段将前端的上下文完整地传递给后端日志系统
            # 这样在Kibana中可以方便地展开和搜索
            extra_context = {
                "frontend_context": log_entry,
            }

            log_method = getattr(frontend_logger, level, frontend_logger.info)
            log_method(f"[Frontend Log] {message}", extra=extra_context)
            
        return Response(status=202) # 接受请求,异步处理

别忘了在urls.py中配置路由。

2. 使用createContext提供服务

// src/contexts/LoggerContext.tsx
import { createContext, useContext, JSX } from 'solid-js';
import { logger as loggerInstance } from '../services/LoggerService';

type LoggerServiceType = typeof loggerInstance;

const LoggerContext = createContext<LoggerServiceType>();

export function LoggerProvider(props: { children: JSX.Element }) {
  return (
    <LoggerContext.Provider value={loggerInstance}>
      {props.children}
    </LoggerContext.Provider>
  );
}

export function useLogger() {
  const context = useContext(LoggerContext);
  if (!context) {
    throw new Error('useLogger must be used within a LoggerProvider');
  }
  return context;
}

在应用的根组件 App.tsx 中包裹 LoggerProvider

3. 封装fetch API

为了让所有API请求自动带上 X-Correlation-ID,我们创建一个 fetch 的包装器。

// src/lib/api.ts
import { logger } from '../services/LoggerService';

export async function instrumentedFetch(
    input: RequestInfo | URL,
    init?: RequestInit
): Promise<Response> {
    const correlationId = logger.getCorrelationId();

    const headers = new Headers(init?.headers);
    headers.set('X-Correlation-ID', correlationId);
    
    // 如果有CSRF token等,也在这里统一添加
    // headers.set('X-CSRFToken', getCookie('csrftoken'));

    const newInit: RequestInit = {
        ...init,
        headers,
    };

    logger.info(`API Request: ${input.toString()}`, { method: init?.method || 'GET' });

    try {
        const response = await fetch(input, newInit);
        if (!response.ok) {
            logger.error(`API Response Error: ${response.status} ${response.statusText}`, undefined, {
                url: input.toString(),
                status: response.status
            });
        }
        return response;
    } catch (error) {
        if (error instanceof Error) {
            logger.error(`API Fetch Failed: ${error.message}`, error, { url: input.toString() });
        }
        throw error;
    }
}

4. 在组件中使用

现在,在任何组件中,我们都可以轻松地记录日志和发起API请求。

// src/components/MyComponent.tsx
import { createSignal } from 'solid-js';
import { useLogger } from '../contexts/LoggerContext';
import { instrumentedFetch } from '../lib/api';

function MyComponent() {
  const logger = useLogger();
  const [data, setData] = createSignal<any>(null);

  const fetchData = async () => {
    logger.info('User clicked "Fetch Data" button.', { component: 'MyComponent' });
    try {
      const response = await instrumentedFetch('/api/my-test-view/');
      const result = await response.json();
      setData(result);
      logger.info('Data fetched successfully.');
    } catch (e) {
      // instrumentedFetch 内部已经记录了错误,这里可以不记
    }
  };
  
  const fetchWithError = async () => {
    logger.warn('User is attempting to trigger a server error.');
    await instrumentedFetch('/api/my-test-view/?error=true');
  };

  return (
    <div>
      <button onClick={fetchData}>Fetch Data</button>
      <button onClick={fetchWithError}>Trigger Server Error</button>
      {data() && <pre>{JSON.stringify(data(), null, 2)}</pre>}
    </div>
  );
}

在Kibana中验证成果

现在,当我们在前端应用中点击按钮时,会发生以下一系列事件,最终都汇聚到Kibana。

  1. 打开Kibana(http://localhost:5601),进入 Discover 页面。
  2. 创建一个 django-solid-logs-* 的数据视图(Data View)。
  3. 在前端应用中点击 “Fetch Data” 按钮。
  4. 在Kibana的搜索框中,输入前端生成的 correlation_id。例如: correlation_id : "your-uuid-from-session-storage"

你将看到一个完美的事件序列:

  1. 一条来自前端的 info 日志: User clicked "Fetch Data" button.
  2. 一条来自前端的 info 日志: API Request: /api/my-test-view/
  3. 一条来自后端的 info 日志 (来自django logger): [Django Log] "GET /api/my-test-view/ HTTP/1.1" 200 15
  4. 一条来自后端的 info 日志 (来自core.views): 开始处理请求...
  5. 一条来自后端的 debug 日志 (来自core.views): 业务逻辑执行成功。

如果点击 “Trigger Server Error” 按钮,你将看到一条来自后端的 error 日志,包含完整的Python堆栈跟踪,同样带有相同的 correlation_id。你终于可以准确地告诉后端同事:“是ID为 abc-123 的这次操作导致了你们服务在10:15:32的崩溃,这是完整的堆栈。”

局限性与未来展望

我们构建的这套系统解决了从单体前端到单体后端的链路追踪问题,在许多项目中已经足够强大。但它并非银弹,也存在一些局限性:

  1. 微服务扩展性: 如果Django后端需要调用其他微服务,correlation_id 需要被继续传递下去(例如通过HTTP头或消息队列的元数据)。这要求所有下游服务都遵循相同的上下文传递协议。
  2. 性能开销: 日志量巨大时,向Logstash发送同步的TCP请求可能会对Django应用的性能产生影响。在生产环境中,通常会采用更可靠的异步日志管道,例如将日志写入本地文件,由Filebeat等代理来采集,或者使用基于UDP的日志 handler。
  3. 非侵入性: 虽然我们的方案对业务代码的侵入性很低,但前端仍需要主动使用 instrumentedFetch 包装器。更理想的方案可能是通过 Service Worker 或其他代理层来自动拦截和注入头信息。

未来的迭代方向可以考虑引入OpenTelemetry标准。它提供了一套完整的规范和SDK,用于生成和传播 traces, metrics, 和 logs,能够更优雅地处理跨服务上下文传递(traceparent头),并支持更复杂的span概念,从而将我们的“日志链”升级为真正的“分布式追踪系统”。但对于很多中小型项目而言,本文实现的基于correlation_id的方案,以其简洁和高效,提供了一个极具性价比的起点。


  目录