MemoryStore

The default in-memory shared state for passing data between workflow stages.

The store is the shared data layer that connects workflow stages. Tasks write values during their execution and read values produced by upstream stages. The default implementation, MemoryStore, is an Arc<RwLock<HashMap>> that is safe to clone and share across async tasks. Custom backends can be registered as typed Resource values and retrieved by concrete type.

MemoryStore is registered inside a Resources dictionary and retrieved in tasks with res.get::<MemoryStore, _>("store")?. This makes the store one of many named dependencies a workflow can carry, alongside database pools, HTTP clients, configuration, or any other type that implements the Resource trait. See Resources for lifecycle semantics, key-type options, and the full insert / get API.


Thread Safety Model

One shared map, many readers and writers

graph LR S["Arc<RwLock<HashMap>>"] S --> R1["Task A (read)"] S --> R2["Task B (read)"] S --> W1["Task C (write)"] style S fill:#1e293b,stroke:#38bdf8

MemoryStore uses Arc<RwLock<_>> for interior mutability. Multiple readers hold shared locks concurrently; writers get exclusive access. Because MemoryStore wraps its inner map in Arc, cloning the store produces a second handle to the same underlying data -- not a copy.

🔒 Clone shares the same data
let store = MemoryStore::new();
let store2 = store.clone(); // same backing map — not a copy

store.put("key", 42i32)?;
let val: i32 = store2.get("key")?; // returns 42


API Reference

Method Signature Notes
get get<T: Clone>(&self, key: &str) -> StoreResult<T> Returns a clone. Errors on missing key or type mismatch.
get_shared get_shared<T: Send + Sync>(&self, key: &str) -> StoreResult<Arc<T>> Zero-copy read. Returns the stored Arc pointer directly when possible.
put put<T: Send + Sync>(&self, key: &str, value: T) -> StoreResult<()> Inserts or replaces. Values do not need to implement Clone.
remove remove(&self, key: &str) -> StoreResult<()> Silent no-op on missing keys. Errors only on lock failure.
append append<T: Send + Sync + Clone>(&self, key: &str, item: T) -> StoreResult<()> Appends to an existing Vec<T>, or creates one. Type mismatch errors if the key holds a non-Vec.
contains_key contains_key(&self, key: &str) -> StoreResult<bool> Returns Ok(true) if the key exists.
keys keys(&self) -> StoreResult<Vec<Arc<str>>> Returns all keys. Order is unspecified (HashMap).
len len(&self) -> StoreResult<usize> Number of key-value pairs currently stored.
is_empty is_empty(&self) -> StoreResult<bool> Default impl delegates to len().
clear clear(&self) -> StoreResult<()> Removes all entries.

Basic Usage

📌 Common store operations
use cano::prelude::*;

fn main() -> Result<(), CanoError> {
    let store = MemoryStore::new();

    // Store primitive and composite types
    store.put("count", 42u32)?;
    store.put("labels", vec!["a".to_string(), "b".to_string()])?;

    // Retrieve with explicit type annotation
    let count: u32 = store.get("count")?;
    let labels: Vec<String> = store.get("labels")?;

    // Append to a Vec (creates if absent)
    store.append("log", "step 1 complete".to_string())?;
    store.append("log", "step 2 complete".to_string())?;
    let log: Vec<String> = store.get("log")?;

    // Check existence before reading optional keys
    if store.contains_key("result")? {
        let result: String = store.get("result")?;
    }

    // Remove a key — silent no-op if absent
    store.remove("scratch")?;
    Ok(())
}

Tip

Use contains_key() before get() for optional keys, or handle the KeyNotFound error directly. The append() method is handy for building up collections across multiple workflow stages without overwriting previous entries.


Zero-Copy Reads with get_shared()

