Back to Blog
MLflowMachine LearningFeature StoresModel ServingMLOpsPython

ML Pipeline in Production — MLflow, Feature Stores, and Model Serving Patterns

A practical guide to building production ML pipelines: MLflow experiment tracking, the Model Registry workflow, Feast feature stores for training-serving consistency, batch and online model serving with FastAPI and Triton, and production monitoring patterns for data drift and model performance degradation.

2026-05-05

Why Production ML Is Different

The gap between a model that works in a Jupyter notebook and a model that delivers value in production is wider than most teams expect. Research environments are optimised for exploration: fast iteration, loose coupling, minimal operational overhead. Production is the opposite: reproducibility, versioning, data consistency, latency SLAs, monitoring, and safe model updates all become hard requirements.

Three failure modes account for the majority of ML production incidents. The first is experiment chaos: when no one knows which model version, hyperparameters, or training dataset produced the deployed artifact. The second is training-serving skew: the features computed at training time differ from the features computed at inference time, causing silent accuracy degradation. The third is model drift: input data distribution shifts over time, and the model's performance silently degrades until a business metric breaks. Each failure has a well-understood engineering solution.

Experiment Chaos

Fix with an experiment tracker like MLflow Tracking or Weights & Biases. Every run records hyperparameters, metrics, and the model artifact — making any past experiment fully reproducible and comparable.

Training-Serving Skew

Fix with a feature store. The same feature computation logic runs both at training time (reading historical data) and at serving time (reading real-time state), eliminating divergence between the two environments.

Model Drift

Fix with production monitoring. Tools like Evidently or WhyLabs compare the statistical distribution of inference inputs against the training distribution, alerting when drift exceeds a threshold before business metrics degrade.

MLflow Experiment Tracking

MLflow is the de facto open-source ML lifecycle platform. Its tracking component wraps your training loop with a context manager that automatically records every run: parameters, metrics per epoch, model artifacts, dependency versions, and the Git commit that produced the code. The tracking server stores this data in a SQL backend (SQLite for local, PostgreSQL for production) and exposes it via a REST API and a web UI.

The core objects are experiments (logical groupings of related runs) and runs (individual training executions). Each run gets a unique ID and can be queried, compared, and promoted to the model registry. The mlflow.sklearn, mlflow.xgboost, and mlflow.pytorch flavors provide autologging — a single call instruments your framework and records everything automatically.

# MLflow experiment tracking with sklearn — production pattern
# pip install mlflow scikit-learn pandas xgboost

import mlflow
import mlflow.sklearn
import mlflow.xgboost
import xgboost as xgb
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    roc_auc_score, average_precision_score,
    log_loss, f1_score
)
from sklearn.preprocessing import LabelEncoder

# ── Configure tracking server ─────────────────────────────────────────────────
# In production: point to your hosted MLflow server
# MLFLOW_TRACKING_URI env var is read automatically if set
mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("churn-prediction-v2")

# ── Load data ─────────────────────────────────────────────────────────────────
df = pd.read_parquet("gs://my-data/features/churn_training_2026-05.parquet")

FEATURE_COLS = [
    "days_since_last_order", "order_count_90d", "avg_order_value_usd",
    "support_tickets_30d", "product_category_diversity",
    "days_since_signup", "email_open_rate_30d",
]
TARGET_COL = "churned_within_30d"

X = df[FEATURE_COLS]
y = df[TARGET_COL]
X_train, X_val, y_train, y_val = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42
)

# ── Hyperparameters ───────────────────────────────────────────────────────────
params = {
    "n_estimators":     500,
    "max_depth":        6,
    "learning_rate":    0.05,
    "subsample":        0.8,
    "colsample_bytree": 0.8,
    "min_child_weight": 5,
    "scale_pos_weight": (y_train == 0).sum() / (y_train == 1).sum(),
    "eval_metric":      "auc",
    "early_stopping_rounds": 30,
    "random_state":     42,
}

