Workflows
Orchestrate complex processes with state machine semantics.
Workflows in Cano are state machines. You define a set of states (usually an enum) and register a Task or Node for each state.
The workflow engine manages the transitions between these states until an exit state is reached.
Defining States
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum OrderState {
Start,
Validate,
Process,
Complete,
Failed,
}
Building a Workflow
Linear Workflow Example
use cano::prelude::*;
use async_trait::async_trait;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum OrderState {
Start,
Validate,
Process,
Complete,
Failed,
}
// Define simple tasks (omitted for brevity, see Tasks page)
struct ValidateTask;
struct ProcessTask;
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let store = MemoryStore::new();
// Create Workflow
let mut workflow = Workflow::new(store.clone());
workflow
// 1. Register Tasks for each state
.register(OrderState::Start, |_: &MemoryStore| async {
Ok(TaskResult::Single(OrderState::Validate))
})
.register(OrderState::Validate, ValidateTask)
.register(OrderState::Process, ProcessTask)
// 2. Define Exit States (Workflow stops here)
.add_exit_states(vec![OrderState::Complete, OrderState::Failed]);
// 3. Execute
let result = workflow.orchestrate(OrderState::Start).await?;
println!("Final State: {:?}", result);
Ok(())
}
Split/Join Workflows
Execute multiple tasks in parallel and control how they proceed using flexible join strategies. This is essential for scatter-gather patterns, redundant API calls, and performance optimization.
Join Strategies
Cano provides several strategies to control how parallel tasks are aggregated.
All
Wait for all tasks to complete successfully.
JoinStrategy::All
Any
Proceed after the first task completes successfully.
JoinStrategy::Any
Quorum(n)
Wait for n tasks to complete successfully.
JoinStrategy::Quorum(2)
Percentage(p)
Wait for p% of tasks to complete successfully.
JoinStrategy::Percentage(0.5)
PartialResults(n)
Proceed after n tasks complete successfully.
JoinStrategy::PartialResults(3)
Complete Example
Here is a complete, runnable example demonstrating how to use Split/Join with different strategies.
use cano::prelude::*;
use async_trait::async_trait;
use std::time::Duration;
// 1. Define Workflow State
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum DataState {
Start,
LoadData,
ParallelProcessing,
Aggregate,
Complete,
}
// 2. Task to load initial data
#[derive(Clone)]
struct DataLoader;
#[async_trait]
impl Task for DataLoader {
async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
println!("Loading initial data...");
// Load some data to process
let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
store.put("input_data", data)?;
println!("Data loaded: 10 numbers");
Ok(TaskResult::Single(DataState::ParallelProcessing))
}
}
// 3. Parallel processing task
#[derive(Clone)]
struct ProcessorTask {
task_id: usize,
}
impl ProcessorTask {
fn new(task_id: usize) -> Self {
Self { task_id }
}
}
#[async_trait]
impl Task for ProcessorTask {
async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
println!("Processor {} starting...", self.task_id);
// Get input data
let data: Vec = store.get("input_data")?;
// Simulate processing time
tokio::time::sleep(Duration::from_millis(100 * self.task_id as u64)).await;
// Process data (simple example: multiply by task_id)
let result: i32 = data.iter().map(|&x| x * self.task_id as i32).sum();
// Store individual result
store.put(&format!("result_{}", self.task_id), result)?;
println!("Processor {} completed with result: {}", self.task_id, result);
Ok(TaskResult::Single(DataState::Aggregate))
}
}
// 4. Aggregator task
#[derive(Clone)]
struct Aggregator;
#[async_trait]
impl Task for Aggregator {
async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
println!("Aggregating results...");
// Collect all results
let mut total = 0;
let mut count = 0;
for i in 1..=3 {
if let Ok(result) = store.get::(&format!("result_{}", i)) {
total += result;
count += 1;
}
}
store.put("final_result", total)?;
store.put("processor_count", count)?;
println!("Aggregation complete: {} processors, total: {}", count, total);
Ok(TaskResult::Single(DataState::Complete))
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let store = MemoryStore::new();
// Define tasks to run in parallel
let processors = vec![
ProcessorTask::new(1),
ProcessorTask::new(2),
ProcessorTask::new(3),
];
// Configure Join Strategy: Wait for ALL tasks
let join_config = JoinConfig::new(
JoinStrategy::All,
DataState::Aggregate
).with_timeout(Duration::from_secs(5));
// Build Workflow
let workflow = Workflow::new(store.clone())
.register(DataState::Start, DataLoader)
.register_split(
DataState::ParallelProcessing,
processors,
join_config
)
.register(DataState::Aggregate, Aggregator)
.add_exit_state(DataState::Complete);
// Run Workflow
let result = workflow.orchestrate(DataState::Start).await?;
let final_result: i32 = store.get("final_result")?;
println!("Workflow completed: {:?}", result);
println!("Final result: {}", final_result);
Ok(())
}
Join Strategy Examples
Each strategy handles parallel task completion differently. Here are detailed examples for each strategy.
1. All Strategy - Wait for All Tasks
Waits for all tasks to complete successfully. Fails if any task fails.
// All Strategy: Best for workflows requiring complete data
let join_config = JoinConfig::new(
JoinStrategy::All,
DataState::Aggregate
).with_timeout(Duration::from_secs(10));
let workflow = Workflow::new(store.clone())
.register(DataState::Start, DataLoader)
.register_split(
DataState::ParallelProcessing,
vec![ProcessorTask::new(1), ProcessorTask::new(2), ProcessorTask::new(3)],
join_config
)
.register(DataState::Aggregate, Aggregator)
.add_exit_state(DataState::Complete);
2. Any Strategy - First to Complete
Proceeds as soon as the first task completes successfully. Other tasks are cancelled.
// Any Strategy: Best for redundant API calls or fastest-wins scenarios
let join_config = JoinConfig::new(
JoinStrategy::Any,
DataState::Complete // Skip aggregation, proceed directly
);
// Example: Call 3 different data sources, use whoever responds first
let workflow = Workflow::new(store.clone())
.register(DataState::Start, DataLoader)
.register_split(
DataState::ParallelProcessing,
vec![
ApiCallTask::new("provider1"),
ApiCallTask::new("provider2"),
ApiCallTask::new("provider3"),
],
join_config
)
.add_exit_state(DataState::Complete);
3. Quorum Strategy - Wait for N Tasks
Proceeds after a specific number of tasks complete successfully. Useful for consensus systems.
// Quorum Strategy: Best for distributed consensus or majority voting
let join_config = JoinConfig::new(
JoinStrategy::Quorum(3), // Need 3 out of 5 to succeed
DataState::Aggregate
).with_timeout(Duration::from_secs(5));
// Example: Write to 5 replicas, succeed when 3 confirm
let workflow = Workflow::new(store.clone())
.register(DataState::Start, PrepareData)
.register_split(
DataState::ParallelProcessing,
vec![
WriteReplica::new(1),
WriteReplica::new(2),
WriteReplica::new(3),
WriteReplica::new(4),
WriteReplica::new(5),
],
join_config
)
.register(DataState::Aggregate, ConfirmWrite)
.add_exit_state(DataState::Complete);
4. Percentage Strategy - Wait for % of Tasks
Proceeds after a percentage of tasks complete successfully. Flexible for varying batch sizes.
// Percentage Strategy: Best for batch processing with acceptable partial results
let join_config = JoinConfig::new(
JoinStrategy::Percentage(0.75), // Need 75% to succeed
DataState::Aggregate
).with_timeout(Duration::from_secs(10));
// Example: Process 100 records, proceed when 75 complete
let mut tasks = Vec::new();
for i in 0..100 {
tasks.push(RecordProcessor::new(i));
}
let workflow = Workflow::new(store.clone())
.register(DataState::Start, LoadRecords)
.register_split(
DataState::ParallelProcessing,
tasks,
join_config
)
.register(DataState::Aggregate, SummarizeResults)
.add_exit_state(DataState::Complete);
5. PartialResults Strategy - Accept Partial Completion
Proceeds after N tasks complete successfully, cancels remaining tasks. Tracks all outcomes.
// PartialResults Strategy: Best for fault-tolerant systems with latency optimization
let join_config = JoinConfig::new(
JoinStrategy::PartialResults(2), // Proceed after any 2 succeed
DataState::Aggregate
)
.with_timeout(Duration::from_secs(5))
.with_store_partial_results(true); // Store detailed results
// Example: Call multiple services, use fastest 3 responses
let workflow = Workflow::new(store.clone())
.register(DataState::Start, PrepareRequest)
.register_split(
DataState::ParallelProcessing,
vec![
ServiceCall::new("fast-service"),
ServiceCall::new("medium-service"),
ServiceCall::new("slow-service"),
ServiceCall::new("backup-service"),
],
join_config
)
.register(DataState::Aggregate, MergePartialResults)
.add_exit_state(DataState::Complete);
// After execution, check stored results
let split_result: SplitResult = store.get("split_results")?;
println!("Successes: {}", split_result.successes.len());
println!("Errors: {}", split_result.errors.len());
println!("Cancelled: {}", split_result.cancelled.len());
6. PartialTimeout Strategy - Deadline-Based Completion
Accepts whatever completes before timeout expires. Proceeds with available results.
// PartialTimeout Strategy: Best for real-time systems with strict SLAs
let join_config = JoinConfig::new(
JoinStrategy::PartialTimeout, // Must specify timeout
DataState::Aggregate
)
.with_timeout(Duration::from_millis(500)) // 500ms deadline
.with_store_partial_results(true);
// Example: Real-time recommendation system with 500ms SLA
let workflow = Workflow::new(store.clone())
.register(DataState::Start, LoadUserContext)
.register_split(
DataState::ParallelProcessing,
vec![
RecommendationEngine::new("collaborative"),
RecommendationEngine::new("content-based"),
RecommendationEngine::new("trending"),
RecommendationEngine::new("personalized"),
],
join_config
)
.register(DataState::Aggregate, |store: &MemoryStore| async move {
// Aggregate whatever results we got within deadline
let split_result: SplitResult = store.get("split_results")?;
println!("Got {} recommendations within SLA", split_result.successes.len());
Ok(TaskResult::Single(DataState::Complete))
})
.add_exit_state(DataState::Complete);