PCS
Distributed Batch Processing for Rust, on Apache Arrow.
Typed columnar pipelines with field-granular DAG scheduling, zero-copy IPC checkpoints,
and Raft-backed distribution — assembled from plain Rust System impls.
What is PCS?
PCS is a distributed batch processing engine for Rust, built on Apache Arrow.
You write typed System impls that declare which Arrow fields they
read and write; PCS derives the execution order, runs independent stages in
parallel, retries with exponential backoff, and distributes work across nodes
with at-least-once semantics backed by Raft consensus.
The columnar layout pays off where it matters most: IPC serialization runs 4–19× faster than row-oriented equivalents, wide-schema projection runs 2–3× faster from cache behavior alone, and distributed checkpoint recovery inherits those gains directly.
- A columnar transform engine for 100k–100M row batches
- A place for imperative Rust that SQL cannot express cleanly
- Optimized for wide schemas (tens to hundreds of columns)
- Built for workloads where distributed recovery time is a first-class constraint
- A SQL engine — no query planner; DataFusion will beat it on SQL
- A sub-millisecond stream processor — batch-first by design
- A game ECS — despite the columnar vocabulary
- A replacement for row-oriented workflows under ~10k rows
The mental model
Five concepts compose every PCS workload:
Dataset
An Arrow-backed columnar container. One RecordBatch per registered Component; all batches share the same row count. Append, soft-delete, compact, IPC round-trip.
System
Processing logic. SystemMeta declares which (component, field) pairs the system reads and writes. PCS uses these declarations to schedule.
Pipeline
One Dataset + one set of Systems. Builds a field-level DAG, sorts into stages, runs with per-system retry. The unit of single-workload execution.
Scheduler
Multi-pipeline orchestrator. Drives several independent Pipelines — sequentially or concurrently — with optional dependency edges and backpressure.
Distributed Runner
Claims row-range partitions from a shared store, executes a Pipeline template against each batch, checkpoints via Arrow IPC, replicates state via Raft.
Features
Zero-copy Arrow IPC
Checkpoint writes and recovery use Arrow IPC — a contiguous buffer copy, not per-field dispatch. Benchmarked at 4.4× faster encode and 19× faster decode than postcard row-oriented equivalents at 1M rows.
Wide-schema cache efficiency
Columnar layout reads only the columns a system touches. A pipeline reading 3 of 50 columns loads 24 MB instead of 400 MB per 1M rows — a 2.7× throughput advantage from cache behavior alone.
Field-granular DAG scheduling
Systems declare reads and writes per Arrow field. PCS builds a dependency graph, places systems with disjoint writes into the same parallel stage, and topologically sorts execution automatically.
Composable System trait
Each transform is an independent, testable Rust struct — or a closure via system_fn. Pipelines are assembled from systems, not configured via a query DSL.
Distributed with Raft
The distributed and distributed-raft features add a RedbSharedStore that coordinates partition assignment and checkpointing across nodes using an embedded openraft state machine.
Cross-stage zero-copy handoff
Passing a RecordBatch between pipeline stages is an Arc clone — one atomic increment, no buffer copy.
Configurable retry
Per-system RetryMode::ExponentialBackoff with max retries, base delay, multiplier, cap, and jitter — all set at pipeline construction time.
Integrated tracing
Span-level instrumentation throughout the pipeline execution path under the tracing feature. Drops into any existing tracing subscriber.
Production-ready service
The service feature ships a pcs-service binary with TOML config, factory registry, axum HTTP control plane, and built-in standalone & Raft cluster runners.
Getting Started
Add PCS to your Cargo.toml:
[dependencies]
pcs-core = { version = "0.1.0", features = ["io"] }
arrow-array = "58.1"
arrow-schema = "58.1"
async-trait = "0.1"
serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["full"] }
A complete example
Two systems over a Transaction component: ValidateSystem writes
only the valid field; EnrichSystem writes only usd_amount.
Their writes are disjoint, so PCS places them in the same parallel stage automatically.
use std::sync::Arc;
use arrow_array::{BooleanArray, Float64Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use pcs_core::prelude::*;
#[derive(Serialize, Deserialize, Clone)]
struct Transaction {
id: u64,
amount: f64,
currency: String,
valid: bool,
usd_amount: f64,
}
impl Component for Transaction {
fn name() -> &'static str { "Transaction" }
fn schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("id", DataType::UInt64, false),
Field::new("amount", DataType::Float64, false),
Field::new("currency", DataType::Utf8, false),
Field::new("valid", DataType::Boolean, false),
Field::new("usd_amount", DataType::Float64, false),
]))
}
}
impl Transaction {
const AMOUNT: FieldRef<Self> = FieldRef::new("amount");
const CURRENCY: FieldRef<Self> = FieldRef::new("currency");
const VALID: FieldRef<Self> = FieldRef::new("valid");
const USD_AMOUNT: FieldRef<Self> = FieldRef::new("usd_amount");
}
struct ValidateSystem;
#[async_trait]
impl System for ValidateSystem {
fn meta(&self) -> SystemMeta {
SystemMeta::new("validate")
.reads(Transaction::AMOUNT)
.writes(Transaction::VALID)
}
async fn run(&self, data: &mut Dataset) -> PcsResult<()> {
let txns = data.view::<Transaction>()?;
let amount = txns.f64(Transaction::AMOUNT)?;
let flags: Vec<bool> = (0..txns.len())
.map(|i| amount.value(i) > 0.0)
.collect();
// ... rebuild the RecordBatch with the new `valid` column
Ok(())
}
}
#[tokio::main]
async fn main() -> PcsResult<()> {
let mut pipeline = Pipeline::builder("etl")
.with::<Transaction>()
.with_system(ValidateSystem)
// .with_system(EnrichSystem)
// .with_system(ReportSystem)
.build();
pipeline.append::<Transaction>(&seed_data())?;
pipeline.run().await?;
println!("processed {} rows", pipeline.data().rows());
Ok(())
}
fn seed_data() -> Vec<Transaction> { /* ... */ vec![] }
Run an example
cargo run --example scheduler_etl # field-granular ETL
cargo run --example scheduler_etl_parallel # parallel-system slices
cargo run --example scheduler_parquet_etl # Parquet source/sink
cargo run --example distributed_scheduler --features distributed
cargo run --example datafusion_interop --features datafusion
Next steps
Dive deeper into each part of PCS: