Scheduler
Automate your workflows with flexible scheduling and concurrency.
The Scheduler provides workflow scheduling capabilities for background jobs and automated workflows. It supports intervals, cron expressions, and manual triggers.
Scheduling Strategies
Interval
Run workflows at fixed time intervals.
scheduler.every_seconds(...)
Cron
Run workflows based on cron expressions.
scheduler.cron(..., "0 0 9 * * *")
Manual
Trigger workflows on-demand via API.
scheduler.manual(...)
Scheduling Strategy Examples
The Scheduler supports multiple scheduling strategies. Here are complete examples for each.
1. Interval Scheduling - Fixed Time Intervals
Run workflows at regular time intervals. Best for periodic tasks like health checks or data syncing.
use cano::prelude::*;
use async_trait::async_trait;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State { Start, Complete }
#[derive(Clone)]
struct HealthCheckTask;
#[async_trait]
impl Task for HealthCheckTask {
async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
println!("Running health check...");
// Check system health
let status = "healthy".to_string();
store.put("last_health_check", status)?;
Ok(TaskResult::Single(State::Complete))
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let mut scheduler = Scheduler::new();
let store = MemoryStore::new();
let workflow = Workflow::new(store.clone())
.register(State::Start, HealthCheckTask)
.add_exit_state(State::Complete);
// Run every 30 seconds
scheduler.every_seconds("health_check", workflow, State::Start, 30)?;
scheduler.start().await?;
Ok(())
}
2. Cron Scheduling - Time-Based Expressions
Run workflows based on cron expressions. Perfect for scheduled reports, backups, or time-specific tasks.
use cano::prelude::*;
use async_trait::async_trait;
use chrono::Utc;
#[derive(Clone)]
struct DailyReportNode {
report_type: String,
}
#[async_trait]
impl Node for DailyReportNode {
type PrepResult = Vec;
type ExecResult = String;
async fn prep(&self, store: &MemoryStore) -> Result {
println!("📊 Preparing {} report...", self.report_type);
// Load data for report
let data = vec!["metric1".to_string(), "metric2".to_string(), "metric3".to_string()];
store.put("report_start", Utc::now().to_rfc3339())?;
Ok(data)
}
async fn exec(&self, data: Self::PrepResult) -> Self::ExecResult {
println!("📊 Generating report with {} records", data.len());
// Generate report
format!("{} report: {} records processed", self.report_type, data.len())
}
async fn post(&self, store: &MemoryStore, result: Self::ExecResult) -> Result {
println!("📊 Report completed: {}", result);
store.put("last_report", result)?;
Ok(State::Complete)
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let mut scheduler = Scheduler::new();
let store = MemoryStore::new();
// Morning report workflow
let morning_report = Workflow::new(store.clone())
.register(State::Start, DailyReportNode {
report_type: "Morning".to_string()
})
.add_exit_state(State::Complete);
// Evening report workflow
let evening_report = Workflow::new(store.clone())
.register(State::Start, DailyReportNode {
report_type: "Evening".to_string()
})
.add_exit_state(State::Complete);
// Run daily at 9 AM: "0 0 9 * * *"
scheduler.cron("morning_report", morning_report, State::Start, "0 0 9 * * *")?;
// Run daily at 6 PM: "0 0 18 * * *"
scheduler.cron("evening_report", evening_report, State::Start, "0 0 18 * * *")?;
scheduler.start().await?;
Ok(())
}
3. Manual Triggering - On-Demand Execution
Trigger workflows manually via API. Ideal for user-initiated tasks or event-driven processing.
use cano::prelude::*;
#[derive(Clone)]
struct DataExportTask;
#[async_trait]
impl Task for DataExportTask {
async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
println!("Starting data export...");
// Export data to CSV
let export_path = "/tmp/export.csv".to_string();
store.put("export_path", export_path)?;
println!("Export completed");
Ok(TaskResult::Single(State::Complete))
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let mut scheduler = Scheduler::new();
let store = MemoryStore::new();
let export_workflow = Workflow::new(store.clone())
.register(State::Start, DataExportTask)
.add_exit_state(State::Complete);
// Register as manual-only workflow
scheduler.manual("data_export", export_workflow, State::Start)?;
// Start scheduler in background
let scheduler_handle = scheduler.clone();
tokio::spawn(async move {
scheduler_handle.start().await.unwrap();
});
// Trigger manually when needed
println!("Triggering export...");
scheduler.trigger("data_export").await?;
// Can be triggered again later
tokio::time::sleep(Duration::from_secs(5)).await;
scheduler.trigger("data_export").await?;
scheduler.stop().await?;
Ok(())
}
4. Mixed Scheduling - Combining Strategies
Use multiple scheduling strategies together for complex automation scenarios.
use cano::prelude::*;
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let mut scheduler = Scheduler::new();
let store = MemoryStore::new();
// Define simple tasks
#[derive(Clone)]
struct DataSyncTask;
#[async_trait]
impl Task for DataSyncTask {
async fn run(&self, _store: &MemoryStore) -> Result, CanoError> {
println!("Syncing data...");
Ok(TaskResult::Single(State::Complete))
}
}
#[derive(Clone)]
struct BackupTask;
#[async_trait]
impl Task for BackupTask {
async fn run(&self, _store: &MemoryStore) -> Result, CanoError> {
println!("Running backup...");
Ok(TaskResult::Single(State::Complete))
}
}
#[derive(Clone)]
struct WeeklyReportTask;
#[async_trait]
impl Task for WeeklyReportTask {
async fn run(&self, _store: &MemoryStore) -> Result, CanoError> {
println!("Generating weekly report...");
Ok(TaskResult::Single(State::Complete))
}
}
#[derive(Clone)]
struct EmergencyExportTask;
#[async_trait]
impl Task for EmergencyExportTask {
async fn run(&self, _store: &MemoryStore) -> Result, CanoError> {
println!("Emergency export...");
Ok(TaskResult::Single(State::Complete))
}
}
// 1. Interval: Data sync every 5 minutes
let sync_workflow = Workflow::new(store.clone())
.register(State::Start, DataSyncTask)
.add_exit_state(State::Complete);
scheduler.every_seconds("data_sync", sync_workflow, State::Start, 300)?;
// 2. Cron: Daily backup at 3 AM
let backup_workflow = Workflow::new(store.clone())
.register(State::Start, BackupTask)
.add_exit_state(State::Complete);
scheduler.cron("daily_backup", backup_workflow, State::Start, "0 0 3 * * *")?;
// 3. Cron: Weekly report on Mondays at 9 AM
let report_workflow = Workflow::new(store.clone())
.register(State::Start, WeeklyReportTask)
.add_exit_state(State::Complete);
scheduler.cron("weekly_report", report_workflow, State::Start, "0 0 9 * * MON")?;
// 4. Manual: Emergency data export
let export_workflow = Workflow::new(store.clone())
.register(State::Start, EmergencyExportTask)
.add_exit_state(State::Complete);
scheduler.manual("emergency_export", export_workflow, State::Start)?;
// Start scheduler
let scheduler_handle = scheduler.clone();
tokio::spawn(async move {
scheduler_handle.start().await.unwrap();
});
// Monitor and trigger as needed
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
// Check status of all workflows
let workflows = scheduler.list().await;
for info in workflows {
println!("{}: {:?} (runs: {})", info.id, info.status, info.run_count);
}
// Example: Trigger emergency export if needed based on some condition
// scheduler.trigger("emergency_export").await?;
}
}
Concurrent Workflows
use cano::prelude::*;
use async_trait::async_trait;
use chrono::Utc;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum WorkflowAction {
Start,
Complete,
Error,
}
#[derive(Clone)]
struct ReportNode {
report_type: String,
}
impl ReportNode {
fn new(report_type: &str) -> Self {
Self { report_type: report_type.to_string() }
}
}
#[async_trait]
impl Node for ReportNode {
type PrepResult = String;
type ExecResult = String;
async fn prep(&self, store: &MemoryStore) -> Result {
println!("📊 Preparing {} report...", self.report_type);
store.put("report_start_time", Utc::now().to_rfc3339())?;
Ok(format!("Preparing {} report", self.report_type))
}
async fn exec(&self, _prep_result: Self::PrepResult) -> Self::ExecResult {
// Simulate work
tokio::time::sleep(Duration::from_millis(500)).await;
format!("{} report generated", self.report_type)
}
async fn post(&self, store: &MemoryStore, exec_result: Self::ExecResult) -> Result {
println!("📊 Report completed: {}", exec_result);
store.put("report_result", exec_result)?;
Ok(WorkflowAction::Complete)
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let store = MemoryStore::new();
let mut scheduler = Scheduler::new();
// Define workflows
let hourly_report = Workflow::new(store.clone())
.register(WorkflowAction::Start, ReportNode::new("Hourly"))
.add_exit_states(vec![WorkflowAction::Complete, WorkflowAction::Error]);
let daily_cleanup = Workflow::new(store.clone())
.register(WorkflowAction::Start, ReportNode::new("Cleanup")) // Reusing node for demo
.add_exit_states(vec![WorkflowAction::Complete, WorkflowAction::Error]);
// 1. Interval: Run every 30 seconds
scheduler.every_seconds("hourly_report", hourly_report.clone(), WorkflowAction::Start, 30)?;
// 2. Cron: Run daily at 9 AM
scheduler.cron("daily_cleanup", daily_cleanup, WorkflowAction::Start, "0 0 9 * * *")?;
// 3. Manual: Only run when triggered
scheduler.manual("on_demand_report", hourly_report.clone(), WorkflowAction::Start)?;
// Start Scheduler in background
let scheduler_handle = scheduler.clone();
tokio::spawn(async move {
scheduler_handle.start().await.unwrap();
});
// Trigger manual job
println!("Triggering manual report...");
scheduler.trigger("on_demand_report").await?;
// Keep running for a bit
tokio::time::sleep(Duration::from_secs(60)).await;
// Graceful shutdown
scheduler.stop().await?;
Ok(())
}
Concurrent Workflows
Execute multiple instances of the same workflow in parallel. This is useful for high-throughput processing where you don't want one long-running job to block others.
How Concurrent Workflows Work
Wait Strategy Examples
Control how the scheduler behaves when concurrency limit is reached.
1. WaitForever - Queue and Wait
Waits indefinitely until a slot becomes available. Ensures all scheduled executions eventually run.
use cano::prelude::*;
#[derive(Clone)]
struct BatchProcessor { batch_id: usize }
#[async_trait]
impl Task for BatchProcessor {
async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
println!("Processing batch {}...", self.batch_id);
// Simulate long processing
tokio::time::sleep(Duration::from_secs(5)).await;
store.put(&format!("batch_{}_result", self.batch_id), "completed")?;
Ok(TaskResult::Single(State::Complete))
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let mut scheduler = Scheduler::new();
let store = MemoryStore::new();
let mut concurrent_workflow = ConcurrentWorkflow::new(State::Start);
concurrent_workflow.register(State::Start, BatchProcessor { batch_id: 1 });
concurrent_workflow.add_exit_state(State::Complete);
// WaitForever: Queue all executions
scheduler.every_seconds_concurrent(
"batch_processor",
concurrent_workflow,
State::Start,
1, // Trigger every 1 second
3, // Max 3 concurrent
WaitStrategy::WaitForever // Queue and wait
)?;
scheduler.start().await?;
Ok(())
}
2. WaitForQuota(n) - Wait for N Slots
Waits until a specific number of slots are available. Useful for batch operations requiring multiple workers.
use cano::prelude::*;
#[derive(Clone)]
struct MultiSlotTask;
#[async_trait]
impl Task for MultiSlotTask {
async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
println!("Running multi-slot task...");
// Task requires multiple workers/resources
tokio::time::sleep(Duration::from_secs(3)).await;
Ok(TaskResult::Single(State::Complete))
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let mut scheduler = Scheduler::new();
let store = MemoryStore::new();
let mut concurrent_workflow = ConcurrentWorkflow::new(State::Start);
concurrent_workflow.register(State::Start, MultiSlotTask);
concurrent_workflow.add_exit_state(State::Complete);
// WaitForQuota(2): Wait until at least 2 slots available
scheduler.every_seconds_concurrent(
"multi_slot_task",
concurrent_workflow,
State::Start,
5, // Trigger every 5 seconds
10, // Max 10 concurrent
WaitStrategy::WaitForQuota(2) // Need 2 free slots to start
)?;
scheduler.start().await?;
Ok(())
}
3. NoWait - Skip if Busy
Skips execution if no slots are available. Best for non-critical periodic tasks.
use cano::prelude::*;
#[derive(Clone)]
struct MetricsCollector;
#[async_trait]
impl Task for MetricsCollector {
async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
println!("Collecting metrics...");
// Collect current metrics
let metrics = collect_system_metrics().await?;
store.put("latest_metrics", metrics)?;
tokio::time::sleep(Duration::from_millis(500)).await;
Ok(TaskResult::Single(State::Complete))
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let mut scheduler = Scheduler::new();
let store = MemoryStore::new();
let mut concurrent_workflow = ConcurrentWorkflow::new(State::Start);
concurrent_workflow.register(State::Start, MetricsCollector);
concurrent_workflow.add_exit_state(State::Complete);
// NoWait: Skip if busy (OK to miss some metric collections)
scheduler.every_seconds_concurrent(
"metrics_collector",
concurrent_workflow,
State::Start,
1, // Try every 1 second
5, // Max 5 concurrent
WaitStrategy::NoWait // Skip if all slots busy
)?;
println!("Starting metrics collection...");
println!("Note: Will skip collections when system is busy");
scheduler.start().await?;
Ok(())
}
Comparison of Wait Strategies
Graceful Shutdown
The scheduler supports graceful shutdown, allowing currently running workflows to complete before stopping.
// Stop scheduler but allow running flows to finish
scheduler.stop().await?;