Back to Blog
LLMAIObservabilityTracingMonitoringMLOpsPythonProduction

LLM Observability — Tracing, Evaluation, and Cost Monitoring for Production AI Systems

A practical guide to LLM observability in production: the four pillars (tracing, automated evaluation, cost monitoring, quality drift detection), instrumenting Python LLM applications with Langfuse SDK for trace/span hierarchies and session tracking, building a token cost monitoring class with per-model pricing tables and budget alerts, LLM-as-judge evaluation pipelines with Prometheus pass-rate metrics, defining LLM SLOs (P95 latency, error rate, hallucination rate) with Prometheus histograms and Grafana dashboards, Prometheus alerting rules for budget burn and latency SLO violations, OpenTelemetry GenAI semantic conventions (gen_ai.system, gen_ai.request.model, gen_ai.usage.input_tokens) with OTLP exporter, and a decision framework for choosing between Langfuse, LangSmith, and Helicone.

2026-05-28

Why LLM Observability Matters

Running LLMs in production introduces a class of failure modes that traditional APM tools were never designed to detect. Latency spikes in a microservice show up immediately in your P99 histograms. A cost blowout from an LLM prompt that suddenly generates 4x more tokens might not surface for a week — until your cloud bill arrives.

  • Silent cost blowouts. Token usage is billed per request but aggregated monthly. A single prompt change that increases average completion length by 200 tokens is invisible to your error rate dashboards but can double your inference spend in days.
  • Hallucination regressions. Model updates and prompt drift degrade output quality without raising HTTP error rates. A RAG pipeline that was 92% faithful last week may silently drop to 74% after a model provider upgrades their weights.
  • Latency degradation. LLM inference latency varies wildly with prompt length, output token count, and provider load. P50 metrics hide the long tail: a P99 of 18 seconds is invisible if you only watch averages.

Traditional APM captures request/response times, error rates, and resource utilisation. It misses the semantic layer entirely: what was the prompt? How many tokens did it use? Was the answer faithful to the retrieved context? Did the user session succeed end-to-end? LLM observability fills this gap.

Note

This article covers the open-source and vendor-neutral observability stack: Langfuse for tracing, Prometheus + Grafana for metrics and SLOs, and OpenTelemetry GenAI semantic conventions for portable instrumentation. All code examples use Python 3.11+.

The LLM Observability Stack

Complete LLM observability rests on four pillars. Missing any one of them leaves a blind spot that will eventually cost you money, user trust, or both.

The Four Pillars

# Pillar 1 — TRACING (spans, chains, sessions)
#   What: Capture every LLM call as a structured span with inputs, outputs,
#         model, token counts, latency, and custom metadata.
#   Why:  Debug multi-step chains. Reproduce failures. Correlate cost to features.
#   Tool: Langfuse SDK (open-source, self-hostable) or LangSmith
#
# Pillar 2 — EVALUATION (automated scoring)
#   What: Run quality scorers (LLM-as-judge, faithfulness, relevance)
#         on production traces automatically and on every deploy.
#   Why:  Catch hallucination regressions before users notice.
#   Tool: Langfuse evaluators, RAGAS, DeepEval, custom Prometheus metrics
#
# Pillar 3 — COST MONITORING (token budget tracking)
#   What: Track prompt + completion tokens per model, per feature, per user.
#         Calculate USD cost. Alert on budget burn rate anomalies.
#   Why:  LLM costs scale non-linearly with usage. Catch waste early.
#   Tool: Custom Python class + Prometheus counters + Grafana
#
# Pillar 4 — QUALITY MONITORING (drift detection)
#   What: Track pass rate, faithfulness, and semantic similarity over time.
#         Alert when rolling quality metrics drop below SLO thresholds.
#   Why:  Model provider updates and prompt drift degrade quality silently.
#   Tool: Prometheus + Alertmanager + Langfuse dataset evaluations

Distributed Tracing with Langfuse

Langfuse is an open-source LLM engineering platform that provides trace/span hierarchies, evaluation pipelines, and dataset management. It can be self-hosted on Docker or used as a managed cloud service.

Basic Instrumentation with the Python SDK

# pip install langfuse openai anthropic
# Set environment variables:
#   LANGFUSE_SECRET_KEY=sk-lf-...
#   LANGFUSE_PUBLIC_KEY=pk-lf-...
#   LANGFUSE_HOST=https://cloud.langfuse.com  # or your self-hosted URL