# ── Training run ──────────────────────────────────────────────────────────────
with mlflow.start_run(run_name="xgb-churn-v2.3") as run:
    # Log all hyperparameters
    mlflow.log_params(params)
    # Log dataset metadata (not the data itself)
    mlflow.log_param("train_rows",     len(X_train))
    mlflow.log_param("val_rows",       len(X_val))
    mlflow.log_param("positive_rate",  float(y.mean()))
    mlflow.log_param("feature_count",  len(FEATURE_COLS))
    mlflow.log_param("training_date",  "2026-05-05")

    # Train XGBoost with eval set for early stopping
    model = xgb.XGBClassifier(**params)
    model.fit(
        X_train, y_train,
        eval_set=[(X_val, y_val)],
        verbose=50,
    )

    # ── Evaluate ──────────────────────────────────────────────────────────────
    y_prob = model.predict_proba(X_val)[:, 1]
    y_pred = (y_prob >= 0.5).astype(int)

    metrics = {
        "val_roc_auc":        roc_auc_score(y_val, y_prob),
        "val_avg_precision":  average_precision_score(y_val, y_prob),
        "val_log_loss":       log_loss(y_val, y_prob),
        "val_f1":             f1_score(y_val, y_pred),
        "best_iteration":     model.best_iteration,
    }
    mlflow.log_metrics(metrics)

    # Log feature importances as a JSON artifact
    importances = dict(zip(FEATURE_COLS, model.feature_importances_.tolist()))
    mlflow.log_dict(importances, "feature_importances.json")

    # Log the model with its signature (input/output schema)
    from mlflow.models.signature import infer_signature
    signature = infer_signature(X_train, y_prob)

    mlflow.xgboost.log_model(
        xgb_model=model,
        artifact_path="model",
        signature=signature,
        input_example=X_train.iloc[:5],
        # Conda env / pip requirements are captured automatically
        registered_model_name="churn-prediction",
    )

    print(f"Run ID: {run.info.run_id}")
    print(f"ROC-AUC: {metrics['val_roc_auc']:.4f}")
    print(f"Model URI: runs:/{run.info.run_id}/model")

Note

MLflow autologging (mlflow.xgboost.autolog()) can replace most manual log_params and log_metricscalls, but explicit logging gives you control over what gets recorded and how it's named. In production, consistent metric names across experiments are essential for automated comparison and promotion workflows. Always log the training date, dataset version, and feature list as parameters — these are the first things you'll need during a production incident.

MLflow Model Registry: From Experiment to Production

The MLflow Model Registry is a centralised store that tracks the lifecycle of each model version. Every registered model has named versions (automatically assigned integers) and lifecycle stages: None (just registered), Staging (candidate for promotion), Production (currently serving traffic), and Archived (retired). Promotions are gated by CI checks and human approval.

The production deployment workflow is: train → register → evaluate in Staging → promote to Production → rollback or archive. The serving code always loads from a registry alias (models:/churn-prediction/Production), never from a run URI. This decouples the deployment decision from the training artifact — promoting a model doesn't require a code deploy.

# MLflow Model Registry: register, evaluate, promote, load
# pip install mlflow

import mlflow
from mlflow.tracking import MlflowClient
from mlflow.entities.model_registry.model_version_status import ModelVersionStatus

mlflow.set_tracking_uri("http://mlflow.internal:5000")
client = MlflowClient()

MODEL_NAME = "churn-prediction"
RUN_ID     = "a3f7c9d2e4b1..."  # from the training run above

# ── Step 1: Register the model from a completed run ──────────────────────────
model_uri = f"runs:/{RUN_ID}/model"
mv = mlflow.register_model(model_uri=model_uri, name=MODEL_NAME)
print(f"Registered model version: {mv.version}")

# Wait for registration to complete (async in background)
import time
while mv.status == ModelVersionStatus.PENDING_REGISTRATION.name:
    time.sleep(1)
    mv = client.get_model_version(MODEL_NAME, mv.version)

# ── Step 2: Add a description and tag before promotion ────────────────────────
client.update_model_version(
    name=MODEL_NAME,
    version=mv.version,
    description=(
        "XGBoost churn model trained on 2026-05 cohort. "
        "ROC-AUC=0.8812 on held-out validation set."
    ),
)
client.set_model_version_tag(
    name=MODEL_NAME,
    version=mv.version,
    key="training_date",
    value="2026-05-05",
)
client.set_model_version_tag(
    name=MODEL_NAME,
    version=mv.version,
    key="validated_by",
    value="ml-platform-ci",
)

# ── Step 3: Transition to Staging ─────────────────────────────────────────────
client.transition_model_version_stage(
    name=MODEL_NAME,
    version=mv.version,
    stage="Staging",
    archive_existing_versions=False,
)
print(f"Moved version {mv.version} to Staging")

# ── Step 4: Run validation gate (CI would do this) ───────────────────────────
import mlflow.pyfunc
import pandas as pd

staging_model = mlflow.pyfunc.load_model(
    model_uri=f"models:/{MODEL_NAME}/Staging"
)

# Load a reference dataset for shadow scoring
X_shadow = pd.read_parquet("gs://my-data/features/shadow_eval_2026-05.parquet")
shadow_preds = staging_model.predict(X_shadow[FEATURE_COLS])

# Validate: predictions are in [0,1] range and not all identical
assert shadow_preds.min() >= 0.0 and shadow_preds.max() <= 1.0,     "Predictions out of range"
assert shadow_preds.std() > 0.01,     "Model predicts nearly identical scores — likely degenerate"

print("Validation gate passed")

