基于etcd实现OpenTelemetry采样策略的动态无重启更新


在生产环境中,静态采样率很快就会成为可观测性系统的一个瓶颈。当系统平稳运行时,我们可能只需要1%甚至更低的采样率来节省成本和资源。然而,一旦发生故障或进行性能压测,我们迫切需要将特定服务、甚至特定接口的采样率瞬间提升到100%,以便捕获所有关键的追踪数据。传统方式需要修改配置、重新构建和部署服务,这个过程的延迟足以让故障现场的关键信息丢失。

我们需要的是一个控制平面,能够实时、动态地调整分布式系统中所有服务的采样策略,而无需任何服务重启。这套机制必须是高可用的、低延迟的,并且对业务代码的侵入性要降到最低。

最初的构想是利用一个高可用的分布式键值存储来作为这个控制平面的核心。当配置变更时,服务实例能够近乎实时地感知到并更新其内部的 OpenTelemetry Sampler。技术选型上,etcd 因其在 Kubernetes 生态中的广泛应用、基于 Raft 的强一致性保证以及高效的 Watch 机制,成为了理想选择。相比于使用数据库轮询或消息队列,etcd 的 Watch 机制提供了一种更优雅、资源消耗更低的服务端推送模型。

整个架构的核心在于设计一个能够动态替换其内部逻辑的 OpenTelemetry Sampler。标准库中的 Sampler 一旦被 TracerProvider 初始化后,通常是不可变的。因此,我们需要构建一个自定义的 Sampler 作为代理,它持有一个内部 Sampler 的引用,并通过一个线程安全的机制来动态替换这个内部引用。

sequenceDiagram
    participant Operator as 运维/SRE
    participant etcd
    participant GoService as 微服务实例
    participant CustomSampler as 动态采样器
    participant OTelSDK as OpenTelemetry SDK

    Operator->>etcd: etcdctl put /otel/sampling/order-service '{"type":"ratio","param":1.0}'
    etcd-->>GoService: Watcher 接收到配置变更事件
    GoService->>CustomSampler: UpdateSampler(newConfig)
    CustomSampler->>CustomSampler: (内部)替换底层采样器实例 (线程安全)

    loop 接收请求
        GoService->>OTelSDK: Start Span
        OTelSDK->>CustomSampler: ShouldSample(...)
        CustomSampler->>OTelSDK: 返回最新的采样决策
    end

实现动态采样器

我们的核心是 DynamicSampler,它需要实现 go.opentelemetry.io/otel/sdk/trace.Sampler 接口。其内部包含一个读写锁 sync.RWMutex 来保护对底层实际 Sampler 的访问和更新,确保并发安全。

sampler.go:

package main

import (
	"context"
	"encoding/json"
	"log"
	"sync"

	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/sdk/trace"
	tracesdk "go.opentelemetry.io/otel/sdk/trace"
	"go.opentelemetry.io/otel/trace"
)

// SamplingConfig 定义了我们存储在 etcd 中的配置结构
type SamplingConfig struct {
	// type 可以是 "ratio", "always_on", "always_off"
	Type  string  `json:"type"`
	// param 对于 "ratio" 类型是采样率 (0.0 to 1.0)
	Param float64 `json:"param"`
}

// DynamicSampler 是一个可以动态改变其采样策略的 Sampler
type DynamicSampler struct {
	mu          sync.RWMutex
	current     tracesdk.Sampler
	description string
}

// NewDynamicSampler 创建一个默认采样策略的动态采样器
func NewDynamicSampler() *DynamicSampler {
	// 初始默认采样率为 10%
	initialSampler := tracesdk.TraceIDRatioBased(0.1)
	return &DynamicSampler{
		current:     initialSampler,
		description: initialSampler.Description(),
	}
}

// ShouldSample 实现了 Sampler 接口.
// 它使用读锁来安全地调用底层的采样器
func (s *DynamicSampler) ShouldSample(p tracesdk.SamplingParameters) tracesdk.SamplingResult {
	s.mu.RLock()
	defer s.mu.RUnlock()
	return s.current.ShouldSample(p)
}

// Description 实现了 Sampler 接口.
func (s *DynamicSampler) Description() string {
	s.mu.RLock()
	defer s.mu.RUnlock()
	return s.description
}

