BatchTask
Fan out over data items — bounded concurrency, per-item retry, then re-join.
A BatchTask loads a Vec of work items, processes them with bounded
concurrency (each item independently retryable), collects the per-item results in input
order, and decides the next state from the aggregate — all within one workflow state. It is
one of the Task family of processing models, alongside
RouterTask,
PollTask, and SteppedTask, and it reads
typed dependencies from Resources like the rest. New to Cano? Read
Workflows and Resources first.
#[task::batch] infers Item / ItemOutput from the signatures
use cano::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Stage { Crunch, Done }
struct Crunch;
#[task::batch(state = Stage)]
impl Crunch {
async fn load(&self, _res: &Resources) -> Result<Vec<u32>, CanoError> {
Ok((1..=100).collect())
}
// Returning Err here doesn't fail the batch — it lands in that slot of `finish`'s `outputs`.
async fn process_item(&self, n: &u32) -> Result<u64, CanoError> {
Ok((*n as u64) * (*n as u64))
}
async fn finish(&self, _res: &Resources, outputs: Vec<Result<u64, CanoError>>)
-> Result<TaskResult<Stage>, CanoError>
{
let total: u64 = outputs.into_iter().flatten().sum();
println!("sum of squares = {total}");
Ok(TaskResult::Single(Stage::Done))
}
fn concurrency(&self) -> usize { 8 } // up to 8 process_item in flight
fn item_retry(&self) -> RetryMode { RetryMode::fixed(2, std::time::Duration::from_millis(50)) }
}
TaskResult::Split
Split / Join fans out over states: the FSM re-enters with one
workflow branch per state, then a JoinStrategy reconciles them. A BatchTask
fans out over data: many items, all inside one state, re-joined before the
transition. Use Split for "run these N workflows in parallel"; use a BatchTask
for "map this one operation over N items".
The Three Methods
BatchTask: load → process_item (×N) → finish
A BatchTask has two associated types — type Item: Send + Sync + 'static and
type ItemOutput: Send + 'static (inferred by #[task::batch(state = ...)] from
the method signatures, or written explicitly) — and three methods:
| Method | Role |
|---|---|
async fn load(&self, res) -> Result<Vec<Self::Item>, CanoError> |
Produce the items. An Err here fails the batch (and triggers the outer config() retry). |
async fn process_item(&self, item: &Self::Item) -> Result<Self::ItemOutput, CanoError> |
Process one item. Takes &Item because item_retry may re-invoke it. Returning Err does not fail the batch — it lands in that slot of finish's outputs. |
async fn finish(&self, res, outputs: Vec<Result<Self::ItemOutput, CanoError>>) -> Result<TaskResult<TState>, CanoError> |
Aggregate — one slot per input item, in input order — and decide the next state. This is where the partial-failure policy lives (e.g. "≥ 50% must succeed"). An Err here fails the batch. |
Optional knobs, all with defaults:
| Method | Default | Purpose |
|---|---|---|
fn concurrency(&self) -> usize |
1 (sequential) |
How many items run at once. |
fn item_retry(&self) -> RetryMode |
RetryMode::None |
Per-item retry policy, independent of the outer dispatch. |
fn config(&self) -> TaskConfig |
TaskConfig::default() |
Retry config for the whole load → process → finish cycle — the batch only Errs (triggering this) when load or finish Errs. |
fn name(&self) -> Cow<'static, str> |
type name | Identifies the task in logs / observers. |
Quick Start with #[task::batch]
Attach #[task::batch(state = MyState)] (optionally key = MyKey) to an
inherent impl block. The macro infers Item / ItemOutput from
the signatures, injects default bodies for concurrency / item_retry /
config / name if absent, synthesises the
impl BatchTask<MyState> for Fetcher header, and emits a companion
impl Task<MyState> for Fetcher whose run runs the bounded-concurrency
loop (via cano::task::batch::run_batch, which uses
futures_util::buffer_unordered). No engine changes.
use cano::prelude::*;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Fetch, Summarise, Done }
struct FetchUrls;
#[task::batch(state = Step)]
impl FetchUrls {
fn concurrency(&self) -> usize { 8 }
fn item_retry(&self) -> RetryMode { RetryMode::fixed(2, Duration::from_millis(50)) }
async fn load(&self, res: &Resources) -> Result<Vec<String>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
Ok(store.get("urls")?)
}
async fn process_item(&self, url: &String) -> Result<usize, CanoError> {
// pretend to fetch; return the byte count
Ok(url.len() * 100)
}
async fn finish(&self, res: &Resources, outputs: Vec<Result<usize, CanoError>>)
-> Result<TaskResult<Step>, CanoError>
{
let ok = outputs.iter().filter(|r| r.is_ok()).count();
let total: usize = outputs.iter().filter_map(|r| r.as_ref().ok()).sum();
let store = res.get::<MemoryStore, _>("store")?;
store.put("total_bytes", total)?;
if ok * 2 >= outputs.len() { // partial-failure policy: ≥ 50% must succeed
Ok(TaskResult::Single(Step::Summarise))
} else {
Err(CanoError::task_execution("too many fetch failures"))
}
}
}
process_item returning Err is not a batch failure — that error
lands in the matching slot of finish's outputs vector. Decide the
partial-failure policy in finish: count the Oks, set a threshold, and
either transition or Err out (which then triggers the outer config()
retry, re-running the whole load → process → finish cycle).
Registering a Batch Task
Register a batch task with plain Workflow::register — to the FSM it's an ordinary
Single state; the fan-out and re-join happen inside the generated run.
use cano::prelude::*;
let workflow = Workflow::new(resources)
.register(Step::Fetch, FetchUrls)
.register(Step::Summarise, Summarise)
.add_exit_state(Step::Done);
Concurrency & Retry Tuning
concurrency() — width of the fan-out
Defaults to 1 (fully sequential). Raise it to run that many process_item
calls at once; results are still collected in input order regardless.
item_retry() — per-item resilience
Defaults to RetryMode::None. Set e.g.
RetryMode::fixed(2, Duration::from_millis(50)) to retry an individual item before its
slot becomes an Err — orthogonal to the outer config() retry.
config() — whole-cycle retry
Defaults to TaskConfig::default(). This governs retries of the entire
load → process → finish cycle, which only happens when load or
finish returns Err.
BatchTask vs Split & Join
BatchTask
Fans out over: data items, inside one state.
- One state, one re-join point (
finish) - Per-item retry via
item_retry() - Item failures collected, not fatal — policy lives in
finish - Bounded concurrency via
concurrency()
Split & Join
Fans out over: workflow states / branches.
- One state ⇒ many parallel
Tasks, each transitioning JoinStrategy—All,Any,Quorum, …- Optional bulkhead caps concurrency
- See the Split & Join guide
Explicit Trait-Impl Form
Prefer the trait header explicit? Put a bare #[task::batch] on an
impl BatchTask<...> for ... block and declare type Item /
type ItemOutput yourself. The companion impl Task is still emitted.
#[task::batch] on a trait impl
use cano::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Fetch, Summarise, Done }
struct FetchUrls;
#[task::batch]
impl BatchTask<Step> for FetchUrls {
type Item = String;
type ItemOutput = usize;
fn concurrency(&self) -> usize { 8 }
async fn load(&self, res: &Resources) -> Result<Vec<String>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
Ok(store.get("urls")?)
}
async fn process_item(&self, url: &String) -> Result<usize, CanoError> {
Ok(url.len() * 100)
}
async fn finish(&self, _res: &Resources, outputs: Vec<Result<usize, CanoError>>)
-> Result<TaskResult<Step>, CanoError>
{
let ok = outputs.iter().filter(|r| r.is_ok()).count();
if ok * 2 >= outputs.len() {
Ok(TaskResult::Single(Step::Summarise))
} else {
Err(CanoError::task_execution("too many fetch failures"))
}
}
}
Type-Erased Aliases
The object-safe aliases pin the associated types: Item to
Box<dyn Any + Send + Sync> and ItemOutput to
Box<dyn Any + Send>.
| Alias | Expands to |
|---|---|
DynBatchTask<TState, TResourceKey> |
dyn BatchTask<TState, TResourceKey, Item = …, ItemOutput = …> |
BatchTaskObject<TState, TResourceKey> |
Arc<dyn BatchTask<TState, TResourceKey, Item = …, ItemOutput = …>> |
When to Use BatchTask
Reach for a BatchTask when you want to map a sub-operation over a collection — fetch
N URLs, process N records, validate N inputs — with:
- bounded parallelism (don't open 10 000 sockets at once);
- per-item retry (one flaky record shouldn't sink the rest);
- a single re-join point where you apply the partial-failure policy and decide the transition;
- and you'd rather not spawn N workflow branches to do it.
If instead each item is really its own multi-step workflow, that's a Split job, not a batch.
The crate ships a complete example — run it with cargo run --example batch_task.