Metrics System¶
TeleFuser provides a comprehensive metrics collection system compatible with Prometheus, designed for production monitoring and observability.
Features¶
- Prometheus-compatible metrics - Counter, Gauge, Histogram, Summary types
- Service-level metrics - Tasks, queue, GPU monitoring
- Stage-level metrics - Automatic tracking for pipeline stages
- GPU metrics - NVIDIA GPU monitoring via pynvml
- Thread-safe registry - Concurrent access support
- Prometheus export - Standard text exposition format
- Configurable buckets - Customizable histogram buckets
Quick Start¶
Basic Usage¶
from telefuser.metrics import (
get_metrics_registry,
Counter,
Gauge,
Histogram,
)
# Get the global registry
registry = get_metrics_registry()
# Create a counter
requests_total = registry.counter(
"requests_total",
"Total number of requests",
)
requests_total.inc()
# Create a gauge
queue_size = registry.gauge(
"queue_size",
"Current queue size",
)
queue_size.set(10)
# Create a histogram
latency = registry.histogram(
"request_latency_seconds",
"Request latency in seconds",
)
latency.observe(0.5)
# Export in Prometheus format
print(registry.get_prometheus_format())
Configuration¶
from telefuser.metrics import MetricsConfig, create_metrics_config
# Use default configuration
config = MetricsConfig()
# Custom configuration
config = create_metrics_config(
enabled=True,
enable_stage_metrics=True,
enable_gpu_metrics=True,
gpu_metrics_interval=5.0,
namespace="telefuser",
)
Metric Types¶
Counter¶
Monotonically increasing counter for cumulative values:
from telefuser.metrics import get_metrics_registry
registry = get_metrics_registry()
counter = registry.counter("tasks_completed", "Tasks completed")
counter.inc() # Increment by 1
counter.inc(5) # Increment by 5
# counter.dec() # Error! Counters cannot be decremented
print(counter.value) # Current value
Use cases: - Total requests - Tasks completed - Errors encountered - Bytes processed
Gauge¶
Point-in-time value that can go up or down:
gauge = registry.gauge("memory_used_bytes", "Memory used in bytes")
gauge.set(1024) # Set to specific value
gauge.inc() # Increment by 1
gauge.dec() # Decrement by 1
gauge.set_to_current_time() # Set to Unix timestamp
Use cases: - Current queue size - Memory usage - Active connections - Temperature
Histogram¶
Distribution of values with configurable buckets:
# Default buckets: [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
histogram = registry.histogram(
"request_duration_seconds",
"Request duration in seconds",
)
# Custom buckets
histogram = registry.histogram(
"file_size_bytes",
"File size in bytes",
buckets=[100, 1000, 10000, 100000, 1000000],
)
histogram.observe(0.5)
histogram.observe(1.2)
histogram.observe(0.05)
print(histogram.count) # Total observations
print(histogram.sum) # Sum of all values
print(histogram.average) # Average value
Use cases: - Request latency - Response sizes - Processing times
Summary¶
Summary metric with configurable quantiles:
# Default quantiles: [0.5, 0.9, 0.95, 0.99]
summary = registry.summary(
"response_time_seconds",
"Response time in seconds",
)
# Custom quantiles
summary = registry.summary(
"latency_seconds",
"Latency in seconds",
quantiles=[0.5, 0.75, 0.9, 0.95, 0.99],
)
summary.observe(0.1)
summary.observe(0.5)
summary.observe(1.0)
Labels¶
Add dimensions to metrics:
# Using dictionary
counter = registry.counter(
"http_requests_total",
"Total HTTP requests",
labels={"method": "GET", "status": "200"},
)
# Using list of MetricLabel
from telefuser.metrics import MetricLabel
counter = registry.counter(
"http_requests_total",
"Total HTTP requests",
labels=[
MetricLabel("method", "GET"),
MetricLabel("status", "200"),
],
)
Stage Metrics¶
Enable Stage Metrics¶
from telefuser.metrics import enable_stage_metrics, disable_stage_metrics
from telefuser.core import BaseStage
class MyStage(BaseStage):
def __init__(self, name, config):
super().__init__(name, config)
# Enable metrics for this stage
enable_stage_metrics(self)
def process(self, data):
# Metrics are automatically tracked if using @with_metrics
return result
# Disable metrics
disable_stage_metrics(my_stage)
Using Decorators¶
from telefuser.metrics import with_metrics, with_metrics_async
class MyStage(BaseStage):
@with_metrics
def process(self, data):
# Execution time, success/failure automatically tracked
return self._do_work(data)
@with_metrics_async
async def process_async(self, data):
# Also works for async methods
return await self._do_work_async(data)
Stage Metric Context¶
Each stage gets the following metrics automatically:
| Metric | Type | Description |
|---|---|---|
stage_{name}_duration_seconds | Histogram | Execution duration |
stage_{name}_total | Counter | Total executions |
stage_{name}_errors_total | Counter | Total errors |
stage_{name}_active | Gauge | Active executions |
stage_{name}_input_size_bytes | Histogram | Input size |
stage_{name}_output_size_bytes | Histogram | Output size |
from telefuser.metrics import enable_stage_metrics
context = enable_stage_metrics(my_stage)
# Access individual metrics
context.duration.observe(0.5)
context.total.inc()
context.errors.inc()
context.active.inc()
# Record a complete execution
context.record_execution(
duration=0.5,
success=True,
input_size=1024,
output_size=2048,
)
Service Metrics¶
Overview¶
Service-level metrics for monitoring TeleFuser service:
from telefuser.metrics import get_service_metrics, ServiceMetrics
# Get global service metrics
service_metrics = get_service_metrics()
# Record task events
service_metrics.record_task_created()
service_metrics.record_task_completed(duration=1.5)
service_metrics.record_task_failed()
service_metrics.record_task_cancelled()
# Update queue metrics
service_metrics.update_queue_metrics(
size=10,
pending=5,
processing=2,
)
# Collect GPU metrics
service_metrics.collect_gpu_metrics()
# Get Prometheus format
output = service_metrics.get_prometheus_format()
Task Metrics¶
| Metric | Type | Description |
|---|---|---|
tasks_created_total | Counter | Total tasks created |
tasks_completed_total | Counter | Tasks completed successfully |
tasks_failed_total | Counter | Tasks that failed |
tasks_cancelled_total | Counter | Tasks cancelled |
task_duration_seconds | Histogram | Task execution duration |
task_queue_wait_seconds | Histogram | Queue wait time |
Queue Metrics¶
| Metric | Type | Description |
|---|---|---|
queue_size | Gauge | Total queue size |
queue_pending | Gauge | Pending tasks |
queue_processing | Gauge | Processing tasks |
GPU Metrics¶
| Metric | Type | Description |
|---|---|---|
gpu_{id}_memory_used_bytes | Gauge | GPU memory used |
gpu_{id}_memory_total_bytes | Gauge | GPU total memory |
gpu_{id}_utilization_ratio | Gauge | GPU utilization (0-1) |
gpu_{id}_temperature_celsius | Gauge | GPU temperature |
Periodic Collection¶
import asyncio
from telefuser.metrics import get_service_metrics
async def main():
service_metrics = get_service_metrics()
# Start periodic GPU metrics collection
asyncio.create_task(service_metrics.start_periodic_collection())
# Your application logic here
await run_application()
Prometheus Integration¶
Expose Metrics Endpoint¶
from fastapi import FastAPI
from telefuser.metrics import get_metrics_registry
app = FastAPI()
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint."""
registry = get_metrics_registry()
return Response(
content=registry.get_prometheus_format(),
media_type="text/plain",
)
Prometheus Configuration¶
# prometheus.yml
scrape_configs:
- job_name: 'telefuser'
static_configs:
- targets: ['localhost:8000']
metrics_path: /metrics
Example Output¶
# HELP telefuser_tasks_created_total Total number of tasks created
# TYPE telefuser_tasks_created_total counter
telefuser_tasks_created_total 100
# HELP telefuser_task_duration_seconds Duration of task execution in seconds
# TYPE telefuser_task_duration_seconds histogram
telefuser_task_duration_seconds_bucket{le="0.005"} 10
telefuser_task_duration_seconds_bucket{le="0.01"} 25
telefuser_task_duration_seconds_bucket{le="0.025"} 45
telefuser_task_duration_seconds_bucket{le="0.05"} 60
telefuser_task_duration_seconds_bucket{le="0.1"} 75
telefuser_task_duration_seconds_bucket{le="0.25"} 85
telefuser_task_duration_seconds_bucket{le="0.5"} 92
telefuser_task_duration_seconds_bucket{le="1.0"} 97
telefuser_task_duration_seconds_bucket{le="2.5"} 99
telefuser_task_duration_seconds_bucket{le="5.0"} 100
telefuser_task_duration_seconds_bucket{le="10.0"} 100
telefuser_task_duration_seconds_bucket{le="+Inf"} 100
telefuser_task_duration_seconds_sum 45.5
telefuser_task_duration_seconds_count 100
# HELP telefuser_gpu_0_memory_used_bytes GPU 0 memory used in bytes
# TYPE telefuser_gpu_0_memory_used_bytes gauge
telefuser_gpu_0_memory_used_bytes 8589934592
Configuration Options¶
| Option | Type | Default | Description |
|---|---|---|---|
enabled | bool | True | Enable metrics collection |
enable_stage_metrics | bool | True | Enable stage-level metrics |
enable_gpu_metrics | bool | True | Enable GPU metrics |
enable_http_metrics | bool | True | Enable HTTP request metrics |
enable_queue_metrics | bool | True | Enable queue metrics |
gpu_metrics_interval | float | 5.0 | GPU collection interval (seconds) |
histogram_buckets | list[float] | [0.005, 0.01, ...] | Default histogram buckets |
metrics_path | str | "/metrics" | HTTP endpoint path |
namespace | str | "telefuser" | Metric name prefix |
gpu_platform | str | "auto" | GPU platform (nvidia/amd/auto) |
Advanced Usage¶
Custom Collectors¶
Add custom metrics from external sources:
from telefuser.metrics import get_metrics_registry
def custom_collector():
"""Collect custom metrics."""
return [
"# HELP custom_metric My custom metric",
"# TYPE custom_metric gauge",
f"custom_metric {get_custom_value()}",
]
registry = get_metrics_registry()
registry.add_custom_collector(custom_collector)
Multiple Registries¶
from telefuser.metrics import MetricRegistry
# Create separate registries for different purposes
app_registry = MetricRegistry(namespace="app")
system_registry = MetricRegistry(namespace="system")
# Each registry is independent
app_registry.counter("requests", "App requests")
system_registry.gauge("cpu_usage", "CPU usage")
Reset Metrics¶
# Reset specific metric
counter.reset()
# Reset all metrics in registry
registry.reset_all()
# Clear all metrics
registry.clear()
Best Practices¶
1. Naming Conventions¶
Follow Prometheus naming conventions:
# Good
registry.counter("http_requests_total", "...")
registry.histogram("request_duration_seconds", "...")
registry.gauge("memory_used_bytes", "...")
# Avoid
registry.counter("httpRequests", "...") # Use snake_case
registry.gauge("memory", "...") # Include unit
2. Use Appropriate Types¶
- Counter: Cumulative values (requests, errors, bytes)
- Gauge: Current state (queue size, memory, temperature)
- Histogram: Distributions (latency, size)
- Summary: Quantiles (p50, p95, p99)
3. Meaningful Labels¶
# Good - dimensions for aggregation
registry.counter(
"http_requests_total",
"Total HTTP requests",
labels={"method": "GET", "endpoint": "/api/users"},
)
# Avoid - high cardinality
registry.counter(
"http_requests_total",
"...",
labels={"user_id": "12345"}, # Too many unique values!
)
4. Histogram Buckets¶
Choose buckets appropriate for your data: