Distributed Runner

At-least-once batch processing across instances, backed by an embedded redb store and (optionally) Raft-replicated coordination.

Overview

The distributed runner takes a Pipeline as a template and runs it against many partitions of data spread across instances. The mechanics are:

  1. An instance calls PartitionSource::claim_next_batch to lease a row-range partition.
  2. It reconstructs a Dataset for the partition (via a user-provided world_factory closure).
  3. It calls Pipeline::run_on(&mut partition_dataset) — the template’s own data, sources, and sinks are never used.
  4. Between (configurable) stages, it persists Arrow IPC checkpoints via CheckpointStore::save_checkpoint.
  5. On success it acks the claim; on crash, the lease expires and another instance picks the partition up, optionally resuming from the latest checkpoint.

The runner is feature-gated. Plain distributed gives you the traits, redb-backed store, and runner. Adding distributed-raft wires the store’s state machine to openraft so coordination state is replicated across nodes.

Building blocks

TypeRole
PartitionSourceClaims, renews, acks, and releases row-range batches across instances. Lease-based to handle crashes.
CheckpointStorePersists Arrow IPC snapshots keyed by (claim_id, stage_idx) for crash recovery.
RedbSharedStoreImplements both traits over redb. Single-node mode applies state directly; cluster mode proposes through a Raft channel.
DistributedRunnerThe driver. Holds a pipeline template, a runtime, and a RunnerConfig. run(world_factory) loops until exhausted.
RunnerConfiginstance_id, checkpoint_strategy, schema_id, max_batches, lease renewal interval.
CheckpointStrategyHow often to checkpoint: EveryStage, EveryNStages(n), or None.

Single-node example

The simplest setup uses one process, one redb file, and a single instance running its own coordinator. There is no Raft, but the same code works in cluster mode by switching the store constructor.

use std::sync::Arc;
use uuid::Uuid;

use pcs_core::prelude::*;
use pcs_service::distributed::{
    DistributedRunner, RunnerConfig, CheckpointStrategy, RedbSharedStore,
};

#[tokio::main]
async fn main() -> PcsResult<()> {
    // 1. Open the shared redb store.
    let store = RedbSharedStore::open("/tmp/pcs.redb")?;

    // 2. Register a master batch — the data this runner will partition over.
    let trades: Vec<Trade> = load_trades_from_parquet("/data/trades.parquet")?;
    let batch = Trade::to_record_batch(&trades)?;
    let mut buf = Vec::new();
    arrow_ipc::writer::FileWriter::try_new(&mut buf, &Trade::schema())?
        .write(&batch)?;
    store.register_master_batch(/*schema_id=*/ 1, "Trade", buf).await?;

    // 3. Build the pipeline template.
    let template = Pipeline::builder("trade-processor")
        .with::<Trade>()
        .with_system(ValidateTradeSystem)
        .with_system(EnrichTradeSystem)
        .build();

    // 4. Drive the runner.
    let mut runner = DistributedRunner::new(
        store,
        Box::new(template),
        RunnerConfig {
            instance_id: Uuid::new_v4(),
            checkpoint_strategy: CheckpointStrategy::EveryStage,
            schema_id: 1,
            max_batches: None,
            lease_renewal_check_interval_millis: 250,
            ..Default::default()
        },
    );

    // For each claim, the runner asks `world_factory` to produce a fresh
    // Dataset and copies the partition's IPC snapshot into it before running.
    let processed = runner.run(|| Dataset::new()).await?;
    println!("processed {} batches", processed);
    Ok(())
}

Checkpoints & recovery

Checkpoints serialize the partition’s Dataset after a stage completes and persist the bytes via CheckpointStore. If the instance crashes mid-pipeline, the next claim of the same partition restores the latest checkpoint and resumes from the next stage.

Three policies trade durability against throughput:

Cluster mode (feature = "distributed-raft")

The distributed-raft feature replaces the single-node store with a Raft-coordinated one. Two redb files back the cluster:

A ConsensusCommand is the deterministic op type proposed via Raft — claim, renew, ack, release, register-batch, save-checkpoint. The leader serializes ops, replicates them to followers, and applies them to the redb state machine on commit.

use pcs_service::distributed::{
    ArrowRaftDriver, ArrowRaftDriverConfig, TcpNetworkFactory, PcsTypeConfig,
};

let driver = ArrowRaftDriver::new(ArrowRaftDriverConfig {
    node_id: 1,
    listen_addr: "0.0.0.0:7001".parse()?,
    log_db_path:   "/var/lib/pcs/raft-log.redb".into(),
    state_db_path: "/var/lib/pcs/state-machine.redb".into(),
    ..Default::default()
}).await?;

let store = RedbSharedStore::with_raft(driver.handle()).await?;
// ... same pipeline template, same runner.run(...) call as single-node.

Network framing is length-prefixed TCP (TcpNetworkFactoryTcpNetwork), so adding nodes only requires reachable TCP ports.

Examples

ExampleWhat it shows
distributed_scheduler.rs Single-node DistributedRunner over RedbSharedStore, with IPC serialization end-to-end.
distributed_windowed.rs Distributed windowed aggregation with checkpoint recovery between window stages.
distributed_fulfillment/ Multi-file order fulfillment example exercising claim/ack/checkpoint flow under Raft.

Next steps

Distributed

Service

The production pcs-service binary that wires standalone or cluster runners to a TOML config and HTTP control plane.

Observability

Tracing

Span-level instrumentation for runner ticks, claim lifecycles, and Raft proposals.