Agent Skills: Databricks Core Workflow B: MLflow Training & Serving

|

UncategorizedID: jeremylongshore/claude-code-plugins-plus-skills/databricks-core-workflow-b

Install this agent skill to your local

pnpm dlx add-skill https://github.com/jeremylongshore/claude-code-plugins-plus-skills/tree/HEAD/plugins/saas-packs/databricks-pack/skills/databricks-core-workflow-b

Skill Files

Browse the full folder contents for databricks-core-workflow-b.

Download Skill

Loading file tree…

plugins/saas-packs/databricks-pack/skills/databricks-core-workflow-b/SKILL.md

Skill Metadata

Name
databricks-core-workflow-b
Description
|

Databricks Core Workflow B: MLflow Training & Serving

Overview

Full ML lifecycle on Databricks: Feature Engineering Client for discoverable features, MLflow experiment tracking with auto-logging, Unity Catalog model registry with aliases (champion/challenger), and Mosaic AI Model Serving endpoints for real-time inference via REST API.

Prerequisites

  • Completed databricks-install-auth and databricks-core-workflow-a
  • databricks-sdk, mlflow, scikit-learn installed
  • Unity Catalog enabled (required for model registry)

Instructions

Step 1: Feature Engineering with Feature Store

Create a feature table in Unity Catalog so features are discoverable and reusable.

from databricks.feature_engineering import FeatureEngineeringClient
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()
fe = FeatureEngineeringClient()

# Build features from gold layer tables
user_features = (
    spark.table("prod_catalog.gold.user_events")
    .groupBy("user_id")
    .agg(
        F.count("event_id").alias("total_events"),
        F.avg("session_duration_sec").alias("avg_session_sec"),
        F.max("event_timestamp").alias("last_active"),
        F.countDistinct("event_type").alias("unique_event_types"),
        F.datediff(F.current_date(), F.max("event_timestamp")).alias("days_since_last_active"),
    )
)

# Register as a feature table (creates or updates)
fe.create_table(
    name="prod_catalog.ml_features.user_behavior",
    primary_keys=["user_id"],
    df=user_features,
    description="User behavioral features for churn prediction",
)

Step 2: MLflow Experiment Tracking

import mlflow
from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# Point MLflow to Databricks tracking server
mlflow.set_tracking_uri("databricks")
mlflow.set_experiment("/Users/team@company.com/churn-prediction")

# Load features
features_df = spark.table("prod_catalog.ml_features.user_behavior").toPandas()
X = features_df.drop(columns=["user_id", "churned"])
y = features_df["churned"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train with experiment tracking
with mlflow.start_run(run_name="gbm-baseline") as run:
    params = {"n_estimators": 200, "max_depth": 5, "learning_rate": 0.1}
    mlflow.log_params(params)

    model = GradientBoostingClassifier(**params)
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)

    metrics = {
        "accuracy": accuracy_score(y_test, y_pred),
        "precision": precision_score(y_test, y_pred),
        "recall": recall_score(y_test, y_pred),
        "f1": f1_score(y_test, y_pred),
    }
    mlflow.log_metrics(metrics)

    # Log model with signature for serving validation
    mlflow.sklearn.log_model(
        model,
        artifact_path="model",
        input_example=X_test.iloc[:5],
        registered_model_name="prod_catalog.ml_models.churn_predictor",
    )
    print(f"Run {run.info.run_id}: accuracy={metrics['accuracy']:.3f}")

Step 3: Model Registry with Aliases

Unity Catalog model registry replaces legacy stages with aliases (champion, challenger).

from mlflow import MlflowClient

client = MlflowClient()
model_name = "prod_catalog.ml_models.churn_predictor"

# List versions
for mv in client.search_model_versions(f"name='{model_name}'"):
    print(f"v{mv.version}: status={mv.status}, aliases={mv.aliases}")

# Promote best version to champion
client.set_registered_model_alias(model_name, alias="champion", version="3")

# Load model by alias in downstream code
champion = mlflow.pyfunc.load_model(f"models:/{model_name}@champion")
predictions = champion.predict(X_test)

