Task

Simple, flexible processing units for your workflows.

A Task is the fundamental building block of a Cano workflow: a single run method that decides the next state. Start here β€” Task is the default choice for every processing unit. The other four processing models (RouterTask, PollTask, BatchTask, SteppedTask) are specialisations you reach for only when a task has a shape that one of them fits better β€” see The Task Family below for the decision matrix. Tasks receive a &Resources reference at dispatch time β€” see Resources for how to register and retrieve typed dependencies.

New to Cano?

Read Workflows and Resources first β€” every example on this page wires a task into a Workflow and pulls dependencies from a Resources map. Then come back here.


Implementing a Task

To create a task, implement the Task trait for your struct. The trait requires a run method and an optional config method.

Implementing Task trait
use cano::prelude::*;
use rand::Rng;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Action { Generate, Count, Complete }

struct GeneratorTask;

#[task(state = Action)]
impl GeneratorTask {
    // Optional: Configure retries
    fn config(&self) -> TaskConfig {
        TaskConfig::default().with_fixed_retry(3, std::time::Duration::from_secs(1))
    }

    async fn run(&self, res: &Resources) -> Result<TaskResult<Action>, CanoError> {
        println!("🎲 GeneratorTask: Creating random numbers...");

        // 1. Look up the shared store from resources
        let store = res.get::<MemoryStore, _>("store")?;

        // 2. Perform logic
        let mut rng = rand::rng();
        let numbers: Vec<u32> = (0..10).map(|_| rng.random_range(1..=100)).collect();

        // 3. Store results
        store.put("numbers", numbers)?;
        println!("βœ… Stored numbers");

        // 4. Return next state
        Ok(TaskResult::Single(Action::Count))
    }
}

Runnable example: cargo run --example task_simple β€” a two-task generate/count workflow like the snippet above.


Resource-Free Tasks

When a task performs pure computation and needs no resources, override run_bare() instead of run(). This skips the Resources parameter entirely, giving you a cleaner signature for self-contained logic.

Task using run_bare
use cano::prelude::*;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Compute, Done }

struct PureTask;

#[task(state = Step)]
impl PureTask {
    async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
        // No resources needed β€” pure computation
        let answer = 40 + 2;
        println!("Computed answer: {}", answer);
        Ok(TaskResult::Single(Step::Done))
    }
}

Tip

Pair run_bare() with Workflow::bare() (or Resources::empty()) when building workflows where no tasks need shared state β€” for example, pure pipelines or computational benchmarks.


Configuration & Retries

Tasks can be configured with retry strategies to handle transient failures. The TaskConfig struct allows you to specify the retry behavior.

Retry Strategy Examples

Retry with backoff between attempts

sequenceDiagram participant W as Workflow participant T as Task W->>T: Execute T-->>W: Fail Note over W: Wait (backoff) W->>T: Retry 1 T-->>W: Fail Note over W: Wait (longer) W->>T: Retry 2 T-->>W: Success βœ“

Fixed Retry

Retry a fixed number of times with a constant delay between attempts.

Fixed retry config
TaskConfig::default()
    .with_fixed_retry(3, Duration::from_secs(1))

Exponential Backoff

Retry with exponentially increasing delays, useful for rate-limited APIs.

Exponential backoff config
TaskConfig::default()
    .with_exponential_retry(5)

Minimal Config

Fast execution with minimal retry overhead for reliable operations.

Minimal config
TaskConfig::minimal()

Per-Attempt Timeout

Bound each attempt with a fresh deadline. Composes with any retry mode.

Attempt timeout config
TaskConfig::default()
    .with_exponential_retry(3)
    .with_attempt_timeout(Duration::from_secs(2))

How attempt timeouts compose with retries

When attempt_timeout is set, each attempt inside run_with_retries is wrapped in tokio::time::timeout. An expired attempt produces a CanoError::Timeout, which is fed through the same retry path as any other failure β€” so the configured RetryMode decides whether to retry. The deadline resets on every attempt, and retry exhaustion still surfaces as CanoError::RetryExhausted wrapping the underlying timeout context.

Wiring a Circuit Breaker

A CircuitBreaker can be attached to a task's config via TaskConfig::with_circuit_breaker(Arc::clone(&breaker)). The retry loop consults it before each attempt; an open breaker short-circuits the whole loop with CanoError::CircuitOpen (returned raw, not wrapped in RetryExhausted), so a dependency that is already down is not hammered. Share one Arc<CircuitBreaker> across every task that hits the same dependency so they trip together.

Attaching a breaker to a task config
fn build_config(breaker: Arc<CircuitBreaker>) -> TaskConfig {
    TaskConfig::default()
        .with_exponential_retry(3)
        .with_circuit_breaker(breaker)
}

The breaker itself β€” its Closed β†’ Open { until } β†’ HalfOpen state machine, CircuitPolicy, the lazy Open β†’ HalfOpen transition, and the manual try_acquire / record_success / record_failure RAII API β€” is documented in the Resilience guide.

Real-World Example: API Client with Retry

🌐 API client with exponential backoff
use cano::prelude::*;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State { Call, Complete }

#[derive(Clone)]
struct ApiClientTask {
    endpoint: String,
}

#[task(state = State)]
impl ApiClientTask {
    fn config(&self) -> TaskConfig {
        // Exponential backoff for API rate limiting
        TaskConfig::default()
            .with_exponential_retry(5)
    }

    async fn run(&self, res: &Resources) -> Result<TaskResult<State>, CanoError> {
        println!("πŸ“‘ Calling API: {}", self.endpoint);

        let store = res.get::<MemoryStore, _>("store")?;

        // Replace this with your HTTP client of choice (reqwest, hyper, etc.)
        let data = String::new();

        store.put("api_response", data)?;
        println!("βœ… API call successful");

        Ok(TaskResult::Single(State::Complete))
    }
}

