Examples
Real-world examples demonstrating OnlineStatsChains.jl.
Financial Time Series Analysis
Track multiple technical indicators:
using OnlineStatsChains
using OnlineStatsBase
function create_trading_dag()
dag = StatDAG()
# Price input
add_node!(dag, :price, Mean())
# Technical indicators
add_node!(dag, :sma_5, Mean())
add_node!(dag, :sma_20, Mean())
add_node!(dag, :volatility, Variance())
add_node!(dag, :range, Extrema())
# Build pipeline
connect!(dag, :price, :sma_5)
connect!(dag, :price, :sma_20)
connect!(dag, :price, :volatility)
connect!(dag, :price, :range)
return dag
end
# Use the DAG
dag = create_trading_dag()
# Stream price data
prices = [100.0, 102.0, 101.0, 103.0, 105.0, 104.0, 106.0, 108.0]
fit!(dag, :price => prices)
# Get indicators
println("Price: ", value(dag, :price))
println("SMA(5): ", value(dag, :sma_5))
println("SMA(20): ", value(dag, :sma_20))
println("Volatility: ", value(dag, :volatility))
println("Range: ", value(dag, :range))
Sensor Network Monitoring
Multi-sensor aggregation:
using OnlineStatsChains
using OnlineStatsBase
function create_sensor_network()
dag = StatDAG()
# Individual sensors
add_node!(dag, :temp_1, Mean())
add_node!(dag, :temp_2, Mean())
add_node!(dag, :temp_3, Mean())
# Zone aggregates
add_node!(dag, :zone_avg, Mean())
add_node!(dag, :zone_var, Variance())
# Building-level stats
add_node!(dag, :building, Mean())
# Connect sensors to zones
connect!(dag, [:temp_1, :temp_2, :temp_3], :zone_avg)
connect!(dag, [:temp_1, :temp_2, :temp_3], :zone_var)
# Connect to building
connect!(dag, :zone_avg, :building)
return dag
end
# Monitor sensors
dag = create_sensor_network()
# Update sensors (different sample sizes)
fit!(dag, Dict(
:temp_1 => [20.1, 20.3, 20.2],
:temp_2 => [19.8, 20.0, 19.9],
:temp_3 => [20.5, 20.7]
))
println("Zone average: ", value(dag, :zone_avg))
println("Zone variance: ", value(dag, :zone_var))
println("Building temp: ", value(dag, :building))
Streaming Data Pipeline
Process real-time data with lazy evaluation:
using OnlineStatsChains
using OnlineStatsBase
function create_streaming_pipeline()
# Use lazy for efficiency
dag = StatDAG(strategy=:lazy)
# Raw data input
add_node!(dag, :raw, Mean())
# Processing stages
add_node!(dag, :filtered, Mean())
add_node!(dag, :normalized, Mean())
# Analytics
add_node!(dag, :stats, Variance())
add_node!(dag, :summary, Extrema())
# Build pipeline
connect!(dag, :raw, :filtered)
connect!(dag, :filtered, :normalized)
connect!(dag, :normalized, :stats)
connect!(dag, :normalized, :summary)
return dag
end
# Process stream
dag = create_streaming_pipeline()
# Accumulate data (no propagation in lazy mode)
for batch in data_stream
fit!(dag, :raw => batch)
end
# Trigger computation when needed
stats_value = value(dag, :stats)
summary_value = value(dag, :summary)
Quality Control System
Multi-stage quality checks:
using OnlineStatsChains
using OnlineStatsBase
function create_qc_pipeline()
dag = StatDAG()
# Measurement inputs
add_node!(dag, :measurement, Mean())
# Quality metrics
add_node!(dag, :mean_check, Mean())
add_node!(dag, :variance_check, Variance())
add_node!(dag, :range_check, Extrema())
# Aggregate QC
add_node!(dag, :qc_status, Mean())
# Build pipeline
connect!(dag, :measurement, :mean_check)
connect!(dag, :measurement, :variance_check)
connect!(dag, :measurement, :range_check)
connect!(dag, [:mean_check, :variance_check, :range_check], :qc_status)
return dag
end
# Run quality control
dag = create_qc_pipeline()
measurements = [10.1, 10.2, 10.15, 10.3, 10.25]
fit!(dag, :measurement => measurements)
println("Mean: ", value(dag, :mean_check))
println("Variance: ", value(dag, :variance_check))
println("Range: ", value(dag, :range_check))
println("QC Status: ", value(dag, :qc_status))
Real-Time Dashboard
Live statistics update:
using OnlineStatsChains
using OnlineStatsBase
function create_dashboard()
dag = StatDAG(strategy=:eager)
# Data source
add_node!(dag, :events, Mean())
# Dashboard panels
add_node!(dag, :count_panel, Sum())
add_node!(dag, :avg_panel, Mean())
add_node!(dag, :var_panel, Variance())
add_node!(dag, :hist_panel, Hist(10))
# Connect to all panels
connect!(dag, :events, :count_panel)
connect!(dag, :events, :avg_panel)
connect!(dag, :events, :var_panel)
connect!(dag, :events, :hist_panel)
return dag
end
# Update dashboard
dag = create_dashboard()
# Stream events
function update_dashboard(dag, event_value)
fit!(dag, :events => event_value)
# Dashboard auto-updates (eager mode)
return (
count = value(dag, :count_panel),
average = value(dag, :avg_panel),
variance = value(dag, :var_panel),
histogram = value(dag, :hist_panel)
)
end
# Simulate events
for event in event_stream
dashboard_data = update_dashboard(dag, event)
display_dashboard(dashboard_data)
end
Batch Analytics Pipeline
Process large datasets efficiently:
using OnlineStatsChains
using OnlineStatsBase
function create_analytics_pipeline()
dag = StatDAG(strategy=:lazy)
# Data inputs
add_node!(dag, :dataset_a, Mean())
add_node!(dag, :dataset_b, Mean())
# Derived metrics
add_node!(dag, :metric_1, Mean())
add_node!(dag, :metric_2, Variance())
# Final reports
add_node!(dag, :report_summary, Mean())
# Build pipeline
connect!(dag, :dataset_a, :metric_1)
connect!(dag, :dataset_b, :metric_1)
connect!(dag, [:dataset_a, :dataset_b], :metric_2)
connect!(dag, [:metric_1, :metric_2], :report_summary)
return dag
end
# Process batch
dag = create_analytics_pipeline()
# Load large datasets
data_a = load_dataset("a.csv")
data_b = load_dataset("b.csv")
# Fit (lazy - no propagation)
fit!(dag, Dict(
:dataset_a => data_a,
:dataset_b => data_b
))
# Compute only what's needed
report = value(dag, :report_summary)
Multi-Source Data Fusion
Combine multiple data sources:
using OnlineStatsChains
using OnlineStatsBase
function create_fusion_pipeline()
dag = StatDAG()
# Multiple data sources
add_node!(dag, :sensor_a, Mean())
add_node!(dag, :sensor_b, Mean())
add_node!(dag, :database, Mean())
# Fusion layers
add_node!(dag, :fused_sensors, Mean())
add_node!(dag, :final_estimate, Mean())
# Build fusion
connect!(dag, [:sensor_a, :sensor_b], :fused_sensors)
connect!(dag, [:fused_sensors, :database], :final_estimate)
return dag
end
# Fuse data
dag = create_fusion_pipeline()
fit!(dag, Dict(
:sensor_a => [10.1, 10.2],
:sensor_b => [10.15, 10.25],
:database => [10.0, 10.1, 10.2]
))
println("Fused estimate: ", value(dag, :final_estimate))
Strategy Switching Example
Dynamic strategy changes:
using OnlineStatsChains
using OnlineStatsBase
dag = StatDAG(strategy=:eager)
add_node!(dag, :input, Mean())
add_node!(dag, :output, Mean())
connect!(dag, :input, :output)
# Real-time mode (eager)
println("Real-time processing...")
for x in realtime_stream
fit!(dag, :input => x)
display(value(dag, :output))
end
# Switch to batch mode (lazy)
println("Switching to batch mode...")
set_strategy!(dag, :lazy)
# Process batch (no propagation)
fit!(dag, :input => batch_data)
# Compute when ready
result = value(dag, :output)
Edge Transformations
Temperature Conversion
Convert temperature units on-the-fly:
using OnlineStatsChains
using OnlineStatsBase
dag = StatDAG()
# Sensor outputs in Celsius
add_node!(dag, :celsius, Mean())
add_node!(dag, :fahrenheit, Mean())
add_node!(dag, :kelvin, Mean())
# Convert to different scales
connect!(dag, :celsius, :fahrenheit, transform = c -> c * 9/5 + 32)
connect!(dag, :celsius, :kelvin, transform = c -> c + 273.15)
# Input temperature readings
fit!(dag, :celsius => [0.0, 10.0, 20.0, 30.0, 40.0])
println("Celsius: ", value(dag, :celsius)) # 20.0°C
println("Fahrenheit: ", value(dag, :fahrenheit)) # 68.0°F
println("Kelvin: ", value(dag, :kelvin)) # 293.15K
Data Cleaning Pipeline
Filter and transform data:
using OnlineStatsChains
using OnlineStatsBase
dag = StatDAG()
add_node!(dag, :raw_sensor, Mean())
add_node!(dag, :valid_only, Mean())
add_node!(dag, :calibrated, Mean())
# Filter out missing values
connect!(dag, :raw_sensor, :valid_only, filter = !ismissing)
# Apply calibration after filtering
connect!(dag, :valid_only, :calibrated, transform = x -> x * 1.05 + 0.5)
# Input with some missing values
fit!(dag, :raw_sensor => [10.0, missing, 15.0, missing, 20.0])
println("Valid readings: ", value(dag, :valid_only)) # 15.0
println("Calibrated: ", value(dag, :calibrated)) # 16.25
E-Commerce Analytics
Extract metrics from transaction data:
using OnlineStatsChains
using OnlineStatsBase
# Transaction structure
struct Transaction
price::Float64
quantity::Int
discount::Float64
end
dag = StatDAG()
add_node!(dag, :transactions, Mean())
add_node!(dag, :avg_price, Mean())
add_node!(dag, :avg_quantity, Mean())
add_node!(dag, :total_revenue, Mean())
# Extract different metrics
connect!(dag, :transactions, :avg_price,
transform = t -> t.price)
connect!(dag, :transactions, :avg_quantity,
transform = t -> Float64(t.quantity))
connect!(dag, :transactions, :total_revenue,
transform = t -> t.price * t.quantity * (1 - t.discount))
# Process transactions
transactions = [
Transaction(100.0, 2, 0.1),
Transaction(150.0, 1, 0.0),
Transaction(75.0, 3, 0.2)
]
fit!(dag, :transactions => transactions)
println("Average price: ", value(dag, :avg_price))
println("Average quantity: ", value(dag, :avg_quantity))
println("Average revenue: ", value(dag, :total_revenue))
Conditional Routing with Transforms
Route data based on conditions:
using OnlineStatsChains
using OnlineStatsBase
dag = StatDAG()
add_node!(dag, :temperature, Mean())
add_node!(dag, :high_alert, Mean())
add_node!(dag, :normal, Mean())
add_node!(dag, :low_alert, Mean())
# High temperature alert (>30°C) - convert to Fahrenheit
connect!(dag, :temperature, :high_alert,
filter = t -> t > 30,
transform = t -> t * 9/5 + 32)
# Normal range (10-30°C) - keep as is
connect!(dag, :temperature, :normal,
filter = t -> 10 <= t <= 30)
# Low temperature alert (<10°C) - flag with negative
connect!(dag, :temperature, :low_alert,
filter = t -> t < 10,
transform = t -> -t)
# Stream temperature data
fit!(dag, :temperature => [5.0, 15.0, 25.0, 35.0, 8.0, 32.0])
println("High alerts (°F): ", value(dag, :high_alert)) # Avg of 95°F, 89.6°F
println("Normal temps: ", value(dag, :normal)) # Avg of 15, 25
println("Low alerts: ", value(dag, :low_alert)) # Avg of -5, -8
Multi-Input Feature Engineering
Combine multiple inputs with transformations:
using OnlineStatsChains
using OnlineStatsBase
dag = StatDAG()
add_node!(dag, :price, Mean())
add_node!(dag, :volume, Mean())
add_node!(dag, :momentum, Mean())
# Calculate momentum: price * volume
connect!(dag, [:price, :volume], :momentum,
transform = inputs -> inputs[1] * inputs[2])
# Update with coordinated data
fit!(dag, Dict(
:price => [100.0, 105.0, 103.0],
:volume => [1000.0, 1200.0, 900.0]
))
println("Average price: ", value(dag, :price))
println("Average volume: ", value(dag, :volume))
println("Average momentum: ", value(dag, :momentum))
Log Transform Pipeline
Apply logarithmic transformations:
using OnlineStatsChains
using OnlineStatsBase
dag = StatDAG()
add_node!(dag, :population, Mean())
add_node!(dag, :log_population, Mean())
add_node!(dag, :growth_rate, Mean())
# Log transform
connect!(dag, :population, :log_population,
transform = p -> log10(p))
# Calculate growth rate (only for positive changes)
connect!(dag, :log_population, :growth_rate,
filter = x -> x > 0)
# Population data
fit!(dag, :population => [1000.0, 1100.0, 1210.0, 1331.0])
println("Mean population: ", value(dag, :population))
println("Mean log(pop): ", value(dag, :log_population))
println("Growth rate: ", value(dag, :growth_rate))
See Also
- Basic Tutorial - Fundamental concepts
- Advanced Patterns - Complex structures
- Performance Guide - Optimization
- API Reference - Complete documentation