Source code for torch_ecg.augmenters.baseline_wander

"""Add baseline wander composed of sinusoidal and Gaussian noise to the ECGs."""

import multiprocessing as mp
from itertools import repeat
from numbers import Real
from random import randint
from typing import Any, List, Optional, Sequence, Tuple, Union

import numpy as np
import torch
from torch import Tensor

from ..cfg import DEFAULTS
from ..utils.utils_signal import get_ampl
from .base import Augmenter

__all__ = [
    "BaselineWanderAugmenter",
]


[docs]class BaselineWanderAugmenter(Augmenter): """Generate baseline wander composed of sinusoidal and Gaussian noise. Parameters ---------- fs : int Sampling frequency of the ECGs to be augmented bw_fs : numpy.ndarray, optional Frequencies of the sinusoidal noises, of shape ``(n,)``, defaults to ``[0.33, 0.1, 0.05, 0.01]``. ampl_ratio : numpy.ndarray, optional Candidate ratios of noise amplitdes compared to the original ECGs for each `fs`, of shape ``(m, n)``, defaults to .. code-block:: python np.array( [ [0.01, 0.01, 0.02, 0.03], # low [0.01, 0.02, 0.04, 0.05], # low [0.1, 0.06, 0.04, 0.02], # low [0.02, 0.04, 0.07, 0.1], # low [0.05, 0.1, 0.16, 0.25], # medium [0.1, 0.15, 0.25, 0.3], # high [0.25, 0.25, 0.3, 0.35], # extremely high ] ) gaussian : numpy.ndarray, optional Candidate mean and std of the Gaussian noises, of shape ``(k, 2)``, defaults to .. code-block:: python np.array( [ # mean and std, in terms of ratio [0.0, 0.001], [0.0, 0.003], [0.0, 0.01], ] ) prob : float, default 0.5 Probability of performing the augmentation. inplace : bool, default True If True, ECG signal tensors will be modified inplace. kwargs : dict, optional Additional keyword arguments. Examples -------- .. code-block:: python blw = BaselineWanderAugmenter(300, prob=0.7) sig = torch.randn(32, 12, 5000) label = torch.ones((32, 20)) sig, _ = blw(sig, label) """ __name__ = "BaselineWanderAugmenter" def __init__( self, fs: int, bw_fs: Optional[np.ndarray] = None, ampl_ratio: Optional[np.ndarray] = None, gaussian: Optional[np.ndarray] = None, prob: float = 0.5, inplace: bool = True, **kwargs: Any ) -> None: super().__init__() self.fs = fs self.bw_fs = bw_fs if bw_fs is not None else np.array([0.33, 0.1, 0.05, 0.01]) self.prob = prob assert 0 <= self.prob <= 1, "Probability must be between 0 and 1" self.ampl_ratio = ( ampl_ratio if ampl_ratio is not None else np.array( [ # default ampl_ratio [0.01, 0.01, 0.02, 0.03], # low [0.01, 0.02, 0.04, 0.05], # low [0.1, 0.06, 0.04, 0.02], # low [0.02, 0.04, 0.07, 0.1], # low [0.05, 0.1, 0.16, 0.25], # medium [0.1, 0.15, 0.25, 0.3], # high [0.25, 0.25, 0.3, 0.35], # extremely high ] ) ) if self.prob > 0: self.ampl_ratio = np.concatenate( ( np.zeros( ( int((1 - self.prob) * self.ampl_ratio.shape[0] / self.prob), self.ampl_ratio.shape[1], ) ), self.ampl_ratio, ) ) self.gaussian = ( gaussian if gaussian is not None else np.array( [ # default gaussian, mean and std, in terms of ratio [0.0, 0.001], [0.0, 0.003], [0.0, 0.01], ] ) ) if self.prob > 0: self.gaussian = np.concatenate( ( np.zeros( ( int((1 - self.prob) * self.gaussian.shape[0] / self.prob), self.gaussian.shape[1], ) ), self.gaussian, ) ) assert self.bw_fs.ndim == 1 and self.ampl_ratio.ndim == 2 and self.bw_fs.shape[0] == self.ampl_ratio.shape[1] self.inplace = inplace self._n_bw_choices = len(self.ampl_ratio) self._n_gn_choices = len(self.gaussian)
[docs] def forward( self, sig: Tensor, label: Optional[Tensor], *extra_tensors: Sequence[Tensor], **kwargs: Any ) -> Tuple[Tensor, ...]: """Forward function of the :class:`BaselineWanderAugmenter`. Parameters ---------- sig : torch.Tensor Batched ECGs to be augmented, of shape ``(batch, lead, siglen)``. label : torch.Tensor, optional Batched label tensor of the ECGs. Not used, but kept for consistency with other augmenters. extra_tensors : Sequence[torch.Tensor], optional, Not used, but kept for consistency with other augmenters. **kwargs : dict, optional Not used, but kept for consistency with other augmenters. Returns ------- sig : torch.Tensor The augmented ECGs. label : torch.Tensor Label tensor of the augmented ECGs, unchanged. extra_tensors : Sequence[torch.Tensor], optional Unchanged extra tensors. """ if not self.inplace: sig = sig.clone() if self.prob > 0: sig.add_(gen_baseline_wander(sig, self.fs, self.bw_fs, self.ampl_ratio, self.gaussian)) return (sig, label, *extra_tensors)
[docs] def extra_repr_keys(self) -> List[str]: return [ "fs", "bw_fs", "prob", "inplace", ] + super().extra_repr_keys()
def _get_ampl(sig: Tensor, fs: int) -> Tensor: """Get the amplitude of each lead. Parameters ---------- sig : torch.Tensor Batched ECG signal tensor, of shape ``(batch, lead, siglen)``. fs : int Sampling frequency of the ECGs. Returns ------- ampl : torch.Tensor Amplitude of each lead, of shape ``(batch, lead, 1)``. """ with mp.Pool(processes=max(1, mp.cpu_count() - 2)) as pool: ampl = pool.starmap( get_ampl, iterable=[(sig[i].cpu().numpy(), fs) for i in range(sig.shape[0])], ) ampl = torch.as_tensor(np.array(ampl), dtype=sig.dtype, device=sig.device).unsqueeze(-1) return ampl def _gen_gaussian_noise(siglen: int, mean: Real = 0, std: Real = 0) -> np.ndarray: """Generate 1d Gaussian noise of given length, mean, and standard deviation. Parameters ---------- siglen : int Length of the noise signal. mean : numbers.Real, default 0 Mean value of the noise. std : numbers.Real, default 0 Standard deviation of the noise. Returns ------- gn : numpy.ndarray Gaussian noise of given length, mean, and standard deviation. """ gn = DEFAULTS.RNG.normal(mean, std, siglen) return gn def _gen_sinusoidal_noise( siglen: int, start_phase: Real, end_phase: Real, amplitude: Real, amplitude_mean: Real = 0, amplitude_std: Real = 0, ) -> np.ndarray: """Generate 1d sinusoidal noise of given length, amplitude, start phase, and end phase. Parameters ---------- siglen : int Length of the (noise) signal. start_phase : numbers.Real Start phase, with units in degrees. end_phase : numbers.Real End phase, with units in degrees. amplitude : numbers.Real Amplitude of the sinusoidal curve. amplitude_mean : numbers.Real Mean amplitude of an extra Gaussian noise. amplitude_std : numbers.Real, default 0 Standard deviation of an extra Gaussian noise Returns ------- sn : numpy.ndarray, Sinusoidal noise of given length, amplitude, start phase, and end phase. """ sn = np.linspace(start_phase, end_phase, siglen) sn = amplitude * np.sin(np.pi * sn / 180) sn += _gen_gaussian_noise(siglen, amplitude_mean, amplitude_std) return sn def _gen_baseline_wander( siglen: int, fs: Real, bw_fs: Union[Real, Sequence[Real]], amplitude: Union[Real, Sequence[Real]], amplitude_gaussian: Sequence[Real] = [0, 0], ) -> np.ndarray: """Generate 1d baseline wander of given length, amplitude, and frequency. Parameters ---------- siglen : int Length of the (noise) signal. fs : numbers.Real Sampling frequency of the original signal. bw_fs : numbers.Real, or list of numbers.Real Frequency (Frequencies) of the baseline wander. amplitude : numbers.Real, or list of numbers.Real Amplitude of the baseline wander (corr. to each frequency band). amplitude_gaussian : Tuple[numbers.Real], default [0,0] 2-tuple of :class:`~numbers.Real`. Mean and std of amplitude of an extra Gaussian noise. Returns ------- bw : numpy.ndarray Baseline wander of given length, amplitude, frequency. Example ------- >>> _gen_baseline_wander(4000, 400, [0.4,0.1,0.05], [0.1,0.2,0.4]) """ bw = _gen_gaussian_noise(siglen, amplitude_gaussian[0], amplitude_gaussian[1]) if isinstance(bw_fs, Real): _bw_fs = [bw_fs] else: _bw_fs = bw_fs if isinstance(amplitude, Real): _amplitude = list(repeat(amplitude, len(_bw_fs))) else: _amplitude = amplitude assert len(_bw_fs) == len(_amplitude) duration = siglen / fs for bf, a in zip(_bw_fs, _amplitude): start_phase = DEFAULTS.RNG_randint(0, 360) end_phase = duration * bf * 360 + start_phase bw += _gen_sinusoidal_noise(siglen, start_phase, end_phase, a, 0, 0) return bw def gen_baseline_wander( sig: Tensor, fs: Real, bw_fs: Union[Real, Sequence[Real]], ampl_ratio: np.ndarray, gaussian: np.ndarray, ) -> np.ndarray: """Generate 1d baseline wander of given length, amplitude, and frequency. Parameters ---------- sig : torch.Tensor Batched ECGs to be augmented, of shape (batch, lead, siglen). fs : numbers.Real Sampling frequency of the original signal. bw_fs : numbers.Real, or list of numbers.Real, Frequency (Frequencies) of the baseline wander. ampl_ratio : numpy.ndarray, optional Candidate ratios of noise amplitdes compared to the original ECGs for each `fs`, of shape ``(m, n)``. gaussian : numpy.ndarray, optional Candidate mean and std of the Gaussian noises, of shape ``(k, 2)``. Returns ------- bw : numpy.ndarray Baseline wander of given length, amplitude, frequency, of shape ``(batch, lead, siglen)``. """ batch, lead, siglen = sig.shape sig_ampl = _get_ampl(sig, fs) _n_bw_choices = len(ampl_ratio) _n_gn_choices = len(gaussian) with mp.Pool(processes=max(1, mp.cpu_count() - 2)) as pool: bw = pool.starmap( _gen_baseline_wander, iterable=[ ( siglen, fs, bw_fs, ampl_ratio[randint(0, _n_bw_choices - 1)], gaussian[randint(0, _n_gn_choices - 1)], ) for i in range(sig.shape[0]) for j in range(sig.shape[1]) ], ) bw = torch.as_tensor(np.array(bw), dtype=sig.dtype, device=sig.device).reshape(batch, lead, siglen) return bw