import os
from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
import anthropic

langfuse = Langfuse()
client = anthropic.Anthropic()

@observe()  # automatically creates a trace for each call
def classify_support_ticket(ticket_text: str, user_id: str) -> dict:
    # Attach metadata to the current span
    langfuse_context.update_current_trace(
        user_id=user_id,
        tags=["support", "classification"],
        metadata={"ticket_length": len(ticket_text)},
    )

    response = client.messages.create(
        model="claude-sonnet-4-5",
        max_tokens=256,
        messages=[{
            "role": "user",
            "content": f"Classify this support ticket into one of: billing, technical, account, other.\n\nTicket: {ticket_text}",
        }],
    )

    result = response.content[0].text
    # Langfuse captures: model, input tokens, output tokens, latency automatically
    return {"category": result, "input_tokens": response.usage.input_tokens,
            "output_tokens": response.usage.output_tokens}


# Multi-step chain: each nested @observe() call becomes a child span
@observe()
def rag_pipeline(query: str, user_id: str, session_id: str) -> str:
    langfuse_context.update_current_trace(
        session_id=session_id,
        user_id=user_id,
        metadata={"pipeline": "rag_v2", "prompt_version": "2026-05-28"},
    )

    # Step 1: retrieval (child span)
    context_docs = retrieve_context(query)

    # Step 2: generation (child span)
    answer = generate_answer(query, context_docs)

    return answer

@observe(name="retrieve_context")
def retrieve_context(query: str) -> list[str]:
    # vector search — captured as a child span with its own latency
    return ["doc1 content...", "doc2 content..."]

@observe(name="generate_answer")
def generate_answer(query: str, context_docs: list[str]) -> str:
    context = "\n".join(context_docs)
    response = client.messages.create(
        model="claude-sonnet-4-5",
        max_tokens=1024,
        messages=[{
            "role": "user",
            "content": f"Answer based on context.\n\nContext:\n{context}\n\nQuestion: {query}",
        }],
    )
    return response.content[0].text

Manual Span Creation for Fine-Grained Control

from langfuse import Langfuse

langfuse = Langfuse()

def process_document_batch(documents: list[dict], job_id: str) -> list[dict]:
    # Create a top-level trace for the batch job
    trace = langfuse.trace(
        name="document_batch_processing",
        id=job_id,
        metadata={"batch_size": len(documents), "job_id": job_id},
    )

    results = []
    total_input_tokens = 0
    total_output_tokens = 0

    for doc in documents:
        # Each document gets its own generation span
        generation = trace.generation(
            name="extract_entities",
            model="claude-haiku-4-5",
            model_parameters={"max_tokens": 512, "temperature": 0},
            input=[{"role": "user", "content": f"Extract named entities from: {doc['text']}"}],
            metadata={"doc_id": doc["id"]},
        )

        response = call_llm(doc["text"])  # your LLM call

        # Close the generation span with output and token usage
        generation.end(
            output=response.text,
            usage={"input": response.input_tokens, "output": response.output_tokens},
        )

        total_input_tokens += response.input_tokens
        total_output_tokens += response.output_tokens
        results.append({"doc_id": doc["id"], "entities": response.text})

    # Update the trace with aggregated stats
    trace.update(
        output={"processed": len(results)},
        metadata={
            "total_input_tokens": total_input_tokens,
            "total_output_tokens": total_output_tokens,
        },
    )
    return results

Note

Langfuse persists traces asynchronously in a background thread. The SDK batches events and flushes them every 15 seconds or when the batch size reaches 15 events. In short-lived processes (AWS Lambda, Cloud Run), call langfuse.flush() before the process exits to ensure all spans are sent.

Token Cost Monitoring

Token costs compound quickly. A pipeline processing 10,000 documents per day at 2,000 input tokens and 500 output tokens per document reaches 20M input + 5M output tokens daily. At current Claude Sonnet pricing, that is approximately $75/day — $2,250/month for a single pipeline. Monitoring token usage per feature and per model is not optional at production scale.

Token Cost Tracker with Prometheus

# cost_tracker.py
# pip install prometheus-client

from dataclasses import dataclass, field
from prometheus_client import Counter, Histogram, Gauge
import threading
import time

# Model price table (USD per 1M tokens, as of 2026-05)
MODEL_PRICES: dict[str, dict[str, float]] = {
    "claude-opus-4-5":        {"input": 15.00, "output": 75.00},
    "claude-sonnet-4-5":      {"input":  3.00, "output": 15.00},
    "claude-haiku-4-5":       {"input":  0.25, "output":  1.25},
    "gpt-4o":                 {"input":  2.50, "output": 10.00},
    "gpt-4o-mini":            {"input":  0.15, "output":  0.60},
    "gemini-2.0-flash":       {"input":  0.10, "output":  0.40},
    "gemini-2.5-pro":         {"input":  1.25, "output": 10.00},
}

# Prometheus metrics
TOKEN_COUNTER = Counter(
    "llm_tokens_total",
    "Total LLM tokens consumed",
    ["model", "token_type", "feature"],  # label dimensions
)
COST_COUNTER = Counter(
    "llm_cost_usd_total",
    "Total LLM cost in USD",
    ["model", "feature"],
)
COST_PER_REQUEST = Histogram(
    "llm_cost_per_request_usd",
    "Cost per individual LLM request in USD",
    ["model", "feature"],
    buckets=[0.0001, 0.001, 0.01, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50, 5.00],
)
DAILY_BUDGET_REMAINING = Gauge(
    "llm_daily_budget_remaining_usd",
    "Remaining daily budget in USD",
    ["feature"],
)


@dataclass
class TokenUsage:
    model: str
    feature: str
    input_tokens: int
    output_tokens: int

    @property
    def cost_usd(self) -> float:
        prices = MODEL_PRICES.get(self.model, {"input": 0.0, "output": 0.0})
        return (
            self.input_tokens  / 1_000_000 * prices["input"] +
            self.output_tokens / 1_000_000 * prices["output"]
        )


class CostTracker:
    def __init__(self, daily_budgets: dict[str, float] | None = None):
        self._daily_budgets = daily_budgets or {}
        self._daily_spend: dict[str, float] = {}
        self._lock = threading.Lock()
        self._reset_at = time.time() + 86400  # reset in 24h

    def record(self, usage: TokenUsage) -> None:
        # Prometheus counters
        TOKEN_COUNTER.labels(model=usage.model, token_type="input",  feature=usage.feature).inc(usage.input_tokens)
        TOKEN_COUNTER.labels(model=usage.model, token_type="output", feature=usage.feature).inc(usage.output_tokens)
        COST_COUNTER.labels(model=usage.model, feature=usage.feature).inc(usage.cost_usd)
        COST_PER_REQUEST.labels(model=usage.model, feature=usage.feature).observe(usage.cost_usd)

        # Daily budget tracking
        with self._lock:
            if time.time() > self._reset_at:
                self._daily_spend = {}
                self._reset_at = time.time() + 86400

            self._daily_spend[usage.feature] = (
                self._daily_spend.get(usage.feature, 0.0) + usage.cost_usd
            )
            if usage.feature in self._daily_budgets:
                remaining = max(0.0, self._daily_budgets[usage.feature] - self._daily_spend[usage.feature])
                DAILY_BUDGET_REMAINING.labels(feature=usage.feature).set(remaining)

    def check_budget(self, feature: str, threshold_pct: float = 0.90) -> bool:
        """Returns True if the feature has consumed >= threshold_pct of its daily budget."""
        if feature not in self._daily_budgets:
            return False
        with self._lock:
            spent = self._daily_spend.get(feature, 0.0)
            return spent / self._daily_budgets[feature] >= threshold_pct


# Global tracker — configure daily budgets per feature
tracker = CostTracker(daily_budgets={
    "rag_pipeline":       50.00,
    "ticket_classifier":  10.00,
    "document_extractor": 75.00,
})


# Decorator for automatic cost tracking
def track_cost(feature: str):
    def decorator(fn):
        def wrapper(*args, **kwargs):
            result, usage = fn(*args, **kwargs)  # fn must return (result, TokenUsage)
            tracker.record(usage)
            if tracker.check_budget(feature):
                import logging
                logging.warning(f"Feature '{feature}' has consumed 90%+ of its daily LLM budget")
            return result
        return wrapper
    return decorator

Automated LLM Evaluation in Production

Manual evaluation does not scale. The only way to catch quality regressions before users do is to run automated scorers continuously against production traces and on every deployment against a golden dataset.

LLM-as-Judge with Prometheus Pass Rate