Real-World Task Patterns

Tasks excel at various workflow scenarios. Here are proven patterns from production use.

1

Data Transformation Task

Simple, direct data processing without complex setup.

Data transformation
use cano::prelude::*;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State { Transform, Complete }

#[derive(Clone)]
struct DataTransformer;

#[task(state = State)]
impl DataTransformer {
    async fn run(&self, res: &Resources) -> Result<TaskResult<State>, CanoError> {
        let store = res.get::<MemoryStore, _>("store")?;
        let raw_data: Vec<i32> = store.get("raw_data")?;

        // Transform: filter and multiply
        let processed: Vec<i32> = raw_data
            .into_iter()
            .filter(|&x| x > 0)
            .map(|x| x * 2)
            .collect();

        store.put("processed_data", processed)?;
        Ok(TaskResult::Single(State::Complete))
    }
}

2

Validation Task

Quick validation logic with multiple outcomes.

Validation with branching
use cano::prelude::*;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State { Validate, Process, ValidationFailed }

#[derive(Clone)]
struct ValidatorTask;

#[task(state = State)]
impl ValidatorTask {
    async fn run(&self, res: &Resources) -> Result<TaskResult<State>, CanoError> {
        let store = res.get::<MemoryStore, _>("store")?;
        let data: Vec<f64> = store.get("processed_data")?;

        let mut errors = Vec::new();

        if data.is_empty() {
            errors.push("Data is empty");
        }

        if data.iter().any(|&x| x.is_nan()) {
            errors.push("Contains NaN values");
        }

        store.put("validation_errors", errors.clone())?;

        if errors.is_empty() {
            Ok(TaskResult::Single(State::Process))
        } else {
            Ok(TaskResult::Single(State::ValidationFailed))
        }
    }
}

3

Conditional Routing Task

Dynamic workflow routing based on runtime conditions.

Dynamic routing with match
use cano::prelude::*;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State { ParallelProcess, FastTrack, BatchProcess, SimpleProcess, Skip }

#[derive(Clone)]
struct RoutingTask;

#[task(state = State)]
impl RoutingTask {
    async fn run(&self, res: &Resources) -> Result<TaskResult<State>, CanoError> {
        let store = res.get::<MemoryStore, _>("store")?;
        let item_count: usize = store.get("item_count")?;
        let priority: String = store.get("priority")?;

        // Dynamic routing based on conditions
        let next_state = match (item_count, priority.as_str()) {
            (n, "high") if n > 100 => State::ParallelProcess,
            (n, "high") if n > 0 => State::FastTrack,
            (n, _) if n > 50 => State::BatchProcess,
            (n, _) if n > 0 => State::SimpleProcess,
            _ => State::Skip,
        };

        println!("Routing to: {:?}", next_state);
        Ok(TaskResult::Single(next_state))
    }
}

4

Aggregation Task

Collect and combine results from previous steps.

Aggregating parallel results
use cano::prelude::*;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State { Aggregate, Complete }

#[derive(Clone)]
struct AggregatorTask;

#[task(state = State)]
impl AggregatorTask {
    async fn run(&self, res: &Resources) -> Result<TaskResult<State>, CanoError> {
        let store = res.get::<MemoryStore, _>("store")?;
        println!("Aggregating results...");

        let mut total = 0;
        let mut count = 0;

        // Collect results from parallel tasks
        for i in 1..=3 {
            if let Ok(result) = store.get::<i32>(&format!("result_{}", i)) {
                total += result;
                count += 1;
            }
        }

        store.put("total", total)?;
        store.put("count", count)?;

        println!("Aggregated {} results, total: {}", count, total);
        Ok(TaskResult::Single(State::Complete))
    }
}


The Task Family: Four More Processing Models

Beyond the plain Task, Cano ships four more Task-derived processing models. Each is a specialised shape β€” they all ultimately dispatch as a Task, so you mix them freely in one workflow β€” and each has its own page with the full reference.

RouterTask

Side-effect-free branching: a route method that picks the next state and writes nothing. Registered with register_router; leaves no checkpoint row.

Reach for it when: branching on a flag / data shape with no side effects.

PollTask

"Wait-until" loops: a poll method that returns Ready or Pending { delay_ms }, looping with adaptive backoff β€” no blocked thread.

Reach for it when: waiting on an external job, queue, or flag flip.

BatchTask

Fan out over data: load β†’ process_item (Γ—N, bounded concurrency, per-item retry) β†’ finish, re-joined in one state.

Reach for it when: mapping a sub-operation over a collection with a single re-join.

SteppedTask

Resumable iterative work: a step(cursor) method whose cursor is checkpointed each step, so a crash resumes mid-loop. Registered with register_stepped.

Reach for it when: long page-by-page scans, chunked migrations β€” crash-resume finer than per-state.


Choosing a Processing Model

All five models dispatch as a Task, so you can mix them in one workflow. Start from Task and move to a specialised model only when your work has its shape:

Model Reach for it when… Register with
Task The default β€” a single run that does the work and picks the next state. Everything else is a specialisation of this. register
RouterTask You're only deciding the next state β€” branching on a flag or data shape β€” with no side effects and no recovery footprint. register_router
PollTask You need to wait until something is ready β€” an external job, a queue, a flag flip β€” without blocking a thread. register
BatchTask You're doing the same sub-operation over a collection, with bounded concurrency and per-item retry, re-joined in one state. register
SteppedTask You have a long iterative job (page-by-page scan, chunked migration) you want to crash-resume mid-loop, finer than per-state. register_stepped