Split & Join
Fan work out across many tasks, then join the results back into the FSM.
A workflow normally runs one task per state. When you need parallelism —
scatter-gather queries, redundant API calls, batch processing, hedged requests against an SLA —
register a split for that state: a list of tasks that run concurrently, plus a
JoinConfig that decides when the workflow may advance and which next state it lands on.
The join strategy controls the termination condition; an optional bulkhead bounds how
many of those tasks run at once.
Splitting a State
Use register_split(state, tasks, join_config) in place of register() to map a
state to a set of tasks that run in parallel. JoinConfig::new(strategy, next_state) defines
the termination condition and the state the workflow transitions to once the strategy is satisfied;
.with_timeout(duration) caps how long the join waits, and .with_bulkhead(n)
caps how many split tasks run concurrently. Like every other builder method, register_split()
consumes self and returns a new Workflow — capture the return value.
Split / Join Pattern
let join_config = JoinConfig::new(JoinStrategy::All, State::Aggregate)
.with_timeout(Duration::from_secs(5));
Workflow::new(Resources::new().insert("store", store))
.register(State::Start, Loader)
.register_split(State::Fanout, vec![Worker { id: 1 }, Worker { id: 2 }, Worker { id: 3 }], join_config)
.register(State::Aggregate, Aggregator)
.add_exit_state(State::Complete)
A state can be a split because you registered it with register_split() (the tasks
and join strategy are fixed at build time), or because a single task returned
TaskResult::Split(...) at runtime to spawn a dynamic set of follow-on states. A single task
that returns TaskResult::Split for a state registered with plain register()
fails with CanoError::Workflow — use register_split() for that state.
Join Strategies
The JoinStrategy enum controls when a split is considered done and the workflow may advance.
All
Wait for all tasks to complete successfully.
JoinStrategy::All
Any
Proceed after the first task completes successfully.
JoinStrategy::Any
Quorum(n)
Wait for n tasks to complete successfully.
JoinStrategy::Quorum(2)
Percentage(p)
Wait for p% of tasks to complete successfully (value in (0.0, 1.0]).
JoinStrategy::Percentage(0.5)
PartialResults(n)
Proceed once n tasks complete (successes or failures).
JoinStrategy::PartialResults(3)
PartialTimeout
Accept whatever completes before timeout expires. Requires .with_timeout().
JoinStrategy::PartialTimeout
A JoinConfig using PartialTimeout without a configured timeout fails at execution
time with CanoError::Configuration. Always pair it with .with_timeout(duration).
Bounding Concurrency with a Bulkhead
A split fans out as many tasks as the runtime can schedule. When you need to cap concurrent in-flight
work — to bound resource use, protect a downstream service, or stabilise tail latency — set a
bulkhead on the JoinConfig. Internally this gates each spawned task body on
a shared tokio::sync::Semaphore; excess tasks queue until a permit is free, but the join
strategy still applies once results come in.
let join_config = JoinConfig::new(JoinStrategy::All, State::Aggregate)
.with_bulkhead(4); // at most 4 split tasks run at once
with_bulkhead(0) is rejected at execution time with CanoError::Configuration.
Leave the bulkhead unset (None) for unbounded concurrency. Bulkheads compose with
PartialTimeout and any other join strategy.
Runnable example: cargo run --example split_bulkhead — 8 split tasks behind a
with_bulkhead(2), with start/end timestamps printed so you can see the 4 waves of 2.
Complete Example
A runnable end-to-end split/join: a loader writes input data into the store, three processor tasks
fan out in parallel, the All strategy waits for every one of them, and an aggregator reads
the per-task results back out. Run the full version with cargo run --example workflow_split_join.
use cano::prelude::*;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum DataState { Start, ParallelProcessing, Aggregate, Complete }
#[derive(Clone)]
struct DataLoader;
#[task(state = DataState)]
impl DataLoader {
async fn run(&self, res: &Resources) -> Result<TaskResult<DataState>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
store.put("input_data", vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])?;
Ok(TaskResult::Single(DataState::ParallelProcessing))
}
}
#[derive(Clone)]
struct ProcessorTask { task_id: usize }
impl ProcessorTask { fn new(task_id: usize) -> Self { Self { task_id } } }
#[task(state = DataState)]
impl ProcessorTask {
async fn run(&self, res: &Resources) -> Result<TaskResult<DataState>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
let data: Vec<i32> = store.get("input_data")?;
tokio::time::sleep(Duration::from_millis(100 * self.task_id as u64)).await;
let result: i32 = data.iter().map(|&x| x * self.task_id as i32).sum();
store.put(&format!("result_{}", self.task_id), result)?;
Ok(TaskResult::Single(DataState::Aggregate))
}
}
#[derive(Clone)]
struct Aggregator;
#[task(state = DataState)]
impl Aggregator {
async fn run(&self, res: &Resources) -> Result<TaskResult<DataState>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
let total: i32 = (1..=3)
.filter_map(|i| store.get::<i32>(&format!("result_{}", i)).ok())
.sum();
store.put("final_result", total)?;
Ok(TaskResult::Single(DataState::Complete))
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let store = MemoryStore::new();
let processors = vec![
ProcessorTask::new(1),
ProcessorTask::new(2),
ProcessorTask::new(3),
];
// Wait for ALL processors, give up after 5 seconds.
let join_config = JoinConfig::new(JoinStrategy::All, DataState::Aggregate)
.with_timeout(Duration::from_secs(5));
let workflow = Workflow::new(Resources::new().insert("store", store.clone()))
.register(DataState::Start, DataLoader)
.register_split(DataState::ParallelProcessing, processors, join_config)
.register(DataState::Aggregate, Aggregator)
.add_exit_state(DataState::Complete);
let result = workflow.orchestrate(DataState::Start).await?;
let final_result: i32 = store.get("final_result")?;
println!("Workflow completed: {:?} — total {}", result, final_result);
Ok(())
}
Join Strategy Examples
Each strategy handles parallel task completion differently. The examples below isolate the
JoinConfig wiring for each one.
Runnable example: cargo run --example join_strategies — runs the same parallel split four
times with Any, Quorum, Percentage, and PartialResults,
with staggered task delays so you can see exactly when each one returns.
All — Wait for Every Task
Waits for all tasks to complete successfully. Fails if any task fails. Use it when every result is required.
All Strategy
use cano::prelude::*;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum DataState { Start, ParallelProcessing, Aggregate, Complete }
#[derive(Clone)]
struct DataLoader;
#[derive(Clone)]
struct ProcessorTask { id: u32 }
impl ProcessorTask { fn new(id: u32) -> Self { Self { id } } }
#[derive(Clone)]
struct Aggregator;
#[task(state = DataState)]
impl DataLoader {
async fn run_bare(&self) -> Result<TaskResult<DataState>, CanoError> {
Ok(TaskResult::Single(DataState::ParallelProcessing))
}
}
#[task(state = DataState)]
impl ProcessorTask {
async fn run_bare(&self) -> Result<TaskResult<DataState>, CanoError> {
Ok(TaskResult::Single(DataState::Aggregate))
}
}
#[task(state = DataState)]
impl Aggregator {
async fn run_bare(&self) -> Result<TaskResult<DataState>, CanoError> {
Ok(TaskResult::Single(DataState::Complete))
}
}
fn build_workflow(store: MemoryStore) -> Workflow<DataState> {
// All Strategy: best for workflows requiring complete data
let join_config = JoinConfig::new(JoinStrategy::All, DataState::Aggregate)
.with_timeout(Duration::from_secs(10));
Workflow::new(Resources::new().insert("store", store))
.register(DataState::Start, DataLoader)
.register_split(
DataState::ParallelProcessing,
vec![ProcessorTask::new(1), ProcessorTask::new(2), ProcessorTask::new(3)],
join_config,
)
.register(DataState::Aggregate, Aggregator)
.add_exit_state(DataState::Complete)
}
Any — First to Complete
Proceeds as soon as the first task completes successfully; the rest are cancelled. Ideal for redundant calls where the fastest response wins.
Any Strategy
use cano::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum DataState { Start, ParallelProcessing, Complete }
#[derive(Clone)]
struct DataLoader;
#[task(state = DataState)]
impl DataLoader {
async fn run_bare(&self) -> Result<TaskResult<DataState>, CanoError> {
Ok(TaskResult::Single(DataState::ParallelProcessing))
}
}
#[derive(Clone)]
struct ApiCallTask { provider: String }
impl ApiCallTask { fn new(provider: &str) -> Self { Self { provider: provider.into() } } }
#[task(state = DataState)]
impl ApiCallTask {
async fn run_bare(&self) -> Result<TaskResult<DataState>, CanoError> {
Ok(TaskResult::Single(DataState::Complete))
}
}
fn build_workflow(store: MemoryStore) -> Workflow<DataState> {
// Any Strategy: best for redundant API calls or fastest-wins scenarios
let join_config = JoinConfig::new(JoinStrategy::Any, DataState::Complete);
// Call 3 different data sources, use whoever responds first
Workflow::new(Resources::new().insert("store", store))
.register(DataState::Start, DataLoader)
.register_split(
DataState::ParallelProcessing,
vec![
ApiCallTask::new("provider1"),
ApiCallTask::new("provider2"),
ApiCallTask::new("provider3"),
],
join_config,
)
.add_exit_state(DataState::Complete)
}
Quorum(n) — Wait for N Tasks
Proceeds once a specific number of tasks complete successfully; the rest are cancelled. Useful for distributed consensus and majority-vote writes.
Quorum Strategy
use cano::prelude::*;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum DataState { Start, ParallelProcessing, Aggregate, Complete }
#[derive(Clone)]
struct PrepareData;
#[derive(Clone)]
struct WriteReplica { id: u32 }
impl WriteReplica { fn new(id: u32) -> Self { Self { id } } }
#[derive(Clone)]
struct ConfirmWrite;
#[task(state = DataState)]
impl PrepareData {
async fn run_bare(&self) -> Result<TaskResult<DataState>, CanoError> {
Ok(TaskResult::Single(DataState::ParallelProcessing))
}
}
#[task(state = DataState)]
impl WriteReplica {
async fn run_bare(&self) -> Result<TaskResult<DataState>, CanoError> {
Ok(TaskResult::Single(DataState::Aggregate))
}
}
#[task(state = DataState)]
impl ConfirmWrite {
async fn run_bare(&self) -> Result<TaskResult<DataState>, CanoError> {
Ok(TaskResult::Single(DataState::Complete))
}
}
fn build_workflow(store: MemoryStore) -> Workflow<DataState> {
// Quorum Strategy: write to 5 replicas, succeed when 3 confirm
let join_config = JoinConfig::new(JoinStrategy::Quorum(3), DataState::Aggregate)
.with_timeout(Duration::from_secs(5));
Workflow::new(Resources::new().insert("store", store))
.register(DataState::Start, PrepareData)
.register_split(
DataState::ParallelProcessing,
vec![
WriteReplica::new(1),
WriteReplica::new(2),
WriteReplica::new(3),
WriteReplica::new(4),
WriteReplica::new(5),
],
join_config,
)
.register(DataState::Aggregate, ConfirmWrite)
.add_exit_state(DataState::Complete)
}
Percentage(p) — Wait for a Fraction of Tasks
Proceeds once a percentage of tasks complete successfully. Scales with the batch size, so it works well when the number of split tasks varies.
Percentage Strategy
use cano::prelude::*;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum DataState { Start, ParallelProcessing, Aggregate, Complete }
#[derive(Clone)]
struct LoadRecords;
#[derive(Clone)]
struct RecordProcessor { idx: usize }
impl RecordProcessor { fn new(idx: usize) -> Self { Self { idx } } }
#[derive(Clone)]
struct SummarizeResults;
#[task(state = DataState)]
impl LoadRecords {
async fn run_bare(&self) -> Result<TaskResult<DataState>, CanoError> {
Ok(TaskResult::Single(DataState::ParallelProcessing))
}
}
#[task(state = DataState)]
impl RecordProcessor {
async fn run_bare(&self) -> Result<TaskResult<DataState>, CanoError> {
Ok(TaskResult::Single(DataState::Aggregate))
}
}
#[task(state = DataState)]
impl SummarizeResults {
async fn run_bare(&self) -> Result<TaskResult<DataState>, CanoError> {
Ok(TaskResult::Single(DataState::Complete))
}
}
fn build_workflow(store: MemoryStore) -> Workflow<DataState> {
// Percentage Strategy: process 100 records, proceed when 75 complete
let join_config = JoinConfig::new(JoinStrategy::Percentage(0.75), DataState::Aggregate)
.with_timeout(Duration::from_secs(10));
let tasks: Vec<RecordProcessor> = (0..100).map(RecordProcessor::new).collect();
Workflow::new(Resources::new().insert("store", store))
.register(DataState::Start, LoadRecords)
.register_split(DataState::ParallelProcessing, tasks, join_config)
.register(DataState::Aggregate, SummarizeResults)
.add_exit_state(DataState::Complete)
}
PartialResults(n) — Accept Partial Completion
Proceeds once n tasks have completed — successes or failures both count — and
cancels the rest. All outcomes are tracked, so a downstream task can inspect how many succeeded versus
failed. Good for latency-bounded fan-out where some failures are tolerable. A fuller, runnable
walk-through lives in cargo run --example workflow_partial_results.
PartialResults Strategy
use cano::prelude::*;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum DataState { Start, ParallelProcessing, Aggregate, Complete }
#[derive(Clone)]
struct PrepareRequest;
#[derive(Clone)]
struct ServiceCall { name: String }
impl ServiceCall { fn new(name: &str) -> Self { Self { name: name.into() } } }
#[derive(Clone)]
struct MergePartialResults;
#[task(state = DataState)]
impl PrepareRequest {
async fn run_bare(&self) -> Result<TaskResult<DataState>, CanoError> {
Ok(TaskResult::Single(DataState::ParallelProcessing))
}
}
#[task(state = DataState)]
impl ServiceCall {
async fn run_bare(&self) -> Result<TaskResult<DataState>, CanoError> {
Ok(TaskResult::Single(DataState::Aggregate))
}
}
#[task(state = DataState)]
impl MergePartialResults {
async fn run_bare(&self) -> Result<TaskResult<DataState>, CanoError> {
Ok(TaskResult::Single(DataState::Complete))
}
}
fn build_workflow(store: MemoryStore) -> Workflow<DataState> {
// PartialResults Strategy: proceed after any 3 of 4 service calls finish
let join_config = JoinConfig::new(JoinStrategy::PartialResults(3), DataState::Aggregate)
.with_timeout(Duration::from_secs(5));
Workflow::new(Resources::new().insert("store", store))
.register(DataState::Start, PrepareRequest)
.register_split(
DataState::ParallelProcessing,
vec![
ServiceCall::new("fast-service"),
ServiceCall::new("medium-service"),
ServiceCall::new("slow-service"),
ServiceCall::new("backup-service"),
],
join_config,
)
.register(DataState::Aggregate, MergePartialResults)
.add_exit_state(DataState::Complete)
}
PartialTimeout — Deadline-Based Completion
Accepts whatever has completed when the timeout expires and proceeds with those results — the
remaining tasks are cancelled. The go-to strategy for hard SLAs. Requires .with_timeout().
PartialTimeout Strategy
use cano::prelude::*;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum DataState { Start, ParallelProcessing, Aggregate, Complete }
#[derive(Clone)]
struct LoadUserContext;
#[derive(Clone)]
struct RecommendationEngine { kind: String }
impl RecommendationEngine { fn new(kind: &str) -> Self { Self { kind: kind.into() } } }
#[derive(Clone)]
struct AggregateWithinSla;
#[task(state = DataState)]
impl LoadUserContext {
async fn run_bare(&self) -> Result<TaskResult<DataState>, CanoError> {
Ok(TaskResult::Single(DataState::ParallelProcessing))
}
}
#[task(state = DataState)]
impl RecommendationEngine {
async fn run_bare(&self) -> Result<TaskResult<DataState>, CanoError> {
Ok(TaskResult::Single(DataState::Aggregate))
}
}
#[task(state = DataState)]
impl AggregateWithinSla {
async fn run_bare(&self) -> Result<TaskResult<DataState>, CanoError> {
Ok(TaskResult::Single(DataState::Complete))
}
}
fn build_workflow(store: MemoryStore) -> Workflow<DataState> {
// PartialTimeout Strategy: real-time recommendations with a 500ms SLA
let join_config = JoinConfig::new(JoinStrategy::PartialTimeout, DataState::Aggregate)
.with_timeout(Duration::from_millis(500));
Workflow::new(Resources::new().insert("store", store))
.register(DataState::Start, LoadUserContext)
.register_split(
DataState::ParallelProcessing,
vec![
RecommendationEngine::new("collaborative"),
RecommendationEngine::new("content-based"),
RecommendationEngine::new("trending"),
RecommendationEngine::new("personalized"),
],
join_config,
)
.register(DataState::Aggregate, AggregateWithinSla)
.add_exit_state(DataState::Complete)
}
Comparison Table
| Strategy | Trigger Condition | Cancels Others | Best Use Case |
|---|---|---|---|
All |
All tasks succeed | No | Complete data required |
Any |
First success | Yes | Redundant API calls |
Quorum(n) |
N tasks succeed | Yes | Distributed consensus |
Percentage(p) |
P% succeed | Yes | Batch processing |
PartialResults(n) |
N tasks complete (success or failure) | Yes | Latency optimization |
PartialTimeout |
Timeout reached | Yes | Strict SLA requirements |
Common Parallel Patterns
Split/Join handles complex parallel processing within a single workflow. Below are real-world patterns that fan out work across many tasks and join results back into the FSM. Use the Queue Consumer pattern for external queues (SQS, Redis, Kafka), Dynamic Task Generation when the task count depends on runtime data, Resource-Limited Processing to cap concurrent operations, and the Continuous Workflow pattern for scheduled parallel batches. They all give you the parallelism of running many concurrent workflow instances, with a simpler mental model, better resource control, and type-safe state management.
Pattern 1: Queue Consumer with Batch Processing
Process items from a queue in parallel batches. Instead of running multiple workflow instances concurrently,
use a single workflow that pulls a batch, splits it into per-item processors via a task that returns
TaskResult::Split, and loops until the queue drains.
use cano::prelude::*;
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum QueueState { PullBatch, ProcessBatch, Complete }
// Simulated queue (in production, use Redis, SQS, etc.)
type SharedQueue = Arc<Mutex<VecDeque<String>>>;
#[derive(Clone)]
struct QueuePuller { queue: SharedQueue, batch_size: usize }
#[task(state = QueueState)]
impl QueuePuller {
async fn run(&self, res: &Resources) -> Result<TaskResult<QueueState>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
let mut queue = self.queue.lock().await;
let mut batch = Vec::new();
for _ in 0..self.batch_size {
match queue.pop_front() {
Some(item) => batch.push(item),
None => break,
}
}
if batch.is_empty() {
tokio::time::sleep(Duration::from_secs(1)).await;
return Ok(TaskResult::Single(QueueState::PullBatch));
}
store.put("current_batch", batch)?;
Ok(TaskResult::Single(QueueState::ProcessBatch))
}
}
#[derive(Clone)]
struct ItemProcessor { item_id: String }
#[task(state = QueueState)]
impl ItemProcessor {
async fn run(&self, res: &Resources) -> Result<TaskResult<QueueState>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
tokio::time::sleep(Duration::from_millis(500)).await;
store.put(&format!("result_{}", self.item_id), "completed")?;
Ok(TaskResult::Single(QueueState::Complete))
}
}
#[derive(Clone)]
struct BatchSplitter;
#[task(state = QueueState)]
impl BatchSplitter {
async fn run(&self, res: &Resources) -> Result<TaskResult<QueueState>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
let batch: Vec<String> = store.get("current_batch")?;
if batch.is_empty() {
return Ok(TaskResult::Single(QueueState::PullBatch));
}
// One follow-on Complete state per item — processed in parallel.
Ok(TaskResult::Split(batch.iter().map(|_| QueueState::Complete).collect()))
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let store = MemoryStore::new();
let queue = Arc::new(Mutex::new(VecDeque::from(vec![
"order1".into(), "order2".into(), "order3".into(), "order4".into(), "order5".into(),
])));
let workflow = Workflow::new(Resources::new().insert("store", store.clone()))
.register(QueueState::PullBatch, QueuePuller { queue: queue.clone(), batch_size: 10 })
.register(QueueState::ProcessBatch, BatchSplitter)
.add_exit_state(QueueState::Complete);
loop {
let result = workflow.orchestrate(QueueState::PullBatch).await?;
if result == QueueState::Complete && queue.lock().await.is_empty() {
break;
}
}
println!("All items processed");
Ok(())
}
Pattern 2: Dynamic Task Generation
Build the list of parallel tasks from runtime data before constructing the workflow. A loader task writes the dataset into the store; each processor reads its slice by index; an aggregator runs once after the split joins.
use cano::prelude::*;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum DataState { LoadRecords, ProcessBatch, Aggregate, Complete }
#[derive(Clone)]
struct RecordLoader;
#[task(state = DataState)]
impl RecordLoader {
async fn run(&self, res: &Resources) -> Result<TaskResult<DataState>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
store.put("records", (1..=100).collect::<Vec<i32>>())?;
Ok(TaskResult::Single(DataState::ProcessBatch))
}
}
#[derive(Clone)]
struct RecordProcessor { index: usize }
#[task(state = DataState)]
impl RecordProcessor {
async fn run(&self, res: &Resources) -> Result<TaskResult<DataState>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
let records: Vec<i32> = store.get("records")?;
tokio::time::sleep(Duration::from_millis(10)).await;
store.put(&format!("result_{}", self.index), records[self.index] * 2)?;
Ok(TaskResult::Single(DataState::Aggregate))
}
}
#[derive(Clone)]
struct FinishAggregate;
#[task(state = DataState)]
impl FinishAggregate {
async fn run(&self, res: &Resources) -> Result<TaskResult<DataState>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
let total: i32 = (0..100)
.filter_map(|i| store.get::<i32>(&format!("result_{}", i)).ok())
.sum();
println!("Aggregated total: {}", total);
Ok(TaskResult::Single(DataState::Complete))
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let store = MemoryStore::new();
// Build the processor tasks before constructing the workflow.
let processors: Vec<RecordProcessor> = (0..100).map(|i| RecordProcessor { index: i }).collect();
let join_config = JoinConfig::new(JoinStrategy::All, DataState::Aggregate);
let workflow = Workflow::new(Resources::new().insert("store", store.clone()))
.register(DataState::LoadRecords, RecordLoader)
.register_split(DataState::ProcessBatch, processors, join_config)
.register(DataState::Aggregate, FinishAggregate)
.add_exit_state(DataState::Complete);
workflow.orchestrate(DataState::LoadRecords).await?;
Ok(())
}
Pattern 3: Resource-Limited Parallel Processing
Cap parallelism when a downstream resource (API keys, connections) is scarce. The
bulkhead on JoinConfig is the built-in way to do this; the example
below shows the manual alternative — a shared tokio::sync::Semaphore acquired inside each
task — for cases where you need the limit to span more than one split.
use cano::prelude::*;
use tokio::sync::Semaphore;
use std::sync::Arc;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum ApiState { Start, Complete }
async fn make_api_call(_id: usize) -> Result<String, CanoError> {
Ok("ok".to_string())
}
#[derive(Clone)]
struct RateLimitedApiTask { api_id: usize, semaphore: Arc<Semaphore> }
#[task(state = ApiState)]
impl RateLimitedApiTask {
async fn run(&self, res: &Resources) -> Result<TaskResult<ApiState>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
let _permit = self.semaphore.acquire().await
.map_err(|e| CanoError::task_execution(e.to_string()))?;
let result = make_api_call(self.api_id).await?;
store.put(&format!("api_result_{}", self.api_id), result)?;
Ok(TaskResult::Single(ApiState::Complete))
}
}
fn build_workflow(store: MemoryStore) -> Workflow<ApiState> {
// 20 tasks, at most 5 in flight at once.
let semaphore = Arc::new(Semaphore::new(5));
let tasks: Vec<RateLimitedApiTask> = (0..20)
.map(|i| RateLimitedApiTask { api_id: i, semaphore: semaphore.clone() })
.collect();
let join_config = JoinConfig::new(JoinStrategy::All, ApiState::Complete);
Workflow::new(Resources::new().insert("store", store))
.register_split(ApiState::Start, tasks, join_config)
.add_exit_state(ApiState::Complete)
}
For most cases, JoinConfig::with_bulkhead(n) is simpler than a hand-rolled semaphore —
it gates the split's task bodies on a semaphore for you and still applies the join strategy normally.
Reach for the manual approach only when the limit must be shared across multiple splits or workflows.
Pattern 4: Continuous Workflow with Split/Join
Combine the scheduler with split/join for continuous parallel processing. Because the split tasks are registered statically, the batch size is fixed per workflow instance — size it for your throughput and let the scheduler re-run it on an interval.
use cano::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum ProcessState { Start, ProcessBatch, Complete }
async fn fetch_pending_work() -> Result<Vec<String>, CanoError> {
Ok(vec!["job-1".to_string(), "job-2".to_string()])
}
#[derive(Clone)]
struct WorkProcessor { item_index: usize }
#[task(state = ProcessState)]
impl WorkProcessor {
async fn run(&self, res: &Resources) -> Result<TaskResult<ProcessState>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
let items: Vec<String> = store.get("work_items")?;
if let Some(item) = items.get(self.item_index) {
println!("Processing item: {}", item);
}
Ok(TaskResult::Single(ProcessState::Complete))
}
}
#[derive(Clone)]
struct BatchLoaderTask;
#[task(state = ProcessState)]
impl BatchLoaderTask {
async fn run(&self, res: &Resources) -> Result<TaskResult<ProcessState>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
let items = fetch_pending_work().await?;
if items.is_empty() {
return Ok(TaskResult::Single(ProcessState::Complete));
}
store.put("work_items", items)?;
Ok(TaskResult::Single(ProcessState::ProcessBatch))
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let mut scheduler = Scheduler::new();
let store = MemoryStore::new();
let batch_size = 10usize;
let processors: Vec<WorkProcessor> = (0..batch_size).map(|i| WorkProcessor { item_index: i }).collect();
let join_config = JoinConfig::new(JoinStrategy::All, ProcessState::Complete);
let batch_workflow = Workflow::new(Resources::new().insert("store", store.clone()))
.register(ProcessState::Start, BatchLoaderTask)
.register_split(ProcessState::ProcessBatch, processors, join_config)
.add_exit_state(ProcessState::Complete);
scheduler.every_seconds("batch_processor", batch_workflow, ProcessState::Start, 10)?;
// Keep the handle alive — dropping the `RunningScheduler` aborts the spawned loops.
let running = scheduler.start().await?;
// ...run until shut down (e.g. a Ctrl-C handler), then stop gracefully.
running.wait().await?;
Ok(())
}