# ── Step 5: Promote to Production (archive existing Production version) ───────
client.transition_model_version_stage(
    name=MODEL_NAME,
    version=mv.version,
    stage="Production",
    archive_existing_versions=True,  # auto-archive the previous Production version
)
print(f"Version {mv.version} is now Production")

# ── Step 6: Load from registry in serving code ────────────────────────────────
# Serving code never references a run_id — always uses the stage alias
production_model = mlflow.pyfunc.load_model(
    model_uri=f"models:/{MODEL_NAME}/Production"
)

# Example inference
X_new = pd.DataFrame([{
    "days_since_last_order": 45,
    "order_count_90d": 2,
    "avg_order_value_usd": 38.5,
    "support_tickets_30d": 3,
    "product_category_diversity": 1,
    "days_since_signup": 720,
    "email_open_rate_30d": 0.05,
}])
churn_prob = production_model.predict(X_new)
print(f"Churn probability: {churn_prob[0]:.4f}")

Querying and Comparing Runs Programmatically

The MLflow client API makes it possible to build automated promotion workflows that compare runs before deploying. A CI job can query all runs in an experiment, find the one with the best validation metric, compare it against the current Production baseline, and promote or reject automatically.

# Query runs and compare against production baseline
from mlflow.tracking import MlflowClient
import mlflow

client = MlflowClient()
MODEL_NAME = "churn-prediction"

# ── Find the current Production model's training run ──────────────────────────
prod_versions = client.get_latest_versions(MODEL_NAME, stages=["Production"])
prod_run_id = prod_versions[0].run_id if prod_versions else None

prod_auc = 0.0
if prod_run_id:
    prod_run = client.get_run(prod_run_id)
    prod_auc = prod_run.data.metrics.get("val_roc_auc", 0.0)
    print(f"Production baseline ROC-AUC: {prod_auc:.4f}")

# ── Query top runs from the latest experiment ─────────────────────────────────
experiment = client.get_experiment_by_name("churn-prediction-v2")
runs = client.search_runs(
    experiment_ids=[experiment.experiment_id],
    filter_string="metrics.val_roc_auc > 0.85",
    order_by=["metrics.val_roc_auc DESC"],
    max_results=5,
)

for run in runs:
    auc     = run.data.metrics["val_roc_auc"]
    ap      = run.data.metrics.get("val_avg_precision", 0)
    n_trees = run.data.params.get("n_estimators", "?")
    print(f"Run {run.info.run_id[:8]}  AUC={auc:.4f}  AP={ap:.4f}  trees={n_trees}")

# ── Promote best run if it beats production by > 0.5% AUC ───────────────────
best_run = runs[0] if runs else None
if best_run and (best_run.data.metrics["val_roc_auc"] - prod_auc) > 0.005:
    new_version = mlflow.register_model(
        model_uri=f"runs:/{best_run.info.run_id}/model",
        name=MODEL_NAME,
    )
    client.transition_model_version_stage(
        name=MODEL_NAME,
        version=new_version.version,
        stage="Production",
        archive_existing_versions=True,
    )
    print(f"Promoted version {new_version.version} with AUC improvement")
else:
    print("No candidate beats production — keeping current model")

Feature Stores: Eliminating Training-Serving Skew

Training-serving skew is the most insidious ML production failure. It happens when the features computed at training time differ from those computed at inference time — different aggregation windows, different null handling, different join logic. The model is evaluated on features it will never see in production. The result is a model that performs well offline and silently underperforms in production.

A feature store solves this by centralising feature computation logic. You define features once, and the same definitions are used both for generating training datasets (reading from the offline store — a data warehouse or object storage) and for serving real-time inference (reading from the online store — Redis, DynamoDB, or BigTable). The feature store handles point-in-time correctness for training: when you generate a training dataset with a label timestamp of T, you only see feature values that were known at time T — no future data leaks.

Defining Features with Feast

Feast is the leading open-source feature store. Feature definitions are Python objects committed to a Git repository and applied to the registry with feast apply. The registry is the source of truth for what features exist and how they are computed — both the training job and the serving layer read from it.

# feature_repo/features.py — Feast feature definitions
# pip install feast[redis]  (or feast[gcp], feast[aws])

from datetime import timedelta
from feast import (
    Entity, FeatureView, Feature, Field,
    FileSource, PushSource, RedisOnlineStoreConfig,
    ValueType
)
from feast.types import Float32, Float64, Int64, String

# ── Entities ──────────────────────────────────────────────────────────────────
# An entity is the primary key that links features to predictions
customer = Entity(
    name="customer_id",
    value_type=ValueType.STRING,
    description="Unique customer identifier",
)

# ── Offline data source (Parquet on GCS / S3) ─────────────────────────────────
customer_stats_source = FileSource(
    path="gs://my-data/features/customer_stats/",
    timestamp_field="event_timestamp",  # required for point-in-time joins
    created_timestamp_column="created",
)

