Tasks

Simple, flexible processing units for your workflows.

A Task provides a simplified interface with a single run method. Use tasks when you want simplicity and direct control over the execution logic. Tasks are the fundamental building blocks of Cano workflows.

Implementing a Task

To create a task, implement the `Task` trait for your struct. The trait requires a `run` method and an optional `config` method.

use async_trait::async_trait;
use cano::prelude::*;
use rand::Rng;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Action { Generate, Count, Complete }

struct GeneratorTask;

#[async_trait]
impl Task for GeneratorTask {
    // Optional: Configure retries
    fn config(&self) -> TaskConfig {
        TaskConfig::default().with_fixed_retry(3, std::time::Duration::from_secs(1))
    }

    async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
        println!("🎲 GeneratorTask: Creating random numbers...");

        // 1. Perform logic
        let mut rng = rand::rng();
        let numbers: Vec = (0..10).map(|_| rng.random_range(1..=100)).collect();
        
        // 2. Store results
        store.put("numbers", numbers)?;
        println!("✅ Stored numbers");

        // 3. Return next state
        Ok(TaskResult::Single(Action::Count))
    }
}

Closure Tasks

For very simple logic, you can register a closure directly as a task without defining a struct.

workflow.register(Action::Count, |store: &MemoryStore| async move {
    let numbers: Vec = store.get("numbers")?;
    println!("Count: {}", numbers.len());
    Ok(TaskResult::Single(Action::Complete))
});

Configuration & Retries

Tasks can be configured with retry strategies to handle transient failures. The TaskConfig struct allows you to specify the retry behavior.

Retry Strategy Examples

sequenceDiagram participant W as Workflow participant T as Task W->>T: Execute T-->>W: Fail Note over W: Wait (backoff) W->>T: Retry 1 T-->>W: Fail Note over W: Wait (longer) W->>T: Retry 2 T-->>W: Success ✓

Fixed Retry

Retry a fixed number of times with a constant delay between attempts.

TaskConfig::default()
    .with_fixed_retry(3, Duration::from_secs(1))

Exponential Backoff

Retry with exponentially increasing delays, useful for rate-limited APIs.

TaskConfig::default()
    .with_exponential_retry(5)

Minimal Config

Fast execution with minimal retry overhead for reliable operations.

TaskConfig::minimal()

Real-World Example: API Client with Retry

use cano::prelude::*;
use async_trait::async_trait;

#[derive(Clone)]
struct ApiClientTask {
    endpoint: String,
}

#[async_trait]
impl Task for ApiClientTask {
    fn config(&self) -> TaskConfig {
        // Exponential backoff for API rate limiting
        TaskConfig::default()
            .with_exponential_retry(5)
    }

    async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
        println!("📡 Calling API: {}", self.endpoint);
        
        // Simulate API call that might fail
        let response = reqwest::get(&self.endpoint)
            .await
            .map_err(|e| CanoError::task_execution(e.to_string()))?;
        
        let data = response.text().await
            .map_err(|e| CanoError::task_execution(e.to_string()))?;
        
        store.put("api_response", data)?;
        println!("✅ API call successful");
        
        Ok(TaskResult::Single(State::Complete))
    }
}

Real-World Task Patterns

Tasks excel at various workflow scenarios. Here are proven patterns from production use.

1. Data Transformation Task

Simple, direct data processing without complex setup.

#[derive(Clone)]
struct DataTransformer;

#[async_trait]
impl Task for DataTransformer {
    async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
        let raw_data: Vec = store.get("raw_data")?;
        
        // Transform: filter and multiply
        let processed: Vec = raw_data
            .into_iter()
            .filter(|&x| x > 0)
            .map(|x| x * 2)
            .collect();
        
        store.put("processed_data", processed)?;
        Ok(TaskResult::Single(State::Complete))
    }
}

2. Validation Task

Quick validation logic with multiple outcomes.

#[derive(Clone)]
struct ValidatorTask;

#[async_trait]
impl Task for ValidatorTask {
    async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
        let data: Vec = store.get("processed_data")?;
        
        let mut errors = Vec::new();
        
        if data.is_empty() {
            errors.push("Data is empty");
        }
        
        if data.iter().any(|&x| x.is_nan()) {
            errors.push("Contains NaN values");
        }
        
        store.put("validation_errors", errors.clone())?;
        
        if errors.is_empty() {
            Ok(TaskResult::Single(State::Process))
        } else {
            Ok(TaskResult::Single(State::ValidationFailed))
        }
    }
}

3. Conditional Routing Task

Dynamic workflow routing based on runtime conditions.

#[derive(Clone)]
struct RoutingTask;

#[async_trait]
impl Task for RoutingTask {
    async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
        let item_count: usize = store.get("item_count")?;
        let priority: String = store.get("priority")?;
        
        // Dynamic routing based on conditions
        let next_state = match (item_count, priority.as_str()) {
            (n, "high") if n > 100 => State::ParallelProcess,
            (n, "high") if n > 0 => State::FastTrack,
            (n, _) if n > 50 => State::BatchProcess,
            (n, _) if n > 0 => State::SimpleProcess,
            _ => State::Skip,
        };
        
        println!("Routing to: {:?}", next_state);
        Ok(TaskResult::Single(next_state))
    }
}

4. Aggregation Task

Collect and combine results from previous steps.

#[derive(Clone)]
struct AggregatorTask;

#[async_trait]
impl Task for AggregatorTask {
    async fn run(&self, store: &MemoryStore) -> Result, CanoError> {
        println!("Aggregating results...");
        
        let mut total = 0;
        let mut count = 0;
        
        // Collect results from parallel tasks
        for i in 1..=3 {
            if let Ok(result) = store.get::(&format!("result_{}", i)) {
                total += result;
                count += 1;
            }
        }
        
        store.put("total", total)?;
        store.put("count", count)?;
        
        println!("Aggregated {} results, total: {}", count, total);
        Ok(TaskResult::Single(State::Complete))
    }
}

Task vs Node

Cano supports both Task and Node interfaces. Every Node automatically implements Task, so they can be mixed in the same workflow.

Task

Best for: Simple logic, quick prototyping, functional style.

  • Single run() method
  • Direct control over flow
  • Can be a closure

Node

Best for: Complex operations, robust error handling, structured data flow.

  • 3 Phases: prep, exec, post
  • Built-in retry logic for exec phase
  • Separation of concerns (IO vs Compute)
// Mixing Tasks and Nodes in one workflow
let workflow = Workflow::new(store.clone())
    .register(State::Init, SimpleTask)           // Task
    .register(State::Process, ComplexNode::new()) // Node
    .register(State::Finish, |_: &MemoryStore| async { // Closure Task
        Ok(TaskResult::Single(State::Done))
    });

When to Use Tasks vs Nodes?

Choose the right abstraction for your use case:

Scenario Use Task Use Node
Data transformation ✅ Simple transform ✅ Complex with validation
API calls ✅ Simple requests ✅ With auth & retry logic
Validation ✅ Quick checks ⚪ Usually overkill
File operations ⚪ For simple cases ✅ Load, process, save pattern
Prototyping ✅ Fastest iteration ⚪ More structure
Production systems ✅ When simple is sufficient ✅ For robust operations