Tracing
Span-level instrumentation for every pipeline, stage, system, retry, and distributed-runner boundary — under the tracing feature flag.
Enabling the feature
[dependencies]
pcs-core = { version = "0.1.0", features = ["tracing"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
Set up the subscriber in main and PCS spans flow into your sink of choice (terminal, JSON, OTLP, …):
use tracing_subscriber::{fmt, EnvFilter};
fn main() {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env()) // RUST_LOG=info,pcs_core=debug
.with_target(false)
.compact()
.init();
// run your pipeline...
}
Spans you’ll see
| Span | Fields | Where |
|---|---|---|
pipeline.run | name, systems | Wraps an entire Pipeline::run call. |
pipeline.stage | idx, systems | One per stage; child of pipeline.run. |
system.execute | name, retries | Per-system run, wrapped in retry. |
system.retry | attempt, delay_ms | Emitted whenever a system retry kicks in. |
dataset.compact | before, after | Compaction sweep over all components. |
distributed.claim | claim_id, partition | Lifecycle of a row-range claim. |
distributed.checkpoint | stage_idx, bytes | Per-stage Arrow IPC checkpoint write. |
raft.propose | op, term | One per ConsensusCommand proposed via Raft. |
Span names use a stable dotted scheme (area.event) so filtering with RUST_LOG or EnvFilter remains predictable.
Useful filter recipes
# Pipeline-level info, system retries at debug:
RUST_LOG="info,pcs_core::pipeline=debug" cargo run --example scheduler_etl
# Quietly run, but capture every Raft proposal:
RUST_LOG="warn,pcs_service::distributed::raft=trace" pcs-service serve --config service.toml
# Drop everything except the distributed runner:
RUST_LOG="off,pcs_service::distributed::runner=info" cargo run --example distributed_scheduler --features distributed
Exporting to OpenTelemetry
PCS does not depend on any specific exporter — it only emits standard tracing events. Wire it up to OTLP, Honeycomb, Jaeger, or any other backend by stacking an opentelemetry layer onto the subscriber.
use opentelemetry_otlp::WithExportConfig;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(opentelemetry_otlp::new_exporter().tonic())
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::from_default_env())
.with(tracing_opentelemetry::layer().with_tracer(tracer))
.with(tracing_subscriber::fmt::layer())
.init();