# ── Feature View ──────────────────────────────────────────────────────────────
# Defines which features belong to which entity and their freshness TTL
customer_stats_fv = FeatureView(
    name="customer_stats",
    entities=[customer],
    ttl=timedelta(days=90),
    schema=[
        Field(name="days_since_last_order",        dtype=Int64),
        Field(name="order_count_90d",              dtype=Int64),
        Field(name="avg_order_value_usd",          dtype=Float64),
        Field(name="support_tickets_30d",          dtype=Int64),
        Field(name="product_category_diversity",   dtype=Int64),
        Field(name="days_since_signup",            dtype=Int64),
        Field(name="email_open_rate_30d",          dtype=Float32),
    ],
    online=True,           # materialise to online store for low-latency serving
    source=customer_stats_source,
    tags={"owner": "ml-platform", "domain": "customer"},
)

# ── Push source for streaming updates ─────────────────────────────────────────
# A PushSource allows Kafka/Flink pipelines to push real-time feature updates
# into both the offline log and online store simultaneously
customer_activity_push = PushSource(
    name="customer_activity_push",
    batch_source=customer_stats_source,
)

customer_realtime_fv = FeatureView(
    name="customer_realtime",
    entities=[customer],
    ttl=timedelta(hours=1),   # short TTL for high-frequency features
    schema=[
        Field(name="session_count_last_1h",  dtype=Int64),
        Field(name="cart_value_usd",         dtype=Float64),
        Field(name="last_page_viewed",       dtype=String),
    ],
    online=True,
    source=customer_activity_push,
    tags={"owner": "ml-platform", "domain": "customer", "tier": "realtime"},
)

Materialization, Training Data, and Online Retrieval

Once features are defined, feast materialize copies them from the offline store into the online store (Redis) for low-latency serving. Training jobs use get_historical_features with point-in-time correct joins. The serving layer uses get_online_features for sub-millisecond retrieval.

# Training: generate dataset with point-in-time correct feature retrieval
# Serving: sub-millisecond online feature retrieval from Redis

import pandas as pd
from feast import FeatureStore

store = FeatureStore(repo_path="./feature_repo")

# ── Materialize to online store ───────────────────────────────────────────────
# Run this on a schedule (e.g. every 15 minutes via Airflow)
# It reads from offline store and writes to Redis for each feature view
from datetime import datetime, timedelta

store.materialize(
    start_date=datetime.utcnow() - timedelta(days=90),
    end_date=datetime.utcnow(),
)
# Or incremental (only new data since last materialisation):
# store.materialize_incremental(end_date=datetime.utcnow())

# ── Training: point-in-time correct historical retrieval ──────────────────────
# The entity_df contains labels with the timestamp at which the label was generated.
# Feast joins features as of that timestamp — no future feature values leak in.
entity_df = pd.read_parquet("gs://my-data/labels/churn_labels_2026-05.parquet")
# entity_df must have: customer_id, event_timestamp, churned_within_30d

training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "customer_stats:days_since_last_order",
        "customer_stats:order_count_90d",
        "customer_stats:avg_order_value_usd",
        "customer_stats:support_tickets_30d",
        "customer_stats:product_category_diversity",
        "customer_stats:days_since_signup",
        "customer_stats:email_open_rate_30d",
    ],
).to_df()

# training_df now has features + label, ready for model training
print(training_df.head())
print(f"Training rows: {len(training_df)}, null rate: {training_df.isnull().mean().max():.2%}")

# ── Serving: online feature retrieval (used inside the prediction API) ────────
# Returns current feature values for a list of entity keys
online_features = store.get_online_features(
    features=[
        "customer_stats:days_since_last_order",
        "customer_stats:order_count_90d",
        "customer_stats:avg_order_value_usd",
        "customer_stats:support_tickets_30d",
        "customer_stats:product_category_diversity",
        "customer_stats:days_since_signup",
        "customer_stats:email_open_rate_30d",
    ],
    entity_rows=[
        {"customer_id": "cust-001"},
        {"customer_id": "cust-002"},
        {"customer_id": "cust-003"},
    ],
).to_dict()

print(online_features)
# {
#   "customer_id": ["cust-001", "cust-002", "cust-003"],
#   "days_since_last_order": [12, 45, 3],
#   "order_count_90d": [8, 2, 15],
#   ...
# }

Note

The major feature store options differ on where they store features and how they handle real-time updates. Feast is open-source and self-hosted, using Redis for online storage and BigQuery/Redshift/Parquet for offline. Tecton is managed and handles streaming feature pipelines natively. Hopsworks is open-source and includes a full ML platform. Start with Feast unless you need managed streaming feature computation — its operational simplicity is a significant advantage for teams without a dedicated ML platform team.

Model Serving Patterns

There is no single model serving pattern — the right choice depends on latency requirements, throughput, and the cost of incorrect predictions. Three patterns cover the majority of production ML use cases.

