Scheduler

Automate your workflows with flexible scheduling and concurrency.

Behind the scheduler feature gate (features = ["scheduler"]).

The Scheduler provides workflow scheduling capabilities for background jobs and automated workflows. It supports intervals, cron expressions, and manual triggers. Each registered workflow carries a Resources dictionary whose setup() and teardown() lifecycle hooks run once per scheduler.start() / scheduler.stop() call — not once per scheduled run.

Type Constraint

All workflows registered with a single Scheduler instance must share the same TState type. The scheduler is generic over Scheduler<TState>, so all registered workflows use the same state enum. For workflows with different state enums, create separate Scheduler instances.


Lifecycle: SchedulerRunningScheduler

The scheduler is split into two halves to make a double-start impossible at the type level:

start consumes the builder, so the compiler prevents you from starting the same scheduler twice or mutating the registry mid-flight. stop().await on any clone signals graceful shutdown and waits for it to complete; wait().await blocks without sending Stop, useful for "main blocks until Ctrl+C handler stops the scheduler" patterns.


Overlap Prevention

The scheduler prevents overlapping executions of the same workflow. If a previous execution is still running when the next interval or cron trigger fires, the new run is skipped. This prevents resource exhaustion from slow-running workflows that accumulate concurrent instances over time.

For example, if a workflow is configured to run every 30 seconds but a particular execution takes 45 seconds, the scheduler will skip the trigger at the 30-second mark and wait for the next interval after the current run completes.


Scheduling Strategies

Interval

Run workflows at fixed time intervals.

scheduler.every_seconds(...)

Cron

Run workflows based on cron expressions.

scheduler.cron(..., "0 0 9 * * *")

Manual

Trigger workflows on-demand via API.

scheduler.manual(...)


Scheduling Strategy Examples

The Scheduler supports multiple scheduling strategies. Here are complete examples for each.

1. Interval Scheduling - Fixed Time Intervals

Run workflows at regular time intervals. Best for periodic tasks like health checks or data syncing.

Interval Scheduling Timeline

gantt title Interval Scheduling (Every 30 seconds) dateFormat ss axisFormat %Ss section Workflow Run 1 :0, 2s Wait :2, 28s Run 2 :30, 2s Wait :32, 28s Run 3 :60, 2s
use cano::prelude::*;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State { Start, Complete }

#[derive(Clone)]
struct HealthCheckTask;

#[task(state = State)]
impl HealthCheckTask {
    async fn run(&self, res: &Resources) -> Result<TaskResult<State>, CanoError> {
        println!("Running health check...");

        // Check system health
        let store = res.get::<MemoryStore, _>("store")?;
        let status = "healthy".to_string();
        store.put("last_health_check", status)?;

        Ok(TaskResult::Single(State::Complete))
    }
}

#[tokio::main]
async fn main() -> Result<(), CanoError> {
    let mut scheduler = Scheduler::new();
    let store = MemoryStore::new();

    let workflow = Workflow::new(Resources::new().insert("store", store.clone()))
        .register(State::Start, HealthCheckTask)
        .add_exit_state(State::Complete);

    // Run every 30 seconds
    scheduler.every_seconds("health_check", workflow, State::Start, 30)?;

    // start() consumes the builder and returns a clone-able RunningScheduler.
    // wait() blocks until somebody calls stop() on a clone.
    let running = scheduler.start().await?;
    running.wait().await?;
    Ok(())
}

Runnable examples: cargo run --example scheduler_duration_scheduling --features scheduler (interval-only) and cargo run --example scheduler_scheduling --features scheduler (intervals plus cron and manual flows).

2. Cron Scheduling - Time-Based Expressions

Run workflows based on cron expressions. Perfect for scheduled reports, backups, or time-specific tasks.

Cron Scheduling Timeline

gantt title Cron Scheduling (Daily at 9 AM and 6 PM) dateFormat HH axisFormat %H:00 section Workflow Run 1 :09, 1h Run 2 :18, 1h %% Add empty space to ensure full visibility Space :20, 0h
use cano::prelude::*;
use chrono::Utc;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State { Start, Complete }

#[derive(Clone)]
struct DailyReport {
    report_type: String,
}

#[task(state = State)]
impl DailyReport {
    async fn run(&self, res: &Resources) -> Result<TaskResult<State>, CanoError> {
        println!("Preparing {} report...", self.report_type);

        let store = res.get::<MemoryStore, _>("store")?;

        // Load data for report
        let data = vec!["metric1".to_string(), "metric2".to_string(), "metric3".to_string()];
        store.put("report_start", Utc::now().to_rfc3339())?;

        println!("Generating report with {} records", data.len());
        let result = format!("{} report: {} records processed", self.report_type, data.len());

        println!("Report completed: {}", result);
        store.put("last_report", result)?;

        Ok(TaskResult::Single(State::Complete))
    }
}