get_shared() returns an Arc<T> pointing to the value in the store. When MemoryStore stored the value, it wrapped it in an Arc internally. get_shared() returns a clone of that pointer -- a cheap reference-count bump, not a data copy. This is useful when passing large values (e.g., a model's weight tensor or a large dataset) to multiple downstream tasks.

Zero-copy with Arc
use cano::prelude::*;
use std::sync::Arc;

fn main() -> Result<(), CanoError> {
    let store = MemoryStore::new();
    store.put("dataset", vec![0u8; 1_000_000])?; // 1 MB

    // Both handles point to the same allocation — no copy
    let handle_a: Arc<Vec<u8>> = store.get_shared("dataset")?;
    let handle_b: Arc<Vec<u8>> = store.get_shared("dataset")?;

    assert!(Arc::ptr_eq(&handle_a, &handle_b));
    Ok(())
}

Clones data

get()

get() always clones the underlying value. Use when you need an owned copy.

Zero-copy

get_shared()

Prefer get_shared() for large or frequently-read values.


Store in a Workflow

Register the store inside a Resources dictionary and pass it to Workflow::new(). Each task retrieves the store via res.get::<MemoryStore, _>("store")?, so the same handle is shared across all registered tasks for the duration of a single orchestrate() call.

End-to-end workflow with store
use cano::prelude::*;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Stage { Ingest, Transform, Complete }

struct IngestTask;

#[task(state = Stage)]
impl IngestTask {
    async fn run(&self, res: &Resources) -> Result<TaskResult<Stage>, CanoError> {
        let store = res.get::<MemoryStore, _>("store")?;
        let batch: usize = store.get("batch_size")?;
        let records: Vec<u32> = (0..batch as u32).collect();
        store.put("records", records)?;
        Ok(TaskResult::Single(Stage::Transform))
    }
}

struct TransformTask;

#[task(state = Stage)]
impl TransformTask {
    async fn run(&self, res: &Resources) -> Result<TaskResult<Stage>, CanoError> {
        let store = res.get::<MemoryStore, _>("store")?;
        let records: Vec<u32> = store.get("records")?;
        let transformed: Vec<u32> = records.into_iter().map(|x| x * 2).collect();
        store.put("result", transformed)?;
        Ok(TaskResult::Single(Stage::Complete))
    }
}

#[tokio::main]
async fn main() -> Result<(), CanoError> {
    let store = MemoryStore::new();

    // Pre-populate before the workflow starts
    store.put("batch_size", 256usize)?;

    let workflow = Workflow::new(Resources::new().insert("store", store.clone()))
        .register(Stage::Ingest, IngestTask)
        .register(Stage::Transform, TransformTask)
        .add_exit_state(Stage::Complete);

    workflow.orchestrate(Stage::Ingest).await?;

    // Read results after the workflow completes
    let result: Vec<u32> = store.get("result")?;
    println!("Processed {} records", result.len());

    Ok(())
}

Runnable example: cargo run --example workflow_stack_store — uses a MemoryStore as a shared stack passed between workflow stages.


Custom Store Resources

For custom storage, define a concrete type with the API your workflow needs and implement Resource for it. Register that type in Resources, then retrieve it by concrete type inside tasks. This keeps storage dependencies explicit and avoids an unnecessary storage trait layer now that Resources provides typed dependency lookup.

🔧 Custom namespaced store resource
use cano::prelude::*;
use cano::store::StoreResult;
use std::sync::Arc;

/// Example: a store that prefixes all keys with a namespace.
#[derive(Clone)]
pub struct NamespacedStore {
    namespace: String,
    inner: MemoryStore,
}

impl NamespacedStore {
    pub fn new(namespace: impl Into<String>) -> Self {
        Self {
            namespace: namespace.into(),
            inner: MemoryStore::new(),
        }
    }

    fn ns_key(&self, key: &str) -> String {
        format!("{}::{}", self.namespace, key)
    }

    pub fn get<T: 'static + Clone>(&self, key: &str) -> StoreResult<T> {
        self.inner.get(&self.ns_key(key))
    }

    pub fn put<T: 'static + Send + Sync>(&self, key: &str, value: T) -> StoreResult<()> {
        self.inner.put(&self.ns_key(key), value)
    }

    pub fn remove(&self, key: &str) -> StoreResult<()> {
        self.inner.remove(&self.ns_key(key))
    }

    pub fn append<T: 'static + Send + Sync + Clone>(&self, key: &str, item: T) -> StoreResult<()> {
        self.inner.append(&self.ns_key(key), item)
    }

    pub fn contains_key(&self, key: &str) -> StoreResult<bool> {
        self.inner.contains_key(&self.ns_key(key))
    }

    pub fn keys(&self) -> StoreResult<Vec<String>> {
        // Strip the namespace prefix before returning keys to callers
        let prefix = format!("{}::", self.namespace);
        let raw_keys = self.inner.keys()?;
        Ok(raw_keys
            .into_iter()
            .filter_map(|k| k.strip_prefix(&prefix).map(str::to_string))
            .collect())
    }

    pub fn len(&self) -> StoreResult<usize> {
        // Count only keys in this namespace
        Ok(self.keys()?.len())
    }

    pub fn clear(&self) -> StoreResult<()> {
        // Remove only keys in this namespace
        for key in self.keys()? {
            self.remove(&key)?;
        }
        Ok(())
    }
}

impl Resource for NamespacedStore {}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State { Start, Done }

fn main() -> Result<(), CanoError> {
    // Use it like any other store — register it in Resources with your own key
    let store = NamespacedStore::new("pipeline_v2");
    store.put("status", "running".to_string())?;

    let _workflow: Workflow<State> = Workflow::new(
        Resources::new().insert("namespaced_store", store.clone()),
    )
    // ... register tasks that retrieve it via
    //     res.get::<NamespacedStore, _>("namespaced_store")? ...
    .add_exit_state(State::Done);
    Ok(())
}

Typed resources keep it simple

Because Resources retrieves dependencies by concrete type, your custom store can expose the exact methods it needs. If you need dynamic dispatch, wrap your backend in a concrete resource type that owns an object-safe client or repository trait.

Runnable example: cargo run --example store_custom_backend — a small NamespacedStore wrapping a MemoryStore with key prefixing, registered as a resource and retrieved by type, plus a get_shared::<T>() demo (Arc::ptr_eq proves the zero-copy share of a large value between tasks).


Error Handling

Store operations return StoreResult<T>, which is Result<T, StoreError>. StoreError converts automatically to CanoError::Store via the From impl, so the ? operator works transparently in task methods.

KeyNotFound

The requested key does not exist. Use contains_key() to guard optional reads, or handle the error with .unwrap_or_default().

TypeMismatch

The stored value cannot be downcast to the requested type. Ensure consistent type usage per key across all pipeline stages.

LockError

The internal RwLock is poisoned -- a thread panicked while holding the lock. Typically fatal; log and restart.

AppendTypeMismatch

The key exists but holds a non-Vec value. Mixing scalar and collection writes to the same key is a logic error.