Agent Skills: I/O Connectors in Apache Beam

>-

dataID: kilo-org/kilo-marketplace/io-connectors

Install this agent skill to your local

pnpm dlx add-skill https://github.com/Kilo-Org/kilo-marketplace/tree/HEAD/skills/io-connectors

Skill Files

Browse the full folder contents for io-connectors.

Download Skill

Loading file tree…

skills/io-connectors/SKILL.md

Skill Metadata

Name
io-connectors
Description
>-

I/O Connectors in Apache Beam

Overview

I/O connectors enable reading from and writing to external data sources. Beam provides 51+ Java I/O connectors and several Python connectors.

Java I/O Connectors Location

sdks/java/io/

Available Connectors

| Category | Connectors | |----------|------------| | Cloud Storage | google-cloud-platform (BigQuery, Bigtable, Spanner, Pub/Sub, GCS), amazon-web-services2, azure, azure-cosmos | | Databases | jdbc, mongodb, cassandra, hbase, redis, neo4j, clickhouse, influxdb, singlestore, elasticsearch | | Messaging | kafka, pulsar, rabbitmq, amqp, jms, mqtt, solace | | File Formats | parquet, csv, json, xml, thrift, iceberg | | Other | snowflake, splunk, cdap, debezium, hadoop-format, kudu, solr, tika |

Testing I/O Connectors

Unit Tests

./gradlew :sdks:java:io:kafka:test
./gradlew :sdks:java:io:jdbc:test

Integration Tests

On Direct Runner

./gradlew :sdks:java:io:google-cloud-platform:integrationTest

With Custom GCP Settings

./gradlew :sdks:java:io:google-cloud-platform:integrationTest \
  -PgcpProject=<project> \
  -PgcpTempRoot=gs://<bucket>/path

With Explicit Pipeline Options

./gradlew :sdks:java:io:jdbc:integrationTest \
  -DbeamTestPipelineOptions='["--runner=TestDirectRunner"]'

Integration Test Framework

Located at it/ directory:

  • it/common/ - Common test utilities
  • it/google-cloud-platform/ - GCP-specific test infrastructure
  • it/jdbc/ - JDBC test infrastructure
  • it/kafka/ - Kafka test infrastructure
  • it/testcontainers/ - Testcontainers support

Writing Integration Tests

Basic Structure

@RunWith(JUnit4.class)
public class MyIOIT {
  @Rule public TestPipeline readPipeline = TestPipeline.create();
  @Rule public TestPipeline writePipeline = TestPipeline.create();

  @Test
  public void testWriteAndRead() {
    // Write data
    writePipeline.apply(Create.of(testData))
                 .apply(MyIO.write().to(destination));
    writePipeline.run().waitUntilFinish();

    // Read and verify
    PCollection<String> results = readPipeline.apply(MyIO.read().from(destination));
    PAssert.that(results).containsInAnyOrder(expectedData);
    readPipeline.run().waitUntilFinish();
  }
}

Using TestPipeline

@Rule public TestPipeline pipeline = TestPipeline.create();

TestPipeline:

  • Blocks on run by default (on TestDataflowRunner)
  • Has 15-minute default timeout
  • Reads options from beamTestPipelineOptions system property

GCP I/O Connectors

BigQuery

// Read
pipeline.apply(BigQueryIO.readTableRows().from("project:dataset.table"));

// Write
data.apply(BigQueryIO.writeTableRows()
    .to("project:dataset.table")
    .withSchema(schema)
    .withWriteDisposition(WriteDisposition.WRITE_APPEND));

Pub/Sub

// Read
pipeline.apply(PubsubIO.readStrings().fromTopic("projects/project/topics/topic"));

// Write
data.apply(PubsubIO.writeStrings().to("projects/project/topics/topic"));

Cloud Storage (TextIO)

// Read
pipeline.apply(TextIO.read().from("gs://bucket/path/*.txt"));

// Write
data.apply(TextIO.write().to("gs://bucket/output").withSuffix(".txt"));

Kafka Connector

// Read
pipeline.apply(KafkaIO.<String, String>read()
    .withBootstrapServers("localhost:9092")
    .withTopic("topic")
    .withKeyDeserializer(StringDeserializer.class)
    .withValueDeserializer(StringDeserializer.class));

// Write
data.apply(KafkaIO.<String, String>write()
    .withBootstrapServers("localhost:9092")
    .withTopic("topic")
    .withKeySerializer(StringSerializer.class)
    .withValueSerializer(StringSerializer.class));

JDBC Connector

// Read
pipeline.apply(JdbcIO.<Row>read()
    .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
        .create("org.postgresql.Driver", "jdbc:postgresql://host/db"))
    .withQuery("SELECT * FROM table"));

// Write
data.apply(JdbcIO.<Row>write()
    .withDataSourceConfiguration(config)
    .withStatement("INSERT INTO table VALUES (?, ?)"));

Python I/O Location

sdks/python/apache_beam/io/

Common Python I/Os

  • textio - Text files
  • fileio - General file operations
  • avroio - Avro files
  • parquetio - Parquet files
  • gcp/ - GCP connectors (BigQuery, Pub/Sub, Datastore, etc.)

Cross-language I/O

Beam supports using I/O connectors from one SDK in another via the expansion service.

# Start Java expansion service
./gradlew :sdks:java:io:expansion-service:runExpansionService

Creating New Connectors

Key components:

  1. Source - Reads data (bounded or unbounded)
  2. Sink - Writes data
  3. Read/Write transforms - User-facing API

For more detailed information on developing new I/O connectors see the Developing new I/O connectors SKILL.

I/O Connectors in Apache Beam Skill | Agent Skills