指标系统¶
TeleFuser 提供了一个与 Prometheus 兼容的综合指标收集系统,专为生产监控和可观测性设计。
特性¶
- Prometheus 兼容指标 - Counter、Gauge、Histogram、Summary 类型
- 服务级指标 - 任务、队列、GPU 监控
- 阶段级指标 - 流水线阶段自动追踪
- GPU 指标 - 通过 pynvml 进行 NVIDIA GPU 监控
- 线程安全注册表 - 支持并发访问
- Prometheus 导出 - 标准文本展示格式
- 可配置桶 - 可自定义直方图桶
快速开始¶
基础用法¶
from telefuser.metrics import (
get_metrics_registry,
Counter,
Gauge,
Histogram,
)
# 获取全局注册表
registry = get_metrics_registry()
# 创建计数器
requests_total = registry.counter(
"requests_total",
"Total number of requests",
)
requests_total.inc()
# 创建仪表
queue_size = registry.gauge(
"queue_size",
"Current queue size",
)
queue_size.set(10)
# 创建直方图
latency = registry.histogram(
"request_latency_seconds",
"Request latency in seconds",
)
latency.observe(0.5)
# 导出 Prometheus 格式
print(registry.get_prometheus_format())
配置¶
from telefuser.metrics import MetricsConfig, create_metrics_config
# 使用默认配置
config = MetricsConfig()
# 自定义配置
config = create_metrics_config(
enabled=True,
enable_stage_metrics=True,
enable_gpu_metrics=True,
gpu_metrics_interval=5.0,
namespace="telefuser",
)
指标类型¶
Counter(计数器)¶
单调递增的计数器,用于累计值:
from telefuser.metrics import get_metrics_registry
registry = get_metrics_registry()
counter = registry.counter("tasks_completed", "Tasks completed")
counter.inc() # 增加 1
counter.inc(5) # 增加 5
# counter.dec() # 错误!计数器不能减少
print(counter.value) # 当前值
适用场景: - 总请求数 - 完成的任务数 - 遇到的错误数 - 处理的字节数
Gauge(仪表)¶
可增可减的即时值:
gauge = registry.gauge("memory_used_bytes", "Memory used in bytes")
gauge.set(1024) # 设置为特定值
gauge.inc() # 增加 1
gauge.dec() # 减少 1
gauge.set_to_current_time() # 设置为 Unix 时间戳
适用场景: - 当前队列大小 - 内存使用量 - 活跃连接数 - 温度
Histogram(直方图)¶
具有可配置桶的值分布:
# 默认桶: [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
histogram = registry.histogram(
"request_duration_seconds",
"Request duration in seconds",
)
# 自定义桶
histogram = registry.histogram(
"file_size_bytes",
"File size in bytes",
buckets=[100, 1000, 10000, 100000, 1000000],
)
histogram.observe(0.5)
histogram.observe(1.2)
histogram.observe(0.05)
print(histogram.count) # 总观察次数
print(histogram.sum) # 所有值的总和
print(histogram.average) # 平均值
适用场景: - 请求延迟 - 响应大小 - 处理时间
Summary(摘要)¶
具有可配置分位数的摘要指标:
# 默认分位数: [0.5, 0.9, 0.95, 0.99]
summary = registry.summary(
"response_time_seconds",
"Response time in seconds",
)
# 自定义分位数
summary = registry.summary(
"latency_seconds",
"Latency in seconds",
quantiles=[0.5, 0.75, 0.9, 0.95, 0.99],
)
summary.observe(0.1)
summary.observe(0.5)
summary.observe(1.0)
Labels(标签)¶
为指标添加维度:
# 使用字典
counter = registry.counter(
"http_requests_total",
"Total HTTP requests",
labels={"method": "GET", "status": "200"},
)
# 使用 MetricLabel 列表
from telefuser.metrics import MetricLabel
counter = registry.counter(
"http_requests_total",
"Total HTTP requests",
labels=[
MetricLabel("method", "GET"),
MetricLabel("status", "200"),
],
)
阶段指标¶
启用阶段指标¶
from telefuser.metrics import enable_stage_metrics, disable_stage_metrics
from telefuser.core import BaseStage
class MyStage(BaseStage):
def __init__(self, name, config):
super().__init__(name, config)
# 为此阶段启用指标
enable_stage_metrics(self)
def process(self, data):
# 如果使用 @with_metrics,指标会自动追踪
return result
# 禁用指标
disable_stage_metrics(my_stage)
使用装饰器¶
from telefuser.metrics import with_metrics, with_metrics_async
class MyStage(BaseStage):
@with_metrics
def process(self, data):
# 执行时间、成功/失败自动追踪
return self._do_work(data)
@with_metrics_async
async def process_async(self, data):
# 也支持异步方法
return await self._do_work_async(data)
阶段指标上下文¶
每个阶段自动获得以下指标:
| 指标 | 类型 | 描述 |
|---|---|---|
stage_{name}_duration_seconds | Histogram | 执行时长 |
stage_{name}_total | Counter | 总执行次数 |
stage_{name}_errors_total | Counter | 总错误数 |
stage_{name}_active | Gauge | 活跃执行数 |
stage_{name}_input_size_bytes | Histogram | 输入大小 |
stage_{name}_output_size_bytes | Histogram | 输出大小 |
from telefuser.metrics import enable_stage_metrics
context = enable_stage_metrics(my_stage)
# 访问单个指标
context.duration.observe(0.5)
context.total.inc()
context.errors.inc()
context.active.inc()
# 记录完整执行
context.record_execution(
duration=0.5,
success=True,
input_size=1024,
output_size=2048,
)
服务指标¶
概述¶
用于监控 TeleFuser 服务的服务级指标:
from telefuser.metrics import get_service_metrics, ServiceMetrics
# 获取全局服务指标
service_metrics = get_service_metrics()
# 记录任务事件
service_metrics.record_task_created()
service_metrics.record_task_completed(duration=1.5)
service_metrics.record_task_failed()
service_metrics.record_task_cancelled()
# 更新队列指标
service_metrics.update_queue_metrics(
size=10,
pending=5,
processing=2,
)
# 收集 GPU 指标
service_metrics.collect_gpu_metrics()
# 获取 Prometheus 格式
output = service_metrics.get_prometheus_format()
任务指标¶
| 指标 | 类型 | 描述 |
|---|---|---|
tasks_created_total | Counter | 创建的任务总数 |
tasks_completed_total | Counter | 成功完成的任务 |
tasks_failed_total | Counter | 失败的任务 |
tasks_cancelled_total | Counter | 取消的任务 |
task_duration_seconds | Histogram | 任务执行时长 |
task_queue_wait_seconds | Histogram | 队列等待时间 |
队列指标¶
| 指标 | 类型 | 描述 |
|---|---|---|
queue_size | Gauge | 队列总大小 |
queue_pending | Gauge | 待处理任务数 |
queue_processing | Gauge | 处理中任务数 |
GPU 指标¶
| 指标 | 类型 | 描述 |
|---|---|---|
gpu_{id}_memory_used_bytes | Gauge | GPU 已用显存 |
gpu_{id}_memory_total_bytes | Gauge | GPU 总显存 |
gpu_{id}_utilization_ratio | Gauge | GPU 利用率 (0-1) |
gpu_{id}_temperature_celsius | Gauge | GPU 温度 |
周期性收集¶
import asyncio
from telefuser.metrics import get_service_metrics
async def main():
service_metrics = get_service_metrics()
# 启动周期性 GPU 指标收集
asyncio.create_task(service_metrics.start_periodic_collection())
# 你的应用逻辑
await run_application()
Prometheus 集成¶
暴露指标端点¶
from fastapi import FastAPI, Response
from telefuser.metrics import get_metrics_registry
app = FastAPI()
@app.get("/metrics")
async def metrics():
"""Prometheus 指标端点。"""
registry = get_metrics_registry()
return Response(
content=registry.get_prometheus_format(),
media_type="text/plain",
)
Prometheus 配置¶
# prometheus.yml
scrape_configs:
- job_name: 'telefuser'
static_configs:
- targets: ['localhost:8000']
metrics_path: /metrics
示例输出¶
# HELP telefuser_tasks_created_total Total number of tasks created
# TYPE telefuser_tasks_created_total counter
telefuser_tasks_created_total 100
# HELP telefuser_task_duration_seconds Duration of task execution in seconds
# TYPE telefuser_task_duration_seconds histogram
telefuser_task_duration_seconds_bucket{le="0.005"} 10
telefuser_task_duration_seconds_bucket{le="0.01"} 25
telefuser_task_duration_seconds_bucket{le="0.025"} 45
telefuser_task_duration_seconds_bucket{le="0.05"} 60
telefuser_task_duration_seconds_bucket{le="0.1"} 75
telefuser_task_duration_seconds_bucket{le="0.25"} 85
telefuser_task_duration_seconds_bucket{le="0.5"} 92
telefuser_task_duration_seconds_bucket{le="1.0"} 97
telefuser_task_duration_seconds_bucket{le="2.5"} 99
telefuser_task_duration_seconds_bucket{le="5.0"} 100
telefuser_task_duration_seconds_bucket{le="10.0"} 100
telefuser_task_duration_seconds_bucket{le="+Inf"} 100
telefuser_task_duration_seconds_sum 45.5
telefuser_task_duration_seconds_count 100
# HELP telefuser_gpu_0_memory_used_bytes GPU 0 memory used in bytes
# TYPE telefuser_gpu_0_memory_used_bytes gauge
telefuser_gpu_0_memory_used_bytes 8589934592
配置选项¶
| 选项 | 类型 | 默认值 | 描述 |
|---|---|---|---|
enabled | bool | True | 启用指标收集 |
enable_stage_metrics | bool | True | 启用阶段级指标 |
enable_gpu_metrics | bool | True | 启用 GPU 指标 |
enable_http_metrics | bool | True | 启用 HTTP 请求指标 |
enable_queue_metrics | bool | True | 启用队列指标 |
gpu_metrics_interval | float | 5.0 | GPU 收集间隔(秒) |
histogram_buckets | list[float] | [0.005, 0.01, ...] | 默认直方图桶 |
metrics_path | str | "/metrics" | HTTP 端点路径 |
namespace | str | "telefuser" | 指标名前缀 |
gpu_platform | str | "auto" | GPU 平台 (nvidia/amd/auto) |
高级用法¶
自定义收集器¶
从外部源添加自定义指标:
from telefuser.metrics import get_metrics_registry
def custom_collector():
"""收集自定义指标。"""
return [
"# HELP custom_metric My custom metric",
"# TYPE custom_metric gauge",
f"custom_metric {get_custom_value()}",
]
registry = get_metrics_registry()
registry.add_custom_collector(custom_collector)
多注册表¶
from telefuser.metrics import MetricRegistry
# 为不同目的创建独立注册表
app_registry = MetricRegistry(namespace="app")
system_registry = MetricRegistry(namespace="system")
# 每个注册表独立
app_registry.counter("requests", "App requests")
system_registry.gauge("cpu_usage", "CPU usage")
重置指标¶
最佳实践¶
1. 命名约定¶
遵循 Prometheus 命名约定:
# 推荐
registry.counter("http_requests_total", "...")
registry.histogram("request_duration_seconds", "...")
registry.gauge("memory_used_bytes", "...")
# 避免
registry.counter("httpRequests", "...") # 使用 snake_case
registry.gauge("memory", "...") # 包含单位
2. 使用适当的类型¶
- Counter: 累计值(请求、错误、字节)
- Gauge: 当前状态(队列大小、内存、温度)
- Histogram: 分布(延迟、大小)
- Summary: 分位数(p50、p95、p99)
3. 有意义的标签¶
# 推荐 - 用于聚合的维度
registry.counter(
"http_requests_total",
"Total HTTP requests",
labels={"method": "GET", "endpoint": "/api/users"},
)
# 避免 - 高基数
registry.counter(
"http_requests_total",
"...",
labels={"user_id": "12345"}, # 唯一值太多!
)
4. 直方图桶¶
选择适合数据的桶: