Palantir Core Workflow A — Data Pipelines with Transforms
Overview
Build Foundry data pipelines using the transforms-python library. Covers the @transform and @transform_df decorators, input/output dataset wiring, incremental transforms, and @configure for Spark tuning. This is the primary workflow for all data processing in Foundry.
Prerequisites
- Completed
palantir-install-authsetup - A Foundry Code Repository (Python Transforms type)
- Understanding of PySpark DataFrames (Foundry runs Spark under the hood)
Instructions
Step 1: Project Structure
my-transforms-repo/
├── src/
│ └── myproject/
│ ├── __init__.py
│ ├── pipeline.py # Main transforms
│ ├── utils.py # Shared logic
│ └── datasets.py # Dataset path constants
├── build.gradle # Foundry build config
├── conda_recipe/meta.yaml # Dependency declarations
└── settings.gradle
Step 2: Basic Transform with @transform_df
# src/myproject/pipeline.py
from transforms.api import transform_df, Input, Output
@transform_df(
Output("/Company/datasets/cleaned_orders"),
orders=Input("/Company/datasets/raw_orders"),
)
def clean_orders(orders):
"""Clean raw orders: drop nulls, normalize dates, filter test data."""
from pyspark.sql import functions as F
return (
orders
.filter(F.col("order_id").isNotNull())
.filter(~F.col("email").like("%@test.com"))
.withColumn("order_date", F.to_date("order_date_str", "yyyy-MM-dd"))
.withColumn("total_cents", (F.col("total") * 100).cast("long"))
.drop("order_date_str", "total")
)
Step 3: Multi-Input Join Transform
@transform_df(
Output("/Company/datasets/order_enriched"),
orders=Input("/Company/datasets/cleaned_orders"),
customers=Input("/Company/datasets/customers"),
)
def enrich_orders(orders, customers):
"""Join orders with customer data for analytics."""
from pyspark.sql import functions as F
return (
orders
.join(customers, orders.customer_id == customers.id, "left")
.select(
orders.order_id,
orders.order_date,
orders.total_cents,
customers.name.alias("customer_name"),
customers.segment,
customers.region,
)
.withColumn("processed_at", F.current_timestamp())
)
Step 4: Low-Level @transform for File I/O
from transforms.api import transform, Input, Output
@transform(
output=Output("/Company/datasets/report_summary"),
orders=Input("/Company/datasets/order_enriched"),
)
def generate_summary(orders, output):
"""Write aggregated summary using low-level FileSystem API."""
df = orders.dataframe()
summary = (
df.groupBy("region", "segment")
.agg(
{"total_cents": "sum", "order_id": "count"}
)
.withColumnRenamed("sum(total_cents)", "revenue_cents")
.withColumnRenamed("count(order_id)", "order_count")
)
output.write_dataframe(summary)
Step 5: Incremental Transforms
from transforms.api import transform_df, Input, Output, incremental
@incremental()
@transform_df(
Output("/Company/datasets/daily_events"),
events=Input("/Company/datasets/raw_events"),
)
def process_events_incrementally(events):
"""Only process new rows since last build — much faster for append-only data."""
from pyspark.sql import functions as F
return events.withColumn("ingested_at", F.current_timestamp())
Step 6: Configure Spark Resources
from transforms.api import transform_df, Input, Output, configure
@configure(profile=["DRIVER_MEMORY_LARGE"]) # 16GB driver
@transform_df(
Output("/Company/datasets/heavy_aggregation"),
data=Input("/Company/datasets/large_dataset"),
)
def heavy_compute(data):
"""Resource-intensive transform needing extra Spark memory."""
from pyspark.sql import functions as F
return (
data
.groupBy("category")
.agg(F.approx_count_distinct("user_id").alias("unique_users"))
)
Output
- Dataset-to-dataset transforms wired with
@transform_df - Multi-input joins connecting datasets across projects
- Incremental processing for append-only sources
- Spark resource tuning with
@configure
Error Handling
| Error | Cause | Solution |
|-------|-------|----------|
| DatasetNotFound | Wrong path string | Check dataset path in Foundry UI (right-click > Copy path) |
| AnalysisException: cannot resolve | Column name mismatch | Print df.columns to debug; Foundry columns are case-sensitive |
| OutOfMemoryError | Insufficient Spark memory | Add @configure(profile=["DRIVER_MEMORY_LARGE"]) |
| Transform is not incremental-compatible | Using non-append operations | Only use filter/select/withColumn in incremental transforms |
| Build hangs | Circular dependency | Check that no two transforms reference each other's output |
Examples
Polars Transform (Lightweight)
from transforms.api import transform_polars, Input, Output
@transform_polars(
Output("/Company/datasets/fast_summary"),
data=Input("/Company/datasets/small_table"),
)
def fast_polars(data):
"""Use Polars for small datasets — faster than Spark, no JVM overhead."""
import polars as pl
return data.group_by("category").agg(pl.col("amount").sum())
Resources
Next Steps
- Query Ontology objects and actions:
palantir-core-workflow-b - Optimize pipeline performance:
palantir-performance-tuning - Deploy across environments:
palantir-multi-env-setup