API Reference
Complete API documentation for OnlineStatsChains.jl.
Core Types
StatDAG
OnlineStatsChains.StatDAG
— TypeStatDAG(; strategy=:eager)
Create a new Directed Acyclic Graph (DAG) for chaining OnlineStat computations.
Fields
nodes
: Dictionary mapping node IDs to Node instancesedges
: Dictionary mapping (from, to) tuples to Edge instancestopological_order
: Cached topological ordering of nodesorder_valid
: Flag indicating if topological order is currentstrategy
: Evaluation strategy (:eager
,:lazy
, or:partial
)dirty_nodes
: Set of nodes that need recomputation (lazy mode)
Evaluation Strategies
:eager
(default): Propagation happens immediately when fit!() is called:lazy
: Updates are recorded but not propagated until value() is requested:partial
: Only affected subgraph is recomputed
Example
# Eager evaluation (default)
dag = StatDAG()
# Lazy evaluation
dag = StatDAG(strategy=:lazy)
# Partial evaluation
dag = StatDAG(strategy=:partial)
CycleError
Custom exception type raised when attempting to create a cycle in the DAG.
Type: CycleError <: Exception
Fields:
msg::String
- Error message describing the cycle
DAG Construction
OnlineStatsChains.add_node!
— Functionadd_node!(dag::StatDAG, id::Symbol, stat::OnlineStat)
Add a node to the DAG with the given identifier and OnlineStat instance.
Throws ArgumentError
if a node with the same id
already exists.
Arguments
dag
: The StatDAG instanceid
: Unique symbol identifier for the nodestat
: An OnlineStat instance to associate with this node
Example
dag = StatDAG()
add_node!(dag, :mean, Mean())
OnlineStatsChains.connect!
— Functionconnect!(dag::StatDAG, from_id::Symbol, to_id::Symbol; filter::Union{Function,Nothing}=nothing, transform::Union{Function,Nothing}=nothing)
Create a directed edge from from_id
node to to_id
node.
Throws ArgumentError
if either node doesn't exist. Throws CycleError
if the connection would create a cycle.
Arguments
dag
: The StatDAG instancefrom_id
: Source node identifierto_id
: Destination node identifierfilter
: Optional filter function to conditionally propagate valuestransform
: Optional transform function to modify values before propagation
When both filter
and transform
are provided, the filter is evaluated first. The transform is only applied if the filter returns true.
Example
connect!(dag, :source, :sink)
connect!(dag, :ema1, :ema2, filter = !ismissing)
connect!(dag, :price, :alert, filter = x -> x > 100)
connect!(dag, :raw, :scaled, transform = x -> x * 100)
connect!(dag, :temp_c, :temp_f, filter = !ismissing, transform = c -> c * 9/5 + 32)
connect!(dag::StatDAG, from_ids::Vector{Symbol}, to_id::Symbol; filter::Union{Function,Nothing}=nothing, transform::Union{Function,Nothing}=nothing)
Connect multiple source nodes to a single destination node (fan-in).
Arguments
dag
: The StatDAG instancefrom_ids
: Vector of source node identifiersto_id
: Destination node identifierfilter
: Optional filter function applied to combined parent valuestransform
: Optional transform function applied to combined parent values
Example
connect!(dag, [:input1, :input2], :combined)
connect!(dag, [:high, :low], :spread, filter = vals -> all(!ismissing, vals))
connect!(dag, [:price, :qty], :total, transform = vals -> vals[1] * vals[2])
Data Input
StatsAPI.fit!
— Functionfit!(dag::StatDAG, data::Pair{Symbol, <:Any})
Update a source node with data and propagate through the DAG.
Arguments
dag
: The StatDAG instancedata
: A Pair ofnode_id => value
ornode_id => iterable
If value is iterable, each element is processed sequentially with propagation.
Example
# Single value
fit!(dag, :source => 42)
# Batch mode
fit!(dag, :source => [1, 2, 3, 4, 5])
fit!(dag::StatDAG, data::Dict{Symbol, <:Any})
Update multiple source nodes simultaneously.
If values in the Dict are iterables of different lengths, processes up to the shortest length and issues a warning.
Example
fit!(dag, Dict(:input1 => [1, 2, 3], :input2 => [4, 5, 6]))
Value Retrieval
value
OnlineStatsBase.value
— Methodvalue(dag::StatDAG, id::Symbol)
Get the current value of a node's OnlineStat.
In lazy mode, this triggers recomputation of dirty nodes if necessary.
Throws KeyError
if the node doesn't exist.
Example
val = value(dag, :mean)
values
OnlineStatsChains.values
— Functionvalues(dag::StatDAG)
Get a dictionary of all node values.
Example
all_vals = values(dag)
Evaluation Strategies
OnlineStatsChains.set_strategy!
— Functionset_strategy!(dag::StatDAG, strategy::Symbol)
Change the evaluation strategy of an existing DAG.
Arguments
dag
: The StatDAG instancestrategy
: New strategy (:eager
,:lazy
, or:partial
)
Example
dag = StatDAG() # Default eager
set_strategy!(dag, :lazy) # Switch to lazy
OnlineStatsChains.invalidate!
— Functioninvalidate!(dag::StatDAG, id::Symbol)
Mark a node and all its descendants as needing recomputation (lazy mode).
Arguments
dag
: The StatDAG instanceid
: Node identifier to invalidate
Example
dag = StatDAG(strategy=:lazy)
add_node!(dag, :source, Mean())
fit!(dag, :source => 1.0)
invalidate!(dag, :source) # Mark for recomputation
OnlineStatsChains.recompute!
— Functionrecompute!(dag::StatDAG)
Force recomputation of all dirty nodes (lazy mode).
Example
dag = StatDAG(strategy=:lazy)
# ... add nodes and fit data ...
recompute!(dag) # Recompute all dirty nodes
Edge Transformations
Overview
Edge transformations allow you to apply functions to data as it flows through the DAG. This enables data preprocessing, unit conversions, feature extraction, and other transformations without modifying the source nodes.
Key Features:
- Transform data on-the-fly as it propagates
- Combine with filters for conditional transformations
- Supports both single-input and multi-input transformations
- Backward compatible: edges without transforms use computed values
transform parameter
The transform
keyword argument in connect!()
accepts a function that will be applied to data propagating through the edge.
Signature: connect!(dag, source, target; transform=nothing, filter=nothing)
Parameters:
transform::Union{Function, Nothing}
- Function to apply to propagating datafilter::Union{Function, Nothing}
- Optional filter to apply before transform
Important: When a transform
or filter
is present on an edge, that edge propagates raw data values instead of computed statistics. This enables meaningful transformations of the original data.
Basic Example
using OnlineStatsChains, OnlineStats
# Temperature sensor in Celsius, convert to Fahrenheit
dag = StatDAG()
add_node!(dag, :celsius, Mean())
add_node!(dag, :fahrenheit, Mean())
# Convert celsius to fahrenheit
connect!(dag, :celsius, :fahrenheit, transform = c -> c * 9/5 + 32)
# Input temperature readings in Celsius
fit!(dag, :celsius => [0.0, 10.0, 20.0, 30.0])
value(dag, :celsius) # 15.0 (mean in Celsius)
value(dag, :fahrenheit) # 59.0 (mean in Fahrenheit)
Data Extraction Example
Extract specific fields from structured data:
dag = StatDAG()
add_node!(dag, :measurements, Mean())
add_node!(dag, :prices, Mean())
# Extract price field from measurement objects
connect!(dag, :measurements, :prices, transform = m -> m.price)
# Simulate measurements with price and quantity
measurements = [(price=10.0, qty=2), (price=15.0, qty=3), (price=12.0, qty=1)]
fit!(dag, :measurements => measurements)
Combining Filter and Transform
Filters and transforms work together: filter is applied first, then transform.
dag = StatDAG()
add_node!(dag, :temperature, Mean())
add_node!(dag, :fahrenheit_high, Mean())
# Only propagate temperatures above 20°C, convert to Fahrenheit
connect!(dag, :temperature, :fahrenheit_high,
filter = t -> t > 20,
transform = t -> t * 9/5 + 32)
fit!(dag, :temperature => [15.0, 25.0, 18.0, 30.0])
# fahrenheit_high receives only [77.0, 86.0] (from 25°C and 30°C)
Multi-Input Transformations
Transform functions can process data from multiple parents:
dag = StatDAG()
add_node!(dag, :price, Mean())
add_node!(dag, :quantity, Mean())
add_node!(dag, :revenue, Mean())
# Connect multiple sources with transformation
connect!(dag, [:price, :quantity], :revenue,
transform = inputs -> inputs[1] * inputs[2])
fit!(dag, Dict(:price => 10.0, :quantity => 5.0))
value(dag, :revenue) # 50.0
Introspection Functions
Query edge transformations:
# Check if an edge has a transform
has_transform(dag::StatDAG, source::Symbol, target::Symbol) -> Bool
# Get the transform function (or nothing)
get_transform(dag::StatDAG, source::Symbol, target::Symbol) -> Union{Function, Nothing}
Example:
dag = StatDAG()
add_node!(dag, :a, Mean())
add_node!(dag, :b, Mean())
connect!(dag, :a, :b, transform = x -> x * 2)
has_transform(dag, :a, :b) # true
get_transform(dag, :a, :b) # returns the transform function
Execution Order
When both filter and transform are present on an edge:
- Filter is evaluated first with the raw data value
- If filter returns
true
, transform is applied - Transformed value is fitted to the target node
Hybrid Propagation Model
OnlineStatsChains uses a hybrid propagation model for backward compatibility:
- Edges WITHOUT filter/transform: Propagate computed values (statistics like means, variances)
- Edges WITH filter OR transform: Propagate raw data values (original input data)
This ensures existing code continues to work while enabling powerful transformations.
Graph Introspection
OnlineStatsChains.get_nodes
— Functionget_nodes(dag::StatDAG)
Get a list of all node IDs in the DAG.
OnlineStatsChains.get_parents
— Functionget_parents(dag::StatDAG, id::Symbol)
Get the list of parent node IDs for a given node.
OnlineStatsChains.get_children
— Functionget_children(dag::StatDAG, id::Symbol)
Get the list of child node IDs for a given node.
OnlineStatsChains.get_topological_order
— Functionget_topological_order(dag::StatDAG)
Get the topological ordering of nodes in the DAG.
OnlineStatsChains.validate
— Functionvalidate(dag::StatDAG)
Validate the DAG structure for consistency. Returns true if valid, throws an error otherwise.
Filter Introspection
OnlineStatsChains.has_filter
— Functionhas_filter(dag::StatDAG, from_id::Symbol, to_id::Symbol)
Check if an edge has a filter function.
Arguments
dag
: The StatDAG instancefrom_id
: Source node identifierto_id
: Destination node identifier
Returns
Bool
: true if the edge has a filter, false otherwise
Example
if has_filter(dag, :ema1, :ema2)
println("Edge has a filter")
end
OnlineStatsChains.get_filter
— Functionget_filter(dag::StatDAG, from_id::Symbol, to_id::Symbol)
Get the filter function for an edge, or nothing
if no filter exists.
Arguments
dag
: The StatDAG instancefrom_id
: Source node identifierto_id
: Destination node identifier
Returns
Union{Function, Nothing}
: The filter function or nothing
Example
filter_fn = get_filter(dag, :ema1, :ema2)
Transform Introspection
OnlineStatsChains.has_transform
— Functionhas_transform(dag::StatDAG, from_id::Symbol, to_id::Symbol)
Check if an edge has a transform function.
Arguments
dag
: The StatDAG instancefrom_id
: Source node identifierto_id
: Destination node identifier
Returns
Bool
: true if the edge has a transform, false otherwise
Example
if has_transform(dag, :raw, :scaled)
println("Edge has a transform")
end
OnlineStatsChains.get_transform
— Functionget_transform(dag::StatDAG, from_id::Symbol, to_id::Symbol)
Get the transform function for an edge, or nothing
if no transform exists.
Arguments
dag
: The StatDAG instancefrom_id
: Source node identifierto_id
: Destination node identifier
Returns
Union{Function, Nothing}
: The transform function or nothing
Example
transform_fn = get_transform(dag, :raw, :scaled)
Internal Functions
These functions are primarily used internally but are documented for advanced usage and package development.
OnlineStatsChains.propagate_value!
— Functionpropagate_value!(dag::StatDAG, node_id::Symbol, raw_val)
Propagate from a node to its immediate children.
Hybrid propagation model (backward compatible):
- IF edge has NO transform: propagates the computed value (cached_value) - ORIGINAL BEHAVIOR
- IF edge has transform: propagates the raw data through the transform - NEW BEHAVIOR
This preserves backward compatibility while enabling transformers.
Arguments
dag
: The StatDAG instancenode_id
: The source node IDraw_val
: The RAW input value that was just fit!() into the source node
OnlineStatsChains.has_cycle
— Functionhas_cycle(dag::StatDAG)
Check if the DAG contains any cycles using depth-first search. Returns true if a cycle is detected, false otherwise.
OnlineStatsChains.compute_topological_order!
— Functioncompute_topological_order!(dag::StatDAG)
Compute and cache the topological ordering of nodes in the DAG. Uses Kahn's algorithm (O(V + E) complexity).
OnlineStatsChains.is_ancestor
— Functionis_ancestor(dag::StatDAG, potential_ancestor::Symbol, node_id::Symbol)
Check if potentialancestor is an ancestor of nodeid.
Index
OnlineStatsChains.StatDAG
OnlineStatsBase.value
OnlineStatsChains.add_node!
OnlineStatsChains.compute_topological_order!
OnlineStatsChains.connect!
OnlineStatsChains.get_children
OnlineStatsChains.get_filter
OnlineStatsChains.get_nodes
OnlineStatsChains.get_parents
OnlineStatsChains.get_topological_order
OnlineStatsChains.get_transform
OnlineStatsChains.has_cycle
OnlineStatsChains.has_filter
OnlineStatsChains.has_transform
OnlineStatsChains.invalidate!
OnlineStatsChains.is_ancestor
OnlineStatsChains.propagate_value!
OnlineStatsChains.recompute!
OnlineStatsChains.set_strategy!
OnlineStatsChains.validate
OnlineStatsChains.values
StatsAPI.fit!