BatchTask

Fan out over data items — bounded concurrency, per-item retry, then re-join.

A BatchTask loads a Vec of work items, processes them with bounded concurrency (each item independently retryable), collects the per-item results in input order, and decides the next state from the aggregate — all within one workflow state. It is one of the Task family of processing models, alongside RouterTask, PollTask, and SteppedTask, and it reads typed dependencies from Resources like the rest. New to Cano? Read Workflows and Resources first.

At a glance — #[task::batch] infers Item / ItemOutput from the signatures
use cano::prelude::*;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Stage { Crunch, Done }

struct Crunch;

#[task::batch(state = Stage)]
impl Crunch {
    async fn load(&self, _res: &Resources) -> Result<Vec<u32>, CanoError> {
        Ok((1..=100).collect())
    }
    // Returning Err here doesn't fail the batch — it lands in that slot of `finish`'s `outputs`.
    async fn process_item(&self, n: &u32) -> Result<u64, CanoError> {
        Ok((*n as u64) * (*n as u64))
    }
    async fn finish(&self, _res: &Resources, outputs: Vec<Result<u64, CanoError>>)
        -> Result<TaskResult<Stage>, CanoError>
    {
        let total: u64 = outputs.into_iter().flatten().sum();
        println!("sum of squares = {total}");
        Ok(TaskResult::Single(Stage::Done))
    }
    fn concurrency(&self) -> usize { 8 }                  // up to 8 process_item in flight
    fn item_retry(&self) -> RetryMode { RetryMode::fixed(2, std::time::Duration::from_millis(50)) }
}
Not the same as TaskResult::Split

Split / Join fans out over states: the FSM re-enters with one workflow branch per state, then a JoinStrategy reconciles them. A BatchTask fans out over data: many items, all inside one state, re-joined before the transition. Use Split for "run these N workflows in parallel"; use a BatchTask for "map this one operation over N items".


The Three Methods

BatchTask: load → process_item (×N) → finish

graph LR A[load] -->|"items: Vec of Item"| B[process_item ×N] B -->|"per-item Results — input order"| C[finish] C -->|TaskResult| D[Next State]

A BatchTask has two associated types — type Item: Send + Sync + 'static and type ItemOutput: Send + 'static (inferred by #[task::batch(state = ...)] from the method signatures, or written explicitly) — and three methods:

Method Role
async fn load(&self, res) -> Result<Vec<Self::Item>, CanoError> Produce the items. An Err here fails the batch (and triggers the outer config() retry).
async fn process_item(&self, item: &Self::Item) -> Result<Self::ItemOutput, CanoError> Process one item. Takes &Item because item_retry may re-invoke it. Returning Err does not fail the batch — it lands in that slot of finish's outputs.
async fn finish(&self, res, outputs: Vec<Result<Self::ItemOutput, CanoError>>) -> Result<TaskResult<TState>, CanoError> Aggregate — one slot per input item, in input order — and decide the next state. This is where the partial-failure policy lives (e.g. "≥ 50% must succeed"). An Err here fails the batch.

Optional knobs, all with defaults:

Method Default Purpose
fn concurrency(&self) -> usize 1 (sequential) How many items run at once.
fn item_retry(&self) -> RetryMode RetryMode::None Per-item retry policy, independent of the outer dispatch.
fn config(&self) -> TaskConfig TaskConfig::default() Retry config for the whole load → process → finish cycle — the batch only Errs (triggering this) when load or finish Errs.
fn name(&self) -> Cow<'static, str> type name Identifies the task in logs / observers.

Quick Start with #[task::batch]

Attach #[task::batch(state = MyState)] (optionally key = MyKey) to an inherent impl block. The macro infers Item / ItemOutput from the signatures, injects default bodies for concurrency / item_retry / config / name if absent, synthesises the impl BatchTask<MyState> for Fetcher header, and emits a companion impl Task<MyState> for Fetcher whose run runs the bounded-concurrency loop (via cano::task::batch::run_batch, which uses futures_util::buffer_unordered). No engine changes.

🌐 Inference form — fan out over a list of URLs
use cano::prelude::*;
use std::time::Duration;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Fetch, Summarise, Done }

struct FetchUrls;

#[task::batch(state = Step)]
impl FetchUrls {
    fn concurrency(&self) -> usize { 8 }
    fn item_retry(&self) -> RetryMode { RetryMode::fixed(2, Duration::from_millis(50)) }

    async fn load(&self, res: &Resources) -> Result<Vec<String>, CanoError> {
        let store = res.get::<MemoryStore, _>("store")?;
        Ok(store.get("urls")?)
    }

