Agent Skills: Attio Reference Architecture

|

UncategorizedID: jeremylongshore/claude-code-plugins-plus-skills/attio-reference-architecture

Install this agent skill to your local

pnpm dlx add-skill https://github.com/jeremylongshore/claude-code-plugins-plus-skills/tree/HEAD/plugins/saas-packs/attio-pack/skills/attio-reference-architecture

Skill Files

Browse the full folder contents for attio-reference-architecture.

Download Skill

Loading file tree…

plugins/saas-packs/attio-pack/skills/attio-reference-architecture/SKILL.md

Skill Metadata

Name
attio-reference-architecture
Description
'Production reference architecture for Attio CRM integrations -- layered

Attio Reference Architecture

Overview

Production architecture for CRM integrations with the Attio REST API (https://api.attio.com/v2). Designed for contact enrichment pipelines, deal tracking across custom lists, bi-directional activity sync with external systems, and workspace isolation for multi-tenant deployments. Key design drivers: webhook-driven data freshness, idempotent upserts via PUT assertions, schema-aware caching, and layered separation between API client, business logic, and infrastructure.

Architecture Diagram

Your App ──→ Service Layer ──→ Cache (Redis) ──→ Attio REST API v2
                  ↓                               /objects/people/records
             Queue (p-queue) ──→ Sync Worker      /lists/{slug}/entries
                  ↓                               /notes, /tasks
             Webhook Handler ←── Attio Events     /webhooks
                  ↓
             External CRM Sync ──→ HubSpot/Salesforce

Service Layer

class ContactService {
  constructor(private client: AttioClient, private cache: CacheLayer) {}

  async findByEmail(email: string): Promise<AttioRecord | null> {
    const res = await this.client.post('/objects/people/records/query', { filter: { email_addresses: email }, limit: 1 });
    return res.data[0] || null;
  }

  async upsertPerson(data: { email: string; firstName: string; lastName: string }): Promise<AttioRecord> {
    const res = await this.client.put('/objects/people/records', {
      data: { values: { email_addresses: [data.email], name: [{ first_name: data.firstName, last_name: data.lastName }] } }
    });
    await this.cache.invalidate(`person:${data.email}`);
    return res.data;
  }

  async addToPipeline(recordId: string, listSlug: string, stage: string): Promise<void> {
    await this.client.post(`/lists/${listSlug}/entries`, {
      data: { parent_record_id: recordId, parent_object: 'people', values: { stage: [{ status: stage }] } }
    });
  }
}

Caching Strategy

const CACHE_CONFIG = {
  schema:  { ttl: 1800, prefix: 'schema' },   // 30 min — object/attribute definitions change rarely
  records: { ttl: 300,  prefix: 'record' },    // 5 min — webhook-driven invalidation handles freshness
  lists:   { ttl: 120,  prefix: 'list' },      // 2 min — deal pipeline stages need near-real-time
  notes:   { ttl: 60,   prefix: 'note' },      // 1 min — activity feed freshness
};
// Webhook events (record.updated, list-entry.created) flush matching cache keys immediately

Event Pipeline

class AttioEventPipeline {
  private queue = new Bull('attio-events', { redis: process.env.REDIS_URL });

  async onWebhook(event: AttioWebhookEvent): Promise<void> {
    await this.queue.add(event.event_type, event, { attempts: 3, backoff: { type: 'exponential', delay: 2000 } });
  }

  async processRecordEvent(event: AttioWebhookEvent): Promise<void> {
    if (event.event_type === 'record.created') await this.syncToExternalCRM(event.record!.id.record_id);
    if (event.event_type === 'record.updated') await this.cache.invalidate(`record:${event.record!.id.record_id}`);
    if (event.event_type === 'record.merged') await this.reconcileMergedRecords(event);
  }

  async processListEntryEvent(event: AttioWebhookEvent): Promise<void> {
    if (event.event_type === 'list-entry.created') await this.triggerPipelineAutomation(event);
  }
}

Data Model

interface AttioRecord       { id: { record_id: string; object_id: string }; values: Record<string, AttioValue[]>; created_at: string; }
interface AttioValue        { attribute_type: string; [key: string]: unknown; }
interface AttioWebhookEvent { event_type: string; object?: { api_slug: string }; record?: AttioRecord; list_entry?: { entry_id: string }; }
interface SyncState         { objectSlug: string; lastSyncOffset: number; lastFullSync: string; recordCount: number; }

Scaling Considerations

  • Partition sync workers by Attio object type (people, companies, deals) to isolate rate limits
  • Use webhook-driven invalidation rather than polling — Attio delivers events within seconds
  • Batch record queries with /records/query pagination (500 per page) for full sync
  • Schema cache (30 min TTL) prevents redundant attribute lookups on every record access
  • Rate-limit outbound writes with p-queue to stay within Attio's per-workspace concurrency limits

Error Handling

| Component | Failure Mode | Recovery | |-----------|-------------|----------| | Contact upsert | Attio 429 rate limit | p-queue backoff with jitter, per-object circuit breaker | | Webhook handler | Duplicate event delivery | Idempotency key on record_id + event_type + timestamp | | Bi-directional sync | Both sides updated same record | Last-write-wins with conflict resolution queue | | Schema cache | Stale attribute definitions | Webhook-driven invalidation, fallback to fresh fetch | | External CRM sync | HubSpot API timeout | Queue retry with dead-letter, manual reconciliation flag |

Resources

Next Steps

See attio-deploy-integration.