MemoryStore
The default in-memory shared state for passing data between workflow stages.
The store is the shared data layer that connects workflow stages. Tasks write
values during their execution and read values produced by upstream stages. The default
implementation, MemoryStore, is an Arc<RwLock<HashMap>>
that is safe to clone and share across async tasks. Custom backends can be registered
as typed Resource values and retrieved by concrete type.
MemoryStore is registered inside a Resources
dictionary and retrieved in tasks with
res.get::<MemoryStore, _>("store")?. This makes the store one
of many named dependencies a workflow can carry, alongside database pools, HTTP clients,
configuration, or any other type that implements the Resource trait. See
Resources for lifecycle semantics, key-type options, and
the full insert / get API.
Thread Safety Model
One shared map, many readers and writers
MemoryStore uses Arc<RwLock<_>> for interior mutability.
Multiple readers hold shared locks concurrently; writers get exclusive access.
Because MemoryStore wraps its inner map in Arc, cloning the store
produces a second handle to the same underlying data -- not a copy.
let store = MemoryStore::new();
let store2 = store.clone(); // same backing map — not a copy
store.put("key", 42i32)?;
let val: i32 = store2.get("key")?; // returns 42
API Reference
| Method | Signature | Notes |
|---|---|---|
get |
get<T: Clone>(&self, key: &str) -> StoreResult<T> |
Returns a clone. Errors on missing key or type mismatch. |
get_shared |
get_shared<T: Send + Sync>(&self, key: &str) -> StoreResult<Arc<T>> |
Zero-copy read. Returns the stored Arc pointer directly when possible. |
put |
put<T: Send + Sync>(&self, key: &str, value: T) -> StoreResult<()> |
Inserts or replaces. Values do not need to implement Clone. |
remove |
remove(&self, key: &str) -> StoreResult<()> |
Silent no-op on missing keys. Errors only on lock failure. |
append |
append<T: Send + Sync + Clone>(&self, key: &str, item: T) -> StoreResult<()> |
Appends to an existing Vec<T>, or creates one. Type mismatch errors if the key holds a non-Vec. |
contains_key |
contains_key(&self, key: &str) -> StoreResult<bool> |
Returns Ok(true) if the key exists. |
keys |
keys(&self) -> StoreResult<Vec<Arc<str>>> |
Returns all keys. Order is unspecified (HashMap). |
len |
len(&self) -> StoreResult<usize> |
Number of key-value pairs currently stored. |
is_empty |
is_empty(&self) -> StoreResult<bool> |
Default impl delegates to len(). |
clear |
clear(&self) -> StoreResult<()> |
Removes all entries. |
Basic Usage
use cano::prelude::*;
fn main() -> Result<(), CanoError> {
let store = MemoryStore::new();
// Store primitive and composite types
store.put("count", 42u32)?;
store.put("labels", vec!["a".to_string(), "b".to_string()])?;
// Retrieve with explicit type annotation
let count: u32 = store.get("count")?;
let labels: Vec<String> = store.get("labels")?;
// Append to a Vec (creates if absent)
store.append("log", "step 1 complete".to_string())?;
store.append("log", "step 2 complete".to_string())?;
let log: Vec<String> = store.get("log")?;
// Check existence before reading optional keys
if store.contains_key("result")? {
let result: String = store.get("result")?;
}
// Remove a key — silent no-op if absent
store.remove("scratch")?;
Ok(())
}
Use contains_key() before get() for optional keys, or handle the
KeyNotFound error directly. The append() method is handy for
building up collections across multiple workflow stages without overwriting previous entries.
Zero-Copy Reads with get_shared()
get_shared() returns an Arc<T> pointing to the value
in the store. When MemoryStore stored the value, it wrapped it in an
Arc internally. get_shared() returns a clone of that pointer --
a cheap reference-count bump, not a data copy. This is useful when passing large
values (e.g., a model's weight tensor or a large dataset) to multiple downstream tasks.
use cano::prelude::*;
use std::sync::Arc;
fn main() -> Result<(), CanoError> {
let store = MemoryStore::new();
store.put("dataset", vec![0u8; 1_000_000])?; // 1 MB
// Both handles point to the same allocation — no copy
let handle_a: Arc<Vec<u8>> = store.get_shared("dataset")?;
let handle_b: Arc<Vec<u8>> = store.get_shared("dataset")?;
assert!(Arc::ptr_eq(&handle_a, &handle_b));
Ok(())
}
get()
get() always clones the underlying value. Use when you need an owned copy.
get_shared()
Prefer get_shared() for large or frequently-read values.
Store in a Workflow
Register the store inside a Resources dictionary and pass it to
Workflow::new(). Each task retrieves the store via
res.get::<MemoryStore, _>("store")?, so the same handle is shared
across all registered tasks for the duration of a single orchestrate() call.
use cano::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Stage { Ingest, Transform, Complete }
struct IngestTask;
#[task(state = Stage)]
impl IngestTask {
async fn run(&self, res: &Resources) -> Result<TaskResult<Stage>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
let batch: usize = store.get("batch_size")?;
let records: Vec<u32> = (0..batch as u32).collect();
store.put("records", records)?;
Ok(TaskResult::Single(Stage::Transform))
}
}
struct TransformTask;
#[task(state = Stage)]
impl TransformTask {
async fn run(&self, res: &Resources) -> Result<TaskResult<Stage>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
let records: Vec<u32> = store.get("records")?;
let transformed: Vec<u32> = records.into_iter().map(|x| x * 2).collect();
store.put("result", transformed)?;
Ok(TaskResult::Single(Stage::Complete))
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let store = MemoryStore::new();
// Pre-populate before the workflow starts
store.put("batch_size", 256usize)?;
let workflow = Workflow::new(Resources::new().insert("store", store.clone()))
.register(Stage::Ingest, IngestTask)
.register(Stage::Transform, TransformTask)
.add_exit_state(Stage::Complete);
workflow.orchestrate(Stage::Ingest).await?;
// Read results after the workflow completes
let result: Vec<u32> = store.get("result")?;
println!("Processed {} records", result.len());
Ok(())
}
Runnable example: cargo run --example workflow_stack_store — uses a MemoryStore
as a shared stack passed between workflow stages.
Custom Store Resources
For custom storage, define a concrete type with the API your workflow needs and
implement Resource for it. Register that type in Resources, then retrieve it by
concrete type inside tasks. This keeps storage dependencies explicit and avoids
an unnecessary storage trait layer now that Resources provides typed dependency lookup.
use cano::prelude::*;
use cano::store::StoreResult;
use std::sync::Arc;
/// Example: a store that prefixes all keys with a namespace.
#[derive(Clone)]
pub struct NamespacedStore {
namespace: String,
inner: MemoryStore,
}
impl NamespacedStore {
pub fn new(namespace: impl Into<String>) -> Self {
Self {
namespace: namespace.into(),
inner: MemoryStore::new(),
}
}
fn ns_key(&self, key: &str) -> String {
format!("{}::{}", self.namespace, key)
}
pub fn get<T: 'static + Clone>(&self, key: &str) -> StoreResult<T> {
self.inner.get(&self.ns_key(key))
}
pub fn put<T: 'static + Send + Sync>(&self, key: &str, value: T) -> StoreResult<()> {
self.inner.put(&self.ns_key(key), value)
}
pub fn remove(&self, key: &str) -> StoreResult<()> {
self.inner.remove(&self.ns_key(key))
}
pub fn append<T: 'static + Send + Sync + Clone>(&self, key: &str, item: T) -> StoreResult<()> {
self.inner.append(&self.ns_key(key), item)
}
pub fn contains_key(&self, key: &str) -> StoreResult<bool> {
self.inner.contains_key(&self.ns_key(key))
}
pub fn keys(&self) -> StoreResult<Vec<String>> {
// Strip the namespace prefix before returning keys to callers
let prefix = format!("{}::", self.namespace);
let raw_keys = self.inner.keys()?;
Ok(raw_keys
.into_iter()
.filter_map(|k| k.strip_prefix(&prefix).map(str::to_string))
.collect())
}
pub fn len(&self) -> StoreResult<usize> {
// Count only keys in this namespace
Ok(self.keys()?.len())
}
pub fn clear(&self) -> StoreResult<()> {
// Remove only keys in this namespace
for key in self.keys()? {
self.remove(&key)?;
}
Ok(())
}
}
impl Resource for NamespacedStore {}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum State { Start, Done }
fn main() -> Result<(), CanoError> {
// Use it like any other store — register it in Resources with your own key
let store = NamespacedStore::new("pipeline_v2");
store.put("status", "running".to_string())?;
let _workflow: Workflow<State> = Workflow::new(
Resources::new().insert("namespaced_store", store.clone()),
)
// ... register tasks that retrieve it via
// res.get::<NamespacedStore, _>("namespaced_store")? ...
.add_exit_state(State::Done);
Ok(())
}
Because Resources retrieves dependencies by concrete type, your custom store can expose
the exact methods it needs. If you need dynamic dispatch, wrap your backend in a concrete
resource type that owns an object-safe client or repository trait.
Runnable example: cargo run --example store_custom_backend — a small
NamespacedStore wrapping a MemoryStore with key prefixing, registered as a
resource and retrieved by type, plus a get_shared::<T>() demo (Arc::ptr_eq
proves the zero-copy share of a large value between tasks).
Error Handling
Store operations return StoreResult<T>, which is
Result<T, StoreError>. StoreError converts automatically
to CanoError::Store via the From impl, so the ?
operator works transparently in task methods.
KeyNotFound
The requested key does not exist. Use contains_key() to guard optional reads, or handle the error with .unwrap_or_default().
TypeMismatch
The stored value cannot be downcast to the requested type. Ensure consistent type usage per key across all pipeline stages.
LockError
The internal RwLock is poisoned -- a thread panicked while holding the lock. Typically fatal; log and restart.
AppendTypeMismatch
The key exists but holds a non-Vec value. Mixing scalar and collection writes to the same key is a logic error.