Skip to content

Filters API Reference

This page documents the API for the filters module in Pypulate.

Kalman Filters

Apply a standard Kalman filter to a time series.

The Kalman filter is an optimal estimator that infers parameters of interest from indirect, inaccurate and uncertain observations. It's recursive so new measurements can be processed as they arrive.

Parameters:

Name Type Description Default
data array_like

Input time series data

required
process_variance float

Process noise variance (Q)

1e-5
measurement_variance float

Measurement noise variance (R)

1e-3
initial_state float

Initial state estimate. If None, the first data point is used

None
initial_covariance float

Initial estimate covariance

1.0

Returns:

Type Description
ndarray

Filtered time series

Apply an Extended Kalman Filter (EKF) to a time series with non-linear dynamics.

The EKF is a nonlinear version of the Kalman filter that linearizes about the current mean and covariance. It's used when the state transition or observation models are non-linear.

Parameters:

Name Type Description Default
data array_like

Input time series data (observations)

required
state_transition_func callable

Function that computes the state transition (f)

required
observation_func callable

Function that computes the observation from state (h)

required
process_jacobian_func callable

Function that computes the Jacobian of the state transition function

required
observation_jacobian_func callable

Function that computes the Jacobian of the observation function

required
process_covariance ndarray

Process noise covariance matrix (Q)

required
observation_covariance ndarray

Observation noise covariance matrix (R)

required
initial_state ndarray

Initial state estimate. If None, zeros are used

None
initial_covariance ndarray

Initial estimate covariance matrix. If None, identity is used

None

Returns:

Type Description
ndarray

Filtered time series (state estimates)

Examples:

>>> import numpy as np
>>> from pypulate.filters import extended_kalman_filter
>>> # Define non-linear system
>>> def state_transition(x):
...     # Non-linear state transition function
...     return np.array([x[0] + x[1], 0.5 * x[1]])
>>> def observation(x):
...     # Non-linear observation function
...     return np.array([np.sin(x[0])])
>>> def process_jacobian(x):
...     # Jacobian of state transition function
...     return np.array([[1, 1], [0, 0.5]])
>>> def observation_jacobian(x):
...     # Jacobian of observation function
...     return np.array([[np.cos(x[0]), 0]])
>>> # Create data
>>> n = 100
>>> true_states = np.zeros((n, 2))
>>> true_states[0] = [0, 1]
>>> for i in range(1, n):
...     true_states[i] = state_transition(true_states[i-1])
>>> observations = np.array([observation(x)[0] for x in true_states])
>>> observations += np.random.normal(0, 0.1, n)
>>> # Apply EKF
>>> Q = np.eye(2) * 0.01  # Process noise covariance
>>> R = np.array([[0.1]])  # Observation noise covariance
>>> filtered_states = extended_kalman_filter(
...     observations, state_transition, observation,
...     process_jacobian, observation_jacobian, Q, R
... )

Apply an Unscented Kalman Filter (UKF) to a time series with non-linear dynamics.

The UKF uses the unscented transform to pick a minimal set of sample points (sigma points) around the mean. These sigma points are then propagated through the non-linear functions, and the mean and covariance of the estimate are recovered.

Parameters:

Name Type Description Default
data array_like

Input time series data (observations)

required
state_transition_func callable

Function that computes the state transition

required
observation_func callable

Function that computes the observation from state

required
process_covariance ndarray

Process noise covariance matrix (Q)

required
observation_covariance ndarray

Observation noise covariance matrix (R)

required
initial_state ndarray

Initial state estimate. If None, zeros are used

None
initial_covariance ndarray

Initial estimate covariance matrix. If None, identity is used

None
alpha float

Spread of sigma points around mean

1e-3
beta float

Prior knowledge about distribution (2 is optimal for Gaussian)

2.0
kappa float

Secondary scaling parameter

0.0

Returns:

Type Description
ndarray

Filtered time series (state estimates)

Examples:

>>> import numpy as np
>>> from pypulate.filters import unscented_kalman_filter
>>> # Define non-linear system
>>> def state_transition(x):
...     # Non-linear state transition function
...     return np.array([x[0] + x[1], 0.5 * x[1]])
>>> def observation(x):
...     # Non-linear observation function
...     return np.array([np.sin(x[0])])
>>> # Create data
>>> n = 100
>>> true_states = np.zeros((n, 2))
>>> true_states[0] = [0, 1]
>>> for i in range(1, n):
...     true_states[i] = state_transition(true_states[i-1])
>>> observations = np.array([observation(x)[0] for x in true_states])
>>> observations += np.random.normal(0, 0.1, n)
>>> # Apply UKF
>>> Q = np.eye(2) * 0.01  # Process noise covariance
>>> R = np.array([[0.1]])  # Observation noise covariance
>>> filtered_states = unscented_kalman_filter(
...     observations, state_transition, observation, Q, R
... )

Signal Filters

Apply a Butterworth filter to a time series.

The Butterworth filter is a type of signal processing filter designed to have a frequency response as flat as possible in the passband.

Parameters:

Name Type Description Default
data array_like

Input time series data

required
cutoff float or tuple of float

Cutoff frequency. For lowpass and highpass, this is a scalar. For bandpass and bandstop, this is a tuple of (low, high)

required
order int

Filter order

4
filter_type str

Filter type: 'lowpass', 'highpass', 'bandpass', or 'bandstop'

'lowpass'
fs float

Sampling frequency

1.0

Returns:

Type Description
ndarray

Filtered time series

Examples:

>>> import numpy as np
>>> from pypulate.filters import butterworth_filter
>>> # Create noisy data
>>> x = np.linspace(0, 10, 1000)
>>> signal = np.sin(2 * np.pi * 0.05 * x) + 0.5 * np.sin(2 * np.pi * 0.25 * x)
>>> # Apply lowpass filter to remove high frequency component
>>> filtered = butterworth_filter(signal, cutoff=0.1, filter_type='lowpass', fs=1.0)

Apply a Chebyshev filter to a time series.

The Chebyshev filter is a filter with steeper roll-off than the Butterworth but more passband ripple (type I) or stopband ripple (type II).

Parameters:

Name Type Description Default
data array_like

Input time series data

required
cutoff float or tuple of float

Cutoff frequency. For lowpass and highpass, this is a scalar. For bandpass and bandstop, this is a tuple of (low, high)

required
order int

Filter order

4
ripple float

Maximum ripple allowed in the passband (type I) or stopband (type II)

1.0
filter_type str

Filter type: 'lowpass', 'highpass', 'bandpass', or 'bandstop'

'lowpass'
fs float

Sampling frequency

1.0
type_num int

Type of Chebyshev filter: 1 for Type I, 2 for Type II

1

Returns:

Type Description
ndarray

Filtered time series

Examples:

>>> import numpy as np
>>> from pypulate.filters import chebyshev_filter
>>> # Create noisy data
>>> x = np.linspace(0, 10, 1000)
>>> signal = np.sin(2 * np.pi * 0.05 * x) + 0.5 * np.sin(2 * np.pi * 0.25 * x)
>>> # Apply Chebyshev type I lowpass filter
>>> filtered = chebyshev_filter(signal, cutoff=0.1, ripple=0.5, type_num=1)

Apply a Savitzky-Golay filter to a time series.

The Savitzky-Golay filter is a digital filter that can be applied to a set of digital data points to smooth the data by increasing the signal-to-noise ratio without greatly distorting the signal.

Parameters:

Name Type Description Default
data array_like

Input time series data

required
window_length int

Length of the filter window (must be odd)

11
polyorder int

Order of the polynomial used to fit the samples (must be less than window_length)

3
deriv int

Order of the derivative to compute

0
delta float

Spacing of the samples to which the filter is applied

1.0

Returns:

Type Description
ndarray

Filtered time series

Examples:

>>> import numpy as np
>>> from pypulate.filters import savitzky_golay_filter
>>> # Create noisy data
>>> x = np.linspace(0, 10, 100)
>>> signal = np.sin(x) + np.random.normal(0, 0.1, len(x))
>>> # Apply Savitzky-Golay filter
>>> filtered = savitzky_golay_filter(signal, window_length=11, polyorder=3)

