Pipeline

Pairs a Dataset with a list of Systems, builds a field-level DAG, sorts into parallel stages, and runs with per-system retry.

Anatomy

A Pipeline is a self-contained workload:

The pipeline is the unit of single-workload execution. To run several independent workloads from one process, see Scheduler.

Building a pipeline

use pcs_core::prelude::*;

let pipeline = Pipeline::builder("etl")
    .with::<Transaction>()                  // register Component
    .with_resource(FxRates::default())      // register Resource
    .with_system(IngestSystem)              // stage 0: writes_component("Transaction")
    .with_system(ValidateSystem)            // stage 1: writes "valid"
    .with_system(EnrichSystem)              // stage 1: writes "usd_amount" (parallel-safe)
    .with_system(ReportSystem)              // stage 2: reads "valid" + "usd_amount"
    .build();

The inline form is also fine when you need to mutate the pipeline after construction:

let mut pipeline = Pipeline::new("etl");
pipeline.register_component::<Transaction>()?;
pipeline.append::<Transaction>(&seed_data)?;
pipeline.add_system(ValidateSystem);
pipeline.add_system(EnrichSystem);
pipeline.add_parallel_system(ScoreSystem);   // slice-parallel

Running

run() validates declared fields against the dataset’s SchemaRegistry, builds the conflict graph, topologically sorts into stages, and runs each stage with per-system retry. Systems within a stage run concurrently via tokio::join when the runtime is enabled.

pipeline.run().await?;

let stats: RunStats = pipeline.last_stats();
println!(
    "{} systems in {}ms ({} retries this batch)",
    stats.systems_run, stats.duration_millis, stats.retries_this_batch,
);

Inspect the computed stage layout with pipeline.stages():

Stage 0: ["ingest"]
Stage 1: ["validate", "enrich"]
Stage 2: ["report"]

run_on — the distributed escape hatch

Pipeline::run_on(&self, &mut Dataset) executes the system DAG against an external dataset without touching the pipeline’s own data, sources, or sinks. The Distributed Runner uses this to apply a pipeline template to per-batch datasets reconstructed from a shared store.

// Template pipeline carries only the systems and component schemas.
let template = Pipeline::builder("processor")
    .with::<Transaction>()
    .with_system(ValidateSystem)
    .with_system(EnrichSystem)
    .build();

// Apply the same DAG to many independent datasets.
for mut batch in incoming_batches {
    template.run_on(&mut batch).await?;
}

Sources & sinks (feature = "io")

With the io feature, you can attach per-component Sources and Sinks directly to a pipeline. run_with_io() drains all sources into the dataset, runs the systems, then drains the dataset to all sinks.

use pcs_core::io::{ParquetSource, ParquetSink};

let mut pipeline = Pipeline::builder("etl")
    .with::<Transaction>()
    .with_source("Transaction", ParquetSource::open("input.parquet")?)
    .with_sink("Transaction",   ParquetSink::create("output.parquet", Transaction::schema())?)
    .with_system(ValidateSystem)
    .with_system(EnrichSystem)
    .build();

pipeline.run_with_io().await?;

See Sources & Sinks for built-in implementations and the underlying traits.

Conflict rules in detail

For systems registered in order, A then B, an edge B → A is added when:

CaseA declaresB declaresResult
Read-after-writewrites Freads FB follows A
Write-after-readreads Fwrites FB follows A
Write-writewrites Fwrites FB follows A
Disjoint writeswrites Fwrites Gsame stage
Resource conflictwrites Rreads RB follows A (TypeId-level)

read_component(C) and write_component(C) expand to all current fields of C at validation time, so they are coarser than per-field declarations.

Retry

Each system carries its own SystemConfig. The pipeline runs the system, catches errors, and retries according to the system’s RetryMode:

Failures that exhaust the retry budget surface as PcsError::RetryExhausted.

Next steps

Core

Scheduler

Run multiple independent pipelines from one process with sequential or concurrent ticks.

I/O

Sources & Sinks

Get Arrow data into and out of pipelines via Parquet, CSV, JSON, or in-memory channels.

Distributed

Distributed Runner

Run a pipeline template across nodes with at-least-once semantics and Arrow IPC checkpoints.