Scheduler

A multi-pipeline orchestrator. Drives several independent Pipelines from one process, sequentially or concurrently.

Overview

Pipeline is the unit of one workload. Scheduler is the unit of many. It owns a Vec<Pipeline> and provides two tick modes plus dependency edges between pipelines.

Each pipeline owns its own Dataset — there is no shared mutable state between pipelines. That makes parallel execution trivially sound: tick_parallel just calls Pipeline::run on every pipeline concurrently via futures::try_join_all.

Adding pipelines

use pcs_core::prelude::*;

let mut sched = Scheduler::new();

// Pipeline 1: ingest raw transactions.
let ingest = Pipeline::builder("ingest")
    .with::<Transaction>()
    .with_system(IngestSystem)
    .build();
sched.add_pipeline(ingest);

// Pipeline 2: produce daily reports.
let report = Pipeline::builder("report")
    .with::<Transaction>()
    .with_system(ReportSystem)
    .build();
sched.add_pipeline(report);

Tick modes

A tick runs every pipeline once. Choose a mode based on what you’re optimizing for:

// Sequential — pipelines run in registration order, one after another.
// Easiest to reason about; preferred when pipelines are fast or when
// deterministic ordering matters for logging/metrics.
sched.tick().await?;

// Concurrent — pipelines run in parallel within each scheduling stage.
// Preferred when pipelines are IO-heavy or CPU-bound and independent.
sched.tick_parallel().await?;

Dependencies between pipelines

add_pipeline_with_config attaches a PipelineConfig describing dependency edges, priority, and optional backpressure. Edges express two semantics:

use pcs_core::DependencyKind;

sched.add_pipeline_with_config(
    enrich,
    PipelineConfig::new()
        .after("ingest", DependencyKind::Data)   // skip if ingest is empty
        .priority(10),
);

sched.add_pipeline_with_config(
    report,
    PipelineConfig::new()
        .after("enrich", DependencyKind::Order), // always run, but after enrich
);

Dependency edges define a DAG over pipelines (much like SystemMeta defines one over systems within a single pipeline). tick_parallel respects the DAG: independent pipelines run concurrently; dependents wait.

Inspecting pipelines

// All pipelines, in registration order.
for p in sched.pipelines() {
    let stats = p.last_stats();
    println!("{:<12} {:>6} systems  {:>6} ms",
        p.name(), stats.systems_run, stats.duration_millis);
}

// Lookup by name (immutable or mutable).
if let Some(p) = sched.get_mut("ingest") {
    p.append::<Transaction>(&new_rows)?;
}

Full example

use pcs_core::prelude::*;

#[tokio::main]
async fn main() -> PcsResult<()> {
    let mut sched = Scheduler::new();

    sched.add_pipeline(
        Pipeline::builder("ingest")
            .with::<Transaction>()
            .with_system(IngestSystem)
            .build(),
    );

    sched.add_pipeline_with_config(
        Pipeline::builder("enrich")
            .with::<Transaction>()
            .with_system(ValidateSystem)
            .with_system(EnrichSystem)
            .build(),
        PipelineConfig::new().after("ingest", DependencyKind::Data),
    );

    sched.add_pipeline_with_config(
        Pipeline::builder("report")
            .with::<Transaction>()
            .with_system(ReportSystem)
            .build(),
        PipelineConfig::new().after("enrich", DependencyKind::Order),
    );

    // Drive a few ticks; each respects the dependency DAG.
    for _ in 0..3 {
        sched.tick_parallel().await?;
    }

    Ok(())
}

When to use Scheduler vs. just a Pipeline

UseWhen
Just a Pipeline You have a single workload. Stage parallelism inside the pipeline already runs systems concurrently.
Scheduler You have several independent workloads (different schemas or different cadences) that should share a process — e.g. an ingest pipeline that runs continuously plus a reporting pipeline that runs every 5 minutes.
Distributed Runner You have a single workload but want it to run across multiple nodes with at-least-once semantics and crash recovery. See Distributed Runner.

Next steps

I/O

Sources & Sinks

Plug pipelines into Parquet, CSV, JSON, or in-memory channel transports.

Distributed

Distributed Runner

Scale a single pipeline template across nodes with Raft-replicated coordination.