Scheduler
Automate your workflows with flexible scheduling and concurrency.
Behind the scheduler feature gate (features = ["scheduler"]).
The Scheduler provides workflow scheduling capabilities for background jobs and automated workflows.
It supports intervals, cron expressions, and manual triggers. Each registered workflow carries a
Resources dictionary whose setup() and
teardown() lifecycle hooks run once per scheduler.start() /
scheduler.stop() call — not once per scheduled run.
All workflows registered with a single Scheduler instance must share the same
TState type. The scheduler is generic over Scheduler<TState>,
so all registered workflows use the same state enum. For workflows with different state enums,
create separate Scheduler instances.
Lifecycle: Scheduler → RunningScheduler
The scheduler is split into two halves to make a double-start impossible at the type level:
Scheduleris the builder. Register workflows withevery/cron/manualand, if you want something other thanBackoffPolicy::default(), override it per flow viaset_backoff.Scheduleris notClone.RunningScheduleris the live handle returned byscheduler.start().await?. It owns the spawned driver and per-flow loop tasks. It is cheap to clone — every clone shares the same command channel and flow registry, so you can calltrigger,status,list,reset_flow, andstopfrom any task.
start consumes the builder, so the compiler prevents you from starting the same scheduler
twice or mutating the registry mid-flight. stop().await on any clone signals graceful
shutdown and waits for it to complete; wait().await blocks without sending Stop, useful for
"main blocks until Ctrl+C handler stops the scheduler" patterns.
Overlap Prevention
The scheduler prevents overlapping executions of the same workflow. If a previous execution is still running when the next interval or cron trigger fires, the new run is skipped. This prevents resource exhaustion from slow-running workflows that accumulate concurrent instances over time.
For example, if a workflow is configured to run every 30 seconds but a particular execution takes 45 seconds, the scheduler will skip the trigger at the 30-second mark and wait for the next interval after the current run completes.
Scheduling Strategies
Interval
Run workflows at fixed time intervals.
scheduler.every_seconds(...)
Cron
Run workflows based on cron expressions.
scheduler.cron(..., "0 0 9 * * *")
Manual
Trigger workflows on-demand via API.
scheduler.manual(...)
Scheduling Strategy Examples
The Scheduler supports multiple scheduling strategies. Here are complete examples for each.
1. Interval Scheduling - Fixed Time Intervals
Run workflows at regular time intervals. Best for periodic tasks like health checks or data syncing.
Interval Scheduling Timeline
use cano::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State { Start, Complete }
#[derive(Clone)]
struct HealthCheckTask;
#[task(state = State)]
impl HealthCheckTask {
async fn run(&self, res: &Resources) -> Result<TaskResult<State>, CanoError> {
println!("Running health check...");
// Check system health
let store = res.get::<MemoryStore, _>("store")?;
let status = "healthy".to_string();
store.put("last_health_check", status)?;
Ok(TaskResult::Single(State::Complete))
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let mut scheduler = Scheduler::new();
let store = MemoryStore::new();
let workflow = Workflow::new(Resources::new().insert("store", store.clone()))
.register(State::Start, HealthCheckTask)
.add_exit_state(State::Complete);
// Run every 30 seconds
scheduler.every_seconds("health_check", workflow, State::Start, 30)?;
// start() consumes the builder and returns a clone-able RunningScheduler.
// wait() blocks until somebody calls stop() on a clone.
let running = scheduler.start().await?;
running.wait().await?;
Ok(())
}
Runnable examples: cargo run --example scheduler_duration_scheduling --features scheduler
(interval-only) and cargo run --example scheduler_scheduling --features scheduler (intervals
plus cron and manual flows).
2. Cron Scheduling - Time-Based Expressions
Run workflows based on cron expressions. Perfect for scheduled reports, backups, or time-specific tasks.
Cron Scheduling Timeline
use cano::prelude::*;
use chrono::Utc;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State { Start, Complete }
#[derive(Clone)]
struct DailyReport {
report_type: String,
}
#[task(state = State)]
impl DailyReport {
async fn run(&self, res: &Resources) -> Result<TaskResult<State>, CanoError> {
println!("Preparing {} report...", self.report_type);
let store = res.get::<MemoryStore, _>("store")?;
// Load data for report
let data = vec!["metric1".to_string(), "metric2".to_string(), "metric3".to_string()];
store.put("report_start", Utc::now().to_rfc3339())?;
println!("Generating report with {} records", data.len());
let result = format!("{} report: {} records processed", self.report_type, data.len());
println!("Report completed: {}", result);
store.put("last_report", result)?;
Ok(TaskResult::Single(State::Complete))
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let mut scheduler = Scheduler::new();
let store = MemoryStore::new();
// Morning report workflow
let morning_report = Workflow::new(Resources::new().insert("store", store.clone()))
.register(State::Start, DailyReport {
report_type: "Morning".to_string()
})
.add_exit_state(State::Complete);
// Evening report workflow
let evening_report = Workflow::new(Resources::new().insert("store", store.clone()))
.register(State::Start, DailyReport {
report_type: "Evening".to_string()
})
.add_exit_state(State::Complete);
// Run daily at 9 AM: "0 0 9 * * *"
scheduler.cron("morning_report", morning_report, State::Start, "0 0 9 * * *")?;
// Run daily at 6 PM: "0 0 18 * * *"
scheduler.cron("evening_report", evening_report, State::Start, "0 0 18 * * *")?;
let running = scheduler.start().await?;
running.wait().await?;
Ok(())
}
3. Manual Triggering - On-Demand Execution
Trigger workflows manually via API. Ideal for user-initiated tasks or event-driven processing.
trigger() lives on RunningScheduler, which is only obtained by calling
scheduler.start().await?. The compiler will not let you trigger a workflow before the
scheduler is running.
Manual Trigger Sequence
use cano::prelude::*;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State { Start, Complete }
#[derive(Clone)]
struct DataExportTask;
#[task(state = State)]
impl DataExportTask {
async fn run(&self, res: &Resources) -> Result<TaskResult<State>, CanoError> {
println!("Starting data export...");
// Export data to CSV
let store = res.get::<MemoryStore, _>("store")?;
let export_path = "/tmp/export.csv".to_string();
store.put("export_path", export_path)?;
println!("Export completed");
Ok(TaskResult::Single(State::Complete))
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let mut scheduler = Scheduler::new();
let store = MemoryStore::new();
let export_workflow = Workflow::new(Resources::new().insert("store", store.clone()))
.register(State::Start, DataExportTask)
.add_exit_state(State::Complete);
// Register as manual-only workflow
scheduler.manual("data_export", export_workflow, State::Start)?;
// Start consumes the builder and returns a live, clone-able handle.
let running = scheduler.start().await?;
// Trigger manually when needed
println!("Triggering export...");
running.trigger("data_export").await?;
// Can be triggered again later
tokio::time::sleep(Duration::from_secs(5)).await;
running.trigger("data_export").await?;
// stop() sends the Stop command and waits for graceful shutdown.
running.stop().await?;
Ok(())
}
4. Mixed Scheduling - Combining Strategies
Use multiple scheduling strategies together for complex automation scenarios.
Mixed Strategy Overview
use cano::prelude::*;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State { Start, Complete }
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let mut scheduler = Scheduler::new();
let store = MemoryStore::new();
// Define simple tasks
#[derive(Clone)]
struct DataSyncTask;
#[task(state = State)]
impl DataSyncTask {
async fn run(&self, _res: &Resources) -> Result<TaskResult<State>, CanoError> {
println!("Syncing data...");
Ok(TaskResult::Single(State::Complete))
}
}
#[derive(Clone)]
struct BackupTask;
#[task(state = State)]
impl BackupTask {
async fn run(&self, _res: &Resources) -> Result<TaskResult<State>, CanoError> {
println!("Running backup...");
Ok(TaskResult::Single(State::Complete))
}
}
#[derive(Clone)]
struct WeeklyReportTask;
#[task(state = State)]
impl WeeklyReportTask {
async fn run(&self, _res: &Resources) -> Result<TaskResult<State>, CanoError> {
println!("Generating weekly report...");
Ok(TaskResult::Single(State::Complete))
}
}
#[derive(Clone)]
struct EmergencyExportTask;
#[task(state = State)]
impl EmergencyExportTask {
async fn run(&self, _res: &Resources) -> Result<TaskResult<State>, CanoError> {
println!("Emergency export...");
Ok(TaskResult::Single(State::Complete))
}
}
// 1. Interval: Data sync every 5 minutes
let sync_workflow = Workflow::new(Resources::new().insert("store", store.clone()))
.register(State::Start, DataSyncTask)
.add_exit_state(State::Complete);
scheduler.every_seconds("data_sync", sync_workflow, State::Start, 300)?;
// 2. Cron: Daily backup at 3 AM
let backup_workflow = Workflow::new(Resources::new().insert("store", store.clone()))
.register(State::Start, BackupTask)
.add_exit_state(State::Complete);
scheduler.cron("daily_backup", backup_workflow, State::Start, "0 0 3 * * *")?;
// 3. Cron: Weekly report on Mondays at 9 AM
let report_workflow = Workflow::new(Resources::new().insert("store", store.clone()))
.register(State::Start, WeeklyReportTask)
.add_exit_state(State::Complete);
scheduler.cron("weekly_report", report_workflow, State::Start, "0 0 9 * * MON")?;
// 4. Manual: Emergency data export
let export_workflow = Workflow::new(Resources::new().insert("store", store.clone()))
.register(State::Start, EmergencyExportTask)
.add_exit_state(State::Complete);
scheduler.manual("emergency_export", export_workflow, State::Start)?;
// Start consumes the builder and returns a live handle.
let running = scheduler.start().await?;
// Monitor and trigger as needed
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
// Check status of all workflows
let workflows = running.list().await;
for info in workflows {
println!("{}: {:?} (runs: {})", info.id, info.status, info.run_count);
}
// Example: Trigger emergency export if needed based on some condition
// running.trigger("emergency_export").await?;
}
}
Runnable example: cargo run --example scheduler_mixed_workflows --features scheduler —
interval, cron, and manual flows side by side, plus a trigger on the manual one.
Backoff & Trip State
A flow that fails repeatedly shouldn't keep re-firing on its base schedule, so every flow has a
BackoffPolicy. After a failure the scheduler parks the flow in Status::Backoff
for a growing delay; with a streak_limit set it eventually Trippeds and stops
dispatching until you intervene. Each flow starts with BackoffPolicy::default() — call
set_backoff to use a different one.
Because BackoffPolicy::default() has a 1s initial delay and is applied to
every flow, a flow that fails waits ~1s before its next attempt (the Every loop sleeps
max(interval, next_eligible - now)) — even if its base interval is shorter. If you run a flow
on a sub-second interval and want fast retries after a failure, lower
BackoffPolicy { initial: … } via set_backoff.
Flow-level Tripped is scoped to the scheduler and is separate from the task-level
CanoError::CircuitOpen emitted by a CircuitBreaker.
The breaker gates a single task's call to a dependency; this policy gates the scheduler from re-firing
an entire flow.
Overriding the Default Policy
Register the workflow normally, then call set_backoff before start().
The policy controls the initial delay after the first failure, the multiplier applied per
additional consecutive failure, a hard cap on the computed delay, jitter, and an optional streak limit.
use cano::prelude::*;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum FlowState { Start, Done }
#[derive(Clone)]
struct NoopTask;
#[task(state = FlowState)]
impl NoopTask {
async fn run_bare(&self) -> Result<TaskResult<FlowState>, CanoError> {
Ok(TaskResult::Single(FlowState::Done))
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let mut scheduler: Scheduler<FlowState> = Scheduler::new();
let workflow = Workflow::new(Resources::new())
.register(FlowState::Start, NoopTask)
.add_exit_state(FlowState::Done);
scheduler.every(
"flaky",
workflow,
FlowState::Start,
Duration::from_millis(200),
)?;
scheduler.set_backoff(
"flaky",
BackoffPolicy {
initial: Duration::from_millis(300),
multiplier: 2.0,
max_delay: Duration::from_secs(2),
jitter: 0.1,
streak_limit: Some(3),
},
)?;
let running = scheduler.start().await?;
running.wait().await?;
Ok(())
}
Computed delay is initial * multiplier^(streak-1), capped at max_delay, then
multiplied by a random factor in 1 ± jitter. The Every loop's sleep extends to
max(interval, next_eligible - now), and the Cron loop suppresses ticks inside the
backoff window. BackoffPolicy::default() gives 1s initial, 2.0× multiplier, 5min cap,
0.1 jitter, and no trip limit. Use BackoffPolicy::with_trip(n) to ask for a
trip after n consecutive failures.
Status Variants
Status is #[non_exhaustive] — external match arms must include
a wildcard. The variants are:
Idle— registered, never run or finished cleanly.Running— currently executing.Completed— last run reached an exit state.Backoff { until, streak, last_error }— last run errored; the flow is waiting untiluntilbefore its next dispatch, per itsBackoffPolicy.Tripped { streak, last_error }— streak reachedstreak_limit; the scheduler will not dispatch this flow again untilreset_flowis called.
Outcome writes are atomic: a single write decides this run's terminal status (Completed on
success, otherwise Backoff or Tripped), so observers never see a transient
intermediate state. FlowInfo exposes failure_streak and
next_eligible for observability.
Recovery via reset_flow
A Tripped flow stays parked until you clear it. RunningScheduler::reset_flow(id)
clears the failure streak and next_eligible, and (when the flow is not currently running) sets
the status back to Idle. Manual trigger() is rejected on a tripped flow — call
reset_flow first.
let snap = running.status("flaky").await.expect("flow exists");
if matches!(snap.status, Status::Tripped { .. }) {
running.reset_flow("flaky").await?;
}
See the scheduler_backoff example
(cargo run --example scheduler_backoff --features scheduler) for an end-to-end walk-through
that exercises the trip and recovery path.
Graceful Shutdown
The scheduler supports graceful shutdown, allowing currently running workflows to complete before stopping. This includes workflows started by interval or cron triggers as well as manually-triggered workflows. All active executions are tracked and included in the shutdown wait.
// Stop the scheduler and wait for running flows to finish.
running.stop().await?;
When stop() is called, the scheduler signals all scheduling loops to stop,
waits up to 30 seconds for any in-progress workflow executions to finish, and runs each
workflow's resource teardown_all in reverse registration order before returning.
A second stop() call after success is idempotent — it returns the same cached result.
Runnable example: cargo run --example scheduler_graceful_shutdown --features scheduler —
spawns a Ctrl-C handler, runs scheduled flows, and shuts down cleanly on signal.
Advanced Pattern: Multi-Level Map-Reduce
The scheduler composes naturally with split/join to give you
a two-level map-reduce. Level 1 lives inside a single workflow: a state registered with
register_split fans a batch out across parallel tasks, and a JoinConfig reduces
their results back into one summary state. Level 2 lives at the scheduler: register several
of those batch workflows — each with different parameters (a different batch of records, a different region,
a different tenant) — as manual flows or interval flows, and trigger them concurrently. Because
every workflow carries its own Resources dictionary, you hand each
one a shared accumulator (an Arc<RwLock<…>> wrapped in a Resource); every
batch independently appends its summary, and a final reduce step folds them together.
- Map (level 1) —
register_splitruns N tasks over a batch in parallel. - Reduce (level 1) —
JoinConfig(e.g.JoinStrategy::AllorPercentage(0.75)) merges the parallel results into a single batch summary. - Map (level 2) — the scheduler holds several batch workflows and fires them concurrently
via
trigger(or on intervals), each with its own parameters andResources. - Reduce (level 2) — a shared accumulator resource collects every batch summary; once all flows finish, a final pass aggregates across batches.
The skeleton — one batch workflow with a parallel state, plus a scheduler wiring up a couple of batches:
// Level 1: a workflow that map-reduces over one batch.
fn batch_workflow(batch: Vec<Item>, results: SharedResults) -> Workflow<State> {
Workflow::new(Resources::new().insert("results", results))
.register_split(
State::Process,
batch.iter().map(|item| ProcessTask::new(item)).collect::<Vec<_>>(),
JoinConfig::new(JoinStrategy::All, State::Summarize)
.with_timeout(Duration::from_secs(60)),
)
.register(State::Summarize, SummarizeTask) // appends a batch summary into `results`
.add_exit_states(vec![State::Done, State::Error])
}
// Level 2: the scheduler runs several batch workflows concurrently.
let results = SharedResults::default();
let mut scheduler = Scheduler::new();
scheduler.manual("batch-a", batch_workflow(batch_a, results.clone()), State::Start)?;
scheduler.manual("batch-b", batch_workflow(batch_b, results.clone()), State::Start)?;
let running = scheduler.start().await?;
running.trigger("batch-a").await?;
running.trigger("batch-b").await?;
// ...wait for both flows to finish, then reduce across all batch summaries in `results`.
The full runnable program is examples/scheduler_mapreduce_books.rs —
cargo run --example scheduler_mapreduce_books --features scheduler. It downloads several books
from Project Gutenberg, splits download + analysis across parallel tasks within each batch workflow, runs
multiple batch workflows concurrently, and reduces all results into global rankings. See also
examples/scheduler_book_prepositions.rs for the single-workflow variant.