Scheduler

Automate your workflows with flexible scheduling and concurrency.

The Scheduler provides workflow scheduling capabilities for background jobs and automated workflows. It supports intervals, cron expressions, and manual triggers.

Scheduling Strategies

Interval

Run workflows at fixed time intervals.

scheduler.every_seconds(...)

Cron

Run workflows based on cron expressions.

scheduler.cron(..., "0 0 9 * * *")

Manual

Trigger workflows on-demand via API.

scheduler.manual(...)

Scheduling Strategy Examples

The Scheduler supports multiple scheduling strategies. Here are complete examples for each.

1. Interval Scheduling - Fixed Time Intervals

Run workflows at regular time intervals. Best for periodic tasks like health checks or data syncing.

gantt title Interval Scheduling (Every 30 seconds) dateFormat ss axisFormat %Ss section Workflow Run 1 :0, 2s Wait :2, 28s Run 2 :30, 2s Wait :32, 28s Run 3 :60, 2s
use cano::prelude::*;
use async_trait::async_trait;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State { Start, Complete }

#[derive(Clone)]
struct HealthCheckTask;

#[async_trait]
impl Task for HealthCheckTask {
    async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
        println!("Running health check...");
        
        // Check system health
        let status = "healthy".to_string();
        store.put("last_health_check", status)?;
        
        Ok(TaskResult::Single(State::Complete))
    }
}

#[tokio::main]
async fn main() -> Result<(), CanoError> {
    let mut scheduler = Scheduler::new();
    let store = MemoryStore::new();

    let workflow = Workflow::new(store.clone())
        .register(State::Start, HealthCheckTask)
        .add_exit_state(State::Complete);

    // Run every 30 seconds
    scheduler.every_seconds("health_check", workflow, State::Start, 30)?;

    scheduler.start().await?;
    Ok(())
}

2. Cron Scheduling - Time-Based Expressions

Run workflows based on cron expressions. Perfect for scheduled reports, backups, or time-specific tasks.

gantt title Cron Scheduling (Daily at 9 AM and 6 PM) dateFormat HH axisFormat %H:00 section Workflow Run 1 :09, 1h Run 2 :18, 1h %% Add empty space to ensure full visibility Space :20, 0h
use cano::prelude::*;
use async_trait::async_trait;
use chrono::Utc;

#[derive(Clone)]
struct DailyReportNode {
    report_type: String,
}

#[async_trait]
impl Node for DailyReportNode {
    type PrepResult = Vec;
    type ExecResult = String;

    async fn prep(&self, store: &MemoryStore) -> Result {
        println!("📊 Preparing {} report...", self.report_type);
        
        // Load data for report
        let data = vec!["metric1".to_string(), "metric2".to_string(), "metric3".to_string()];
        store.put("report_start", Utc::now().to_rfc3339())?;
        
        Ok(data)
    }

    async fn exec(&self, data: Self::PrepResult) -> Self::ExecResult {
        println!("📊 Generating report with {} records", data.len());
        
        // Generate report
        format!("{} report: {} records processed", self.report_type, data.len())
    }

    async fn post(&self, store: &MemoryStore, result: Self::ExecResult) -> Result {
        println!("📊 Report completed: {}", result);
        store.put("last_report", result)?;
        
        Ok(State::Complete)
    }
}

#[tokio::main]
async fn main() -> Result<(), CanoError> {
    let mut scheduler = Scheduler::new();
    let store = MemoryStore::new();

    // Morning report workflow
    let morning_report = Workflow::new(store.clone())
        .register(State::Start, DailyReportNode { 
            report_type: "Morning".to_string() 
        })
        .add_exit_state(State::Complete);

    // Evening report workflow
    let evening_report = Workflow::new(store.clone())
        .register(State::Start, DailyReportNode { 
            report_type: "Evening".to_string() 
        })
        .add_exit_state(State::Complete);

    // Run daily at 9 AM: "0 0 9 * * *"
    scheduler.cron("morning_report", morning_report, State::Start, "0 0 9 * * *")?;
    
    // Run daily at 6 PM: "0 0 18 * * *"
    scheduler.cron("evening_report", evening_report, State::Start, "0 0 18 * * *")?;

    scheduler.start().await?;
    Ok(())
}

3. Manual Triggering - On-Demand Execution

Trigger workflows manually via API. Ideal for user-initiated tasks or event-driven processing.

sequenceDiagram participant API as API Request participant S as Scheduler participant W as Workflow API->>S: trigger("data_export") S->>W: Start Workflow W-->>S: Complete S-->>API: Success
use cano::prelude::*;