Apply a Wiener filter to a time series.

The Wiener filter is a filter used to produce an estimate of a desired or target signal by linear filtering of an observed noisy signal.

Parameters:

Name Type Description Default
data array_like

Input time series data

required
mysize int or tuple of int

Size of the filter window

3
noise float

Estimate of the noise power. If None, it's estimated from the data

None

Returns:

Type Description
ndarray

Filtered time series

Examples:

>>> import numpy as np
>>> from pypulate.filters import wiener_filter
>>> # Create noisy data
>>> x = np.linspace(0, 10, 100)
>>> signal = np.sin(x) + np.random.normal(0, 0.1, len(x))
>>> # Apply Wiener filter
>>> filtered = wiener_filter(signal, mysize=5)

Apply a median filter to a time series.

The median filter is a nonlinear digital filtering technique used to remove noise from a signal. It replaces each entry with the median of neighboring entries.

Parameters:

Name Type Description Default
data array_like

Input time series data

required
kernel_size int

Size of the filter kernel

3

Returns:

Type Description
ndarray

Filtered time series

Examples:

>>> import numpy as np
>>> from pypulate.filters import median_filter
>>> # Create noisy data with outliers
>>> x = np.linspace(0, 10, 100)
>>> signal = np.sin(x)
>>> signal[10] = 5  # Add outlier
>>> signal[50] = -5  # Add outlier
>>> # Apply median filter to remove outliers
>>> filtered = median_filter(signal, kernel_size=5)

Apply a Hampel filter to a time series.

The Hampel filter is used to identify and replace outliers in a time series. It uses the median and the median absolute deviation (MAD) to identify outliers.

Parameters:

Name Type Description Default
data array_like

Input time series data

required
window_size int

Size of the window (number of points on each side of the current point)

5
n_sigmas float

Number of standard deviations to use for outlier detection

3.0

Returns:

Type Description
ndarray

Filtered time series

Examples:

>>> import numpy as np
>>> from pypulate.filters import hampel_filter
>>> # Create noisy data with outliers
>>> x = np.linspace(0, 10, 100)
>>> signal = np.sin(x)
>>> signal[10] = 5  # Add outlier
>>> signal[50] = -5  # Add outlier
>>> # Apply Hampel filter to remove outliers
>>> filtered = hampel_filter(signal, window_size=5, n_sigmas=3.0)

Apply the Hodrick-Prescott filter to decompose a time series into trend and cycle components.

The Hodrick-Prescott filter is a mathematical tool used in macroeconomics to separate the cyclical component of a time series from raw data.

Parameters:

Name Type Description Default
data array_like

Input time series data

required
lambda_param float

Smoothing parameter. The larger the value, the smoother the trend component

1600.0

Returns:

Type Description
tuple of np.ndarray

Tuple containing (trend, cycle) components

Examples:

>>> import numpy as np
>>> from pypulate.filters import hodrick_prescott_filter
>>> # Create data with trend and cycle
>>> x = np.linspace(0, 10, 100)
>>> trend = 0.1 * x**2
>>> cycle = np.sin(2 * np.pi * 0.1 * x)
>>> data = trend + cycle
>>> # Apply Hodrick-Prescott filter
>>> trend_component, cycle_component = hodrick_prescott_filter(data, lambda_param=100)

Apply the Baxter-King bandpass filter to extract business cycle components.

The Baxter-King filter is a bandpass filter used to extract business cycle components from macroeconomic time series.

Parameters:

Name Type Description Default
data array_like

Input time series data

required
low float

Minimum period of oscillations (in number of observations)

6
high float

Maximum period of oscillations (in number of observations)

32
K int

Number of terms in the approximation (lead/lag)

12

Returns:

Type Description
ndarray

Filtered time series (cycle component)

Examples:

>>> import numpy as np
>>> from pypulate.filters import baxter_king_filter
>>> # Create data with trend and multiple cycles
>>> x = np.linspace(0, 100, 1000)
>>> trend = 0.01 * x
>>> long_cycle = np.sin(2 * np.pi * x / 100)  # Period of 100
>>> business_cycle = np.sin(2 * np.pi * x / 20)  # Period of 20
>>> short_cycle = np.sin(2 * np.pi * x / 5)  # Period of 5
>>> data = trend + long_cycle + business_cycle + short_cycle
>>> # Extract business cycle component (periods between 8 and 32)
>>> cycle = baxter_king_filter(data, low=8, high=32, K=12)

Adaptive Filters

Apply an adaptive Kalman filter to a time series.

The adaptive Kalman filter automatically adjusts its parameters based on the observed data, making it more robust to changing dynamics.

Parameters:

Name Type Description Default
data ndarray

Input time series data

required
process_variance_init float

Initial process noise variance (Q)

1e-5
measurement_variance_init float

Initial measurement noise variance (R)

1e-3
adaptation_rate float

Rate at which the filter adapts to changes

0.01
window_size int

Size of the window for innovation estimation

10
initial_state float

Initial state estimate. If None, the first data point is used

None
initial_covariance float

Initial estimate covariance

1.0

Returns:

Type Description
ndarray

Filtered time series

Apply a Least Mean Squares (LMS) adaptive filter to a time series.

The LMS algorithm is an adaptive filter that adjusts its coefficients to minimize the mean square error between the desired signal and the filter output.

Parameters:

Name Type Description Default
data ndarray

Input time series data

required
desired ndarray

Desired signal. If None, a delayed version of the input is used

None
filter_length int

Length of the adaptive filter

5
mu float

Step size (learning rate) of the adaptation

0.01
initial_weights ndarray

Initial filter weights. If None, zeros are used

None

Returns:

Type Description
tuple of np.ndarray

Tuple containing (filtered_data, filter_weights)

Apply a Recursive Least Squares (RLS) adaptive filter to a time series.

The RLS algorithm is an adaptive filter that recursively finds the filter coefficients that minimize a weighted linear least squares cost function related to the input signals.

Parameters:

Name Type Description Default
data ndarray

Input time series data

required
desired ndarray

Desired signal. If None, a delayed version of the input is used

None
filter_length int

Length of the adaptive filter

5
forgetting_factor float

Forgetting factor (0 < lambda <= 1)

0.99
delta float

Regularization parameter for the initial correlation matrix

1.0
initial_weights ndarray

Initial filter weights. If None, zeros are used

None

Returns:

Type Description
tuple of np.ndarray

Tuple containing (filtered_data, filter_weights)

Particle Filters

Apply a particle filter to a time series.

The particle filter is a sequential Monte Carlo method that uses a set of particles (samples) to represent the posterior distribution of some stochastic process given noisy and/or partial observations.

Parameters:

Name Type Description Default
data array_like

Input time series data (observations)

required
state_transition_func callable

Function that propagates particles through the state transition model

required
observation_func callable

Function that computes the expected observation from a state

required
process_noise_func callable

Function that adds process noise to particles

required
observation_likelihood_func callable

Function that computes the likelihood of an observation given a state

required
n_particles int

Number of particles

100
initial_state_func callable

Function that generates initial particles. If None, a default is used

None
resample_threshold float

Threshold for effective sample size ratio below which resampling occurs

0.5

Returns:

Type Description
tuple of np.ndarray

Tuple containing (filtered_states, particle_weights)

Apply a bootstrap particle filter to a time series.

The bootstrap particle filter is a simplified version of the particle filter that resamples at every step and uses the state transition prior as the proposal.

Parameters:

Name Type Description Default
data array_like

Input time series data (observations)

required
state_transition_func callable

Function that propagates particles through the state transition model

required
observation_func callable

Function that computes the expected observation from a state

required
process_noise_std float

Standard deviation of the process noise

0.1
observation_noise_std float

Standard deviation of the observation noise

0.1
n_particles int

Number of particles

100
initial_state_mean float

Mean of the initial state distribution. If None, the first observation is used

None
initial_state_std float

Standard deviation of the initial state distribution

1.0

Returns:

Type Description
tuple of np.ndarray

Tuple containing (filtered_states, particle_weights)