# eval_pipeline.py
# Runs every 15 minutes via cron or Airflow.
# Scores a random sample of recent traces and exports pass rate to Prometheus.

import random
import anthropic
from langfuse import Langfuse
from prometheus_client import Gauge, Counter

EVAL_PASS_RATE = Gauge(
    "llm_eval_pass_rate",
    "Rolling pass rate from LLM-as-judge evaluation",
    ["feature", "metric"],
)
EVAL_RUNS_TOTAL = Counter(
    "llm_eval_runs_total",
    "Total evaluation runs",
    ["feature", "metric", "result"],
)

langfuse_client = Langfuse()
judge_client = anthropic.Anthropic()

JUDGE_PROMPT = """You are an evaluation judge. Rate the following LLM response on a scale of 0 to 1.

Criteria: {criteria}

User Question: {question}
LLM Response: {response}
{context_block}

Respond with ONLY a JSON object: {{"score": <float 0-1>, "reasoning": "<one sentence>"}}"""


def judge_response(
    question: str,
    response: str,
    criteria: str,
    context: str | None = None,
) -> float:
    context_block = f"Retrieved Context:\n{context}" if context else ""
    prompt = JUDGE_PROMPT.format(
        criteria=criteria,
        question=question,
        response=response,
        context_block=context_block,
    )
    result = judge_client.messages.create(
        model="claude-haiku-4-5",
        max_tokens=256,
        messages=[{"role": "user", "content": prompt}],
    )
    import json
    parsed = json.loads(result.content[0].text)
    return float(parsed["score"])


def run_production_eval(feature: str, sample_size: int = 50) -> None:
    # Fetch recent traces from Langfuse
    traces = langfuse_client.fetch_traces(
        name=feature,
        limit=200,
        order_by="timestamp",
        order="desc",
    ).data
    sample = random.sample(traces, min(sample_size, len(traces)))

    scores = []
    for trace in sample:
        if not trace.input or not trace.output:
            continue
        try:
            score = judge_response(
                question=str(trace.input),
                response=str(trace.output),
                criteria="Is the response accurate, helpful, and free of hallucinations?",
            )
            scores.append(score)
            result_label = "pass" if score >= 0.7 else "fail"
            EVAL_RUNS_TOTAL.labels(feature=feature, metric="accuracy", result=result_label).inc()

            # Write score back to Langfuse for the UI
            langfuse_client.score(
                trace_id=trace.id,
                name="llm_judge_accuracy",
                value=score,
                data_type="NUMERIC",
            )
        except Exception:
            EVAL_RUNS_TOTAL.labels(feature=feature, metric="accuracy", result="error").inc()

    if scores:
        pass_rate = sum(1 for s in scores if s >= 0.7) / len(scores)
        EVAL_PASS_RATE.labels(feature=feature, metric="accuracy").set(pass_rate)

Faithfulness Scoring for RAG Pipelines

# faithfulness_eval.py — measures whether the LLM answer is grounded in the retrieved context
# Uses the RAGAS faithfulness metric concept implemented with an LLM judge

FAITHFULNESS_PROMPT = """Given the following retrieved context and LLM answer, assess whether
every factual claim in the answer is supported by the context.

Context:
{context}

Answer:
{answer}

Score from 0 to 1 where:
1.0 = every claim is fully supported by the context
0.5 = some claims are supported, some are not
0.0 = the answer contains claims not found in the context (hallucinations)

Respond with ONLY JSON: {{"faithfulness": <float 0-1>, "unsupported_claims": [<list of strings>]}}"""


def score_faithfulness(context: str, answer: str) -> dict:
    result = judge_client.messages.create(
        model="claude-haiku-4-5",
        max_tokens=512,
        messages=[{
            "role": "user",
            "content": FAITHFULNESS_PROMPT.format(context=context, answer=answer),
        }],
    )
    import json
    return json.loads(result.content[0].text)


FAITHFULNESS_GAUGE = Gauge(
    "llm_rag_faithfulness",
    "Rolling faithfulness score for RAG pipeline responses",
    ["pipeline_version"],
)


