User Guide

This comprehensive guide covers all aspects of using OnlineResamplers.jl for financial market data processing.

Table of Contents

  1. Installation
  2. Core Concepts
  3. Basic Usage
  4. Advanced Features
  5. Real-World Examples
  6. Performance Optimization
  7. Integration with OnlineStats
  8. Troubleshooting

Installation

using Pkg
Pkg.add(url="https://github.com/femtotrader/OnlineResamplers.jl")

Development Installation

using Pkg
Pkg.develop(url="https://github.com/femtotrader/OnlineResamplers.jl")
Pkg.test("OnlineResamplers")

Core Concepts

Market Data Structure

Market data is represented using the MarketDataPoint{T,P,V} structure, which provides type safety and flexibility:

using OnlineResamplers, Dates

# Basic usage with default types (DateTime, Float64, Float64)
data = MarketDataPoint(DateTime(2024, 1, 1, 9, 30, 0), 100.50, 1000.0)

# Explicit type construction
data_explicit = MarketDataPoint{DateTime, Float64, Float64}(
    DateTime(2024, 1, 1, 9, 30, 0),
    100.50,
    1000.0
)

# Custom types for high precision
using FixedPointDecimals
precise_data = MarketDataPoint{DateTime, FixedDecimal{Int64,4}, FixedDecimal{Int64,2}}(
    DateTime(2024, 1, 1, 9, 30, 0),
    FixedDecimal{Int64,4}(100.5012),
    FixedDecimal{Int64,2}(1000.50)
)

Time Windows

Data is aggregated into time windows defined by start time and period. Understanding time windows is crucial for effective resampling:

using Dates

# Create a 5-minute window
window = TimeWindow{DateTime}(DateTime(2024, 1, 1, 9, 30, 0), Minute(5))

# The window includes data from [start_time, start_time + period)
println("Window start: $(window.start_time)")      # 2024-01-01T09:30:00
println("Window end: $(window_end(window))")       # 2024-01-01T09:35:00

# Check if timestamps belong to window
test_times = [
    DateTime(2024, 1, 1, 9, 29, 59),  # Before window -> false
    DateTime(2024, 1, 1, 9, 30, 0),   # Start of window -> true
    DateTime(2024, 1, 1, 9, 32, 30),  # Middle of window -> true
    DateTime(2024, 1, 1, 9, 35, 0)    # Next window -> false
]

for ts in test_times
    belongs = belongs_to_window(ts, window)
    println("$(ts): $(belongs)")
end

Basic Usage

OHLC Resampling

OHLC (Open, High, Low, Close) resampling is perfect for candlestick charts and technical analysis:

using OnlineResamplers, OnlineStatsBase, Dates

# Create OHLC resampler (this is the default)
ohlc_resampler = MarketResampler(Minute(1), price_method=:ohlc)

# Sample market data within one minute
base_time = DateTime(2024, 1, 1, 14, 30, 0)
market_data = [
    MarketDataPoint(base_time + Second(0), 100.00, 1000.0),   # Open
    MarketDataPoint(base_time + Second(15), 102.50, 800.0),   # High point
    MarketDataPoint(base_time + Second(30), 97.75, 1200.0),   # Low point
    MarketDataPoint(base_time + Second(45), 101.25, 900.0)    # Close
]

# Process all data points
for data in market_data
    fit!(ohlc_resampler, data)
end

# Extract results
result = value(ohlc_resampler)
ohlc = result.price.ohlc

println("Open:  $(ohlc.open)")     # 100.00 (first price)
println("High:  $(ohlc.high)")     # 102.50 (highest price)
println("Low:   $(ohlc.low)")      # 97.75  (lowest price)
println("Close: $(ohlc.close)")    # 101.25 (last price)
println("Volume: $(result.volume)") # 3900.0 (total volume)

Mean Price Resampling

For applications requiring smoothed price data or when you need average prices over time intervals:

# Create mean price resampler
mean_resampler = MarketResampler(Minute(5), price_method=:mean)

# Process the same data
for data in market_data
    fit!(mean_resampler, data)
end

result = value(mean_resampler)
mean_price = result.price.mean_price

println("Mean Price: $(mean_price)")  # 100.375 ((100+102.5+97.75+101.25)/4)
println("Volume: $(result.volume)")   # 3900.0

Advanced Features

Custom Numeric Types

OnlineResamplers fully supports custom numeric types commonly used in financial applications:

using FixedPointDecimals, NanoDates

# Define high-precision types
PriceType = FixedDecimal{Int128, 8}    # 8 decimal places for prices
VolumeType = FixedDecimal{Int64, 2}    # 2 decimal places for volumes

# Create high-precision resampler
precision_resampler = MarketResampler{NanoDate, PriceType, VolumeType}(
    Nanosecond(1_000_000_000),  # 1 second intervals
    price_method=:ohlc
)

# Create high-precision market data
nano_data = MarketDataPoint{NanoDate, PriceType, VolumeType}(
    NanoDate(2024, 1, 1, 9, 30, 0, 123456789),
    PriceType(100.12345678),
    VolumeType(1000.50)
)

fit!(precision_resampler, nano_data)
result = value(precision_resampler)

println("High-precision OHLC: $(result.price.ohlc)")
println("High-precision Volume: $(result.volume)")

Parallel Processing

OnlineResamplers supports efficient merging for parallel data processing:

# Function to process a chunk of data
function process_chunk(data_chunk::Vector, period::Period)
    chunk_resampler = OHLCResampler{DateTime, Float64, Float64}(period)
    for data in data_chunk
        fit!(chunk_resampler, data)
    end
    return chunk_resampler
end

# Generate large dataset
large_dataset = [
    MarketDataPoint(DateTime(2024, 1, 1, 9, 0, i), 100.0 + sin(i/100), rand(500:1500))
    for i in 1:10000
]

# Split into chunks for parallel processing
chunk_size = 2500
chunks = [large_dataset[i:min(i+chunk_size-1, end)] for i in 1:chunk_size:length(large_dataset)]

# Process chunks (in real applications, use @distributed or threading)
chunk_resamplers = [process_chunk(chunk, Minute(1)) for chunk in chunks]

# Merge all results
final_resampler = chunk_resamplers[1]
for i in 2:length(chunk_resamplers)
    merge!(final_resampler, chunk_resamplers[i])
end

merged_result = value(final_resampler)
println("Merged OHLC: $(merged_result.ohlc)")
println("Total observations: $(nobs(final_resampler))")

Individual Resamplers

For specialized use cases, you can use individual resampler types directly:

# Pure OHLC resampler
ohlc_only = OHLCResampler{DateTime, Float64, Float64}(Minute(1))

# Mean price resampler
mean_only = MeanResampler{DateTime, Float64, Float64}(Minute(5))

# Sum resampler (for volume or other additive metrics)
volume_sum = SumResampler{DateTime, Float64, Float64}(Second(30))

# Process sample data
sample_data = MarketDataPoint(DateTime(2024, 1, 1, 10, 0, 0), 100.0, 1000.0)

fit!(ohlc_only, sample_data)
fit!(mean_only, sample_data)
fit!(volume_sum, sample_data)

# Get individual results
ohlc_result = value(ohlc_only)
mean_result = value(mean_only)
volume_result = value(volume_sum)

println("OHLC only: $(ohlc_result)")
println("Mean only: $(mean_result)")
println("Volume sum: $(volume_result)")

Real-World Examples

Processing CSV Market Data

Here's a complete example processing market data from a CSV file:

using OnlineResamplers, OnlineStatsBase, Dates, CSV, DataFrames

# Load tick data from CSV file
tick_data = CSV.read("market_ticks.csv", DataFrame)

# Create 1-minute OHLC resampler
resampler = MarketResampler(Minute(1))

# Storage for completed OHLC bars
ohlc_bars = []
current_window = nothing

# Process each tick
for row in eachrow(tick_data)
    # Create market data point
    data_point = MarketDataPoint(
        DateTime(row.timestamp),
        row.price,
        row.volume
    )

    # Get current window before processing
    old_window = value(resampler).window

    # Process the data
    fit!(resampler, data_point)

    # Check if we moved to a new window (completed a bar)
    new_result = value(resampler)
    if new_result.window != old_window && old_window !== nothing
        # We completed a window, save the OHLC bar
        old_result = # You'll need to store this before processing new data
        push!(ohlc_bars, (
            timestamp = old_window.start_time,
            open = old_result.price.ohlc.open,
            high = old_result.price.ohlc.high,
            low = old_result.price.ohlc.low,
            close = old_result.price.ohlc.close,
            volume = old_result.volume
        ))
    end
end

# Convert to DataFrame for analysis
ohlc_df = DataFrame(ohlc_bars)
println("Generated $(nrow(ohlc_df)) OHLC bars from $(nrow(tick_data)) ticks")

# Save results
CSV.write("ohlc_1min.csv", ohlc_df)

Multi-timeframe Analysis

Analyze the same data stream across multiple timeframes simultaneously:

# Create resamplers for different timeframes
timeframes = Dict(
    "1min" => MarketResampler(Minute(1)),
    "5min" => MarketResampler(Minute(5)),
    "15min" => MarketResampler(Minute(15)),
    "1hour" => MarketResampler(Hour(1))
)

# Generate sample data (simulating 1 hour of minute-level ticks)
base_time = DateTime(2024, 1, 1, 9, 0, 0)
sample_ticks = []