// UpdateFromConfig 根据 JSON 配置更新底层的采样器
// 这是一个关键方法,它将被 etcd 的 watcher 调用
func (s *DynamicSampler) UpdateFromConfig(configJSON []byte) {
	var config SamplingConfig
	if err := json.Unmarshal(configJSON, &config); err != nil {
		log.Printf("Error: Failed to unmarshal sampling config: %v", err)
		return
	}

	var newSampler tracesdk.Sampler
	switch config.Type {
	case "always_on":
		newSampler = tracesdk.AlwaysSample()
		log.Println("Info: Updating sampler to AlwaysSample")
	case "always_off":
		newSampler = tracesdk.NeverSample()
		log.Println("Info: Updating sampler to NeverSample")
	case "ratio":
		if config.Param < 0 || config.Param > 1 {
			log.Printf("Warning: Invalid sampling ratio %f, keeping previous sampler.", config.Param)
			return
		}
		newSampler = tracesdk.TraceIDRatioBased(config.Param)
		log.Printf("Info: Updating sampler to TraceIDRatioBased with ratio %f", config.Param)
	default:
		log.Printf("Warning: Unknown sampler type '%s', keeping previous sampler.", config.Type)
		return
	}

	// 使用写锁来安全地替换采样器实例
	s.mu.Lock()
	defer s.mu.Unlock()
	s.current = newSampler
	s.description = newSampler.Description()
}

这里的关键在于 UpdateFromConfig 方法。它接收从 etcd 获取的原始 JSON 数据,解析后根据 type 字段创建不同的标准采样器实例,然后通过写锁安全地替换 DynamicSampler 内部的 current 字段。这种代理模式解耦了配置变更逻辑和采样逻辑。

集成 etcd Watcher

接下来,我们需要在主应用程序中集成 etcd 客户端,并设置一个 watcher 来监听配置变更。我们将使用 go.etcd.io/etcd/client/v3 这个官方库。

main.go:

package main

import (
	"context"
	"log"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

	"go.etcd.io/etcd/client/v3"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/exporters/jaeger"
	"go.opentelemetry.io/otel/propagation"
	"go.opentelemetry.io/otel/sdk/resource"
	tracesdk "go.opentelemetry.io/otel/sdk/trace"
	semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
	"go.opentelemetry.io/otel/trace"
)

const (
	serviceName     = "order-service"
	etcdEndpoints   = "etcd:2379"
	samplingConfigKey = "/otel/sampling/order-service"
	jaegerEndpoint  = "http://jaeger:14268/api/traces"
)

var tracer trace.Tracer

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// 1. 初始化动态采样器
	dynamicSampler := NewDynamicSampler()

	// 2. 初始化 TracerProvider,并使用我们的动态采样器
	tp, err := tracerProvider(dynamicSampler)
	if err != nil {
		log.Fatalf("failed to create tracer provider: %v", err)
	}
	otel.SetTracerProvider(tp)
	otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
	tracer = otel.Tracer(serviceName)

	// 3. 在一个单独的 Goroutine 中启动 etcd watcher
	go watchEtcdConfig(ctx, dynamicSampler)

	// 4. 设置并启动 HTTP 服务器
	http.HandleFunc("/order", handleOrder)
	server := &http.Server{Addr: ":8080"}

	go func() {
		log.Println("Server starting on port 8080...")
		if err := server.ListenAndServe(); err != http.ErrServerClosed {
			log.Fatalf("HTTP server ListenAndServe: %v", err)
		}
	}()
	
	// 优雅停机
	stop := make(chan os.Signal, 1)
	signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
	<-stop

	log.Println("Shutting down server...")
	shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer shutdownCancel()

	if err := server.Shutdown(shutdownCtx); err != nil {
		log.Fatalf("Server Shutdown Failed:%+v", err)
	}
	
	if err := tp.Shutdown(shutdownCtx); err != nil {
		log.Fatalf("Tracer Provider Shutdown Failed: %+v", err)
	}
	log.Println("Server gracefully stopped")
}

// tracerProvider 设置并返回一个 OpenTelemetry TracerProvider
func tracerProvider(sampler tracesdk.Sampler) (*tracesdk.TracerProvider, error) {
	exporter, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(jaegerEndpoint)))
	if err != nil {
		return nil, err
	}
	
	tp := tracesdk.NewTracerProvider(
		tracesdk.WithBatcher(exporter),
		// 关键点:在这里使用我们的自定义采样器
		tracesdk.WithSampler(sampler),
		tracesdk.WithResource(resource.NewWithAttributes(
			semconv.SchemaURL,
			semconv.ServiceName(serviceName),
		)),
	)
	return tp, nil
}

// watchEtcdConfig 连接到 etcd 并监听配置键的变化
func watchEtcdConfig(ctx context.Context, sampler *DynamicSampler) {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{etcdEndpoints},
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatalf("Failed to connect to etcd: %v", err)
	}
	defer cli.Close()

	// 首次启动时,尝试获取一次当前配置
	resp, err := cli.Get(ctx, samplingConfigKey)
	if err != nil {
		log.Printf("Warning: Failed to get initial sampling config from etcd: %v", err)
	} else {
		if len(resp.Kvs) > 0 {
			log.Println("Info: Applying initial sampling config from etcd.")
			sampler.UpdateFromConfig(resp.Kvs[0].Value)
		} else {
			// 如果 etcd 中没有配置,可以写入一个默认值,以方便管理
			defaultConfig := `{"type":"ratio","param":0.1}`
			log.Printf("Info: No initial config found. Setting default in etcd: %s", defaultConfig)
			_, err := cli.Put(ctx, samplingConfigKey, defaultConfig)
			if err != nil {
				log.Printf("Warning: Failed to put default config to etcd: %v", err)
			}
		}
	}
	
	// 设置 Watcher
	watchChan := cli.Watch(ctx, samplingConfigKey)
	log.Println("Info: Watching for sampling config changes on key:", samplingConfigKey)
	for watchResp := range watchChan {
		for _, event := range watchResp.Events {
			// 我们只关心 PUT 事件
			if event.Type == clientv3.EventTypePut {
				log.Printf("Info: Detected config change on key '%s'", string(event.Kv.Key))
				sampler.UpdateFromConfig(event.Kv.Value)
			}
		}
	}
}

// handleOrder 是一个简单的 HTTP handler,用于生成 trace
func handleOrder(w http.ResponseWriter, r *http.Request) {
	// 创建一个 span
	ctx, span := tracer.Start(r.Context(), "handleOrder")
	defer span.End()

	// 模拟一些工作
	time.Sleep(50 * time.Millisecond)
	processPayment(ctx)
	time.Sleep(30 * time.Millisecond)
	
	w.WriteHeader(http.StatusOK)
	w.Write([]byte("Order processed!"))
}

func processPayment(ctx context.Context) {
	_, span := tracer.Start(ctx, "processPayment")
	defer span.End()
	
	// 模拟数据库或第三方服务调用
	time.Sleep(100 * time.Millisecond)
	span.SetAttributes(semconv.DBSystemKey.String("mysql"))
}

main 函数中,我们首先创建了 DynamicSampler 实例,然后用它来初始化 TracerProvider。关键在于 watchEtcdConfig 这个 Goroutine。它会先尝试获取一次初始配置,如果没有则写入一个默认值,然后启动一个无限循环来监听 samplingConfigKey 的变化。一旦有 PUT 事件,就调用 sampler.UpdateFromConfig

本地环境验证

为了在本地运行和验证这套系统,我们可以使用 Docker Compose 来编排所有组件:我们的 Go 应用、etcd、Jaeger 和 Grafana。

docker-compose.yml:

version: '3.8'

services:
  etcd:
    image: bitnami/etcd:3.5
    environment:
      - ALLOW_NONE_AUTHENTICATION=yes
      - ETCD_ADVERTISE_CLIENT_URLS=http://etcd:2379
    ports:
      - "2379:2379"

  jaeger:
    image: jaegertracing/all-in-one:1.41
    ports:
      - "16686:16686" # UI
      - "14268:14268" # Collector

  order-service:
    build: .
    ports:
      - "8080:8080"
    depends_on:
      - etcd
      - jaeger

  grafana:
    image: grafana/grafana:10.2.0
    ports:
      - "3000:3000"
    volumes:
      - ./grafana-provisioning:/etc/grafana/provisioning