def run_faithfulness_eval(pipeline_version: str, sample_size: int = 30) -> None:
    traces = langfuse_client.fetch_traces(
        name="rag_pipeline",
        limit=100,
        order_by="timestamp",
        order="desc",
    ).data
    sample = random.sample(traces, min(sample_size, len(traces)))

    scores = []
    for trace in sample:
        if not trace.input or not trace.output or not trace.metadata:
            continue
        context = trace.metadata.get("retrieved_context", "")
        if not context:
            continue
        result = score_faithfulness(context=context, answer=str(trace.output))
        scores.append(result["faithfulness"])
        langfuse_client.score(
            trace_id=trace.id,
            name="faithfulness",
            value=result["faithfulness"],
            data_type="NUMERIC",
        )

    if scores:
        FAITHFULNESS_GAUGE.labels(pipeline_version=pipeline_version).set(
            sum(scores) / len(scores)
        )

Note

For offline evaluation against golden datasets on every deploy, see the Langfuse datasets documentation. Datasets let you version golden Q&A pairs, run your pipeline against them in CI, and compare scores across runs — the same workflow as unit tests but for LLM quality.

Production Metrics and SLOs

Define explicit SLOs for your LLM services before they go to production. Implicit expectations ("it should be fast and accurate") cannot be monitored or alerted on. Explicit SLOs turn vague quality bars into measurable Prometheus queries.

LLM SLOs and Prometheus Metrics

# metrics.py — LLM-specific Prometheus metrics
from prometheus_client import Counter, Histogram, Gauge, start_http_server

# SLO targets (document these next to the metric definitions):
# - P95 latency < 2000ms (end-to-end, from request to first token)
# - Error rate < 1% over any 5-minute window
# - Hallucination rate < 5% (from LLM-as-judge eval, rolling 1h)
# - Token budget burn < 90% of daily limit by 20:00 UTC

# Latency histogram — use powers-of-two buckets for LLM latency
LLM_REQUEST_LATENCY = Histogram(
    "llm_request_duration_seconds",
    "End-to-end LLM request latency (seconds)",
    ["model", "feature", "provider"],
    buckets=[0.1, 0.25, 0.5, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0],
)

# Time to first token (TTFT) — measures streaming responsiveness
LLM_TTFT = Histogram(
    "llm_time_to_first_token_seconds",
    "Time from request start to first token in streaming response",
    ["model", "feature"],
    buckets=[0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 4.0],
)

# Request and error counters
LLM_REQUESTS_TOTAL = Counter(
    "llm_requests_total",
    "Total LLM requests",
    ["model", "feature", "status"],  # status: success | error | timeout | rate_limited
)

# Context window utilisation — alert when approaching context limits
LLM_CONTEXT_UTILISATION = Histogram(
    "llm_context_utilisation_ratio",
    "Fraction of context window used (input_tokens / context_window_size)",
    ["model", "feature"],
    buckets=[0.1, 0.25, 0.5, 0.7, 0.8, 0.9, 0.95, 0.99],
)

# Concurrent in-flight requests
LLM_INFLIGHT_REQUESTS = Gauge(
    "llm_inflight_requests",
    "Number of LLM requests currently in flight",
    ["model", "feature"],
)

# Example: wrapping an LLM call with full metric instrumentation
import time
import contextlib

@contextlib.contextmanager
def observe_llm_call(model: str, feature: str, provider: str = "anthropic"):
    LLM_INFLIGHT_REQUESTS.labels(model=model, feature=feature).inc()
    start = time.perf_counter()
    status = "success"
    try:
        yield
    except Exception as exc:
        status = "error" if "timeout" not in str(exc).lower() else "timeout"
        raise
    finally:
        duration = time.perf_counter() - start
        LLM_REQUEST_LATENCY.labels(model=model, feature=feature, provider=provider).observe(duration)
        LLM_REQUESTS_TOTAL.labels(model=model, feature=feature, status=status).inc()
        LLM_INFLIGHT_REQUESTS.labels(model=model, feature=feature).dec()


# Usage:
# with observe_llm_call(model="claude-sonnet-4-5", feature="rag_pipeline"):
#     response = client.messages.create(...)
#     TOKEN_COUNTER.labels(...)  # record tokens separately

Grafana Dashboard Queries

# Grafana PromQL queries for an LLM observability dashboard
# Assumes Prometheus scrapes your application metrics every 15s

# Panel 1: P50 / P95 / P99 latency (5-minute rolling window)
histogram_quantile(0.95,
  sum(rate(llm_request_duration_seconds_bucket{feature="rag_pipeline"}[5m]))
  by (le, model)
)

