Scheduler
A multi-pipeline orchestrator. Drives several independent Pipelines from one process, sequentially or concurrently.
Overview
Pipeline is the unit of one workload. Scheduler is the unit of many. It owns a Vec<Pipeline> and provides two tick modes plus dependency edges between pipelines.
Each pipeline owns its own Dataset — there is no shared mutable state between pipelines. That makes parallel execution trivially sound: tick_parallel just calls Pipeline::run on every pipeline concurrently via futures::try_join_all.
Adding pipelines
use pcs_core::prelude::*;
let mut sched = Scheduler::new();
// Pipeline 1: ingest raw transactions.
let ingest = Pipeline::builder("ingest")
.with::<Transaction>()
.with_system(IngestSystem)
.build();
sched.add_pipeline(ingest);
// Pipeline 2: produce daily reports.
let report = Pipeline::builder("report")
.with::<Transaction>()
.with_system(ReportSystem)
.build();
sched.add_pipeline(report);
Tick modes
A tick runs every pipeline once. Choose a mode based on what you’re optimizing for:
// Sequential — pipelines run in registration order, one after another.
// Easiest to reason about; preferred when pipelines are fast or when
// deterministic ordering matters for logging/metrics.
sched.tick().await?;
// Concurrent — pipelines run in parallel within each scheduling stage.
// Preferred when pipelines are IO-heavy or CPU-bound and independent.
sched.tick_parallel().await?;
Dependencies between pipelines
add_pipeline_with_config attaches a PipelineConfig describing dependency edges, priority, and optional backpressure. Edges express two semantics:
DependencyKind::Order— B runs after A in the same tick.DependencyKind::Data— B runs after A and is skipped if A produced zero rows. Useful when B has nothing to do unless A appended new data.
use pcs_core::DependencyKind;
sched.add_pipeline_with_config(
enrich,
PipelineConfig::new()
.after("ingest", DependencyKind::Data) // skip if ingest is empty
.priority(10),
);
sched.add_pipeline_with_config(
report,
PipelineConfig::new()
.after("enrich", DependencyKind::Order), // always run, but after enrich
);
Dependency edges define a DAG over pipelines (much like SystemMeta defines one over systems within a single pipeline). tick_parallel respects the DAG: independent pipelines run concurrently; dependents wait.
Inspecting pipelines
// All pipelines, in registration order.
for p in sched.pipelines() {
let stats = p.last_stats();
println!("{:<12} {:>6} systems {:>6} ms",
p.name(), stats.systems_run, stats.duration_millis);
}
// Lookup by name (immutable or mutable).
if let Some(p) = sched.get_mut("ingest") {
p.append::<Transaction>(&new_rows)?;
}
Full example
use pcs_core::prelude::*;
#[tokio::main]
async fn main() -> PcsResult<()> {
let mut sched = Scheduler::new();
sched.add_pipeline(
Pipeline::builder("ingest")
.with::<Transaction>()
.with_system(IngestSystem)
.build(),
);
sched.add_pipeline_with_config(
Pipeline::builder("enrich")
.with::<Transaction>()
.with_system(ValidateSystem)
.with_system(EnrichSystem)
.build(),
PipelineConfig::new().after("ingest", DependencyKind::Data),
);
sched.add_pipeline_with_config(
Pipeline::builder("report")
.with::<Transaction>()
.with_system(ReportSystem)
.build(),
PipelineConfig::new().after("enrich", DependencyKind::Order),
);
// Drive a few ticks; each respects the dependency DAG.
for _ in 0..3 {
sched.tick_parallel().await?;
}
Ok(())
}
When to use Scheduler vs. just a Pipeline
| Use | When |
|---|---|
| Just a Pipeline | You have a single workload. Stage parallelism inside the pipeline already runs systems concurrently. |
| Scheduler | You have several independent workloads (different schemas or different cadences) that should share a process — e.g. an ingest pipeline that runs continuously plus a reporting pipeline that runs every 5 minutes. |
| Distributed Runner | You have a single workload but want it to run across multiple nodes with at-least-once semantics and crash recovery. See Distributed Runner. |