#[derive(Clone)]
struct DataExportTask;

#[async_trait]
impl Task for DataExportTask {
    async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
        println!("Starting data export...");
        
        // Export data to CSV
        let export_path = "/tmp/export.csv".to_string();
        store.put("export_path", export_path)?;
        
        println!("Export completed");
        Ok(TaskResult::Single(State::Complete))
    }
}

#[tokio::main]
async fn main() -> Result<(), CanoError> {
    let mut scheduler = Scheduler::new();
    let store = MemoryStore::new();

    let export_workflow = Workflow::new(store.clone())
        .register(State::Start, DataExportTask)
        .add_exit_state(State::Complete);

    // Register as manual-only workflow
    scheduler.manual("data_export", export_workflow, State::Start)?;

    // Start scheduler in background
    let scheduler_handle = scheduler.clone();
    tokio::spawn(async move {
        scheduler_handle.start().await.unwrap();
    });

    // Trigger manually when needed
    println!("Triggering export...");
    scheduler.trigger("data_export").await?;
    
    // Can be triggered again later
    tokio::time::sleep(Duration::from_secs(5)).await;
    scheduler.trigger("data_export").await?;

    scheduler.stop().await?;
    Ok(())
}

4. Mixed Scheduling - Combining Strategies

Use multiple scheduling strategies together for complex automation scenarios.

gantt title Mixed Scheduling Strategies dateFormat HH:mm axisFormat %H:%M section Interval Tasks Sync Every 5min :00:00, 24h section Cron Tasks Daily Backup :03:00, 1h Weekly Report :09:00, 1h section Manual Tasks Emergency Export :done, 14:30, 15m
use cano::prelude::*;

#[tokio::main]
async fn main() -> Result<(), CanoError> {
    let mut scheduler = Scheduler::new();
    let store = MemoryStore::new();

    // Define simple tasks
    #[derive(Clone)]
    struct DataSyncTask;

    #[async_trait]
    impl Task for DataSyncTask {
        async fn run(&self, _store: &MemoryStore) -> Result, CanoError> {
            println!("Syncing data...");
            Ok(TaskResult::Single(State::Complete))
        }
    }
    
    #[derive(Clone)]
    struct BackupTask;
    #[async_trait]
    impl Task for BackupTask {
        async fn run(&self, _store: &MemoryStore) -> Result, CanoError> {
            println!("Running backup...");
            Ok(TaskResult::Single(State::Complete))
        }
    }
    
    #[derive(Clone)]
    struct WeeklyReportTask;

    #[async_trait]
    impl Task for WeeklyReportTask {
        async fn run(&self, _store: &MemoryStore) -> Result, CanoError> {
            println!("Generating weekly report...");
            Ok(TaskResult::Single(State::Complete))
        }
    }
    
    #[derive(Clone)]
    struct EmergencyExportTask;
    
    #[async_trait]
    impl Task for EmergencyExportTask {
        async fn run(&self, _store: &MemoryStore) -> Result, CanoError> {
            println!("Emergency export...");
            Ok(TaskResult::Single(State::Complete))
        }
    }

    // 1. Interval: Data sync every 5 minutes
    let sync_workflow = Workflow::new(store.clone())
        .register(State::Start, DataSyncTask)
        .add_exit_state(State::Complete);
    
    scheduler.every_seconds("data_sync", sync_workflow, State::Start, 300)?;

    // 2. Cron: Daily backup at 3 AM
    let backup_workflow = Workflow::new(store.clone())
        .register(State::Start, BackupTask)
        .add_exit_state(State::Complete);
    
    scheduler.cron("daily_backup", backup_workflow, State::Start, "0 0 3 * * *")?;

    // 3. Cron: Weekly report on Mondays at 9 AM
    let report_workflow = Workflow::new(store.clone())
        .register(State::Start, WeeklyReportTask)
        .add_exit_state(State::Complete);
    
    scheduler.cron("weekly_report", report_workflow, State::Start, "0 0 9 * * MON")?;

    // 4. Manual: Emergency data export
    let export_workflow = Workflow::new(store.clone())
        .register(State::Start, EmergencyExportTask)
        .add_exit_state(State::Complete);
    
    scheduler.manual("emergency_export", export_workflow, State::Start)?;

    // Start scheduler
    let scheduler_handle = scheduler.clone();
    tokio::spawn(async move {
        scheduler_handle.start().await.unwrap();
    });

    // Monitor and trigger as needed
    loop {
        tokio::time::sleep(Duration::from_secs(60)).await;
        
        // Check status of all workflows
        let workflows = scheduler.list().await;
        for info in workflows {
            println!("{}: {:?} (runs: {})", info.id, info.status, info.run_count);
        }
        
        // Example: Trigger emergency export if needed based on some condition
        // scheduler.trigger("emergency_export").await?;
    }
}

Concurrent Workflows

use cano::prelude::*;
use async_trait::async_trait;
use chrono::Utc;
use std::time::Duration;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum WorkflowAction {
    Start,
    Complete,
    Error,
}

#[derive(Clone)]
struct ReportNode {
    report_type: String,
}

impl ReportNode {
    fn new(report_type: &str) -> Self {
        Self { report_type: report_type.to_string() }
    }
}

#[async_trait]
impl Node for ReportNode {
    type PrepResult = String;
    type ExecResult = String;

    async fn prep(&self, store: &MemoryStore) -> Result {
        println!("📊 Preparing {} report...", self.report_type);
        store.put("report_start_time", Utc::now().to_rfc3339())?;
        Ok(format!("Preparing {} report", self.report_type))
    }

    async fn exec(&self, _prep_result: Self::PrepResult) -> Self::ExecResult {
        // Simulate work
        tokio::time::sleep(Duration::from_millis(500)).await;
        format!("{} report generated", self.report_type)
    }

    async fn post(&self, store: &MemoryStore, exec_result: Self::ExecResult) -> Result {
        println!("📊 Report completed: {}", exec_result);
        store.put("report_result", exec_result)?;
        Ok(WorkflowAction::Complete)
    }
}

#[tokio::main]
async fn main() -> Result<(), CanoError> {
    let store = MemoryStore::new();
    let mut scheduler = Scheduler::new();

    // Define workflows
    let hourly_report = Workflow::new(store.clone())
        .register(WorkflowAction::Start, ReportNode::new("Hourly"))
        .add_exit_states(vec![WorkflowAction::Complete, WorkflowAction::Error]);

    let daily_cleanup = Workflow::new(store.clone())
        .register(WorkflowAction::Start, ReportNode::new("Cleanup")) // Reusing node for demo
        .add_exit_states(vec![WorkflowAction::Complete, WorkflowAction::Error]);

    // 1. Interval: Run every 30 seconds
    scheduler.every_seconds("hourly_report", hourly_report.clone(), WorkflowAction::Start, 30)?;

    // 2. Cron: Run daily at 9 AM
    scheduler.cron("daily_cleanup", daily_cleanup, WorkflowAction::Start, "0 0 9 * * *")?;

    // 3. Manual: Only run when triggered
    scheduler.manual("on_demand_report", hourly_report.clone(), WorkflowAction::Start)?;

    // Start Scheduler in background
    let scheduler_handle = scheduler.clone();
    tokio::spawn(async move {
        scheduler_handle.start().await.unwrap();
    });

    // Trigger manual job
    println!("Triggering manual report...");
    scheduler.trigger("on_demand_report").await?;

    // Keep running for a bit
    tokio::time::sleep(Duration::from_secs(60)).await;
    
    // Graceful shutdown
    scheduler.stop().await?;
    
    Ok(())
}

Concurrent Workflows

Execute multiple instances of the same workflow in parallel. This is useful for high-throughput processing where you don't want one long-running job to block others.

How Concurrent Workflows Work

gantt title Concurrent Execution (Max 3, Trigger Every 2s) dateFormat ss axisFormat %Ss section Instance 1 Execute :0, 5s section Instance 2 Execute :2, 5s section Instance 3 Execute :4, 5s section Waiting Wait (slots full) :6, 1s section Instance 4 Execute :7, 5s

Wait Strategy Examples

Control how the scheduler behaves when concurrency limit is reached.

1. WaitForever - Queue and Wait

Waits indefinitely until a slot becomes available. Ensures all scheduled executions eventually run.

sequenceDiagram participant S as Scheduler participant Q as Queue participant W as Workflow S->>Q: Check slots (Full) Note over Q: Wait for slot... W->>Q: Slot available Q->>W: Start execution
use cano::prelude::*;

#[derive(Clone)]
struct BatchProcessor { batch_id: usize }

#[async_trait]
impl Task for BatchProcessor {
    async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
        println!("Processing batch {}...", self.batch_id);
        