# Panel 2: Request rate (requests per second)
sum(rate(llm_requests_total[5m])) by (feature, status)

# Panel 3: Error rate (percentage)
100 * sum(rate(llm_requests_total{status=~"error|timeout"}[5m]))
  / sum(rate(llm_requests_total[5m]))

# Panel 4: Token consumption rate (tokens/hour by model)
sum(rate(llm_tokens_total[1h])) by (model, token_type) * 3600

# Panel 5: Daily cost by feature (approximation from counter)
sum(increase(llm_cost_usd_total[24h])) by (feature)

# Panel 6: Eval pass rate (from eval pipeline Gauge)
llm_eval_pass_rate{feature="rag_pipeline", metric="accuracy"}

# Panel 7: Context utilisation heatmap
histogram_quantile(0.95,
  sum(rate(llm_context_utilisation_ratio_bucket[15m])) by (le, model)
)

Alerting Patterns

Good LLM alerting requires different thresholds and timescales than standard API alerting. A 5-second response time is an outage for a REST endpoint but normal for a long-context LLM call. Alert on relative changes and SLO burn rates, not on absolute values.

Prometheus Alerting Rules

# prometheus/alerts/llm.yaml
groups:
  - name: llm_slo_alerts
    rules:

      # Budget burn alert — fires when a feature has consumed 80% of its daily budget
      - alert: LlmDailyBudgetBurning
        expr: |
          llm_daily_budget_remaining_usd / on(feature) group_left()
          (llm_daily_budget_remaining_usd + increase(llm_cost_usd_total[24h])) < 0.20
        for: 5m
        labels:
          severity: warning
          team: platform
        annotations:
          summary: "LLM feature {{ $labels.feature }} has consumed 80%+ of its daily budget"
          description: "Remaining budget: ${{ $value | printf "%.2f" }} USD"

      # P95 latency SLO burn — 2s SLO, alert when P95 exceeds 3s for 10 minutes
      - alert: LlmLatencySLOBurn
        expr: |
          histogram_quantile(0.95,
            sum(rate(llm_request_duration_seconds_bucket[5m])) by (le, feature)
          ) > 3.0
        for: 10m
        labels:
          severity: warning
          team: platform
        annotations:
          summary: "LLM P95 latency SLO burn for {{ $labels.feature }}"
          description: "P95 latency is {{ $value | printf "%.1f" }}s, SLO target is 2s"

      # Error rate spike — alert when error rate exceeds 2% over 5 minutes
      - alert: LlmErrorRateHigh
        expr: |
          sum(rate(llm_requests_total{status=~"error|timeout"}[5m])) by (feature)
          / sum(rate(llm_requests_total[5m])) by (feature)
          > 0.02
        for: 5m
        labels:
          severity: critical
          team: platform
        annotations:
          summary: "LLM error rate spike for {{ $labels.feature }}"
          description: "Error rate: {{ $value | printf "%.1f%%" | multiply 100 }}"

      # Eval pass rate regression — alert when quality drops below 80%
      - alert: LlmEvalPassRateLow
        expr: llm_eval_pass_rate < 0.80
        for: 15m
        labels:
          severity: warning
          team: ml
        annotations:
          summary: "LLM eval pass rate dropped below 80% for {{ $labels.feature }}"
          description: "Current pass rate: {{ $value | printf "%.1f%%" | multiply 100 }}"

      # RAG faithfulness regression
      - alert: LlmRagFaithfulnessLow
        expr: llm_rag_faithfulness < 0.75
        for: 15m
        labels:
          severity: warning
          team: ml
        annotations:
          summary: "RAG faithfulness score dropped below 75% for {{ $labels.pipeline_version }}"

  - name: llm_cost_alerts
    rules:
      # Token usage anomaly — 3x spike over hourly average
      - alert: LlmTokenUsageAnomaly
        expr: |
          sum(rate(llm_tokens_total[10m])) by (model)
          > 3 * sum(rate(llm_tokens_total[1h] offset 1h)) by (model)
        for: 10m
        labels:
          severity: warning
          team: platform
        annotations:
          summary: "LLM token usage anomaly detected for model {{ $labels.model }}"
          description: "Current rate is 3x the hourly average from 1 hour ago"

AlertManager Routing for LLM Alerts

