Dataset & Components
A columnar container that stores one Arrow RecordBatch per registered component, plus typed Rust singletons.
Overview
A Dataset is the unit of data inside PCS. It owns:
- One Arrow
RecordBatchper registeredComponent. All batches share the same row count, so anyRowindex is valid across every component. - A
SchemaRegistrythat records the schema and version of every registered component. - A
ResourceMap— aTypeId-keyed map of arbitrary Rust singletons that live alongside the columns. - An alive bitmap, so rows can be soft-deleted with
mark_deadand removed in bulk bycompact.
A Pipeline wraps a Dataset together with a set of Systems, but you can use a Dataset on its own for ad-hoc data manipulation.
The Component trait
A Component is any Rust type that names itself and provides an Arrow Schema. Serialization to and from RecordBatch goes through serde_arrow, so deriving Serialize + Deserialize is enough for most types.
use std::sync::Arc;
use arrow_schema::{DataType, Field, Schema};
use pcs_core::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Clone)]
struct Price {
symbol: String,
value: f64,
}
impl Component for Price {
fn name() -> &'static str { "Price" }
fn schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("symbol", DataType::Utf8, false),
Field::new("value", DataType::Float64, false),
]))
}
}
Optionally, declare typed FieldRef constants on your component. Systems use them to declare access in SystemMeta, and ComponentView uses them to fetch typed columns — both with no runtime cost beyond a string compare.
impl Price {
pub const SYMBOL: FieldRef<Price> = FieldRef::new("symbol");
pub const VALUE: FieldRef<Price> = FieldRef::new("value");
}
Register and append
Components must be registered before any rows are appended. Registration validates the schema and reserves a slot for the component’s RecordBatch.
let mut data = Dataset::new();
data.register_component::<Price>()?;
let rows = vec![
Price { symbol: "AAPL".into(), value: 150.0 },
Price { symbol: "MSFT".into(), value: 300.0 },
Price { symbol: "GOOG".into(), value: 2_800.0 },
];
let range: std::ops::Range<Row> = data.append::<Price>(&rows)?;
assert_eq!(data.rows(), 3);
Or use the builder for a fluent setup with components, resources, and systems all wired in one chain. The same DatasetBuilder is also used by PipelineBuilder under the hood.
let data = Dataset::builder()
.with::<Price>()
.with::<Volume>()
.with_resource(MarketCalendar::default())
.build();
Reading columns
Three patterns, increasing in ergonomics:
// 1. Raw Arrow access — useful for batch-level operations
let batch: &RecordBatch = data.columns::<Price>().expect("registered");
let value: &ArrayRef = data.column::<Price>("value").expect("field");
// 2. Typed view — borrowing the batch and exposing typed accessors
let prices = data.view::<Price>()?;
let value = prices.f64(Price::VALUE)?;
let symbol = prices.str(Price::SYMBOL)?;
for i in 0..prices.len() {
println!("{} = {}", symbol.value(i), value.value(i));
}
// 3. Decode back into Rust structs (allocates) — for small batches
let restored: Vec<Price> = Price::from_record_batch(batch)?;
ComponentView exposes typed accessors named after the Arrow data type: f64(...), i64(...), u64(...), bool(...), str(...), and so on. They return concrete Arrow array types like &Float64Array, so you index directly without downcasting.
Soft delete & compaction
Marking a row dead is O(1) — it flips a bit in the alive bitmap. The row stays in the underlying Arrow buffers until you call compact, which filters every component’s RecordBatch in one pass.
data.mark_dead(Row::new(0));
assert!(!data.is_alive(Row::new(0)));
assert_eq!(data.live_rows(), data.rows() - 1);
if data.should_compact() {
data.compact()?; // invalidates existing Row indices
}
Important: compaction renumbers rows. Don’t hold a Row across a compaction unless you’ve recomputed it.
Resources
Resources are arbitrary Rust singletons stored in the dataset, keyed by TypeId. They’re not columnar — use them for things like configuration values, FX-rate tables, lookup caches, or summary outputs.
struct FxRates { eur: f64, gbp: f64 }
data.insert_resource(FxRates { eur: 1.08, gbp: 1.27 });
let rates = data.get_resource::<FxRates>().expect("inserted");
assert_eq!(rates.eur, 1.08);
// Mutable access requires &mut Dataset
data.get_resource_mut::<FxRates>().unwrap().gbp = 1.30;
Systems can declare resource access too:
SystemMeta::new("convert")
.read_resource::<FxRates>() // shared resource read
.write_resource::<Report>(); // exclusive resource write
Resource conflicts in the DAG are at TypeId granularity (whole resource, not field-level). Two systems writing the same resource type are forced into different stages.
Arrow IPC round-trip
A Dataset serializes to Arrow IPC — the same format the distributed runner uses for checkpoints. Encode is a contiguous buffer write; decode is zero-copy from a borrowed byte slice.
// Encode
let mut buf = Vec::new();
data.write_ipc(&mut buf)?;
// Decode
let restored = Dataset::read_ipc(&buf)?;
assert_eq!(restored.rows(), data.rows());
Resources are not part of the IPC payload — only the columnar component data is. Re-attach resources after decoding.
The Row type
A Row is a transparent u32 wrapper used as a stable index into the dataset’s columnar storage. append returns a Range<Row> covering the rows you just inserted.
let range = data.append::<Price>(&rows)?;
for row in range.clone() {
if /* some condition */ true {
data.mark_dead(row);
}
}