Batch Inference

Run predictions on the entire population overnight or on a schedule. Results are written to a database table that downstream systems read at query time. Latency: hours. Best for: email targeting, churn scores, product recommendations, risk scoring — any use case where the prediction doesn't need to react to a user action in real time. Simplest operationally: no always-on server, no SLA for inference latency, easy to test and debug.

Online Serving (REST API)

A prediction service exposes a REST endpoint that accepts a feature vector and returns a score within an SLA (typically 50–200ms p99). The model is loaded once at startup and kept in memory. Best for: real-time personalisation, fraud detection at transaction time, dynamic pricing, search ranking. Requires careful resource sizing, load testing, and a rollback plan for bad model versions.

Streaming Inference

A Flink or Spark Streaming job applies the model to every event in a Kafka topic, writing predictions back to another topic. Best for: anomaly detection on event streams, real-time content classification, IoT sensor scoring. Combines the low latency of online serving with the throughput of batch — at the cost of the highest operational complexity.

Batch Inference with MLflow

Batch inference is the right default for most ML use cases. It runs on a schedule, scores every entity in the population, and writes results to a table. The consuming application reads predictions from the table at query time — no inference latency in the critical path.

# Batch scoring job — runs nightly via Airflow or similar
# Loads production model from MLflow registry, scores all customers

import mlflow.pyfunc
import pandas as pd
from google.cloud import bigquery
from datetime import date

MODEL_NAME   = "churn-prediction"
OUTPUT_TABLE = "analytics.ml_predictions.churn_scores_daily"

# ── Load production model ─────────────────────────────────────────────────────
model = mlflow.pyfunc.load_model(f"models:/{MODEL_NAME}/Production")
model_version = mlflow.MlflowClient().get_latest_versions(
    MODEL_NAME, stages=["Production"]
)[0].version

# ── Load features for all active customers ────────────────────────────────────
bq = bigquery.Client()
features_df = bq.query("""
    SELECT
        customer_id,
        days_since_last_order,
        order_count_90d,
        avg_order_value_usd,
        support_tickets_30d,
        product_category_diversity,
        days_since_signup,
        email_open_rate_30d
    FROM analytics.features.customer_stats_latest
    WHERE is_active = TRUE
""").to_dataframe()

FEATURE_COLS = [
    "days_since_last_order", "order_count_90d", "avg_order_value_usd",
    "support_tickets_30d", "product_category_diversity",
    "days_since_signup", "email_open_rate_30d",
]

print(f"Scoring {len(features_df):,} customers...")

# ── Score in chunks (memory-efficient for large populations) ──────────────────
CHUNK_SIZE = 50_000
all_scores = []

for start in range(0, len(features_df), CHUNK_SIZE):
    chunk = features_df.iloc[start : start + CHUNK_SIZE]
    scores = model.predict(chunk[FEATURE_COLS])
    all_scores.extend(scores.tolist())

# ── Build output DataFrame ────────────────────────────────────────────────────
output_df = pd.DataFrame({
    "customer_id":    features_df["customer_id"],
    "churn_score":    all_scores,
    "score_date":     date.today().isoformat(),
    "model_name":     MODEL_NAME,
    "model_version":  model_version,
    "score_decile":   pd.qcut(all_scores, q=10, labels=False, duplicates="drop") + 1,
})

# ── Write to BigQuery (append partition) ──────────────────────────────────────
job_config = bigquery.LoadJobConfig(
    write_disposition="WRITE_TRUNCATE",
    time_partitioning=bigquery.TimePartitioning(field="score_date"),
)
bq.load_table_from_dataframe(
    output_df, OUTPUT_TABLE, job_config=job_config
).result()

print(f"Wrote {len(output_df):,} predictions to {OUTPUT_TABLE}")
print(f"Score distribution: mean={output_df['churn_score'].mean():.4f}, "
      f"p90={output_df['churn_score'].quantile(0.9):.4f}")

Online Serving with FastAPI

Online serving requires an always-on prediction service. The model is loaded once at startup from the MLflow registry, held in memory, and reused for every request. The serving layer fetches features from the Feast online store (Redis) and passes them to the model — the calling application only needs to send entity IDs, not raw features.

# FastAPI prediction service with MLflow + Feast integration
# pip install fastapi uvicorn mlflow feast[redis] pydantic prometheus-fastapi-instrumentator

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow.pyfunc
from feast import FeatureStore
import pandas as pd
import time
import logging
from prometheus_fastapi_instrumentator import Instrumentator

logger = logging.getLogger(__name__)
app = FastAPI(title="Churn Prediction API", version="1.0.0")

# ── Load model and feature store at startup ───────────────────────────────────
MODEL_NAME = "churn-prediction"
FEATURES   = [
    "customer_stats:days_since_last_order",
    "customer_stats:order_count_90d",
    "customer_stats:avg_order_value_usd",
    "customer_stats:support_tickets_30d",
    "customer_stats:product_category_diversity",
    "customer_stats:days_since_signup",
    "customer_stats:email_open_rate_30d",
]
FEATURE_COLS = [f.split(":")[1] for f in FEATURES]

model: mlflow.pyfunc.PyFuncModel = None
store: FeatureStore = None

@app.on_event("startup")
async def load_resources():
    global model, store
    logger.info("Loading production model from MLflow registry...")
    model = mlflow.pyfunc.load_model(f"models:/{MODEL_NAME}/Production")
    store = FeatureStore(repo_path="/app/feature_repo")
    logger.info("Model and feature store loaded")

# ── Prometheus metrics ─────────────────────────────────────────────────────────
Instrumentator().instrument(app).expose(app)

# ── Request / Response schemas ─────────────────────────────────────────────────
class PredictRequest(BaseModel):
    customer_id: str

class PredictResponse(BaseModel):
    customer_id:  str
    churn_score:  float
    risk_tier:    str      # "high" / "medium" / "low"
    latency_ms:   float

# ── Prediction endpoint ────────────────────────────────────────────────────────
@app.post("/predict", response_model=PredictResponse)
async def predict(req: PredictRequest):
    t0 = time.monotonic()

    # 1. Retrieve features from online store (Redis) — typically < 5ms
    try:
        feature_dict = store.get_online_features(
            features=FEATURES,
            entity_rows=[{"customer_id": req.customer_id}],
        ).to_dict()
    except Exception as e:
        logger.error(f"Feature retrieval failed for {req.customer_id}: {e}")
        raise HTTPException(status_code=503, detail="Feature store unavailable")

    # 2. Check for missing features (cold-start: new customer with no history)
    row = {col: feature_dict[col][0] for col in FEATURE_COLS}
    if any(v is None for v in row.values()):
        logger.warning(f"Missing features for {req.customer_id} — using fallback")
        # Return neutral score rather than crashing on null features
        return PredictResponse(
            customer_id=req.customer_id,
            churn_score=0.15,   # population base rate
            risk_tier="low",
            latency_ms=(time.monotonic() - t0) * 1000,
        )

    # 3. Run inference
    X = pd.DataFrame([row])
    score = float(model.predict(X[FEATURE_COLS])[0])

    # 4. Map score to risk tier
    risk_tier = "high" if score >= 0.7 else ("medium" if score >= 0.35 else "low")

    return PredictResponse(
        customer_id=req.customer_id,
        churn_score=round(score, 6),
        risk_tier=risk_tier,
        latency_ms=round((time.monotonic() - t0) * 1000, 2),
    )

@app.get("/healthz")
async def health():
    return {"status": "ok", "model": MODEL_NAME}

# Run with:
# uvicorn serving:app --host 0.0.0.0 --port 8080 --workers 4

Note

For models that need to handle thousands of requests per second, consider NVIDIA Triton Inference Server or Ray Serve. Triton handles batching, GPU inference, and model ensembles natively. Ray Serve supports Python-based models with autoscaling and fractional GPU allocation. FastAPI is the right choice for teams that want full control and don't yet need GPU inference or hardware-level batching — it handles 500–5,000 req/s per worker for CPU-bound sklearn and XGBoost models.

Production Monitoring: Drift Detection

A deployed model is a snapshot of the relationship between features and labels at training time. As the world changes — seasonal patterns, product changes, user behaviour shifts — both the input distribution and the label distribution can drift. Without monitoring, model degradation is invisible until a downstream business metric breaks.