Step 4: Deploy Model Serving Endpoint

Mosaic AI Model Serving creates a REST API endpoint with auto-scaling.

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import (
    EndpointCoreConfigInput, ServedEntityInput,
)

w = WorkspaceClient()

# Create or update a serving endpoint
endpoint = w.serving_endpoints.create_and_wait(
    name="churn-predictor-prod",
    config=EndpointCoreConfigInput(
        served_entities=[
            ServedEntityInput(
                entity_name="prod_catalog.ml_models.churn_predictor",
                entity_version="3",
                workload_size="Small",
                scale_to_zero_enabled=True,
            )
        ]
    ),
)
print(f"Endpoint ready: {endpoint.name} ({endpoint.state.ready})")

Step 5: Query the Serving Endpoint

import requests

# Score via REST API
url = f"{w.config.host}/serving-endpoints/churn-predictor-prod/invocations"
headers = {
    "Authorization": f"Bearer {w.config.token}",
    "Content-Type": "application/json",
}
payload = {
    "dataframe_records": [
        {"total_events": 42, "avg_session_sec": 120.5,
         "unique_event_types": 7, "days_since_last_active": 3},
    ]
}
response = requests.post(url, headers=headers, json=payload)
print(response.json())  # {"predictions": [0]}

# Or use the SDK
result = w.serving_endpoints.query(
    name="churn-predictor-prod",
    dataframe_records=[
        {"total_events": 42, "avg_session_sec": 120.5,
         "unique_event_types": 7, "days_since_last_active": 3},
    ],
)
print(result.predictions)

Step 6: Batch Inference Job

# Scheduled Databricks job for daily batch scoring
model_name = "prod_catalog.ml_models.churn_predictor"
champion = mlflow.pyfunc.load_model(f"models:/{model_name}@champion")

# Score all active users
active_users = spark.table("prod_catalog.gold.active_users").toPandas()
feature_cols = ["total_events", "avg_session_sec", "unique_event_types", "days_since_last_active"]
active_users["churn_probability"] = champion.predict_proba(active_users[feature_cols])[:, 1]

# Write scores back to Delta
(spark.createDataFrame(active_users[["user_id", "churn_probability"]])
    .write.mode("overwrite")
    .saveAsTable("prod_catalog.gold.churn_scores"))

Output

  • Feature table in Unity Catalog (prod_catalog.ml_features.user_behavior)
  • MLflow experiment with logged runs, metrics, and artifacts
  • Model versions in registry with champion alias
  • Live serving endpoint at /serving-endpoints/churn-predictor-prod/invocations
  • Batch scoring pipeline writing to prod_catalog.gold.churn_scores

Error Handling

| Error | Cause | Solution | |-------|-------|----------| | RESOURCE_DOES_NOT_EXIST | Wrong experiment path | Verify with mlflow.search_experiments() | | INVALID_PARAMETER_VALUE on log_model | Missing signature | Pass input_example= to auto-infer signature | | Model not found in registry | Wrong three-level name | Use catalog.schema.model_name format | | Endpoint FAILED | Model loading error | Check endpoint events: w.serving_endpoints.get("name").pending_config | | 429 on serving endpoint | Rate limit exceeded | Increase workload_size or add traffic splitting | | FEATURE_TABLE_NOT_FOUND | Table not created | Run fe.create_table() first |

Examples

Hyperparameter Sweep

from sklearn.model_selection import ParameterGrid

grid = {"n_estimators": [100, 200], "max_depth": [3, 5, 7], "learning_rate": [0.05, 0.1]}
for params in ParameterGrid(grid):
    with mlflow.start_run(run_name=f"gbm-d{params['max_depth']}-n{params['n_estimators']}"):
        mlflow.log_params(params)
        model = GradientBoostingClassifier(**params)
        model.fit(X_train, y_train)
        mlflow.log_metric("accuracy", accuracy_score(y_test, model.predict(X_test)))
        mlflow.sklearn.log_model(model, "model")

Resources

Next Steps

For common errors, see databricks-common-errors.