Dataset & Components

A columnar container that stores one Arrow RecordBatch per registered component, plus typed Rust singletons.

Overview

A Dataset is the unit of data inside PCS. It owns:

A Pipeline wraps a Dataset together with a set of Systems, but you can use a Dataset on its own for ad-hoc data manipulation.

The Component trait

A Component is any Rust type that names itself and provides an Arrow Schema. Serialization to and from RecordBatch goes through serde_arrow, so deriving Serialize + Deserialize is enough for most types.

use std::sync::Arc;
use arrow_schema::{DataType, Field, Schema};
use pcs_core::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Clone)]
struct Price {
    symbol: String,
    value: f64,
}

impl Component for Price {
    fn name() -> &'static str { "Price" }

    fn schema() -> Arc<Schema> {
        Arc::new(Schema::new(vec![
            Field::new("symbol", DataType::Utf8,    false),
            Field::new("value",  DataType::Float64, false),
        ]))
    }
}

Optionally, declare typed FieldRef constants on your component. Systems use them to declare access in SystemMeta, and ComponentView uses them to fetch typed columns — both with no runtime cost beyond a string compare.

impl Price {
    pub const SYMBOL: FieldRef<Price> = FieldRef::new("symbol");
    pub const VALUE:  FieldRef<Price> = FieldRef::new("value");
}

Register and append

Components must be registered before any rows are appended. Registration validates the schema and reserves a slot for the component’s RecordBatch.

let mut data = Dataset::new();
data.register_component::<Price>()?;

let rows = vec![
    Price { symbol: "AAPL".into(), value: 150.0 },
    Price { symbol: "MSFT".into(), value: 300.0 },
    Price { symbol: "GOOG".into(), value: 2_800.0 },
];

let range: std::ops::Range<Row> = data.append::<Price>(&rows)?;
assert_eq!(data.rows(), 3);

Or use the builder for a fluent setup with components, resources, and systems all wired in one chain. The same DatasetBuilder is also used by PipelineBuilder under the hood.

let data = Dataset::builder()
    .with::<Price>()
    .with::<Volume>()
    .with_resource(MarketCalendar::default())
    .build();

Reading columns

Three patterns, increasing in ergonomics:

// 1. Raw Arrow access — useful for batch-level operations
let batch: &RecordBatch  = data.columns::<Price>().expect("registered");
let value: &ArrayRef     = data.column::<Price>("value").expect("field");

// 2. Typed view — borrowing the batch and exposing typed accessors
let prices = data.view::<Price>()?;
let value = prices.f64(Price::VALUE)?;
let symbol = prices.str(Price::SYMBOL)?;
for i in 0..prices.len() {
    println!("{} = {}", symbol.value(i), value.value(i));
}

// 3. Decode back into Rust structs (allocates) — for small batches
let restored: Vec<Price> = Price::from_record_batch(batch)?;

ComponentView exposes typed accessors named after the Arrow data type: f64(...), i64(...), u64(...), bool(...), str(...), and so on. They return concrete Arrow array types like &Float64Array, so you index directly without downcasting.

Soft delete & compaction

Marking a row dead is O(1) — it flips a bit in the alive bitmap. The row stays in the underlying Arrow buffers until you call compact, which filters every component’s RecordBatch in one pass.

data.mark_dead(Row::new(0));
assert!(!data.is_alive(Row::new(0)));
assert_eq!(data.live_rows(), data.rows() - 1);

if data.should_compact() {
    data.compact()?;  // invalidates existing Row indices
}

Important: compaction renumbers rows. Don’t hold a Row across a compaction unless you’ve recomputed it.

Resources

Resources are arbitrary Rust singletons stored in the dataset, keyed by TypeId. They’re not columnar — use them for things like configuration values, FX-rate tables, lookup caches, or summary outputs.

struct FxRates { eur: f64, gbp: f64 }

data.insert_resource(FxRates { eur: 1.08, gbp: 1.27 });

let rates = data.get_resource::<FxRates>().expect("inserted");
assert_eq!(rates.eur, 1.08);

// Mutable access requires &mut Dataset
data.get_resource_mut::<FxRates>().unwrap().gbp = 1.30;

Systems can declare resource access too:

SystemMeta::new("convert")
    .read_resource::<FxRates>()    // shared resource read
    .write_resource::<Report>();   // exclusive resource write

Resource conflicts in the DAG are at TypeId granularity (whole resource, not field-level). Two systems writing the same resource type are forced into different stages.

Arrow IPC round-trip

A Dataset serializes to Arrow IPC — the same format the distributed runner uses for checkpoints. Encode is a contiguous buffer write; decode is zero-copy from a borrowed byte slice.

// Encode
let mut buf = Vec::new();
data.write_ipc(&mut buf)?;

// Decode
let restored = Dataset::read_ipc(&buf)?;
assert_eq!(restored.rows(), data.rows());

Resources are not part of the IPC payload — only the columnar component data is. Re-attach resources after decoding.

The Row type

A Row is a transparent u32 wrapper used as a stable index into the dataset’s columnar storage. append returns a Range<Row> covering the rows you just inserted.

let range = data.append::<Price>(&rows)?;
for row in range.clone() {
    if /* some condition */ true {
        data.mark_dead(row);
    }
}

Next steps

Core

Systems

Declare field-level access in SystemMeta and let PCS schedule disjoint writes into the same parallel stage.

Core

Pipeline

Wrap a Dataset and a list of Systems; PCS builds the DAG, sorts into stages, and runs with retry.