Pipeline
Pairs a Dataset with a list of Systems, builds a field-level DAG, sorts into parallel stages, and runs with per-system retry.
Anatomy
A Pipeline is a self-contained workload:
- A
name: Arc<str>for logging and identification - A
Dataset— the columnar data the systems operate on - A list of
Systems, registered in the order they were added - An (optional) DAG: stages computed from
SystemMetaon first run - (With
iofeature) Per-componentSources andSinks
The pipeline is the unit of single-workload execution. To run several independent workloads from one process, see Scheduler.
Building a pipeline
use pcs_core::prelude::*;
let pipeline = Pipeline::builder("etl")
.with::<Transaction>() // register Component
.with_resource(FxRates::default()) // register Resource
.with_system(IngestSystem) // stage 0: writes_component("Transaction")
.with_system(ValidateSystem) // stage 1: writes "valid"
.with_system(EnrichSystem) // stage 1: writes "usd_amount" (parallel-safe)
.with_system(ReportSystem) // stage 2: reads "valid" + "usd_amount"
.build();
The inline form is also fine when you need to mutate the pipeline after construction:
let mut pipeline = Pipeline::new("etl");
pipeline.register_component::<Transaction>()?;
pipeline.append::<Transaction>(&seed_data)?;
pipeline.add_system(ValidateSystem);
pipeline.add_system(EnrichSystem);
pipeline.add_parallel_system(ScoreSystem); // slice-parallel
Running
run() validates declared fields against the dataset’s SchemaRegistry, builds the conflict graph, topologically sorts into stages, and runs each stage with per-system retry. Systems within a stage run concurrently via tokio::join when the runtime is enabled.
pipeline.run().await?;
let stats: RunStats = pipeline.last_stats();
println!(
"{} systems in {}ms ({} retries this batch)",
stats.systems_run, stats.duration_millis, stats.retries_this_batch,
);
Inspect the computed stage layout with pipeline.stages():
Stage 0: ["ingest"]
Stage 1: ["validate", "enrich"]
Stage 2: ["report"]
run_on — the distributed escape hatch
Pipeline::run_on(&self, &mut Dataset) executes the system DAG against an external dataset without touching the pipeline’s own data, sources, or sinks. The Distributed Runner uses this to apply a pipeline template to per-batch datasets reconstructed from a shared store.
// Template pipeline carries only the systems and component schemas.
let template = Pipeline::builder("processor")
.with::<Transaction>()
.with_system(ValidateSystem)
.with_system(EnrichSystem)
.build();
// Apply the same DAG to many independent datasets.
for mut batch in incoming_batches {
template.run_on(&mut batch).await?;
}
Sources & sinks (feature = "io")
With the io feature, you can attach per-component Sources and Sinks directly to a pipeline. run_with_io() drains all sources into the dataset, runs the systems, then drains the dataset to all sinks.
use pcs_core::io::{ParquetSource, ParquetSink};
let mut pipeline = Pipeline::builder("etl")
.with::<Transaction>()
.with_source("Transaction", ParquetSource::open("input.parquet")?)
.with_sink("Transaction", ParquetSink::create("output.parquet", Transaction::schema())?)
.with_system(ValidateSystem)
.with_system(EnrichSystem)
.build();
pipeline.run_with_io().await?;
See Sources & Sinks for built-in implementations and the underlying traits.
Conflict rules in detail
For systems registered in order, A then B, an edge B → A is added when:
| Case | A declares | B declares | Result |
|---|---|---|---|
| Read-after-write | writes F | reads F | B follows A |
| Write-after-read | reads F | writes F | B follows A |
| Write-write | writes F | writes F | B follows A |
| Disjoint writes | writes F | writes G | same stage |
| Resource conflict | writes R | reads R | B follows A (TypeId-level) |
read_component(C) and write_component(C) expand to all current fields of C at validation time, so they are coarser than per-field declarations.
Retry
Each system carries its own SystemConfig. The pipeline runs the system, catches errors, and retries according to the system’s RetryMode:
RetryMode::None— fail-fast.RetryMode::Fixed { max_retries, delay_ms }— constant delay between attempts.RetryMode::ExponentialBackoff { max_retries, base_delay_ms, multiplier, max_delay_ms, jitter }— default for systems that don’t overrideconfig()(3 retries, 100 ms base, 2× multiplier, 30 s cap, 10% jitter).
Failures that exhaust the retry budget surface as PcsError::RetryExhausted.