price = 100.0
for i in 1:60  # 60 minutes
    # Add some realistic price movement
    price += randn() * 0.1  # Random walk
    volume = rand(500:1500)
    timestamp = base_time + Minute(i)

    push!(sample_ticks, MarketDataPoint(timestamp, price, volume))
end

# Process through all timeframes
for tick in sample_ticks
    for (name, resampler) in timeframes
        fit!(resampler, tick)
    end
end

# Display results
println("Multi-timeframe Analysis:")
println("========================")
for (name, resampler) in sort(collect(timeframes))
    result = value(resampler)
    if result.price.ohlc !== nothing
        ohlc = result.price.ohlc
        @printf("%-8s: O=%6.2f H=%6.2f L=%6.2f C=%6.2f Vol=%8.0f\\n",
                name, ohlc.open, ohlc.high, ohlc.low, ohlc.close, result.volume)
    end
end

Performance Optimization

Memory Efficiency

OnlineResamplers uses constant memory regardless of data volume:

# Memory usage stays constant even with millions of data points
memory_test_resampler = MarketResampler(Minute(1))

println("Processing 1 million data points...")
for i in 1:1_000_000
    timestamp = DateTime(2024, 1, 1, 9, 0, 0) + Millisecond(i)
    data = MarketDataPoint(timestamp, 100.0 + sin(i/1000), 1000.0)
    fit!(memory_test_resampler, data)

    # Memory usage remains constant due to automatic window transitions
end

result = value(memory_test_resampler)
println("Current window has $(nobs(memory_test_resampler)) observations")
println("Total memory usage is O(1) - constant regardless of data volume processed")

Type Stability

For maximum performance, use concrete types and avoid type instabilities:

# Good: Concrete types enable compiler optimizations
function high_performance_processing(
    resampler::MarketResampler{DateTime, Float64, Float64},
    data_stream::Vector{MarketDataPoint{DateTime, Float64, Float64}}
)
    for data in data_stream
        fit!(resampler, data)
    end
    return value(resampler)
end

# Usage
fast_resampler = MarketResampler{DateTime, Float64, Float64}(Minute(1))
typed_data = MarketDataPoint{DateTime, Float64, Float64}[]

# This will be highly optimized by the Julia compiler
result = high_performance_processing(fast_resampler, typed_data)

Batch Processing

Process data in batches for optimal performance:

function batch_process_ticks(resampler, ticks::Vector)
    # Process all ticks without intermediate value() calls
    for tick in ticks
        fit!(resampler, tick)
    end

    # Get result only once at the end
    return value(resampler)
end

# This approach is faster than calling value() after each fit!()
batch_resampler = MarketResampler(Minute(1))
batch_ticks = [
    MarketDataPoint(DateTime(2024, 1, 1, 9, 30, i), 100.0 + randn(), 1000.0)
    for i in 1:1000
]

result = batch_process_ticks(batch_resampler, batch_ticks)

Performance Benchmarks

Here are typical performance characteristics:

using BenchmarkTools

# Setup
resampler = MarketResampler(Minute(1))
data = MarketDataPoint(DateTime(2024, 1, 1, 9, 30, 0), 100.0, 1000.0)

# Single operation benchmark
@benchmark fit!($resampler, $data)
# Typical: ~50ns per operation

# Batch processing benchmark
data_batch = [MarketDataPoint(DateTime(2024, 1, 1, 9, 30, i), rand(90:110), rand(500:1500)) for i in 1:10000]
batch_resampler = MarketResampler(Minute(1))

@benchmark begin
    for d in $data_batch
        fit!($batch_resampler, d)
    end
end
# Typical: ~500μs for 10,000 operations (~50ns per operation)

Expected performance characteristics:

  • Single operation: ~50 nanoseconds
  • Memory usage: O(1) constant
  • Throughput: >2 million operations/second on modern hardware
  • Memory allocations: Zero in steady state

Integration with OnlineStats

OnlineResamplers seamlessly integrates with the broader OnlineStats ecosystem:

using OnlineStats

# Combine market resampling with other online statistics
combined_stats = Group(
    MarketResampler(Minute(1)),    # Market data resampling
    Mean(),                        # Overall price mean
    Variance(),                    # Price variance
    CountMinSketch(String, 1000)   # Frequent symbols (if processing multiple assets)
)

# Generate sample data
data_stream = [
    MarketDataPoint(DateTime(2024, 1, 1, 9, 30, i), 100.0 + randn(), 1000.0)
    for i in 1:1000
]

# Process all statistics simultaneously
for data in data_stream
    # The Group expects a tuple matching all statistics
    fit!(combined_stats, (data, data.price, data.price))
end

# Access individual statistics
resampler_result = value(combined_stats[1])  # MarketResampler results
mean_price = value(combined_stats[2])        # Mean price
price_variance = value(combined_stats[3])    # Price variance