Dockerfile:

FROM golang:1.21-alpine

WORKDIR /app

COPY go.mod go.sum ./
RUN go mod download

COPY . .
RUN go build -o /app/server .

EXPOSE 8080
CMD ["/app/server"]

还需要准备 Grafana 的配置文件,让它启动时就自动配置好 Jaeger 数据源。

grafana-provisioning/datasources/datasource.yml:

apiVersion: 1

datasources:
  - name: Jaeger
    type: jaeger
    access: proxy
    url: http://jaeger:16686
    isDefault: true

测试流程

  1. 启动所有服务: docker-compose up --build

  2. 初始状态下,采样率是 10%。我们可以用一个简单的脚本来产生流量:

    while true; do curl http://localhost:8080/order; sleep 0.5; done

    在 Jaeger UI (http://localhost:16686) 中,可以看到大约每 10 个请求中只有 1 个被采样。

  3. 现在,动态调整采样率到 100%。在一个新的终端中,使用 etcdctl (需要安装或通过 docker exec 进入 etcd 容器执行) 来更新配置:

    docker-compose exec etcd etcdctl put /otel/sampling/order-service '{"type":"ratio","param":1.0}'

    观察 order-service 的日志,会看到 “Info: Updating sampler to TraceIDRatioBased with ratio 1.000000”。几乎立刻,Jaeger UI 中会开始捕获到每一个请求的 trace。

  4. 模拟故障排查结束,将采样率降为 0%,以完全停止采样:

    docker-compose exec etcd etcdctl put /otel/sampling/order-service '{"type":"always_off"}'

    再次观察,Jaeger UI 中将不再有新的 trace 产生。

在 Grafana 中可视化效果

虽然 Jaeger 可以查看 trace,但 Grafana 更擅长将不同来源的数据关联起来。我们可以创建一个仪表盘来同时展示服务的 QPS 和实际被采样的 Trace 数量。

  1. 添加一个 Prometheus 数据源(如果环境中已有)。
  2. 创建一个仪表盘,添加两个图表:
    • **图表一:请求速率 (QPS)**。使用类似 rate(http_requests_total{service="order-service"}[1m]) 的 PromQL 查询(假设你已经通过 Prometheus 中间件暴露了该指标)。
    • **图表二:采样速率 (Traces Per Second)**。使用 Jaeger 数据源,在 “Explore” 视图中选择 “Search”,然后切换到 “Metrics” 查询类型,可以查询到每秒的 span 数量。查询语句通常很简单,就是选择你的服务 order-service
    • 关键一步:使用 Annotations。在仪表盘设置中,添加一个 Annotation 查询,数据源选择 etcd。虽然 Grafana 没有内置的 etcd 数据源,但我们可以通过插件或一个简单的中间服务将 etcd 的变更事件转换为 Annotation API 调用。一个更简单的办法是,在我们的 Go 应用中,每当配置更新时,就向 Grafana Annotation API 推送一个事件。这样,每次我们通过 etcdctl 修改采样率,Grafana 的图表上就会出现一条垂直线标记,清晰地展示出配置变更与系统行为(采样速率)变化之间的因果关系。

方案的局限性与未来迭代

这套实现虽然解决了动态采样的问题,但在生产环境中还存在一些可以改进的地方。首先,当前的配置结构过于简单,仅支持全局的采样策略。一个更复杂的系统可能需要基于请求路径、HTTP Header (例如,对内部用户或特定租户开启全量采样) 的规则引擎。这可以通过扩展 SamplingConfig 结构和 DynamicSampler 的逻辑来实现,使其能够持有一组规则并按顺序匹配。

其次,直接将所有服务连接到同一个 etcd 集群可能存在性能和安全问题。在大型系统中,可以使用 etcd 的 gRPC-proxy 或构建一个专门的配置分发服务来作为中间层,降低对核心 etcd 集群的压力。

最后,对 etcd 的强依赖意味着 etcd 的可用性直接影响到采样配置的更新能力。虽然在 etcd 不可用时,服务会继续使用上一次的配置,但需要建立完善的监控告警来确保控制平面的健康。


  目录