# alertmanager/config.yaml
route:
  group_by: ["alertname", "feature", "team"]
  group_wait: 30s
  group_interval: 5m
  repeat_interval: 4h
  receiver: "default-slack"

  routes:
    # ML quality alerts → ML team Slack channel
    - matchers:
        - team = "ml"
      receiver: "ml-team-slack"
      group_wait: 1m        # wait longer to batch quality alerts
      repeat_interval: 2h

    # Budget alerts → platform + finance channels
    - matchers:
        - alertname =~ "LlmDailyBudgetBurning|LlmTokenUsageAnomaly"
      receiver: "llm-cost-alerts"
      group_wait: 0s        # fire immediately for cost alerts
      repeat_interval: 1h

    # Critical errors → PagerDuty
    - matchers:
        - severity = "critical"
      receiver: "pagerduty-platform"

receivers:
  - name: "default-slack"
    slack_configs:
      - api_url: "${SLACK_WEBHOOK_URL}"
        channel: "#platform-alerts"
        title: "{{ .GroupLabels.alertname }}"
        text: "{{ range .Alerts }}{{ .Annotations.summary }}\n{{ .Annotations.description }}{{ end }}"

  - name: "ml-team-slack"
    slack_configs:
      - api_url: "${SLACK_ML_WEBHOOK_URL}"
        channel: "#ml-quality-alerts"
        title: "LLM Quality Alert: {{ .GroupLabels.alertname }}"

  - name: "llm-cost-alerts"
    slack_configs:
      - api_url: "${SLACK_WEBHOOK_URL}"
        channel: "#llm-costs"
        title: "LLM Cost Alert"
    pagerduty_configs:
      - service_key: "${PAGERDUTY_COST_KEY}"

  - name: "pagerduty-platform"
    pagerduty_configs:
      - service_key: "${PAGERDUTY_PLATFORM_KEY}"

OpenTelemetry for LLM Services

The OpenTelemetry GenAI semantic conventions define a standardised set of span attributes for LLM operations. Using them makes your traces portable across observability backends (Langfuse, Jaeger, Honeycomb, Grafana Tempo) without changing your instrumentation code.

OTLP Exporter and GenAI Span Attributes

# otel_llm.py
# pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp-proto-grpc

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
import anthropic
import time

# Configure OTLP exporter — works with any OTel-compatible backend
resource = Resource.create({
    "service.name": "llm-api-service",
    "service.version": "1.4.2",
    "deployment.environment": "production",
})

provider = TracerProvider(resource=resource)
otlp_exporter = OTLPSpanExporter(
    endpoint="http://otel-collector:4317",  # your collector endpoint
    insecure=True,  # use TLS in production
)
provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
trace.set_tracer_provider(provider)

tracer = trace.get_tracer("llm-service", "1.0.0")


def call_with_otel_tracing(
    prompt: str,
    model: str = "claude-sonnet-4-5",
    feature: str = "default",
) -> str:
    """Wraps an LLM call with full OTel GenAI semantic conventions."""
    with tracer.start_as_current_span(
        name=f"gen_ai.{feature}",
        kind=trace.SpanKind.CLIENT,
    ) as span:
        # GenAI semantic convention attributes
        # https://opentelemetry.io/docs/specs/semconv/gen-ai/
        span.set_attribute("gen_ai.system", "anthropic")
        span.set_attribute("gen_ai.request.model", model)
        span.set_attribute("gen_ai.request.max_tokens", 1024)
        span.set_attribute("gen_ai.request.temperature", 0.0)
        span.set_attribute("gen_ai.operation.name", "chat")

        # Custom attributes for your application context
        span.set_attribute("llm.feature", feature)
        span.set_attribute("llm.prompt_version", "v2")

        start = time.perf_counter()
        client = anthropic.Anthropic()

        try:
            response = client.messages.create(
                model=model,
                max_tokens=1024,
                messages=[{"role": "user", "content": prompt}],
            )

            # Token usage — GenAI semantic conventions
            span.set_attribute("gen_ai.usage.input_tokens",  response.usage.input_tokens)
            span.set_attribute("gen_ai.usage.output_tokens", response.usage.output_tokens)
            span.set_attribute("gen_ai.response.model",      response.model)
            span.set_attribute("gen_ai.response.finish_reasons", [response.stop_reason or "end_turn"])

            # Cost (not in spec yet, but useful)
            prices = MODEL_PRICES.get(model, {"input": 0, "output": 0})
            cost = (response.usage.input_tokens / 1e6 * prices["input"] +
                    response.usage.output_tokens / 1e6 * prices["output"])
            span.set_attribute("llm.cost_usd", round(cost, 6))

            span.set_status(trace.StatusCode.OK)
            return response.content[0].text

        except anthropic.APIStatusError as exc:
            span.set_status(trace.StatusCode.ERROR, description=str(exc))
            span.record_exception(exc)
            raise
        finally:
            span.set_attribute("llm.duration_ms", int((time.perf_counter() - start) * 1000))