#[tokio::main]
async fn main() -> Result<(), CanoError> {
    let mut scheduler = Scheduler::new();
    let store = MemoryStore::new();

    // Morning report workflow
    let morning_report = Workflow::new(Resources::new().insert("store", store.clone()))
        .register(State::Start, DailyReport {
            report_type: "Morning".to_string()
        })
        .add_exit_state(State::Complete);

    // Evening report workflow
    let evening_report = Workflow::new(Resources::new().insert("store", store.clone()))
        .register(State::Start, DailyReport { 
            report_type: "Evening".to_string() 
        })
        .add_exit_state(State::Complete);

    // Run daily at 9 AM: "0 0 9 * * *"
    scheduler.cron("morning_report", morning_report, State::Start, "0 0 9 * * *")?;

    // Run daily at 6 PM: "0 0 18 * * *"
    scheduler.cron("evening_report", evening_report, State::Start, "0 0 18 * * *")?;

    let running = scheduler.start().await?;
    running.wait().await?;
    Ok(())
}

3. Manual Triggering - On-Demand Execution

Trigger workflows manually via API. Ideal for user-initiated tasks or event-driven processing.

Type-safe lifecycle

trigger() lives on RunningScheduler, which is only obtained by calling scheduler.start().await?. The compiler will not let you trigger a workflow before the scheduler is running.

Manual Trigger Sequence

sequenceDiagram participant API as API Request participant S as Scheduler participant W as Workflow API->>S: trigger("data_export") S->>W: Start Workflow W-->>S: Complete S-->>API: Success
use cano::prelude::*;
use std::time::Duration;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State { Start, Complete }

#[derive(Clone)]
struct DataExportTask;

#[task(state = State)]
impl DataExportTask {
    async fn run(&self, res: &Resources) -> Result<TaskResult<State>, CanoError> {
        println!("Starting data export...");

        // Export data to CSV
        let store = res.get::<MemoryStore, _>("store")?;
        let export_path = "/tmp/export.csv".to_string();
        store.put("export_path", export_path)?;

        println!("Export completed");
        Ok(TaskResult::Single(State::Complete))
    }
}

#[tokio::main]
async fn main() -> Result<(), CanoError> {
    let mut scheduler = Scheduler::new();
    let store = MemoryStore::new();

    let export_workflow = Workflow::new(Resources::new().insert("store", store.clone()))
        .register(State::Start, DataExportTask)
        .add_exit_state(State::Complete);

    // Register as manual-only workflow
    scheduler.manual("data_export", export_workflow, State::Start)?;

    // Start consumes the builder and returns a live, clone-able handle.
    let running = scheduler.start().await?;

    // Trigger manually when needed
    println!("Triggering export...");
    running.trigger("data_export").await?;

    // Can be triggered again later
    tokio::time::sleep(Duration::from_secs(5)).await;
    running.trigger("data_export").await?;

    // stop() sends the Stop command and waits for graceful shutdown.
    running.stop().await?;
    Ok(())
}

4. Mixed Scheduling - Combining Strategies

Use multiple scheduling strategies together for complex automation scenarios.

Mixed Strategy Overview

gantt title Mixed Scheduling Strategies dateFormat HH:mm axisFormat %H:%M section Interval Tasks Sync Every 5min :00:00, 24h section Cron Tasks Daily Backup :03:00, 1h Weekly Report :09:00, 1h section Manual Tasks Emergency Export :done, 14:30, 15m
use cano::prelude::*;
use std::time::Duration;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State { Start, Complete }