println("OHLC: $(resampler_result.price.ohlc)")
println("Mean price: $(mean_price)")
println("Price variance: $(price_variance)")

Custom OnlineStats Integration

You can also create custom statistics that work with market data:

using OnlineStatsBase

# Custom statistic: Price range tracker
mutable struct PriceRange <: OnlineStat{MarketDataPoint}
    min_price::Float64
    max_price::Float64
    n::Int

    PriceRange() = new(Inf, -Inf, 0)
end

function OnlineStatsBase._fit!(stat::PriceRange, data::MarketDataPoint)
    stat.min_price = min(stat.min_price, data.price)
    stat.max_price = max(stat.max_price, data.price)
    stat.n += 1
    return stat
end

function OnlineStatsBase.value(stat::PriceRange)
    return (min=stat.min_price, max=stat.max_price, range=stat.max_price - stat.min_price)
end

OnlineStatsBase.nobs(stat::PriceRange) = stat.n

# Usage
price_range = PriceRange()
market_data = [MarketDataPoint(DateTime(2024, 1, 1, 9, 30, i), 100.0 + randn() * 5, 1000.0) for i in 1:100]

for data in market_data
    fit!(price_range, data)
end

range_result = value(price_range)
println("Price range: $(range_result.min) to $(range_result.max)")
println("Total range: $(range_result.range)")

Troubleshooting

Common Issues and Solutions

Type Mismatch Errors

# Problem: Type mismatch
resampler = MarketResampler{DateTime, Float64, Float64}(Minute(1))
bad_data = MarketDataPoint{DateTime, Int64, Float64}(DateTime(2024, 1, 1, 9, 30, 0), 100, 1000.0)

# This will fail:
# fit!(resampler, bad_data)  # ERROR: MethodError

# Solution: Ensure consistent types
good_data = MarketDataPoint{DateTime, Float64, Float64}(DateTime(2024, 1, 1, 9, 30, 0), 100.0, 1000.0)
fit!(resampler, good_data)  # Works fine

Window Alignment Issues

# Problem: Unexpected window boundaries
resampler = MarketResampler(Minute(1))

# Data that doesn't align with minute boundaries
misaligned_data = MarketDataPoint(DateTime(2024, 1, 1, 9, 30, 37), 100.0, 1000.0)
fit!(resampler, misaligned_data)

result = value(resampler)
println("Window starts at: $(result.window.start_time)")  # 2024-01-01T09:30:00

# Solution: Understand that windows are floor-aligned
# The window will start at 9:30:00 even though data arrived at 9:30:37

Memory Issues with Large Datasets

# Problem: Processing very large datasets inefficiently
function inefficient_processing(large_dataset)
    results = []
    resampler = MarketResampler(Minute(1))

    for data in large_dataset
        fit!(resampler, data)
        push!(results, value(resampler))  # DON'T DO THIS - stores everything
    end

    return results
end

# Solution: Only store what you need
function efficient_processing(large_dataset)
    completed_bars = []
    resampler = MarketResampler(Minute(1))
    current_window = nothing

    for data in large_dataset
        old_result = value(resampler)
        old_window = old_result.window

        fit!(resampler, data)

        new_result = value(resampler)
        if new_result.window != old_window && old_window !== nothing
            # Only store completed bars
            push!(completed_bars, (
                timestamp = old_window.start_time,
                ohlc = old_result.price.ohlc,
                volume = old_result.volume
            ))
        end
    end

    return completed_bars
end

Performance Debugging

If you're experiencing performance issues:

using Profile

function profile_resampling()
    resampler = MarketResampler(Minute(1))
    data_stream = [MarketDataPoint(DateTime(2024, 1, 1, 9, 30, i), 100.0, 1000.0) for i in 1:100000]

    @profile begin
        for data in data_stream
            fit!(resampler, data)
        end
    end
end

profile_resampling()
Profile.print()  # Analyze where time is spent

Validation and Testing

Always validate your results:

function validate_ohlc(ohlc::OHLC)
    @assert ohlc.high >= ohlc.open "High should be >= Open"
    @assert ohlc.high >= ohlc.close "High should be >= Close"
    @assert ohlc.low <= ohlc.open "Low should be <= Open"
    @assert ohlc.low <= ohlc.close "Low should be <= Close"
    @assert ohlc.high >= ohlc.low "High should be >= Low"
end

# Use in your processing pipeline
resampler = MarketResampler(Minute(1))
# ... process data ...
result = value(resampler)

if result.price.ohlc !== nothing
    validate_ohlc(result.price.ohlc)
    println("OHLC validation passed ✓")
end

This user guide covers the essential aspects of using OnlineResamplers.jl effectively. For more detailed API information, see the API Reference, and for step-by-step learning, check out the Tutorial.