Filters API Reference
This page documents the API for the filters module in Pypulate.
Kalman Filters
Apply a standard Kalman filter to a time series.
The Kalman filter is an optimal estimator that infers parameters of interest from indirect, inaccurate and uncertain observations. It's recursive so new measurements can be processed as they arrive.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
array_like
|
Input time series data |
required |
process_variance
|
float
|
Process noise variance (Q) |
1e-5
|
measurement_variance
|
float
|
Measurement noise variance (R) |
1e-3
|
initial_state
|
float
|
Initial state estimate. If None, the first data point is used |
None
|
initial_covariance
|
float
|
Initial estimate covariance |
1.0
|
Returns:
Type | Description |
---|---|
ndarray
|
Filtered time series |
Apply an Extended Kalman Filter (EKF) to a time series with non-linear dynamics.
The EKF is a nonlinear version of the Kalman filter that linearizes about the current mean and covariance. It's used when the state transition or observation models are non-linear.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
array_like
|
Input time series data (observations) |
required |
state_transition_func
|
callable
|
Function that computes the state transition (f) |
required |
observation_func
|
callable
|
Function that computes the observation from state (h) |
required |
process_jacobian_func
|
callable
|
Function that computes the Jacobian of the state transition function |
required |
observation_jacobian_func
|
callable
|
Function that computes the Jacobian of the observation function |
required |
process_covariance
|
ndarray
|
Process noise covariance matrix (Q) |
required |
observation_covariance
|
ndarray
|
Observation noise covariance matrix (R) |
required |
initial_state
|
ndarray
|
Initial state estimate. If None, zeros are used |
None
|
initial_covariance
|
ndarray
|
Initial estimate covariance matrix. If None, identity is used |
None
|
Returns:
Type | Description |
---|---|
ndarray
|
Filtered time series (state estimates) |
Examples:
>>> import numpy as np
>>> from pypulate.filters import extended_kalman_filter
>>> # Define non-linear system
>>> def state_transition(x):
... # Non-linear state transition function
... return np.array([x[0] + x[1], 0.5 * x[1]])
>>> def observation(x):
... # Non-linear observation function
... return np.array([np.sin(x[0])])
>>> def process_jacobian(x):
... # Jacobian of state transition function
... return np.array([[1, 1], [0, 0.5]])
>>> def observation_jacobian(x):
... # Jacobian of observation function
... return np.array([[np.cos(x[0]), 0]])
>>> # Create data
>>> n = 100
>>> true_states = np.zeros((n, 2))
>>> true_states[0] = [0, 1]
>>> for i in range(1, n):
... true_states[i] = state_transition(true_states[i-1])
>>> observations = np.array([observation(x)[0] for x in true_states])
>>> observations += np.random.normal(0, 0.1, n)
>>> # Apply EKF
>>> Q = np.eye(2) * 0.01 # Process noise covariance
>>> R = np.array([[0.1]]) # Observation noise covariance
>>> filtered_states = extended_kalman_filter(
... observations, state_transition, observation,
... process_jacobian, observation_jacobian, Q, R
... )
Apply an Unscented Kalman Filter (UKF) to a time series with non-linear dynamics.
The UKF uses the unscented transform to pick a minimal set of sample points (sigma points) around the mean. These sigma points are then propagated through the non-linear functions, and the mean and covariance of the estimate are recovered.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
array_like
|
Input time series data (observations) |
required |
state_transition_func
|
callable
|
Function that computes the state transition |
required |
observation_func
|
callable
|
Function that computes the observation from state |
required |
process_covariance
|
ndarray
|
Process noise covariance matrix (Q) |
required |
observation_covariance
|
ndarray
|
Observation noise covariance matrix (R) |
required |
initial_state
|
ndarray
|
Initial state estimate. If None, zeros are used |
None
|
initial_covariance
|
ndarray
|
Initial estimate covariance matrix. If None, identity is used |
None
|
alpha
|
float
|
Spread of sigma points around mean |
1e-3
|
beta
|
float
|
Prior knowledge about distribution (2 is optimal for Gaussian) |
2.0
|
kappa
|
float
|
Secondary scaling parameter |
0.0
|
Returns:
Type | Description |
---|---|
ndarray
|
Filtered time series (state estimates) |
Examples:
>>> import numpy as np
>>> from pypulate.filters import unscented_kalman_filter
>>> # Define non-linear system
>>> def state_transition(x):
... # Non-linear state transition function
... return np.array([x[0] + x[1], 0.5 * x[1]])
>>> def observation(x):
... # Non-linear observation function
... return np.array([np.sin(x[0])])
>>> # Create data
>>> n = 100
>>> true_states = np.zeros((n, 2))
>>> true_states[0] = [0, 1]
>>> for i in range(1, n):
... true_states[i] = state_transition(true_states[i-1])
>>> observations = np.array([observation(x)[0] for x in true_states])
>>> observations += np.random.normal(0, 0.1, n)
>>> # Apply UKF
>>> Q = np.eye(2) * 0.01 # Process noise covariance
>>> R = np.array([[0.1]]) # Observation noise covariance
>>> filtered_states = unscented_kalman_filter(
... observations, state_transition, observation, Q, R
... )
Signal Filters
Apply a Butterworth filter to a time series.
The Butterworth filter is a type of signal processing filter designed to have a frequency response as flat as possible in the passband.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
array_like
|
Input time series data |
required |
cutoff
|
float or tuple of float
|
Cutoff frequency. For lowpass and highpass, this is a scalar. For bandpass and bandstop, this is a tuple of (low, high) |
required |
order
|
int
|
Filter order |
4
|
filter_type
|
str
|
Filter type: 'lowpass', 'highpass', 'bandpass', or 'bandstop' |
'lowpass'
|
fs
|
float
|
Sampling frequency |
1.0
|
Returns:
Type | Description |
---|---|
ndarray
|
Filtered time series |
Examples:
>>> import numpy as np
>>> from pypulate.filters import butterworth_filter
>>> # Create noisy data
>>> x = np.linspace(0, 10, 1000)
>>> signal = np.sin(2 * np.pi * 0.05 * x) + 0.5 * np.sin(2 * np.pi * 0.25 * x)
>>> # Apply lowpass filter to remove high frequency component
>>> filtered = butterworth_filter(signal, cutoff=0.1, filter_type='lowpass', fs=1.0)
Apply a Chebyshev filter to a time series.
The Chebyshev filter is a filter with steeper roll-off than the Butterworth but more passband ripple (type I) or stopband ripple (type II).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
array_like
|
Input time series data |
required |
cutoff
|
float or tuple of float
|
Cutoff frequency. For lowpass and highpass, this is a scalar. For bandpass and bandstop, this is a tuple of (low, high) |
required |
order
|
int
|
Filter order |
4
|
ripple
|
float
|
Maximum ripple allowed in the passband (type I) or stopband (type II) |
1.0
|
filter_type
|
str
|
Filter type: 'lowpass', 'highpass', 'bandpass', or 'bandstop' |
'lowpass'
|
fs
|
float
|
Sampling frequency |
1.0
|
type_num
|
int
|
Type of Chebyshev filter: 1 for Type I, 2 for Type II |
1
|
Returns:
Type | Description |
---|---|
ndarray
|
Filtered time series |
Examples:
>>> import numpy as np
>>> from pypulate.filters import chebyshev_filter
>>> # Create noisy data
>>> x = np.linspace(0, 10, 1000)
>>> signal = np.sin(2 * np.pi * 0.05 * x) + 0.5 * np.sin(2 * np.pi * 0.25 * x)
>>> # Apply Chebyshev type I lowpass filter
>>> filtered = chebyshev_filter(signal, cutoff=0.1, ripple=0.5, type_num=1)
Apply a Savitzky-Golay filter to a time series.
The Savitzky-Golay filter is a digital filter that can be applied to a set of digital data points to smooth the data by increasing the signal-to-noise ratio without greatly distorting the signal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
array_like
|
Input time series data |
required |
window_length
|
int
|
Length of the filter window (must be odd) |
11
|
polyorder
|
int
|
Order of the polynomial used to fit the samples (must be less than window_length) |
3
|
deriv
|
int
|
Order of the derivative to compute |
0
|
delta
|
float
|
Spacing of the samples to which the filter is applied |
1.0
|
Returns:
Type | Description |
---|---|
ndarray
|
Filtered time series |
Examples:
>>> import numpy as np
>>> from pypulate.filters import savitzky_golay_filter
>>> # Create noisy data
>>> x = np.linspace(0, 10, 100)
>>> signal = np.sin(x) + np.random.normal(0, 0.1, len(x))
>>> # Apply Savitzky-Golay filter
>>> filtered = savitzky_golay_filter(signal, window_length=11, polyorder=3)
Apply a Wiener filter to a time series.
The Wiener filter is a filter used to produce an estimate of a desired or target signal by linear filtering of an observed noisy signal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
array_like
|
Input time series data |
required |
mysize
|
int or tuple of int
|
Size of the filter window |
3
|
noise
|
float
|
Estimate of the noise power. If None, it's estimated from the data |
None
|
Returns:
Type | Description |
---|---|
ndarray
|
Filtered time series |
Examples:
Apply a median filter to a time series.
The median filter is a nonlinear digital filtering technique used to remove noise from a signal. It replaces each entry with the median of neighboring entries.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
array_like
|
Input time series data |
required |
kernel_size
|
int
|
Size of the filter kernel |
3
|
Returns:
Type | Description |
---|---|
ndarray
|
Filtered time series |
Examples:
>>> import numpy as np
>>> from pypulate.filters import median_filter
>>> # Create noisy data with outliers
>>> x = np.linspace(0, 10, 100)
>>> signal = np.sin(x)
>>> signal[10] = 5 # Add outlier
>>> signal[50] = -5 # Add outlier
>>> # Apply median filter to remove outliers
>>> filtered = median_filter(signal, kernel_size=5)
Apply a Hampel filter to a time series.
The Hampel filter is used to identify and replace outliers in a time series. It uses the median and the median absolute deviation (MAD) to identify outliers.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
array_like
|
Input time series data |
required |
window_size
|
int
|
Size of the window (number of points on each side of the current point) |
5
|
n_sigmas
|
float
|
Number of standard deviations to use for outlier detection |
3.0
|
Returns:
Type | Description |
---|---|
ndarray
|
Filtered time series |
Examples:
>>> import numpy as np
>>> from pypulate.filters import hampel_filter
>>> # Create noisy data with outliers
>>> x = np.linspace(0, 10, 100)
>>> signal = np.sin(x)
>>> signal[10] = 5 # Add outlier
>>> signal[50] = -5 # Add outlier
>>> # Apply Hampel filter to remove outliers
>>> filtered = hampel_filter(signal, window_size=5, n_sigmas=3.0)
Apply the Hodrick-Prescott filter to decompose a time series into trend and cycle components.
The Hodrick-Prescott filter is a mathematical tool used in macroeconomics to separate the cyclical component of a time series from raw data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
array_like
|
Input time series data |
required |
lambda_param
|
float
|
Smoothing parameter. The larger the value, the smoother the trend component |
1600.0
|
Returns:
Type | Description |
---|---|
tuple of np.ndarray
|
Tuple containing (trend, cycle) components |
Examples:
>>> import numpy as np
>>> from pypulate.filters import hodrick_prescott_filter
>>> # Create data with trend and cycle
>>> x = np.linspace(0, 10, 100)
>>> trend = 0.1 * x**2
>>> cycle = np.sin(2 * np.pi * 0.1 * x)
>>> data = trend + cycle
>>> # Apply Hodrick-Prescott filter
>>> trend_component, cycle_component = hodrick_prescott_filter(data, lambda_param=100)
Apply the Baxter-King bandpass filter to extract business cycle components.
The Baxter-King filter is a bandpass filter used to extract business cycle components from macroeconomic time series.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
array_like
|
Input time series data |
required |
low
|
float
|
Minimum period of oscillations (in number of observations) |
6
|
high
|
float
|
Maximum period of oscillations (in number of observations) |
32
|
K
|
int
|
Number of terms in the approximation (lead/lag) |
12
|
Returns:
Type | Description |
---|---|
ndarray
|
Filtered time series (cycle component) |
Examples:
>>> import numpy as np
>>> from pypulate.filters import baxter_king_filter
>>> # Create data with trend and multiple cycles
>>> x = np.linspace(0, 100, 1000)
>>> trend = 0.01 * x
>>> long_cycle = np.sin(2 * np.pi * x / 100) # Period of 100
>>> business_cycle = np.sin(2 * np.pi * x / 20) # Period of 20
>>> short_cycle = np.sin(2 * np.pi * x / 5) # Period of 5
>>> data = trend + long_cycle + business_cycle + short_cycle
>>> # Extract business cycle component (periods between 8 and 32)
>>> cycle = baxter_king_filter(data, low=8, high=32, K=12)
Adaptive Filters
Apply an adaptive Kalman filter to a time series.
The adaptive Kalman filter automatically adjusts its parameters based on the observed data, making it more robust to changing dynamics.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
ndarray
|
Input time series data |
required |
process_variance_init
|
float
|
Initial process noise variance (Q) |
1e-5
|
measurement_variance_init
|
float
|
Initial measurement noise variance (R) |
1e-3
|
adaptation_rate
|
float
|
Rate at which the filter adapts to changes |
0.01
|
window_size
|
int
|
Size of the window for innovation estimation |
10
|
initial_state
|
float
|
Initial state estimate. If None, the first data point is used |
None
|
initial_covariance
|
float
|
Initial estimate covariance |
1.0
|
Returns:
Type | Description |
---|---|
ndarray
|
Filtered time series |
Apply a Least Mean Squares (LMS) adaptive filter to a time series.
The LMS algorithm is an adaptive filter that adjusts its coefficients to minimize the mean square error between the desired signal and the filter output.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
ndarray
|
Input time series data |
required |
desired
|
ndarray
|
Desired signal. If None, a delayed version of the input is used |
None
|
filter_length
|
int
|
Length of the adaptive filter |
5
|
mu
|
float
|
Step size (learning rate) of the adaptation |
0.01
|
initial_weights
|
ndarray
|
Initial filter weights. If None, zeros are used |
None
|
Returns:
Type | Description |
---|---|
tuple of np.ndarray
|
Tuple containing (filtered_data, filter_weights) |
Apply a Recursive Least Squares (RLS) adaptive filter to a time series.
The RLS algorithm is an adaptive filter that recursively finds the filter coefficients that minimize a weighted linear least squares cost function related to the input signals.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
ndarray
|
Input time series data |
required |
desired
|
ndarray
|
Desired signal. If None, a delayed version of the input is used |
None
|
filter_length
|
int
|
Length of the adaptive filter |
5
|
forgetting_factor
|
float
|
Forgetting factor (0 < lambda <= 1) |
0.99
|
delta
|
float
|
Regularization parameter for the initial correlation matrix |
1.0
|
initial_weights
|
ndarray
|
Initial filter weights. If None, zeros are used |
None
|
Returns:
Type | Description |
---|---|
tuple of np.ndarray
|
Tuple containing (filtered_data, filter_weights) |
Particle Filters
Apply a particle filter to a time series.
The particle filter is a sequential Monte Carlo method that uses a set of particles (samples) to represent the posterior distribution of some stochastic process given noisy and/or partial observations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
array_like
|
Input time series data (observations) |
required |
state_transition_func
|
callable
|
Function that propagates particles through the state transition model |
required |
observation_func
|
callable
|
Function that computes the expected observation from a state |
required |
process_noise_func
|
callable
|
Function that adds process noise to particles |
required |
observation_likelihood_func
|
callable
|
Function that computes the likelihood of an observation given a state |
required |
n_particles
|
int
|
Number of particles |
100
|
initial_state_func
|
callable
|
Function that generates initial particles. If None, a default is used |
None
|
resample_threshold
|
float
|
Threshold for effective sample size ratio below which resampling occurs |
0.5
|
Returns:
Type | Description |
---|---|
tuple of np.ndarray
|
Tuple containing (filtered_states, particle_weights) |
Apply a bootstrap particle filter to a time series.
The bootstrap particle filter is a simplified version of the particle filter that resamples at every step and uses the state transition prior as the proposal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
array_like
|
Input time series data (observations) |
required |
state_transition_func
|
callable
|
Function that propagates particles through the state transition model |
required |
observation_func
|
callable
|
Function that computes the expected observation from a state |
required |
process_noise_std
|
float
|
Standard deviation of the process noise |
0.1
|
observation_noise_std
|
float
|
Standard deviation of the observation noise |
0.1
|
n_particles
|
int
|
Number of particles |
100
|
initial_state_mean
|
float
|
Mean of the initial state distribution. If None, the first observation is used |
None
|
initial_state_std
|
float
|
Standard deviation of the initial state distribution |
1.0
|
Returns:
Type | Description |
---|---|
tuple of np.ndarray
|
Tuple containing (filtered_states, particle_weights) |