Task
Simple, flexible processing units for your workflows.
A Task is the fundamental building block of a Cano workflow: a single run
method that decides the next state. Start here β Task is the default
choice for every processing unit. The other four processing models
(RouterTask,
PollTask, BatchTask,
SteppedTask) are specialisations you reach for only when a task
has a shape that one of them fits better β see The Task Family below for
the decision matrix. Tasks receive a &Resources reference at dispatch time β see
Resources for how to register and retrieve typed dependencies.
Read Workflows and Resources first β every
example on this page wires a task into a Workflow and pulls dependencies from a
Resources map. Then come back here.
Implementing a Task
To create a task, implement the Task trait for your struct. The trait requires a run method and an optional config method.
use cano::prelude::*;
use rand::Rng;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Action { Generate, Count, Complete }
struct GeneratorTask;
#[task(state = Action)]
impl GeneratorTask {
// Optional: Configure retries
fn config(&self) -> TaskConfig {
TaskConfig::default().with_fixed_retry(3, std::time::Duration::from_secs(1))
}
async fn run(&self, res: &Resources) -> Result<TaskResult<Action>, CanoError> {
println!("π² GeneratorTask: Creating random numbers...");
// 1. Look up the shared store from resources
let store = res.get::<MemoryStore, _>("store")?;
// 2. Perform logic
let mut rng = rand::rng();
let numbers: Vec<u32> = (0..10).map(|_| rng.random_range(1..=100)).collect();
// 3. Store results
store.put("numbers", numbers)?;
println!("β
Stored numbers");
// 4. Return next state
Ok(TaskResult::Single(Action::Count))
}
}
Runnable example: cargo run --example task_simple β a two-task generate/count workflow
like the snippet above.
Resource-Free Tasks
When a task performs pure computation and needs no resources, override run_bare() instead of
run(). This skips the Resources parameter entirely, giving you a cleaner signature
for self-contained logic.
use cano::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Compute, Done }
struct PureTask;
#[task(state = Step)]
impl PureTask {
async fn run_bare(&self) -> Result<TaskResult<Step>, CanoError> {
// No resources needed β pure computation
let answer = 40 + 2;
println!("Computed answer: {}", answer);
Ok(TaskResult::Single(Step::Done))
}
}
Pair run_bare() with Workflow::bare() (or Resources::empty())
when building workflows where no tasks need shared state β for example, pure pipelines or
computational benchmarks.
Configuration & Retries
Tasks can be configured with retry strategies to handle transient failures.
The TaskConfig struct allows you to specify the retry behavior.
Retry Strategy Examples
Retry with backoff between attempts
Fixed Retry
Retry a fixed number of times with a constant delay between attempts.
TaskConfig::default()
.with_fixed_retry(3, Duration::from_secs(1))
Exponential Backoff
Retry with exponentially increasing delays, useful for rate-limited APIs.
TaskConfig::default()
.with_exponential_retry(5)
Minimal Config
Fast execution with minimal retry overhead for reliable operations.
TaskConfig::minimal()
Per-Attempt Timeout
Bound each attempt with a fresh deadline. Composes with any retry mode.
TaskConfig::default()
.with_exponential_retry(3)
.with_attempt_timeout(Duration::from_secs(2))
How attempt timeouts compose with retries
When attempt_timeout is set, each attempt inside run_with_retries is wrapped in
tokio::time::timeout. An expired attempt produces a CanoError::Timeout, which is
fed through the same retry path as any other failure β so the configured RetryMode decides
whether to retry. The deadline resets on every attempt, and retry exhaustion still surfaces as
CanoError::RetryExhausted wrapping the underlying timeout context.
Wiring a Circuit Breaker
A CircuitBreaker can be attached to a task's config via
TaskConfig::with_circuit_breaker(Arc::clone(&breaker)). The retry loop consults it
before each attempt; an open breaker short-circuits the whole loop with
CanoError::CircuitOpen (returned raw, not wrapped in RetryExhausted), so a
dependency that is already down is not hammered. Share one Arc<CircuitBreaker>
across every task that hits the same dependency so they trip together.
fn build_config(breaker: Arc<CircuitBreaker>) -> TaskConfig {
TaskConfig::default()
.with_exponential_retry(3)
.with_circuit_breaker(breaker)
}
The breaker itself β its Closed β Open { until } β HalfOpen state machine,
CircuitPolicy, the lazy Open β HalfOpen transition, and the manual
try_acquire / record_success / record_failure RAII API β is
documented in the Resilience guide.
Real-World Example: API Client with Retry
use cano::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State { Call, Complete }
#[derive(Clone)]
struct ApiClientTask {
endpoint: String,
}
#[task(state = State)]
impl ApiClientTask {
fn config(&self) -> TaskConfig {
// Exponential backoff for API rate limiting
TaskConfig::default()
.with_exponential_retry(5)
}
async fn run(&self, res: &Resources) -> Result<TaskResult<State>, CanoError> {
println!("π‘ Calling API: {}", self.endpoint);
let store = res.get::<MemoryStore, _>("store")?;
// Replace this with your HTTP client of choice (reqwest, hyper, etc.)
let data = String::new();
store.put("api_response", data)?;
println!("β
API call successful");
Ok(TaskResult::Single(State::Complete))
}
}
Real-World Task Patterns
Tasks excel at various workflow scenarios. Here are proven patterns from production use.
Data Transformation Task
Simple, direct data processing without complex setup.
use cano::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State { Transform, Complete }
#[derive(Clone)]
struct DataTransformer;
#[task(state = State)]
impl DataTransformer {
async fn run(&self, res: &Resources) -> Result<TaskResult<State>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
let raw_data: Vec<i32> = store.get("raw_data")?;
// Transform: filter and multiply
let processed: Vec<i32> = raw_data
.into_iter()
.filter(|&x| x > 0)
.map(|x| x * 2)
.collect();
store.put("processed_data", processed)?;
Ok(TaskResult::Single(State::Complete))
}
}
Validation Task
Quick validation logic with multiple outcomes.
use cano::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State { Validate, Process, ValidationFailed }
#[derive(Clone)]
struct ValidatorTask;
#[task(state = State)]
impl ValidatorTask {
async fn run(&self, res: &Resources) -> Result<TaskResult<State>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
let data: Vec<f64> = store.get("processed_data")?;
let mut errors = Vec::new();
if data.is_empty() {
errors.push("Data is empty");
}
if data.iter().any(|&x| x.is_nan()) {
errors.push("Contains NaN values");
}
store.put("validation_errors", errors.clone())?;
if errors.is_empty() {
Ok(TaskResult::Single(State::Process))
} else {
Ok(TaskResult::Single(State::ValidationFailed))
}
}
}
Conditional Routing Task
Dynamic workflow routing based on runtime conditions.
use cano::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State { ParallelProcess, FastTrack, BatchProcess, SimpleProcess, Skip }
#[derive(Clone)]
struct RoutingTask;
#[task(state = State)]
impl RoutingTask {
async fn run(&self, res: &Resources) -> Result<TaskResult<State>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
let item_count: usize = store.get("item_count")?;
let priority: String = store.get("priority")?;
// Dynamic routing based on conditions
let next_state = match (item_count, priority.as_str()) {
(n, "high") if n > 100 => State::ParallelProcess,
(n, "high") if n > 0 => State::FastTrack,
(n, _) if n > 50 => State::BatchProcess,
(n, _) if n > 0 => State::SimpleProcess,
_ => State::Skip,
};
println!("Routing to: {:?}", next_state);
Ok(TaskResult::Single(next_state))
}
}
Aggregation Task
Collect and combine results from previous steps.
use cano::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State { Aggregate, Complete }
#[derive(Clone)]
struct AggregatorTask;
#[task(state = State)]
impl AggregatorTask {
async fn run(&self, res: &Resources) -> Result<TaskResult<State>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
println!("Aggregating results...");
let mut total = 0;
let mut count = 0;
// Collect results from parallel tasks
for i in 1..=3 {
if let Ok(result) = store.get::<i32>(&format!("result_{}", i)) {
total += result;
count += 1;
}
}
store.put("total", total)?;
store.put("count", count)?;
println!("Aggregated {} results, total: {}", count, total);
Ok(TaskResult::Single(State::Complete))
}
}
The Task Family: Four More Processing Models
Beyond the plain Task, Cano ships
four more Task-derived processing models. Each is a specialised shape β
they all ultimately dispatch as a Task, so you mix them freely in one workflow β and
each has its own page with the full reference.
RouterTask
Side-effect-free branching: a route method that picks the next state and writes
nothing. Registered with register_router; leaves no checkpoint row.
Reach for it when: branching on a flag / data shape with no side effects.
PollTask
"Wait-until" loops: a poll method that returns Ready or
Pending { delay_ms }, looping with adaptive backoff β no blocked thread.
Reach for it when: waiting on an external job, queue, or flag flip.
BatchTask
Fan out over data: load β process_item (ΓN, bounded concurrency, per-item retry) β finish,
re-joined in one state.
Reach for it when: mapping a sub-operation over a collection with a single re-join.
SteppedTask
Resumable iterative work: a step(cursor) method whose cursor is checkpointed each
step, so a crash resumes mid-loop. Registered with register_stepped.
Reach for it when: long page-by-page scans, chunked migrations β crash-resume finer than per-state.
Choosing a Processing Model
All five models dispatch as a Task, so you can mix them in one workflow. Start from
Task and move to a specialised model only when your work has its shape:
| Model | Reach for it when⦠| Register with |
|---|---|---|
Task |
The default β a single run that does the work and picks the next state. Everything else is a specialisation of this. |
register |
RouterTask |
You're only deciding the next state β branching on a flag or data shape β with no side effects and no recovery footprint. | register_router |
PollTask |
You need to wait until something is ready β an external job, a queue, a flag flip β without blocking a thread. | register |
BatchTask |
You're doing the same sub-operation over a collection, with bounded concurrency and per-item retry, re-joined in one state. | register |
SteppedTask |
You have a long iterative job (page-by-page scan, chunked migration) you want to crash-resume mid-loop, finer than per-state. | register_stepped |