    async fn process_item(&self, url: &String) -> Result<usize, CanoError> {
        // pretend to fetch; return the byte count
        Ok(url.len() * 100)
    }

    async fn finish(&self, res: &Resources, outputs: Vec<Result<usize, CanoError>>)
        -> Result<TaskResult<Step>, CanoError>
    {
        let ok = outputs.iter().filter(|r| r.is_ok()).count();
        let total: usize = outputs.iter().filter_map(|r| r.as_ref().ok()).sum();
        let store = res.get::<MemoryStore, _>("store")?;
        store.put("total_bytes", total)?;
        if ok * 2 >= outputs.len() {                 // partial-failure policy: ≥ 50% must succeed
            Ok(TaskResult::Single(Step::Summarise))
        } else {
            Err(CanoError::task_execution("too many fetch failures"))
        }
    }
}
Tip

process_item returning Err is not a batch failure — that error lands in the matching slot of finish's outputs vector. Decide the partial-failure policy in finish: count the Oks, set a threshold, and either transition or Err out (which then triggers the outer config() retry, re-running the whole load → process → finish cycle).


Registering a Batch Task

Register a batch task with plain Workflow::register — to the FSM it's an ordinary Single state; the fan-out and re-join happen inside the generated run.

Wiring a batch task into a workflow
use cano::prelude::*;

let workflow = Workflow::new(resources)
    .register(Step::Fetch, FetchUrls)
    .register(Step::Summarise, Summarise)
    .add_exit_state(Step::Done);

Concurrency & Retry Tuning

concurrency() — width of the fan-out

Defaults to 1 (fully sequential). Raise it to run that many process_item calls at once; results are still collected in input order regardless.

item_retry() — per-item resilience

Defaults to RetryMode::None. Set e.g. RetryMode::fixed(2, Duration::from_millis(50)) to retry an individual item before its slot becomes an Err — orthogonal to the outer config() retry.

config() — whole-cycle retry

Defaults to TaskConfig::default(). This governs retries of the entire load → process → finish cycle, which only happens when load or finish returns Err.


BatchTask vs Split & Join

BatchTask

Fans out over: data items, inside one state.

  • One state, one re-join point (finish)
  • Per-item retry via item_retry()
  • Item failures collected, not fatal — policy lives in finish
  • Bounded concurrency via concurrency()

Split & Join

Fans out over: workflow states / branches.

  • One state ⇒ many parallel Tasks, each transitioning
  • JoinStrategyAll, Any, Quorum, …
  • Optional bulkhead caps concurrency
  • See the Split & Join guide

Explicit Trait-Impl Form

Prefer the trait header explicit? Put a bare #[task::batch] on an impl BatchTask<...> for ... block and declare type Item / type ItemOutput yourself. The companion impl Task is still emitted.

Explicit form — #[task::batch] on a trait impl
use cano::prelude::*;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Step { Fetch, Summarise, Done }

struct FetchUrls;

#[task::batch]
impl BatchTask<Step> for FetchUrls {
    type Item = String;
    type ItemOutput = usize;

    fn concurrency(&self) -> usize { 8 }

    async fn load(&self, res: &Resources) -> Result<Vec<String>, CanoError> {
        let store = res.get::<MemoryStore, _>("store")?;
        Ok(store.get("urls")?)
    }

    async fn process_item(&self, url: &String) -> Result<usize, CanoError> {
        Ok(url.len() * 100)
    }

    async fn finish(&self, _res: &Resources, outputs: Vec<Result<usize, CanoError>>)
        -> Result<TaskResult<Step>, CanoError>
    {
        let ok = outputs.iter().filter(|r| r.is_ok()).count();
        if ok * 2 >= outputs.len() {
            Ok(TaskResult::Single(Step::Summarise))
        } else {
            Err(CanoError::task_execution("too many fetch failures"))
        }
    }
}

Type-Erased Aliases

The object-safe aliases pin the associated types: Item to Box<dyn Any + Send + Sync> and ItemOutput to Box<dyn Any + Send>.

Alias Expands to
DynBatchTask<TState, TResourceKey> dyn BatchTask<TState, TResourceKey, Item = …, ItemOutput = …>
BatchTaskObject<TState, TResourceKey> Arc<dyn BatchTask<TState, TResourceKey, Item = …, ItemOutput = …>>

When to Use BatchTask

Reach for a BatchTask when you want to map a sub-operation over a collection — fetch N URLs, process N records, validate N inputs — with:

If instead each item is really its own multi-step workflow, that's a Split job, not a batch.

Runnable example

The crate ships a complete example — run it with cargo run --example batch_task.