Workflows

Orchestrate complex processes with state machine semantics.

Workflows in Cano are state machines. You define a set of states (usually an enum) and register a Task or Node for each state. The workflow engine manages the transitions between these states until an exit state is reached.

Defining States

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum OrderState {
    Start,
    Validate,
    Process,
    Complete,
    Failed,
}

Building a Workflow

graph TD A[Start] --> B[Validate] B -->|Valid| C[Process] B -->|Invalid| D[Failed] C --> E[Complete]

Linear Workflow Example

use cano::prelude::*;
use async_trait::async_trait;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum OrderState {
    Start,
    Validate,
    Process,
    Complete,
    Failed,
}

// Define simple tasks (omitted for brevity, see Tasks page)
struct ValidateTask; 
struct ProcessTask;

#[tokio::main]
async fn main() -> Result<(), CanoError> {
    let store = MemoryStore::new();

    // Create Workflow
    let mut workflow = Workflow::new(store.clone());
    
    workflow
        // 1. Register Tasks for each state
        .register(OrderState::Start, |_: &MemoryStore| async {
            Ok(TaskResult::Single(OrderState::Validate))
        })
        .register(OrderState::Validate, ValidateTask)
        .register(OrderState::Process, ProcessTask)
        
        // 2. Define Exit States (Workflow stops here)
        .add_exit_states(vec![OrderState::Complete, OrderState::Failed]);

    // 3. Execute
    let result = workflow.orchestrate(OrderState::Start).await?;
    
    println!("Final State: {:?}", result);
    Ok(())
}

Split/Join Workflows

Execute multiple tasks in parallel and control how they proceed using flexible join strategies. This is essential for scatter-gather patterns, redundant API calls, and performance optimization.

graph TD A[Process State] -->|Split| B[Task 1] A -->|Split| C[Task 2] A -->|Split| D[Task 3] B --> E{Join Strategy} C --> E D --> E E -->|Satisfied| F[Aggregate State] E -->|Failed/Timeout| G[Error State]

Join Strategies

Cano provides several strategies to control how parallel tasks are aggregated.

All

Wait for all tasks to complete successfully.

JoinStrategy::All

Any

Proceed after the first task completes successfully.

JoinStrategy::Any

Quorum(n)

Wait for n tasks to complete successfully.

JoinStrategy::Quorum(2)

Percentage(p)

Wait for p% of tasks to complete successfully.

JoinStrategy::Percentage(0.5)

PartialResults(n)

Proceed after n tasks complete successfully.

JoinStrategy::PartialResults(3)

Complete Example

Here is a complete, runnable example demonstrating how to use Split/Join with different strategies.

use cano::prelude::*;
use async_trait::async_trait;
use std::time::Duration;

// 1. Define Workflow State
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum DataState {
    Start,
    LoadData,
    ParallelProcessing,
    Aggregate,
    Complete,
}

// 2. Task to load initial data
#[derive(Clone)]
struct DataLoader;

#[async_trait]
impl Task for DataLoader {
    async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
        println!("Loading initial data...");
        
        // Load some data to process
        let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        store.put("input_data", data)?;
        
        println!("Data loaded: 10 numbers");
        Ok(TaskResult::Single(DataState::ParallelProcessing))
    }
}

// 3. Parallel processing task
#[derive(Clone)]
struct ProcessorTask {
    task_id: usize,
}

impl ProcessorTask {
    fn new(task_id: usize) -> Self {
        Self { task_id }
    }
}

#[async_trait]
impl Task for ProcessorTask {
    async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
        println!("Processor {} starting...", self.task_id);
        
        // Get input data
        let data: Vec = store.get("input_data")?;
        
        // Simulate processing time
        tokio::time::sleep(Duration::from_millis(100 * self.task_id as u64)).await;
        
        // Process data (simple example: multiply by task_id)
        let result: i32 = data.iter().map(|&x| x * self.task_id as i32).sum();
        
        // Store individual result
        store.put(&format!("result_{}", self.task_id), result)?;
        
        println!("Processor {} completed with result: {}", self.task_id, result);
        Ok(TaskResult::Single(DataState::Aggregate))
    }
}

// 4. Aggregator task
#[derive(Clone)]
struct Aggregator;

#[async_trait]
impl Task for Aggregator {
    async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
        println!("Aggregating results...");
        
        // Collect all results
        let mut total = 0;
        let mut count = 0;
        
        for i in 1..=3 {
            if let Ok(result) = store.get::(&format!("result_{}", i)) {
                total += result;
                count += 1;
            }
        }
        
        store.put("final_result", total)?;
        store.put("processor_count", count)?;
        
        println!("Aggregation complete: {} processors, total: {}", count, total);
        Ok(TaskResult::Single(DataState::Complete))
    }
}

#[tokio::main]
async fn main() -> Result<(), CanoError> {
    let store = MemoryStore::new();
    
    // Define tasks to run in parallel
    let processors = vec![
        ProcessorTask::new(1),
        ProcessorTask::new(2),
        ProcessorTask::new(3),
    ];

    // Configure Join Strategy: Wait for ALL tasks
    let join_config = JoinConfig::new(
        JoinStrategy::All,
        DataState::Aggregate
    ).with_timeout(Duration::from_secs(5));

    // Build Workflow
    let workflow = Workflow::new(store.clone())
        .register(DataState::Start, DataLoader)
        .register_split(
            DataState::ParallelProcessing,
            processors,
            join_config
        )
        .register(DataState::Aggregate, Aggregator)
        .add_exit_state(DataState::Complete);

    // Run Workflow
    let result = workflow.orchestrate(DataState::Start).await?;
    
    let final_result: i32 = store.get("final_result")?;
    println!("Workflow completed: {:?}", result);
    println!("Final result: {}", final_result);
    
    Ok(())
}

Join Strategy Examples

Each strategy handles parallel task completion differently. Here are detailed examples for each strategy.

1. All Strategy - Wait for All Tasks

Waits for all tasks to complete successfully. Fails if any task fails.

sequenceDiagram participant W as Workflow participant T1 as Task 1 participant T2 as Task 2 participant T3 as Task 3 W->>T1: Start W->>T2: Start W->>T3: Start T1-->>W: Complete ✓ T2-->>W: Complete ✓ T3-->>W: Complete ✓ Note over W: All Complete → Proceed
// All Strategy: Best for workflows requiring complete data
let join_config = JoinConfig::new(
    JoinStrategy::All, 
    DataState::Aggregate
).with_timeout(Duration::from_secs(10));

let workflow = Workflow::new(store.clone())
    .register(DataState::Start, DataLoader)
    .register_split(
        DataState::ParallelProcessing,
        vec![ProcessorTask::new(1), ProcessorTask::new(2), ProcessorTask::new(3)],
        join_config
    )
    .register(DataState::Aggregate, Aggregator)
    .add_exit_state(DataState::Complete);

2. Any Strategy - First to Complete

Proceeds as soon as the first task completes successfully. Other tasks are cancelled.

sequenceDiagram participant W as Workflow participant T1 as Task 1 (slow) participant T2 as Task 2 (fast) participant T3 as Task 3 (slow) W->>T1: Start W->>T2: Start W->>T3: Start T2-->>W: Complete ✓ Note over W: First Complete → Proceed W->>T1: Cancel W->>T3: Cancel
// Any Strategy: Best for redundant API calls or fastest-wins scenarios
let join_config = JoinConfig::new(
    JoinStrategy::Any, 
    DataState::Complete  // Skip aggregation, proceed directly
);

// Example: Call 3 different data sources, use whoever responds first
let workflow = Workflow::new(store.clone())
    .register(DataState::Start, DataLoader)
    .register_split(
        DataState::ParallelProcessing,
        vec![
            ApiCallTask::new("provider1"),
            ApiCallTask::new("provider2"),
            ApiCallTask::new("provider3"),
        ],
        join_config
    )
    .add_exit_state(DataState::Complete);

3. Quorum Strategy - Wait for N Tasks

Proceeds after a specific number of tasks complete successfully. Useful for consensus systems.

sequenceDiagram participant W as Workflow participant T1 as Task 1 participant T2 as Task 2 participant T3 as Task 3 participant T4 as Task 4 W->>T1: Start W->>T2: Start W->>T3: Start W->>T4: Start T1-->>W: Complete ✓ T3-->>W: Complete ✓ T2-->>W: Complete ✓ Note over W: Quorum (3/4) Met → Proceed W->>T4: Cancel
// Quorum Strategy: Best for distributed consensus or majority voting
let join_config = JoinConfig::new(
    JoinStrategy::Quorum(3),  // Need 3 out of 5 to succeed
    DataState::Aggregate
).with_timeout(Duration::from_secs(5));

// Example: Write to 5 replicas, succeed when 3 confirm
let workflow = Workflow::new(store.clone())
    .register(DataState::Start, PrepareData)
    .register_split(
        DataState::ParallelProcessing,
        vec![
            WriteReplica::new(1),
            WriteReplica::new(2),
            WriteReplica::new(3),
            WriteReplica::new(4),
            WriteReplica::new(5),
        ],
        join_config
    )
    .register(DataState::Aggregate, ConfirmWrite)
    .add_exit_state(DataState::Complete);

4. Percentage Strategy - Wait for % of Tasks

Proceeds after a percentage of tasks complete successfully. Flexible for varying batch sizes.