Note

The OpenTelemetry GenAI semantic conventions are stabilising in 2025-2026. Key attributes: gen_ai.system (provider name), gen_ai.request.model (requested model), gen_ai.response.model (actual model used), gen_ai.usage.input_tokens, and gen_ai.usage.output_tokens. See the full spec at opentelemetry.io/docs/specs/semconv/gen-ai/.

Langfuse vs LangSmith vs Helicone

Three tools dominate the LLM observability space. They solve overlapping problems but have different strengths and pricing models. Choose based on your stack, budget, and data residency requirements.

  • Langfuse — Open-source, self-hostable. Best for teams that need full data sovereignty or are operating in regulated environments. Strong dataset and evaluation management. Python, TypeScript, and OpenAI-compatible SDKs. Free self-hosted; managed cloud has a generous free tier.
  • LangSmith — Cloud-only by LangChain. Best for teams already using LangChain or LangGraph. Deep integration with the LangChain ecosystem. Excellent playground and prompt hub. Less suitable for non-LangChain stacks or teams requiring self-hosting.
  • Helicone — Proxy-based, minimal code changes. Best for teams that want immediate visibility without SDK instrumentation. Routes all LLM requests through Helicone's proxy, capturing traces transparently. Supports caching and rate limiting at the proxy layer. Limited evaluation capabilities compared to Langfuse.
  • When to use Langfuse: Self-hosted deployment required; need dataset-based evals on every deploy; complex multi-step chains with custom span metadata; Python or TypeScript without LangChain.
  • When to use LangSmith: Already using LangChain/LangGraph; want tight prompt hub integration; team prefers managed SaaS with no ops overhead.
  • When to use Helicone: Need tracing in <30 minutes without code changes; using multiple LLM providers through a unified proxy; want built-in caching to reduce costs.

Production Observability Checklist

Before shipping an LLM-powered feature to production, verify each item in this checklist. Missing items are technical debt that will surface at the worst possible time.

  • Every LLM call emits a Langfuse (or equivalent) trace with model, input, output, and token counts.
  • Traces are tagged with user_id, session_id, and prompt_version.
  • Token counters by model and feature are exported to Prometheus.
  • Cost-per-request histogram is defined with appropriate buckets for your model tier.
  • Daily budget Gauge is configured and populated for every billable feature.
  • P95 latency Prometheus histogram is scraped and visible in Grafana.
  • LLM SLOs are documented: P95 latency target, error rate budget, hallucination rate threshold.
  • AlertManager rules exist for budget burn (≥80%), P95 latency SLO breach, and error rate spike.
  • Automated LLM-as-judge eval runs on a schedule (at minimum every hour) against recent production traces.
  • Golden dataset evaluation runs in CI on every PR that touches prompt templates or model configuration.
  • RAG pipelines have faithfulness scoring enabled and alerted.
  • OpenTelemetry GenAI span attributes (gen_ai.system, gen_ai.request.model, gen_ai.usage.input_tokens) are set on every LLM span.
  • langfuse.flush() is called in Lambda/Cloud Run shutdown hooks.
  • Prompt templates are versioned and the version is recorded in trace metadata.
  • A runbook exists for each alert: what to check, how to mitigate, who to page.

Work with us

Running LLM-powered systems in production and flying blind on cost, latency, and quality?

We design and implement LLM observability stacks — from Langfuse tracing instrumentation and token cost monitoring pipelines to LLM-as-judge evaluation frameworks, Prometheus SLO metrics, Grafana dashboards, AlertManager routing for budget and latency burns, and OpenTelemetry GenAI semantic conventions. Let’s talk.

Get in touch

Related Articles

DataSOps Consulting

Need help implementing this in production?

We build and operate data pipelines, AI systems, and observability stacks for engineering teams. Reach out for a free 30-minute architecture review.