        // Simulate long processing
        tokio::time::sleep(Duration::from_secs(5)).await;
        
        store.put(&format!("batch_{}_result", self.batch_id), "completed")?;
        Ok(TaskResult::Single(State::Complete))
    }
}

#[tokio::main]
async fn main() -> Result<(), CanoError> {
    let mut scheduler = Scheduler::new();
    let store = MemoryStore::new();

    let mut concurrent_workflow = ConcurrentWorkflow::new(State::Start);
    concurrent_workflow.register(State::Start, BatchProcessor { batch_id: 1 });
    concurrent_workflow.add_exit_state(State::Complete);

    // WaitForever: Queue all executions
    scheduler.every_seconds_concurrent(
        "batch_processor",
        concurrent_workflow,
        State::Start,
        1,  // Trigger every 1 second
        3,  // Max 3 concurrent
        WaitStrategy::WaitForever  // Queue and wait
    )?;

    scheduler.start().await?;
    Ok(())
}

2. WaitForQuota(n) - Wait for N Slots

Waits until a specific number of slots are available. Useful for batch operations requiring multiple workers.

sequenceDiagram participant S as Scheduler participant Q as Queue participant W as Workers S->>Q: Check slots (1 available) Note over Q: Need 2 slots, wait... W->>Q: Slot available (2 total) Q->>W: Start with 2 slots
use cano::prelude::*;

#[derive(Clone)]
struct MultiSlotTask;

#[async_trait]
impl Task for MultiSlotTask {
    async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
        println!("Running multi-slot task...");
        
        // Task requires multiple workers/resources
        tokio::time::sleep(Duration::from_secs(3)).await;
        
        Ok(TaskResult::Single(State::Complete))
    }
}

#[tokio::main]
async fn main() -> Result<(), CanoError> {
    let mut scheduler = Scheduler::new();
    let store = MemoryStore::new();

    let mut concurrent_workflow = ConcurrentWorkflow::new(State::Start);
    concurrent_workflow.register(State::Start, MultiSlotTask);
    concurrent_workflow.add_exit_state(State::Complete);

    // WaitForQuota(2): Wait until at least 2 slots available
    scheduler.every_seconds_concurrent(
        "multi_slot_task",
        concurrent_workflow,
        State::Start,
        5,  // Trigger every 5 seconds
        10, // Max 10 concurrent
        WaitStrategy::WaitForQuota(2)  // Need 2 free slots to start
    )?;

    scheduler.start().await?;
    Ok(())
}

3. NoWait - Skip if Busy

Skips execution if no slots are available. Best for non-critical periodic tasks.

sequenceDiagram participant S as Scheduler participant Q as Queue participant W as Workflow S->>Q: Check slots (Full) Note over Q: No slots available Q-->>S: Skip execution Note over S: Continue scheduling
use cano::prelude::*;

#[derive(Clone)]
struct MetricsCollector;

#[async_trait]
impl Task for MetricsCollector {
    async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
        println!("Collecting metrics...");
        
        // Collect current metrics
        let metrics = collect_system_metrics().await?;
        store.put("latest_metrics", metrics)?;
        
        tokio::time::sleep(Duration::from_millis(500)).await;
        Ok(TaskResult::Single(State::Complete))
    }
}

#[tokio::main]
async fn main() -> Result<(), CanoError> {
    let mut scheduler = Scheduler::new();
    let store = MemoryStore::new();

    let mut concurrent_workflow = ConcurrentWorkflow::new(State::Start);
    concurrent_workflow.register(State::Start, MetricsCollector);
    concurrent_workflow.add_exit_state(State::Complete);

    // NoWait: Skip if busy (OK to miss some metric collections)
    scheduler.every_seconds_concurrent(
        "metrics_collector",
        concurrent_workflow,
        State::Start,
        1,  // Try every 1 second
        5,  // Max 5 concurrent
        WaitStrategy::NoWait  // Skip if all slots busy
    )?;

    println!("Starting metrics collection...");
    println!("Note: Will skip collections when system is busy");
    scheduler.start().await?;
    Ok(())
}

Comparison of Wait Strategies

Strategy Behavior When Full Best Use Case
WaitForever Queue and wait indefinitely Critical tasks that must run
WaitForQuota(n) Wait for N slots to be free Batch operations needing multiple workers
NoWait Skip this execution Optional periodic tasks

Graceful Shutdown

The scheduler supports graceful shutdown, allowing currently running workflows to complete before stopping.

// Stop scheduler but allow running flows to finish
scheduler.stop().await?;