Tasks
Simple, flexible processing units for your workflows.
A Task provides a simplified interface with a single run method.
Use tasks when you want simplicity and direct control over the execution logic.
Tasks are the fundamental building blocks of Cano workflows.
Implementing a Task
To create a task, implement the `Task` trait for your struct. The trait requires a `run` method and an optional `config` method.
use async_trait::async_trait;
use cano::prelude::*;
use rand::Rng;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Action { Generate, Count, Complete }
struct GeneratorTask;
#[async_trait]
impl Task for GeneratorTask {
// Optional: Configure retries
fn config(&self) -> TaskConfig {
TaskConfig::default().with_fixed_retry(3, std::time::Duration::from_secs(1))
}
async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
println!("🎲 GeneratorTask: Creating random numbers...");
// 1. Perform logic
let mut rng = rand::rng();
let numbers: Vec = (0..10).map(|_| rng.random_range(1..=100)).collect();
// 2. Store results
store.put("numbers", numbers)?;
println!("✅ Stored numbers");
// 3. Return next state
Ok(TaskResult::Single(Action::Count))
}
}
Closure Tasks
For very simple logic, you can register a closure directly as a task without defining a struct.
workflow.register(Action::Count, |store: &MemoryStore| async move {
let numbers: Vec = store.get("numbers")?;
println!("Count: {}", numbers.len());
Ok(TaskResult::Single(Action::Complete))
});
Configuration & Retries
Tasks can be configured with retry strategies to handle transient failures.
The TaskConfig struct allows you to specify the retry behavior.
Retry Strategy Examples
Fixed Retry
Retry a fixed number of times with a constant delay between attempts.
TaskConfig::default()
.with_fixed_retry(3, Duration::from_secs(1))
Exponential Backoff
Retry with exponentially increasing delays, useful for rate-limited APIs.
TaskConfig::default()
.with_exponential_retry(5)
Minimal Config
Fast execution with minimal retry overhead for reliable operations.
TaskConfig::minimal()
Real-World Example: API Client with Retry
use cano::prelude::*;
use async_trait::async_trait;
#[derive(Clone)]
struct ApiClientTask {
endpoint: String,
}
#[async_trait]
impl Task for ApiClientTask {
fn config(&self) -> TaskConfig {
// Exponential backoff for API rate limiting
TaskConfig::default()
.with_exponential_retry(5)
}
async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
println!("📡 Calling API: {}", self.endpoint);
// Simulate API call that might fail
let response = reqwest::get(&self.endpoint)
.await
.map_err(|e| CanoError::task_execution(e.to_string()))?;
let data = response.text().await
.map_err(|e| CanoError::task_execution(e.to_string()))?;
store.put("api_response", data)?;
println!("✅ API call successful");
Ok(TaskResult::Single(State::Complete))
}
}
Real-World Task Patterns
Tasks excel at various workflow scenarios. Here are proven patterns from production use.
1. Data Transformation Task
Simple, direct data processing without complex setup.
#[derive(Clone)]
struct DataTransformer;
#[async_trait]
impl Task for DataTransformer {
async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
let raw_data: Vec = store.get("raw_data")?;
// Transform: filter and multiply
let processed: Vec = raw_data
.into_iter()
.filter(|&x| x > 0)
.map(|x| x * 2)
.collect();
store.put("processed_data", processed)?;
Ok(TaskResult::Single(State::Complete))
}
}
2. Validation Task
Quick validation logic with multiple outcomes.
#[derive(Clone)]
struct ValidatorTask;
#[async_trait]
impl Task for ValidatorTask {
async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
let data: Vec = store.get("processed_data")?;
let mut errors = Vec::new();
if data.is_empty() {
errors.push("Data is empty");
}
if data.iter().any(|&x| x.is_nan()) {
errors.push("Contains NaN values");
}
store.put("validation_errors", errors.clone())?;
if errors.is_empty() {
Ok(TaskResult::Single(State::Process))
} else {
Ok(TaskResult::Single(State::ValidationFailed))
}
}
}
3. Conditional Routing Task
Dynamic workflow routing based on runtime conditions.
#[derive(Clone)]
struct RoutingTask;
#[async_trait]
impl Task for RoutingTask {
async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
let item_count: usize = store.get("item_count")?;
let priority: String = store.get("priority")?;
// Dynamic routing based on conditions
let next_state = match (item_count, priority.as_str()) {
(n, "high") if n > 100 => State::ParallelProcess,
(n, "high") if n > 0 => State::FastTrack,
(n, _) if n > 50 => State::BatchProcess,
(n, _) if n > 0 => State::SimpleProcess,
_ => State::Skip,
};
println!("Routing to: {:?}", next_state);
Ok(TaskResult::Single(next_state))
}
}
4. Aggregation Task
Collect and combine results from previous steps.
#[derive(Clone)]
struct AggregatorTask;
#[async_trait]
impl Task for AggregatorTask {
async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
println!("Aggregating results...");
let mut total = 0;
let mut count = 0;
// Collect results from parallel tasks
for i in 1..=3 {
if let Ok(result) = store.get::(&format!("result_{}", i)) {
total += result;
count += 1;
}
}
store.put("total", total)?;
store.put("count", count)?;
println!("Aggregated {} results, total: {}", count, total);
Ok(TaskResult::Single(State::Complete))
}
}
Task vs Node
Cano supports both Task and Node interfaces. Every Node automatically implements Task, so they can be mixed in the same workflow.
Task
Best for: Simple logic, quick prototyping, functional style.
- Single
run()method - Direct control over flow
- Can be a closure
Node
Best for: Complex operations, robust error handling, structured data flow.
- 3 Phases:
prep,exec,post - Built-in retry logic for
execphase - Separation of concerns (IO vs Compute)
// Mixing Tasks and Nodes in one workflow
let workflow = Workflow::new(store.clone())
.register(State::Init, SimpleTask) // Task
.register(State::Process, ComplexNode::new()) // Node
.register(State::Finish, |_: &MemoryStore| async { // Closure Task
Ok(TaskResult::Single(State::Done))
});
When to Use Tasks vs Nodes?
Choose the right abstraction for your use case: