Distributed Runner
At-least-once batch processing across instances, backed by an embedded redb store and (optionally) Raft-replicated coordination.
Overview
The distributed runner takes a Pipeline as a template and runs it against many partitions of data spread across instances. The mechanics are:
- An instance calls
PartitionSource::claim_next_batchto lease a row-range partition. - It reconstructs a
Datasetfor the partition (via a user-providedworld_factoryclosure). - It calls
Pipeline::run_on(&mut partition_dataset)— the template’s own data, sources, and sinks are never used. - Between (configurable) stages, it persists Arrow IPC checkpoints via
CheckpointStore::save_checkpoint. - On success it acks the claim; on crash, the lease expires and another instance picks the partition up, optionally resuming from the latest checkpoint.
The runner is feature-gated. Plain distributed gives you the traits, redb-backed store, and runner. Adding distributed-raft wires the store’s state machine to openraft so coordination state is replicated across nodes.
Building blocks
| Type | Role |
|---|---|
PartitionSource | Claims, renews, acks, and releases row-range batches across instances. Lease-based to handle crashes. |
CheckpointStore | Persists Arrow IPC snapshots keyed by (claim_id, stage_idx) for crash recovery. |
RedbSharedStore | Implements both traits over redb. Single-node mode applies state directly; cluster mode proposes through a Raft channel. |
DistributedRunner | The driver. Holds a pipeline template, a runtime, and a RunnerConfig. run(world_factory) loops until exhausted. |
RunnerConfig | instance_id, checkpoint_strategy, schema_id, max_batches, lease renewal interval. |
CheckpointStrategy | How often to checkpoint: EveryStage, EveryNStages(n), or None. |
Single-node example
The simplest setup uses one process, one redb file, and a single instance running its own coordinator. There is no Raft, but the same code works in cluster mode by switching the store constructor.
use std::sync::Arc;
use uuid::Uuid;
use pcs_core::prelude::*;
use pcs_service::distributed::{
DistributedRunner, RunnerConfig, CheckpointStrategy, RedbSharedStore,
};
#[tokio::main]
async fn main() -> PcsResult<()> {
// 1. Open the shared redb store.
let store = RedbSharedStore::open("/tmp/pcs.redb")?;
// 2. Register a master batch — the data this runner will partition over.
let trades: Vec<Trade> = load_trades_from_parquet("/data/trades.parquet")?;
let batch = Trade::to_record_batch(&trades)?;
let mut buf = Vec::new();
arrow_ipc::writer::FileWriter::try_new(&mut buf, &Trade::schema())?
.write(&batch)?;
store.register_master_batch(/*schema_id=*/ 1, "Trade", buf).await?;
// 3. Build the pipeline template.
let template = Pipeline::builder("trade-processor")
.with::<Trade>()
.with_system(ValidateTradeSystem)
.with_system(EnrichTradeSystem)
.build();
// 4. Drive the runner.
let mut runner = DistributedRunner::new(
store,
Box::new(template),
RunnerConfig {
instance_id: Uuid::new_v4(),
checkpoint_strategy: CheckpointStrategy::EveryStage,
schema_id: 1,
max_batches: None,
lease_renewal_check_interval_millis: 250,
..Default::default()
},
);
// For each claim, the runner asks `world_factory` to produce a fresh
// Dataset and copies the partition's IPC snapshot into it before running.
let processed = runner.run(|| Dataset::new()).await?;
println!("processed {} batches", processed);
Ok(())
}
Checkpoints & recovery
Checkpoints serialize the partition’s Dataset after a stage completes and persist the bytes via CheckpointStore. If the instance crashes mid-pipeline, the next claim of the same partition restores the latest checkpoint and resumes from the next stage.
Three policies trade durability against throughput:
EveryStage— safest; one fsync per stage.EveryNStages(n)— tunable; picknbased on stage cost.None— fastest; on crash, the entire partition replays from scratch.
Cluster mode (feature = "distributed-raft")
The distributed-raft feature replaces the single-node store with a Raft-coordinated one. Two redb files back the cluster:
- Log file — written by
ArrowRedbLogStore, openraft’sRaftLogStorageimplementation. - State machine file — written by
ArrowRedbStateMachine, applyingConsensusCommandops.
A ConsensusCommand is the deterministic op type proposed via Raft — claim, renew, ack, release, register-batch, save-checkpoint. The leader serializes ops, replicates them to followers, and applies them to the redb state machine on commit.
use pcs_service::distributed::{
ArrowRaftDriver, ArrowRaftDriverConfig, TcpNetworkFactory, PcsTypeConfig,
};
let driver = ArrowRaftDriver::new(ArrowRaftDriverConfig {
node_id: 1,
listen_addr: "0.0.0.0:7001".parse()?,
log_db_path: "/var/lib/pcs/raft-log.redb".into(),
state_db_path: "/var/lib/pcs/state-machine.redb".into(),
..Default::default()
}).await?;
let store = RedbSharedStore::with_raft(driver.handle()).await?;
// ... same pipeline template, same runner.run(...) call as single-node.
Network framing is length-prefixed TCP (TcpNetworkFactory → TcpNetwork), so adding nodes only requires reachable TCP ports.
Examples
| Example | What it shows |
|---|---|
distributed_scheduler.rs |
Single-node DistributedRunner over RedbSharedStore, with IPC serialization end-to-end. |
distributed_windowed.rs |
Distributed windowed aggregation with checkpoint recovery between window stages. |
distributed_fulfillment/ |
Multi-file order fulfillment example exercising claim/ack/checkpoint flow under Raft. |