Systems

Typed processing units that declare exactly which Arrow fields they read and write.

The System trait

A System is the unit of processing logic. It carries metadata, optional config, and an async run method that operates on a mutable Dataset:

#[async_trait]
pub trait System: Send + Sync {
    fn meta(&self) -> SystemMeta {
        SystemMeta::new(std::any::type_name::<Self>())
    }

    fn config(&self) -> SystemConfig { SystemConfig::default() }

    async fn run(&self, data: &mut Dataset) -> PcsResult<()>;

    /// Optional sync fast-path; default returns None.
    fn run_sync(&self, _data: &mut Dataset) -> Option<PcsResult<()>> { None }
}

SystemMeta — field-level declarations

SystemMeta declares what a system reads and writes at (component, field) granularity. The pipeline uses these declarations to build a DAG and place independent systems into the same parallel stage.

SystemMeta::new("enrich")
    .reads(Transaction::AMOUNT)         // typed FieldRef variant
    .reads(Transaction::CURRENCY)
    .writes(Transaction::USD_AMOUNT)
    .read_resource::<FxRates>()
    .write_resource::<Report>();

The string-based variants are useful when the field name is dynamic or when you don’t want to commit to FieldRef constants:

SystemMeta::new("rebalance")
    .read("Order", "amount")
    .write("Order", "total")
    .read_component("Inventory")        // expands to all fields
    .write_component("Audit");

Conflict rules & stage placement

For two systems A (registered first) and B, PCS adds an edge B → A whenever:

  1. A writes field F, B reads Fread-after-write.
  2. A reads F, B writes Fwrite-after-read.
  3. A writes F, B writes Fwrite-write.
  4. Resource conflicts: same as above but at TypeId granularity (whole resource).

Disjoint writes pay nothing. Two systems that write different fields of the same component end up in the same stage with no edge between them.

Stage 0:  IngestSystem      writes_component("Transaction")
Stage 1:  ValidateSystem    reads "amount", writes "valid"
          EnrichSystem      reads "amount" + "currency", writes "usd_amount"
                            (disjoint writes → same stage, runs in parallel)
Stage 2:  ReportSystem      reads "valid" and "usd_amount"

Implementing a System

use std::sync::Arc;
use arrow_array::BooleanArray;
use async_trait::async_trait;
use pcs_core::prelude::*;

struct ValidateSystem;

#[async_trait]
impl System for ValidateSystem {
    fn meta(&self) -> SystemMeta {
        SystemMeta::new("validate")
            .reads(Transaction::AMOUNT)
            .writes(Transaction::VALID)
    }

    async fn run(&self, data: &mut Dataset) -> PcsResult<()> {
        let batch = data
            .columns::<Transaction>()
            .ok_or_else(|| PcsError::generic("Transaction not registered"))?
            .clone();

        let flags: Vec<bool> = {
            let txns = data.view::<Transaction>()?;
            let amount = txns.f64(Transaction::AMOUNT)?;
            (0..txns.len()).map(|i| amount.value(i) > 0.0).collect()
        };

        // Replace the `valid` column with the freshly computed array.
        let new_valid = Arc::new(BooleanArray::from(flags));
        let schema = batch.schema();
        let valid_idx = schema.index_of("valid").unwrap();
        let cols: Vec<_> = (0..schema.fields().len())
            .map(|i| if i == valid_idx { new_valid.clone() as _ } else { batch.column(i).clone() })
            .collect();

        let new_batch = arrow_array::RecordBatch::try_new(schema, cols)
            .map_err(|e| PcsError::generic(format!("rebuild: {e}")))?;

        data.replace_batch::<Transaction>(new_batch)?;
        Ok(())
    }
}

The system_fn closure helper

For one-off systems — data ingestion, reporting, ad-hoc transforms — system_fn wraps a closure into a System with explicit metadata:

let load = system_fn(
    SystemMeta::new("load").write_component("Transaction"),
    |data: &mut Dataset| {
        let rows = vec![
            Transaction { id: 1, amount: 100.0, ..Default::default() },
            Transaction { id: 2, amount: 250.0, ..Default::default() },
        ];
        data.append::<Transaction>(&rows)?;
        Ok(())
    },
);

pipeline.add_system(load);

Retry configuration

Each system can return its own SystemConfig from config(). The pipeline wraps every run call in the system’s configured retry strategy.

impl System for FlakyApiCall {
    fn config(&self) -> SystemConfig {
        SystemConfig::new(RetryMode::ExponentialBackoff {
            max_retries:   5,
            base_delay_ms: 100,
            multiplier:    2.0,
            max_delay_ms:  30_000,
            jitter:        0.2,
        })
    }
    // ... meta() and run()
}

Other modes: RetryMode::None (one-shot) and RetryMode::Fixed { max_retries, delay_ms }. The default for systems that don’t override config() is exponential backoff with 3 retries.

ParallelSystem — slice-level parallelism

For systems that scan a dataset and produce per-row outputs, ParallelSystem exposes a read-only run that returns a WriteSet. The pipeline can split the dataset into slices, run multiple parallel systems concurrently across them, and merge their WriteSets back into the dataset.

#[async_trait]
impl ParallelSystem for ScoreSystem {
    fn meta(&self) -> SystemMeta {
        SystemMeta::new("score")
            .reads(Transaction::AMOUNT)
            .writes(Transaction::SCORE)
    }

    async fn run(&self, data: &Dataset) -> PcsResult<WriteSet> {
        let txns = data.view::<Transaction>()?;
        let amount = txns.f64(Transaction::AMOUNT)?;
        let scores: Vec<f64> = (0..txns.len())
            .map(|i| amount.value(i).log10())
            .collect();

        let mut ws = WriteSet::new();
        ws.set_column::<Transaction>("score", Arc::new(Float64Array::from(scores)));
        Ok(ws)
    }
}

pipeline.add_parallel_system(ScoreSystem);

See cargo run --example scheduler_etl_parallel for a working slice-parallel pipeline.

Next steps

Core

Pipeline

Bring systems and a dataset together; let PCS build the DAG, sort into stages, and run with retry.

Core

Scheduler

Orchestrate multiple independent pipelines from one process — sequentially or concurrently.