Two types of drift matter: data drift (the distribution of input features changes) and concept drift (the relationship between features and labels changes — the model's assumptions about the world become wrong). Data drift is detectable without ground-truth labels by comparing feature distributions between a reference window (training data) and a current window (recent inference inputs). Evidently computes a suite of statistical tests (Jensen-Shannon divergence, KL divergence, Wasserstein distance, chi-squared) and generates HTML reports and JSON summaries suitable for alerting.

# Drift detection with Evidently — runs as a scheduled job
# pip install evidently pandas pyarrow

import pandas as pd
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset
from evidently.metrics import DatasetDriftMetric, ColumnDriftMetric
import json
import datetime

FEATURE_COLS = [
    "days_since_last_order", "order_count_90d", "avg_order_value_usd",
    "support_tickets_30d", "product_category_diversity",
    "days_since_signup", "email_open_rate_30d",
]

# ── Reference dataset (training data distribution) ────────────────────────────
reference_df = pd.read_parquet(
    "gs://my-data/features/churn_training_2026-05.parquet"
)[FEATURE_COLS].sample(n=10_000, random_state=42)

# ── Current window (last 7 days of inference requests) ───────────────────────
current_df = pd.read_parquet(
    "gs://my-data/inference_logs/2026-05-05_7d_window.parquet"
)[FEATURE_COLS]

# ── Column mapping ────────────────────────────────────────────────────────────
column_mapping = ColumnMapping(
    numerical_features=FEATURE_COLS,
)

# ── Build drift report ────────────────────────────────────────────────────────
report = Report(metrics=[
    DataDriftPreset(drift_share=0.3),  # alert if > 30% of features drift
    DataQualityPreset(),
    ColumnDriftMetric(column_name="days_since_last_order"),
    ColumnDriftMetric(column_name="avg_order_value_usd"),
])

report.run(
    reference_data=reference_df,
    current_data=current_df,
    column_mapping=column_mapping,
)

# ── Extract summary as JSON ────────────────────────────────────────────────────
result = report.as_dict()
drift_summary = result["metrics"][0]["result"]

dataset_drifted = drift_summary["dataset_drift"]
drift_share     = drift_summary["share_of_drifted_columns"]
drifted_cols    = [
    col for col, stats in drift_summary["drift_by_columns"].items()
    if stats["drift_detected"]
]

print(f"Dataset drifted: {dataset_drifted}")
print(f"Drifted columns ({drift_share:.0%}): {drifted_cols}")

# ── Alert on significant drift ────────────────────────────────────────────────
if dataset_drifted and drift_share >= 0.3:
    alert_payload = {
        "timestamp": datetime.datetime.utcnow().isoformat(),
        "model": "churn-prediction",
        "alert": "data_drift_detected",
        "drift_share": drift_share,
        "drifted_columns": drifted_cols,
        "reference_period": "2026-05 training set",
        "current_period": "2026-05-05 7-day window",
        "action": "Trigger model retraining pipeline",
    }
    # In production: post to PagerDuty, Slack, or your alerting system
    import json
    print("ALERT:", json.dumps(alert_payload, indent=2))

# ── Save HTML report for review ───────────────────────────────────────────────
report.save_html("drift_report_2026-05-05.html")
print("Drift report saved")

Prediction Distribution Monitoring

Beyond input feature drift, monitor the distribution of model outputs. A healthy churn model produces a score distribution that is relatively stable over time. If the mean score suddenly shifts up or the distribution becomes bimodal, something is wrong — either data drift upstream, a feature pipeline failure, or a model issue. Log every prediction with its input features to a data warehouse and run distribution checks daily.

# Prometheus metrics for prediction monitoring in FastAPI serving
# Record score distribution and feature statistics per request

from prometheus_client import Histogram, Counter, Gauge
import numpy as np

# ── Prediction score histogram ─────────────────────────────────────────────────
prediction_score_histogram = Histogram(
    "ml_prediction_score",
    "Distribution of model prediction scores",
    buckets=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0],
    labelnames=["model_name", "risk_tier"],
)

# ── Feature value gauges (sample 1% of requests for cardinality control) ──────
feature_sample_gauge = Gauge(
    "ml_feature_value",
    "Sampled feature values for drift proxy monitoring",
    labelnames=["model_name", "feature_name", "statistic"],
)

# ── Null feature counter ───────────────────────────────────────────────────────
feature_null_counter = Counter(
    "ml_feature_null_total",
    "Count of null feature values by feature name",
    labelnames=["model_name", "feature_name"],
)

# Add to the predict() endpoint after scoring:
#   prediction_score_histogram.labels(
#       model_name=MODEL_NAME, risk_tier=risk_tier
#   ).observe(score)
#
#   if random.random() < 0.01:  # 1% sampling
#       feature_sample_gauge.labels(
#           model_name=MODEL_NAME,
#           feature_name="days_since_last_order",
#           statistic="value",
#       ).set(row["days_since_last_order"])

# ── Grafana alert rule: prediction score mean shift ───────────────────────────
# In Grafana / Prometheus rule file (recording + alerting):

# record: ml:churn_prediction:score_mean_1h
# expr: |
#   histogram_quantile(0.50,
#     rate(ml_prediction_score_bucket{model_name="churn-prediction"}[1h])
#   )
#
# alert: ChurnScoreMeanShift
# expr: |
#   abs(
#     ml:churn_prediction:score_mean_1h
#     - ml:churn_prediction:score_mean_1h offset 7d
#   ) > 0.08
# for: 30m
# labels:
#   severity: warning
# annotations:
#   summary: "Churn prediction score distribution shifted by > 8%"
#   description: "Check for upstream data issues or model degradation"

End-to-End Pipeline Architecture

The complete production ML pipeline connects all the components described above into a repeatable, observable workflow. The pipeline runs continuously: features are materialised on a schedule, models are retrained when drift is detected or on a time trigger, new versions go through validation before promotion, and serving is always backed by a monitored model with a tested rollback path.

