Systems
Typed processing units that declare exactly which Arrow fields they read and write.
The System trait
A System is the unit of processing logic. It carries metadata, optional config, and an async run method that operates on a mutable Dataset:
#[async_trait]
pub trait System: Send + Sync {
fn meta(&self) -> SystemMeta {
SystemMeta::new(std::any::type_name::<Self>())
}
fn config(&self) -> SystemConfig { SystemConfig::default() }
async fn run(&self, data: &mut Dataset) -> PcsResult<()>;
/// Optional sync fast-path; default returns None.
fn run_sync(&self, _data: &mut Dataset) -> Option<PcsResult<()>> { None }
}
SystemMeta — field-level declarations
SystemMeta declares what a system reads and writes at (component, field) granularity. The pipeline uses these declarations to build a DAG and place independent systems into the same parallel stage.
SystemMeta::new("enrich")
.reads(Transaction::AMOUNT) // typed FieldRef variant
.reads(Transaction::CURRENCY)
.writes(Transaction::USD_AMOUNT)
.read_resource::<FxRates>()
.write_resource::<Report>();
The string-based variants are useful when the field name is dynamic or when you don’t want to commit to FieldRef constants:
SystemMeta::new("rebalance")
.read("Order", "amount")
.write("Order", "total")
.read_component("Inventory") // expands to all fields
.write_component("Audit");
Conflict rules & stage placement
For two systems A (registered first) and B, PCS adds an edge B → A whenever:
- A writes field F, B reads F — read-after-write.
- A reads F, B writes F — write-after-read.
- A writes F, B writes F — write-write.
- Resource conflicts: same as above but at
TypeIdgranularity (whole resource).
Disjoint writes pay nothing. Two systems that write different fields of the same component end up in the same stage with no edge between them.
Stage 0: IngestSystem writes_component("Transaction")
Stage 1: ValidateSystem reads "amount", writes "valid"
EnrichSystem reads "amount" + "currency", writes "usd_amount"
(disjoint writes → same stage, runs in parallel)
Stage 2: ReportSystem reads "valid" and "usd_amount"
Implementing a System
use std::sync::Arc;
use arrow_array::BooleanArray;
use async_trait::async_trait;
use pcs_core::prelude::*;
struct ValidateSystem;
#[async_trait]
impl System for ValidateSystem {
fn meta(&self) -> SystemMeta {
SystemMeta::new("validate")
.reads(Transaction::AMOUNT)
.writes(Transaction::VALID)
}
async fn run(&self, data: &mut Dataset) -> PcsResult<()> {
let batch = data
.columns::<Transaction>()
.ok_or_else(|| PcsError::generic("Transaction not registered"))?
.clone();
let flags: Vec<bool> = {
let txns = data.view::<Transaction>()?;
let amount = txns.f64(Transaction::AMOUNT)?;
(0..txns.len()).map(|i| amount.value(i) > 0.0).collect()
};
// Replace the `valid` column with the freshly computed array.
let new_valid = Arc::new(BooleanArray::from(flags));
let schema = batch.schema();
let valid_idx = schema.index_of("valid").unwrap();
let cols: Vec<_> = (0..schema.fields().len())
.map(|i| if i == valid_idx { new_valid.clone() as _ } else { batch.column(i).clone() })
.collect();
let new_batch = arrow_array::RecordBatch::try_new(schema, cols)
.map_err(|e| PcsError::generic(format!("rebuild: {e}")))?;
data.replace_batch::<Transaction>(new_batch)?;
Ok(())
}
}
The system_fn closure helper
For one-off systems — data ingestion, reporting, ad-hoc transforms — system_fn wraps a closure into a System with explicit metadata:
let load = system_fn(
SystemMeta::new("load").write_component("Transaction"),
|data: &mut Dataset| {
let rows = vec![
Transaction { id: 1, amount: 100.0, ..Default::default() },
Transaction { id: 2, amount: 250.0, ..Default::default() },
];
data.append::<Transaction>(&rows)?;
Ok(())
},
);
pipeline.add_system(load);
Retry configuration
Each system can return its own SystemConfig from config(). The pipeline wraps every run call in the system’s configured retry strategy.
impl System for FlakyApiCall {
fn config(&self) -> SystemConfig {
SystemConfig::new(RetryMode::ExponentialBackoff {
max_retries: 5,
base_delay_ms: 100,
multiplier: 2.0,
max_delay_ms: 30_000,
jitter: 0.2,
})
}
// ... meta() and run()
}
Other modes: RetryMode::None (one-shot) and RetryMode::Fixed { max_retries, delay_ms }. The default for systems that don’t override config() is exponential backoff with 3 retries.
ParallelSystem — slice-level parallelism
For systems that scan a dataset and produce per-row outputs, ParallelSystem exposes a read-only run that returns a WriteSet. The pipeline can split the dataset into slices, run multiple parallel systems concurrently across them, and merge their WriteSets back into the dataset.
#[async_trait]
impl ParallelSystem for ScoreSystem {
fn meta(&self) -> SystemMeta {
SystemMeta::new("score")
.reads(Transaction::AMOUNT)
.writes(Transaction::SCORE)
}
async fn run(&self, data: &Dataset) -> PcsResult<WriteSet> {
let txns = data.view::<Transaction>()?;
let amount = txns.f64(Transaction::AMOUNT)?;
let scores: Vec<f64> = (0..txns.len())
.map(|i| amount.value(i).log10())
.collect();
let mut ws = WriteSet::new();
ws.set_column::<Transaction>("score", Arc::new(Float64Array::from(scores)));
Ok(ws)
}
}
pipeline.add_parallel_system(ScoreSystem);
See cargo run --example scheduler_etl_parallel for a working slice-parallel pipeline.