Agent Skills: Node.js Streams Skill

Master Node.js streams for memory-efficient processing of large datasets, real-time data handling, and building data pipelines

UncategorizedID: pluginagentmarketplace/custom-plugin-nodejs/streams

Skill Files

Browse the full folder contents for streams.

Download Skill

Loading file tree…

skills/streams/SKILL.md

Skill Metadata

Name
streams
Description
Master Node.js streams for memory-efficient processing of large datasets, real-time data handling, and building data pipelines

Node.js Streams Skill

Master streams for memory-efficient processing of large files, real-time data, and building composable data pipelines.

Quick Start

Streams in 4 types:

  1. Readable - Source of data (file, HTTP request)
  2. Writable - Destination (file, HTTP response)
  3. Transform - Modify data in transit
  4. Duplex - Both readable and writable

Core Concepts

Readable Stream

const fs = require('fs');

// Create readable stream
const readStream = fs.createReadStream('large-file.txt', {
  encoding: 'utf8',
  highWaterMark: 64 * 1024 // 64KB chunks
});

// Event-based consumption
readStream.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes`);
});

readStream.on('end', () => {
  console.log('Finished reading');
});

readStream.on('error', (err) => {
  console.error('Read error:', err);
});

Writable Stream

const writeStream = fs.createWriteStream('output.txt');

// Write data
writeStream.write('Hello, ');
writeStream.write('World!\n');
writeStream.end(); // Signal end

// Handle backpressure
const ok = writeStream.write(data);
if (!ok) {
  // Wait for drain event before writing more
  writeStream.once('drain', () => {
    continueWriting();
  });
}

Transform Stream

const { Transform } = require('stream');

// Custom transform: uppercase text
const upperCase = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

// Usage
fs.createReadStream('input.txt')
  .pipe(upperCase)
  .pipe(fs.createWriteStream('output.txt'));

Learning Path

Beginner (1-2 weeks)

  • ✅ Understand stream types
  • ✅ Read/write file streams
  • ✅ Basic pipe operations
  • ✅ Handle stream events

Intermediate (3-4 weeks)

  • ✅ Transform streams
  • ✅ Backpressure handling
  • ✅ Object mode streams
  • ✅ Pipeline utility

Advanced (5-6 weeks)

  • ✅ Custom stream implementation
  • ✅ Async iterators
  • ✅ Web Streams API
  • ✅ Performance optimization

Pipeline (Recommended)

const { pipeline } = require('stream/promises');
const zlib = require('zlib');

// Compose streams with error handling
async function compressFile(input, output) {
  await pipeline(
    fs.createReadStream(input),
    zlib.createGzip(),
    fs.createWriteStream(output)
  );
  console.log('Compression complete');
}

// With transform
await pipeline(
  fs.createReadStream('data.csv'),
  csvParser(),
  transformRow(),
  jsonStringify(),
  fs.createWriteStream('data.json')
);

Pipeline with Error Handling

const { pipeline } = require('stream');

pipeline(
  source,
  transform1,
  transform2,
  destination,
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

HTTP Streaming

const http = require('http');
const fs = require('fs');

// Stream file as HTTP response
http.createServer((req, res) => {
  const filePath = './video.mp4';
  const stat = fs.statSync(filePath);

  res.writeHead(200, {
    'Content-Type': 'video/mp4',
    'Content-Length': stat.size
  });

  // Stream instead of loading entire file
  fs.createReadStream(filePath).pipe(res);
}).listen(3000);

// Stream HTTP request body
http.createServer((req, res) => {
  const writeStream = fs.createWriteStream('./upload.bin');
  req.pipe(writeStream);

  req.on('end', () => {
    res.end('Upload complete');
  });
}).listen(3001);

Object Mode Streams

const { Transform } = require('stream');

const jsonParser = new Transform({
  objectMode: true,
  transform(chunk, encoding, callback) {
    try {
      const obj = JSON.parse(chunk);
      this.push(obj);
      callback();
    } catch (err) {
      callback(err);
    }
  }
});

// Process objects instead of buffers
const processRecords = new Transform({
  objectMode: true,
  transform(record, encoding, callback) {
    record.processed = true;
    record.timestamp = Date.now();
    this.push(record);
    callback();
  }
});

Async Iterators

const { Readable } = require('stream');

// Create from async iterator
async function* generateData() {
  for (let i = 0; i < 100; i++) {
    yield { id: i, data: `item-${i}` };
  }
}

const stream = Readable.from(generateData(), { objectMode: true });

// Consume with for-await
async function processStream(readable) {
  for await (const chunk of readable) {
    console.log('Processing:', chunk);
  }
}

Backpressure Handling

const readable = fs.createReadStream('huge-file.txt');
const writable = fs.createWriteStream('output.txt');

readable.on('data', (chunk) => {
  // Check if writable can accept more data
  const canContinue = writable.write(chunk);

  if (!canContinue) {
    // Pause reading until writable is ready
    readable.pause();
    writable.once('drain', () => {
      readable.resume();
    });
  }
});

// Or use pipeline (handles automatically)
pipeline(readable, writable, (err) => {
  if (err) console.error('Error:', err);
});

Custom Readable Stream

const { Readable } = require('stream');

class DatabaseStream extends Readable {
  constructor(query, options) {
    super({ ...options, objectMode: true });
    this.query = query;
    this.cursor = null;
  }

  async _read() {
    if (!this.cursor) {
      this.cursor = await db.collection('items').find(this.query).cursor();
    }

    const doc = await this.cursor.next();
    if (doc) {
      this.push(doc);
    } else {
      this.push(null); // Signal end
    }
  }
}

// Usage
const dbStream = new DatabaseStream({ status: 'active' });
for await (const item of dbStream) {
  console.log(item);
}

Unit Test Template

const { Readable, Transform } = require('stream');
const { pipeline } = require('stream/promises');

describe('Stream Processing', () => {
  it('should transform data correctly', async () => {
    const input = Readable.from(['hello', 'world']);
    const chunks = [];

    const upperCase = new Transform({
      transform(chunk, enc, cb) {
        this.push(chunk.toString().toUpperCase());
        cb();
      }
    });

    await pipeline(
      input,
      upperCase,
      async function* (source) {
        for await (const chunk of source) {
          chunks.push(chunk.toString());
        }
      }
    );

    expect(chunks).toEqual(['HELLO', 'WORLD']);
  });
});

Troubleshooting

| Problem | Cause | Solution | |---------|-------|----------| | Memory grows infinitely | No backpressure | Use pipeline or handle drain | | Data loss | Errors not caught | Use pipeline with error callback | | Slow processing | Small chunk size | Increase highWaterMark | | Stream hangs | Missing end() call | Call writable.end() |

When to Use

Use streams when:

  • Processing large files (GB+)
  • Real-time data processing
  • Memory-constrained environments
  • Building data pipelines
  • HTTP request/response handling

Related Skills

  • Async Programming (async patterns)
  • Performance Optimization (memory efficiency)
  • Express REST API (streaming responses)

Resources