# End-to-end ML pipeline — Airflow DAG skeleton
# Connects feature materialization → training → validation → promotion → monitoring

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.base import BaseSensorOperator
from datetime import datetime, timedelta

# DAG runs nightly — retrains model if significant drift is detected
with DAG(
    dag_id="churn_model_pipeline",
    schedule_interval="0 2 * * *",   # 2 AM UTC daily
    start_date=datetime(2026, 5, 1),
    catchup=False,
    default_args={
        "owner": "ml-platform",
        "retries": 2,
        "retry_delay": timedelta(minutes=5),
        "email_on_failure": True,
    },
    tags=["ml", "churn", "production"],
) as dag:

    # ── Step 1: Materialise features into online store ────────────────────────
    materialise_features = PythonOperator(
        task_id="materialise_features",
        python_callable=lambda: __import__("feast_utils").materialize_all(),
        doc="Sync offline feature store to Redis online store",
    )

    # ── Step 2: Check for data drift on the last 7 days ───────────────────────
    check_drift = PythonOperator(
        task_id="check_data_drift",
        python_callable=lambda: __import__("drift_utils").run_drift_check(),
        doc="Run Evidently drift report; set XCom drift_detected=True if significant",
    )

    # ── Step 3: Retrain model (only if drift or scheduled weekly) ─────────────
    retrain_model = PythonOperator(
        task_id="retrain_model",
        python_callable=lambda **ctx: __import__("train_utils").run_training(
            force=ctx["dag_run"].conf.get("force_retrain", False),
            drift_detected=ctx["ti"].xcom_pull("check_data_drift", key="drift_detected"),
        ),
        provide_context=True,
        doc="Train XGBoost churn model, log to MLflow",
    )

    # ── Step 4: Validate the new model version in Staging ────────────────────
    validate_model = PythonOperator(
        task_id="validate_model",
        python_callable=lambda **ctx: __import__("registry_utils").validate_staging(
            run_id=ctx["ti"].xcom_pull("retrain_model", key="run_id"),
        ),
        provide_context=True,
        doc="Shadow-score validation dataset, check for degenerate predictions",
    )

    # ── Step 5: Promote to Production if validation passes ───────────────────
    promote_model = PythonOperator(
        task_id="promote_model",
        python_callable=lambda **ctx: __import__("registry_utils").promote_to_production(
            version=ctx["ti"].xcom_pull("validate_model", key="version"),
        ),
        provide_context=True,
        doc="Transition model version from Staging to Production in MLflow registry",
    )

    # ── Step 6: Run batch scoring on new model ────────────────────────────────
    batch_score = PythonOperator(
        task_id="batch_score",
        python_callable=lambda: __import__("scoring_utils").score_all_customers(),
        doc="Score all active customers with new Production model, write to BQ",
    )

    # ── Step 7: Post-scoring drift check on predictions ──────────────────────
    check_score_distribution = PythonOperator(
        task_id="check_score_distribution",
        python_callable=lambda: __import__("drift_utils").check_score_distribution(),
        doc="Verify prediction score distribution is within expected bounds",
    )

    # Pipeline dependency chain
    (
        materialise_features
        >> check_drift
        >> retrain_model
        >> validate_model
        >> promote_model
        >> batch_score
        >> check_score_distribution
    )

Note

Start simpler than this. If you're building your first production ML pipeline, begin with: MLflow tracking + a nightly batch scoring job. Add the model registry workflow when you have multiple model versions competing. Add Feast when you actually hit training-serving skew. Add drift detection when you have a model in production long enough to see distribution shifts. The full pipeline above is the target state — not the starting point. Premature MLOps infrastructure is one of the most common ways to slow down a team that should be shipping models.

Production Checklist

Experiment tracking

Every training run logs parameters, metrics, model artifact, and dataset version to MLflow.

Model registry

All deployed models are registered and promoted through Staging → Production with validation gates.

Feature store

Training and serving use identical feature computation logic — no ad-hoc feature pipelines in either path.

Serving rollback

The serving layer can roll back to the previous Production version by transitioning a registry stage — no code deploy required.

Input validation

The prediction API validates feature ranges and null rates before calling the model. Cold-start fallbacks are tested.

Drift monitoring

Weekly Evidently reports compare current feature distributions against training data. Alerts fire when drift share exceeds 30%.

Prediction logging

Every inference request is logged with customer ID, feature values, score, and model version for auditability and retrospective evaluation.

Retraining trigger

Both scheduled (weekly) and drift-triggered retraining are tested and run end-to-end in CI before going to production.

Work with us

Building ML pipelines and struggling with experiment tracking, feature consistency, or model serving at scale?

We design and implement production ML pipelines — from MLflow experiment tracking and model registry workflows to feature stores, online serving, and monitoring for model drift. Let’s talk.

Get in touch

Related Articles