sequenceDiagram participant W as Workflow participant T1 as Task 1 participant T2 as Task 2 participant T3 as Task 3 participant T4 as Task 4 W->>T1: Start W->>T2: Start W->>T3: Start W->>T4: Start T1-->>W: Complete ✓ T2-->>W: Complete ✓ T4-->>W: Complete ✓ Note over W: 75% (3/4) Met → Proceed W->>T3: Cancel
// Percentage Strategy: Best for batch processing with acceptable partial results
let join_config = JoinConfig::new(
    JoinStrategy::Percentage(0.75),  // Need 75% to succeed
    DataState::Aggregate
).with_timeout(Duration::from_secs(10));

// Example: Process 100 records, proceed when 75 complete
let mut tasks = Vec::new();
for i in 0..100 {
    tasks.push(RecordProcessor::new(i));
}

let workflow = Workflow::new(store.clone())
    .register(DataState::Start, LoadRecords)
    .register_split(
        DataState::ParallelProcessing,
        tasks,
        join_config
    )
    .register(DataState::Aggregate, SummarizeResults)
    .add_exit_state(DataState::Complete);

5. PartialResults Strategy - Accept Partial Completion

Proceeds after N tasks complete successfully, cancels remaining tasks. Tracks all outcomes.

sequenceDiagram participant W as Workflow participant T1 as Task 1 participant T2 as Task 2 participant T3 as Task 3 participant T4 as Task 4 W->>T1: Start W->>T2: Start W->>T3: Start W->>T4: Start T1-->>W: Complete ✓ T2-->>W: Failed ✗ T3-->>W: Complete ✓ Note over W: 2 Successes → Proceed W->>T4: Cancel Note over W: Track: 2 success, 1 error, 1 cancelled
// PartialResults Strategy: Best for fault-tolerant systems with latency optimization
let join_config = JoinConfig::new(
    JoinStrategy::PartialResults(2),  // Proceed after any 2 succeed
    DataState::Aggregate
)
.with_timeout(Duration::from_secs(5))
.with_store_partial_results(true);  // Store detailed results

// Example: Call multiple services, use fastest 3 responses
let workflow = Workflow::new(store.clone())
    .register(DataState::Start, PrepareRequest)
    .register_split(
        DataState::ParallelProcessing,
        vec![
            ServiceCall::new("fast-service"),
            ServiceCall::new("medium-service"),
            ServiceCall::new("slow-service"),
            ServiceCall::new("backup-service"),
        ],
        join_config
    )
    .register(DataState::Aggregate, MergePartialResults)
    .add_exit_state(DataState::Complete);

// After execution, check stored results
let split_result: SplitResult = store.get("split_results")?;
println!("Successes: {}", split_result.successes.len());
println!("Errors: {}", split_result.errors.len());
println!("Cancelled: {}", split_result.cancelled.len());

6. PartialTimeout Strategy - Deadline-Based Completion

Accepts whatever completes before timeout expires. Proceeds with available results.

sequenceDiagram participant W as Workflow participant T1 as Task 1 participant T2 as Task 2 participant T3 as Task 3 participant T4 as Task 4 W->>T1: Start W->>T2: Start W->>T3: Start W->>T4: Start T1-->>W: Complete ✓ T3-->>W: Complete ✓ Note over W: Timeout Reached W->>T2: Cancel W->>T4: Cancel Note over W: Proceed with 2 results
// PartialTimeout Strategy: Best for real-time systems with strict SLAs
let join_config = JoinConfig::new(
    JoinStrategy::PartialTimeout,  // Must specify timeout
    DataState::Aggregate
)
.with_timeout(Duration::from_millis(500))  // 500ms deadline
.with_store_partial_results(true);

// Example: Real-time recommendation system with 500ms SLA
let workflow = Workflow::new(store.clone())
    .register(DataState::Start, LoadUserContext)
    .register_split(
        DataState::ParallelProcessing,
        vec![
            RecommendationEngine::new("collaborative"),
            RecommendationEngine::new("content-based"),
            RecommendationEngine::new("trending"),
            RecommendationEngine::new("personalized"),
        ],
        join_config
    )
    .register(DataState::Aggregate, |store: &MemoryStore| async move {
        // Aggregate whatever results we got within deadline
        let split_result: SplitResult = store.get("split_results")?;
        println!("Got {} recommendations within SLA", split_result.successes.len());
        Ok(TaskResult::Single(DataState::Complete))
    })
    .add_exit_state(DataState::Complete);

Comparison Table

Strategy Trigger Condition Cancels Others Best Use Case
All All tasks succeed No Complete data required
Any First success Yes Redundant API calls
Quorum(n) N tasks succeed Yes Distributed consensus
Percentage(p) P% succeed Yes Batch processing
PartialResults(n) N tasks succeed Yes Latency optimization
PartialTimeout Timeout reached Yes Strict SLA requirements