#[tokio::main]
async fn main() -> Result<(), CanoError> {
    let mut scheduler = Scheduler::new();
    let store = MemoryStore::new();

    // Define simple tasks
    #[derive(Clone)]
    struct DataSyncTask;

    #[task(state = State)]
    impl DataSyncTask {
        async fn run(&self, _res: &Resources) -> Result<TaskResult<State>, CanoError> {
            println!("Syncing data...");
            Ok(TaskResult::Single(State::Complete))
        }
    }

    #[derive(Clone)]
    struct BackupTask;
    #[task(state = State)]
    impl BackupTask {
        async fn run(&self, _res: &Resources) -> Result<TaskResult<State>, CanoError> {
            println!("Running backup...");
            Ok(TaskResult::Single(State::Complete))
        }
    }

    #[derive(Clone)]
    struct WeeklyReportTask;

    #[task(state = State)]
    impl WeeklyReportTask {
        async fn run(&self, _res: &Resources) -> Result<TaskResult<State>, CanoError> {
            println!("Generating weekly report...");
            Ok(TaskResult::Single(State::Complete))
        }
    }

    #[derive(Clone)]
    struct EmergencyExportTask;

    #[task(state = State)]
    impl EmergencyExportTask {
        async fn run(&self, _res: &Resources) -> Result<TaskResult<State>, CanoError> {
            println!("Emergency export...");
            Ok(TaskResult::Single(State::Complete))
        }
    }

    // 1. Interval: Data sync every 5 minutes
    let sync_workflow = Workflow::new(Resources::new().insert("store", store.clone()))
        .register(State::Start, DataSyncTask)
        .add_exit_state(State::Complete);

    scheduler.every_seconds("data_sync", sync_workflow, State::Start, 300)?;

    // 2. Cron: Daily backup at 3 AM
    let backup_workflow = Workflow::new(Resources::new().insert("store", store.clone()))
        .register(State::Start, BackupTask)
        .add_exit_state(State::Complete);

    scheduler.cron("daily_backup", backup_workflow, State::Start, "0 0 3 * * *")?;

    // 3. Cron: Weekly report on Mondays at 9 AM
    let report_workflow = Workflow::new(Resources::new().insert("store", store.clone()))
        .register(State::Start, WeeklyReportTask)
        .add_exit_state(State::Complete);

    scheduler.cron("weekly_report", report_workflow, State::Start, "0 0 9 * * MON")?;

    // 4. Manual: Emergency data export
    let export_workflow = Workflow::new(Resources::new().insert("store", store.clone()))
        .register(State::Start, EmergencyExportTask)
        .add_exit_state(State::Complete);

    scheduler.manual("emergency_export", export_workflow, State::Start)?;

    // Start consumes the builder and returns a live handle.
    let running = scheduler.start().await?;

    // Monitor and trigger as needed
    loop {
        tokio::time::sleep(Duration::from_secs(60)).await;

        // Check status of all workflows
        let workflows = running.list().await;
        for info in workflows {
            println!("{}: {:?} (runs: {})", info.id, info.status, info.run_count);
        }

        // Example: Trigger emergency export if needed based on some condition
        // running.trigger("emergency_export").await?;
    }
}

Runnable example: cargo run --example scheduler_mixed_workflows --features scheduler — interval, cron, and manual flows side by side, plus a trigger on the manual one.


Backoff & Trip State

A flow that fails repeatedly shouldn't keep re-firing on its base schedule, so every flow has a BackoffPolicy. After a failure the scheduler parks the flow in Status::Backoff for a growing delay; with a streak_limit set it eventually Trippeds and stops dispatching until you intervene. Each flow starts with BackoffPolicy::default() — call set_backoff to use a different one.

Heads-up: failure delays are at least 1s by default

Because BackoffPolicy::default() has a 1s initial delay and is applied to every flow, a flow that fails waits ~1s before its next attempt (the Every loop sleeps max(interval, next_eligible - now)) — even if its base interval is shorter. If you run a flow on a sub-second interval and want fast retries after a failure, lower BackoffPolicy { initial: … } via set_backoff.

Distinct from CircuitBreaker

Flow-level Tripped is scoped to the scheduler and is separate from the task-level CanoError::CircuitOpen emitted by a CircuitBreaker. The breaker gates a single task's call to a dependency; this policy gates the scheduler from re-firing an entire flow.

Overriding the Default Policy

Register the workflow normally, then call set_backoff before start(). The policy controls the initial delay after the first failure, the multiplier applied per additional consecutive failure, a hard cap on the computed delay, jitter, and an optional streak limit.

use cano::prelude::*;
use std::time::Duration;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum FlowState { Start, Done }

#[derive(Clone)]
struct NoopTask;

#[task(state = FlowState)]
impl NoopTask {
    async fn run_bare(&self) -> Result<TaskResult<FlowState>, CanoError> {
        Ok(TaskResult::Single(FlowState::Done))
    }
}

#[tokio::main]
async fn main() -> Result<(), CanoError> {
    let mut scheduler: Scheduler<FlowState> = Scheduler::new();

    let workflow = Workflow::new(Resources::new())
        .register(FlowState::Start, NoopTask)
        .add_exit_state(FlowState::Done);

    scheduler.every(
        "flaky",
        workflow,
        FlowState::Start,
        Duration::from_millis(200),
    )?;

    scheduler.set_backoff(
        "flaky",
        BackoffPolicy {
            initial: Duration::from_millis(300),
            multiplier: 2.0,
            max_delay: Duration::from_secs(2),
            jitter: 0.1,
            streak_limit: Some(3),
        },
    )?;

    let running = scheduler.start().await?;
    running.wait().await?;
    Ok(())
}

