PCS

Distributed Batch Processing for Rust, on Apache Arrow.

Typed columnar pipelines with field-granular DAG scheduling, zero-copy IPC checkpoints, and Raft-backed distribution — assembled from plain Rust System impls.

Crates.io version docs.rs CI status License Rust 1.89+

What is PCS?

PCS is a distributed batch processing engine for Rust, built on Apache Arrow. You write typed System impls that declare which Arrow fields they read and write; PCS derives the execution order, runs independent stages in parallel, retries with exponential backoff, and distributes work across nodes with at-least-once semantics backed by Raft consensus.

The columnar layout pays off where it matters most: IPC serialization runs 4–19× faster than row-oriented equivalents, wide-schema projection runs 2–3× faster from cache behavior alone, and distributed checkpoint recovery inherits those gains directly.

PCS is
  • A columnar transform engine for 100k–100M row batches
  • A place for imperative Rust that SQL cannot express cleanly
  • Optimized for wide schemas (tens to hundreds of columns)
  • Built for workloads where distributed recovery time is a first-class constraint
PCS is not
  • A SQL engine — no query planner; DataFusion will beat it on SQL
  • A sub-millisecond stream processor — batch-first by design
  • A game ECS — despite the columnar vocabulary
  • A replacement for row-oriented workflows under ~10k rows

The mental model

Five concepts compose every PCS workload:

01

Dataset

An Arrow-backed columnar container. One RecordBatch per registered Component; all batches share the same row count. Append, soft-delete, compact, IPC round-trip.

02

System

Processing logic. SystemMeta declares which (component, field) pairs the system reads and writes. PCS uses these declarations to schedule.

03

Pipeline

One Dataset + one set of Systems. Builds a field-level DAG, sorts into stages, runs with per-system retry. The unit of single-workload execution.

04

Scheduler

Multi-pipeline orchestrator. Drives several independent Pipelines — sequentially or concurrently — with optional dependency edges and backpressure.

05

Distributed Runner

Claims row-range partitions from a shared store, executes a Pipeline template against each batch, checkpoints via Arrow IPC, replicates state via Raft.

Features

Zero-copy Arrow IPC

Checkpoint writes and recovery use Arrow IPC — a contiguous buffer copy, not per-field dispatch. Benchmarked at 4.4× faster encode and 19× faster decode than postcard row-oriented equivalents at 1M rows.

Wide-schema cache efficiency

Columnar layout reads only the columns a system touches. A pipeline reading 3 of 50 columns loads 24 MB instead of 400 MB per 1M rows — a 2.7× throughput advantage from cache behavior alone.

Field-granular DAG scheduling

Systems declare reads and writes per Arrow field. PCS builds a dependency graph, places systems with disjoint writes into the same parallel stage, and topologically sorts execution automatically.

Composable System trait

Each transform is an independent, testable Rust struct — or a closure via system_fn. Pipelines are assembled from systems, not configured via a query DSL.

Distributed with Raft

The distributed and distributed-raft features add a RedbSharedStore that coordinates partition assignment and checkpointing across nodes using an embedded openraft state machine.

Cross-stage zero-copy handoff

Passing a RecordBatch between pipeline stages is an Arc clone — one atomic increment, no buffer copy.

Configurable retry

Per-system RetryMode::ExponentialBackoff with max retries, base delay, multiplier, cap, and jitter — all set at pipeline construction time.

Integrated tracing

Span-level instrumentation throughout the pipeline execution path under the tracing feature. Drops into any existing tracing subscriber.

Production-ready service

The service feature ships a pcs-service binary with TOML config, factory registry, axum HTTP control plane, and built-in standalone & Raft cluster runners.

Getting Started

Add PCS to your Cargo.toml:

[dependencies]
pcs-core = { version = "0.1.0", features = ["io"] }
arrow-array = "58.1"
arrow-schema = "58.1"
async-trait = "0.1"
serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["full"] }

A complete example

Two systems over a Transaction component: ValidateSystem writes only the valid field; EnrichSystem writes only usd_amount. Their writes are disjoint, so PCS places them in the same parallel stage automatically.

use std::sync::Arc;
use arrow_array::{BooleanArray, Float64Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};

use pcs_core::prelude::*;

#[derive(Serialize, Deserialize, Clone)]
struct Transaction {
    id: u64,
    amount: f64,
    currency: String,
    valid: bool,
    usd_amount: f64,
}

impl Component for Transaction {
    fn name() -> &'static str { "Transaction" }
    fn schema() -> Arc<Schema> {
        Arc::new(Schema::new(vec![
            Field::new("id",         DataType::UInt64,  false),
            Field::new("amount",     DataType::Float64, false),
            Field::new("currency",   DataType::Utf8,    false),
            Field::new("valid",      DataType::Boolean, false),
            Field::new("usd_amount", DataType::Float64, false),
        ]))
    }
}

impl Transaction {
    const AMOUNT:     FieldRef<Self> = FieldRef::new("amount");
    const CURRENCY:   FieldRef<Self> = FieldRef::new("currency");
    const VALID:      FieldRef<Self> = FieldRef::new("valid");
    const USD_AMOUNT: FieldRef<Self> = FieldRef::new("usd_amount");
}

struct ValidateSystem;

#[async_trait]
impl System for ValidateSystem {
    fn meta(&self) -> SystemMeta {
        SystemMeta::new("validate")
            .reads(Transaction::AMOUNT)
            .writes(Transaction::VALID)
    }

    async fn run(&self, data: &mut Dataset) -> PcsResult<()> {
        let txns = data.view::<Transaction>()?;
        let amount = txns.f64(Transaction::AMOUNT)?;
        let flags: Vec<bool> = (0..txns.len())
            .map(|i| amount.value(i) > 0.0)
            .collect();
        // ... rebuild the RecordBatch with the new `valid` column
        Ok(())
    }
}

#[tokio::main]
async fn main() -> PcsResult<()> {
    let mut pipeline = Pipeline::builder("etl")
        .with::<Transaction>()
        .with_system(ValidateSystem)
        // .with_system(EnrichSystem)
        // .with_system(ReportSystem)
        .build();

    pipeline.append::<Transaction>(&seed_data())?;
    pipeline.run().await?;

    println!("processed {} rows", pipeline.data().rows());
    Ok(())
}

fn seed_data() -> Vec<Transaction> { /* ... */ vec![] }

Run an example

cargo run --example scheduler_etl                 # field-granular ETL
cargo run --example scheduler_etl_parallel        # parallel-system slices
cargo run --example scheduler_parquet_etl         # Parquet source/sink
cargo run --example distributed_scheduler --features distributed
cargo run --example datafusion_interop --features datafusion

Next steps

Dive deeper into each part of PCS:

Core

Dataset & Components

The columnar container, the Component trait, and how Arrow RecordBatches are stored, appended, and soft-deleted.

Core

Systems

Implementing the System trait, declaring field-level access in SystemMeta, sync fast-path, and parallel slice systems.

Core

Pipeline

DAG construction, conflict rules, stage parallelism, per-system retry, and IO integration.

Core

Scheduler

Driving multiple independent Pipelines with sequential or concurrent ticks, dependency edges, and backpressure.

I/O

Sources & Sinks

Built-in Parquet, CSV, JSON Lines, and in-memory channel transports — plus the Source / Sink traits.

Distributed

Distributed Runner

PartitionSource, CheckpointStore, Raft-backed RedbSharedStore, and the at-least-once runner loop.

Distributed

Service

The production pcs-service binary: TOML config, factory registry, HTTP control plane, standalone & cluster runners.

Observability

Tracing

Span-level instrumentation of pipeline execution under the tracing feature flag.