Agent Skills: Palantir Migration Deep Dive

|

UncategorizedID: jeremylongshore/claude-code-plugins-plus-skills/palantir-migration-deep-dive

Install this agent skill to your local

pnpm dlx add-skill https://github.com/jeremylongshore/claude-code-plugins-plus-skills/tree/HEAD/plugins/saas-packs/palantir-pack/skills/palantir-migration-deep-dive

Skill Files

Browse the full folder contents for palantir-migration-deep-dive.

Download Skill

Loading file tree…

plugins/saas-packs/palantir-pack/skills/palantir-migration-deep-dive/SKILL.md

Skill Metadata

Name
palantir-migration-deep-dive
Description
|

Palantir Migration Deep Dive

Overview

Comprehensive guide for migrating data into Foundry, migrating from legacy systems to Foundry-backed architectures, and upgrading between Foundry API versions using the strangler fig pattern.

Prerequisites

  • Source system access and schema documentation
  • Foundry enrollment with write access
  • Understanding of Foundry data pipeline architecture (palantir-reference-architecture)

Instructions

Step 1: Migration Assessment

## Migration Checklist
- [ ] Source system inventory (tables, volumes, refresh rates)
- [ ] Data classification (PII, confidential, public)
- [ ] Schema mapping: source columns → Foundry dataset columns
- [ ] Volume estimate: rows, GB, growth rate
- [ ] Dependencies: downstream consumers of source data
- [ ] Timeline: parallel run period, cutover date

Step 2: Data Migration — Bulk Import

import foundry, pandas as pd

client = get_foundry_client()

# Read source data (example: PostgreSQL)
df = pd.read_sql("SELECT * FROM orders WHERE year >= 2024", source_conn)

# Upload to Foundry dataset
client.datasets.Dataset.upload(
    dataset_rid="ri.foundry.main.dataset.xxxxx",
    branch_id="master",
    file_path="orders.parquet",
    data=df.to_parquet(),
    content_type="application/x-parquet",
)
print(f"Uploaded {len(df)} rows to Foundry")

Step 3: Incremental Sync (Ongoing)

from datetime import datetime, timedelta

def incremental_sync(client, source_conn, dataset_rid, last_sync):
    """Sync only new/changed rows since last sync."""
    query = f"""
        SELECT * FROM orders 
        WHERE updated_at > '{last_sync.isoformat()}'
        ORDER BY updated_at
    """
    df = pd.read_sql(query, source_conn)
    if df.empty:
        print("No new rows to sync")
        return last_sync

    client.datasets.Dataset.upload(
        dataset_rid=dataset_rid,
        branch_id="master",
        file_path=f"sync_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.parquet",
        data=df.to_parquet(),
    )
    print(f"Synced {len(df)} rows")
    return df["updated_at"].max()

Step 4: Strangler Fig Pattern for API Migration

class DualWriteClient:
    """Write to both legacy and Foundry during migration period."""
    def __init__(self, legacy_client, foundry_client):
        self.legacy = legacy_client
        self.foundry = foundry_client
        self.foundry_enabled = os.environ.get("FOUNDRY_WRITES_ENABLED", "false") == "true"

    def create_order(self, order_data):
        # Always write to legacy (source of truth during migration)
        result = self.legacy.create_order(order_data)

        # Shadow write to Foundry (non-blocking)
        if self.foundry_enabled:
            try:
                self.foundry.ontologies.Action.apply(
                    ontology="my-company",
                    action_type="createOrder",
                    parameters=order_data,
                )
            except Exception as e:
                print(f"Foundry shadow write failed (non-fatal): {e}")

        return result

Step 5: Validation and Cutover

def validate_migration(legacy_conn, foundry_client, ontology, object_type):
    """Compare row counts and checksums between source and Foundry."""
    # Legacy count
    legacy_count = pd.read_sql("SELECT COUNT(*) as c FROM orders", legacy_conn).iloc[0]["c"]

    # Foundry count
    foundry_result = foundry_client.ontologies.OntologyObject.aggregate(
        ontology=ontology, object_type=object_type,
        aggregation=[{"type": "count", "name": "total"}],
    )
    foundry_count = foundry_result.data[0].metrics["total"]

    match = legacy_count == foundry_count
    print(f"Legacy: {legacy_count}, Foundry: {foundry_count}, Match: {match}")
    return match

Output

  • Migration assessment checklist completed
  • Bulk data import to Foundry datasets
  • Incremental sync for ongoing changes
  • Dual-write pattern for safe cutover
  • Validation comparing source and Foundry counts

Error Handling

| Migration Risk | Detection | Mitigation | |---------------|-----------|------------| | Data loss | Row count mismatch | Run validation before cutover | | Schema mismatch | Transform errors | Map schemas explicitly | | Dual-write divergence | Checksum differences | Reconciliation job | | Rollback needed | Production issues | Keep legacy running during parallel period |

Resources

Next Steps

For SDK version upgrades, see palantir-upgrade-migration.