Computed delay is initial * multiplier^(streak-1), capped at max_delay, then multiplied by a random factor in 1 ± jitter. The Every loop's sleep extends to max(interval, next_eligible - now), and the Cron loop suppresses ticks inside the backoff window. BackoffPolicy::default() gives 1s initial, 2.0× multiplier, 5min cap, 0.1 jitter, and no trip limit. Use BackoffPolicy::with_trip(n) to ask for a trip after n consecutive failures.

Status Variants

Status is #[non_exhaustive] — external match arms must include a wildcard. The variants are:

Outcome writes are atomic: a single write decides this run's terminal status (Completed on success, otherwise Backoff or Tripped), so observers never see a transient intermediate state. FlowInfo exposes failure_streak and next_eligible for observability.

Recovery via reset_flow

A Tripped flow stays parked until you clear it. RunningScheduler::reset_flow(id) clears the failure streak and next_eligible, and (when the flow is not currently running) sets the status back to Idle. Manual trigger() is rejected on a tripped flow — call reset_flow first.

let snap = running.status("flaky").await.expect("flow exists");
if matches!(snap.status, Status::Tripped { .. }) {
    running.reset_flow("flaky").await?;
}

See the scheduler_backoff example (cargo run --example scheduler_backoff --features scheduler) for an end-to-end walk-through that exercises the trip and recovery path.


Graceful Shutdown

The scheduler supports graceful shutdown, allowing currently running workflows to complete before stopping. This includes workflows started by interval or cron triggers as well as manually-triggered workflows. All active executions are tracked and included in the shutdown wait.

// Stop the scheduler and wait for running flows to finish.
running.stop().await?;

When stop() is called, the scheduler signals all scheduling loops to stop, waits up to 30 seconds for any in-progress workflow executions to finish, and runs each workflow's resource teardown_all in reverse registration order before returning. A second stop() call after success is idempotent — it returns the same cached result.

Runnable example: cargo run --example scheduler_graceful_shutdown --features scheduler — spawns a Ctrl-C handler, runs scheduled flows, and shuts down cleanly on signal.


Advanced Pattern: Multi-Level Map-Reduce

The scheduler composes naturally with split/join to give you a two-level map-reduce. Level 1 lives inside a single workflow: a state registered with register_split fans a batch out across parallel tasks, and a JoinConfig reduces their results back into one summary state. Level 2 lives at the scheduler: register several of those batch workflows — each with different parameters (a different batch of records, a different region, a different tenant) — as manual flows or interval flows, and trigger them concurrently. Because every workflow carries its own Resources dictionary, you hand each one a shared accumulator (an Arc<RwLock<…>> wrapped in a Resource); every batch independently appends its summary, and a final reduce step folds them together.

The skeleton — one batch workflow with a parallel state, plus a scheduler wiring up a couple of batches:

// Level 1: a workflow that map-reduces over one batch.
fn batch_workflow(batch: Vec<Item>, results: SharedResults) -> Workflow<State> {
    Workflow::new(Resources::new().insert("results", results))
        .register_split(
            State::Process,
            batch.iter().map(|item| ProcessTask::new(item)).collect::<Vec<_>>(),
            JoinConfig::new(JoinStrategy::All, State::Summarize)
                .with_timeout(Duration::from_secs(60)),
        )
        .register(State::Summarize, SummarizeTask) // appends a batch summary into `results`
        .add_exit_states(vec![State::Done, State::Error])
}

// Level 2: the scheduler runs several batch workflows concurrently.
let results = SharedResults::default();
let mut scheduler = Scheduler::new();
scheduler.manual("batch-a", batch_workflow(batch_a, results.clone()), State::Start)?;
scheduler.manual("batch-b", batch_workflow(batch_b, results.clone()), State::Start)?;
let running = scheduler.start().await?;
running.trigger("batch-a").await?;
running.trigger("batch-b").await?;
// ...wait for both flows to finish, then reduce across all batch summaries in `results`.

The full runnable program is examples/scheduler_mapreduce_books.rscargo run --example scheduler_mapreduce_books --features scheduler. It downloads several books from Project Gutenberg, splits download + analysis across parallel tasks within each batch workflow, runs multiple batch workflows concurrently, and reduces all results into global rankings. See also examples/scheduler_book_prepositions.rs for the single-workflow variant.