Source code for torch_ecg.utils.utils_nn

"""
utilities for nn models

"""

import os
import re
import warnings
from copy import deepcopy
from itertools import chain, repeat
from math import floor
from numbers import Real
from pathlib import Path
from typing import Dict, List, Optional, Sequence, Tuple, Union

import numpy as np
import torch
from torch import Tensor, nn

from ..cfg import CFG, DEFAULTS
from .misc import add_docstring, make_serializable
from .utils_data import cls_to_bin

__all__ = [
    "extend_predictions",
    "compute_output_shape",
    "compute_conv_output_shape",
    "compute_deconv_output_shape",
    "compute_maxpool_output_shape",
    "compute_avgpool_output_shape",
    "compute_sequential_output_shape",
    "compute_sequential_output_shape_docstring",
    "compute_module_size",
    "default_collate_fn",
    "compute_receptive_field",
    "adjust_cnn_filter_lengths",
    "SizeMixin",
    "CkptMixin",
]


[docs]def extend_predictions(preds: Sequence, classes: List[str], extended_classes: List[str]) -> np.ndarray: """Extend the prediction arrays to prediction arrays in larger range of classes Parameters ---------- preds : array_like Array of predictions (scalar or binary) of shape ``(n_records, n_classes)``, or categorical predictions of shape ``(n_classes,)``, where ``n_classes = len(classes)``. classes : List[str] Classes of the predictions of `preds`. extended_classes : List[str] A superset of `classes`. The predictions will be extended to this range of classes. Returns ------- extended_preds : numpy.ndarray The extended array of predictions, with indices in `extended_classes`, of shape ``(n_records, n_classes)``, or ``(n_classes,)``. Examples -------- .. code-block:: python n_records, n_classes = 10, 3 classes = ["NSR", "AF", "PVC"] extended_classes = ["AF", "RBBB", "PVC", "NSR"] scalar_pred = torch.rand(n_records, n_classes) extended_pred = extend_predictions(scalar_pred, classes, extended_classes) bin_pred = torch.randint(0, 2, (n_records, n_classes)) extended_pred = extend_predictions(bin_pred, classes, extended_classes) cate_pred = torch.randint(0, n_classes, (n_records,)) extended_pred = extend_predictions(cate_pred, classes, extended_classes) """ assert len(set(classes) - set(extended_classes)) == 0, ( "`extended_classes` is not a superset of `classes`, " f"with {set(classes)-set(extended_classes)} in `classes` but not in `extended_classes`" ) if isinstance(preds, Tensor): _preds = preds.numpy() else: _preds = np.array(preds) if np.ndim(_preds) == 1: # categorical predictions extended_preds = cls_to_bin(_preds, len(classes)) extended_preds = extend_predictions(extended_preds, classes, extended_classes) extended_preds = np.where(extended_preds == 1)[1] return extended_preds assert _preds.shape[1] == len(classes), f"`pred` indicates {_preds.shape[1]} classes, while `classes` has {len(classes)}" extended_preds = np.zeros((_preds.shape[0], len(extended_classes))) for idx, c in enumerate(classes): new_idx = extended_classes.index(c) extended_preds[..., new_idx] = _preds[..., idx] if np.array(preds).ndim == 1: extended_preds = extended_preds[0] return extended_preds
# utils for computing output shape
[docs]def compute_output_shape( layer_type: str, input_shape: Sequence[Union[int, None]], num_filters: Optional[int] = None, kernel_size: Union[Sequence[int], int] = 1, stride: Union[Sequence[int], int] = 1, padding: Union[Sequence[int], int] = 0, output_padding: Union[Sequence[int], int] = 0, dilation: Union[Sequence[int], int] = 1, channel_last: bool = False, asymmetric_padding: Union[Sequence[int], Sequence[Sequence[int]]] = None, ) -> Tuple[Union[int, None]]: """Compute the output shape of a (transpose) convolution/maxpool/avgpool layer. This function is based on the discussion [#disc]_. Parameters ---------- layer_type : str Type (conv, maxpool, avgpool, etc.) of the layer. input_shape : Sequence[Union[int, None]] Shape of an input :class:`~torch.Tensor`. The first dimension is the batch dimension, which is allowed to be `None`. num_filters : int, optional Number of filters, also the channel dimension. kernel_size : int or Sequence[int], default 1 Kernel size (filter size) of the layer, should be compatible with `input_shape`. stride : int or Sequence[int], default 1 Stride (down-sampling length) of the layer, should be compatible with `input_shape`. padding : int or Sequence[int], default 0 Padding length(s) of the layer, should be compatible with `input_shape`. output_padding : int or Sequence[int], default 0 Additional size added to one side of the output shape, used only for transpose convolution. dilation : int or Sequence[int], default 1 Dilation of the layer, should be compatible with `input_shape`. channel_last : bool, default False Whether the channel dimension is the last dimension, or the second dimension (the first is the batch dimension by convention). asymmetric_padding : Sequence[int] or Sequence[Sequence[int]], optional (2-)sequence of int or sequence of (2-)sequence of int asymmetric paddings for all dimensions or for each dimension. Returns ------- output_shape : tuple Shape of the output :class:`~torch.Tensor`. References ---------- .. [#disc] https://discuss.pytorch.org/t/utility-function-for-calculating-the-shape-of-a-conv-output/11173/5 """ # check validity of arguments __TYPES__ = [ "conv", "convolution", "deconv", "deconvolution", "transposeconv", "transposeconvolution", "maxpool", "maxpooling", "avgpool", "avgpooling", "averagepool", "averagepooling", ] lt = "".join(layer_type.lower().split("_")) assert lt in __TYPES__, f"Unknown layer type `{layer_type}`, should be one of: {__TYPES__}" def assert_positive_integer(num): return isinstance(num, int) and num > 0 def assert_non_negative_integer(num): return isinstance(num, int) and num >= 0 dim = len(input_shape) - 2 assert dim > 0, ( "`input_shape` should be a sequence of length at least 3, " "to be a valid (with batch and channel) shape of a non-degenerate Tensor" ) assert all( [s is None or assert_positive_integer(s) for s in input_shape] ), "`input_shape` should be a sequence containing only `None` and positive integers" if num_filters is not None: assert assert_positive_integer(num_filters), "`num_filters` should be `None` or positive integer" assert all( [assert_positive_integer(num) for num in np.asarray(kernel_size).flatten().tolist()] ), "`kernel_size` should contain only positive integers" assert all( [assert_positive_integer(num) for num in np.asarray(stride).flatten().tolist()] ), "`stride` should contain only positive integers" assert all( [assert_non_negative_integer(num) for num in np.asarray(padding).flatten().tolist()] ), "`padding` should contain only non-negative integers" assert all( [assert_non_negative_integer(num) for num in np.asarray(output_padding).flatten().tolist()] ), "`output_padding` should contain only non-negative integers" assert all( [assert_positive_integer(num) for num in np.asarray(dilation).flatten().tolist()] ), "`dilation` should contain only positive integers" if lt in [ "conv", "convolution", ]: # as function of dilation, kernel_size def minus_term(d, k): return d * (k - 1) + 1 out_channels = num_filters elif lt in [ "maxpool", "maxpooling", ]: def minus_term(d, k): return d * (k - 1) + 1 out_channels = input_shape[-1] if channel_last else input_shape[1] elif lt in [ "avgpool", "avgpooling", "averagepool", "averagepooling", ]: def minus_term(d, k): return k out_channels = input_shape[-1] if channel_last else input_shape[1] elif lt in [ "deconv", "deconvolution", "transposeconv", "transposeconvolution", ]: out_channels = num_filters def check_output_validity(shape): assert all(p is None or p > 0 for p in shape), f"output shape `{shape}` is illegal, please check input arguments" return shape # none_dim_msg = "only batch and channel dimension can be `None`" # if channel_last: # assert all([n is not None for n in input_shape[1:-1]]), none_dim_msg # else: # assert all([n is not None for n in input_shape[2:]]), none_dim_msg none_dim_msg = "spatial dimensions should be all `None`, or all not `None`" if channel_last: if all([n is None for n in input_shape[1:-1]]): if out_channels is None: raise ValueError("out channel dimension and spatial dimensions are all `None`") output_shape = tuple(list(input_shape[:-1]) + [out_channels]) return check_output_validity(output_shape) elif any([n is None for n in input_shape[1:-1]]): raise ValueError(none_dim_msg) else: if all([n is None for n in input_shape[2:]]): if out_channels is None: raise ValueError("out channel dimension and spatial dimensions are all `None`") output_shape = tuple([input_shape[0], out_channels] + list(input_shape[2:])) return check_output_validity(output_shape) elif any([n is None for n in input_shape[2:]]): raise ValueError(none_dim_msg) if isinstance(kernel_size, int): _kernel_size = list(repeat(kernel_size, dim)) elif len(kernel_size) == dim: _kernel_size = kernel_size else: raise ValueError( f"input has {dim} dimensions, while kernel has {len(kernel_size)} dimensions, " "both not including the channel dimension" ) if isinstance(stride, int): _stride = list(repeat(stride, dim)) elif len(stride) == dim: _stride = stride else: raise ValueError( f"input has {dim} dimensions, while `kernel` has {len(stride)} dimensions, " "both not including the channel dimension" ) # NOTE: asymmetric padding along one spatial dimension # seems not supported yet by PyTorch's builtin Module classes if isinstance(padding, int): _padding = list(repeat(list(repeat(padding, 2)), dim)) # elif len(padding) == 2 and isinstance(padding[0], int): # _padding = list(repeat(padding, dim)) # elif ( # len(padding) == dim # and all([isinstance(p, Sequence) for p in padding]) # and all([len(p) == 2 for p in padding]) # ): elif len(padding) == dim: _padding = [list(repeat(p, 2)) for p in padding] else: raise ValueError( f"input has {dim} dimensions, while `padding` has {len(padding)} dimensions, " "both not including the channel dimension" ) # raise ValueError("Invalid `padding`") if asymmetric_padding is not None: assert hasattr(asymmetric_padding, "__len__"), "Invalid `asymmetric_padding`" if isinstance(asymmetric_padding[0], int): assert len(asymmetric_padding) == 2 and isinstance(asymmetric_padding[1], int), "Invalid `asymmetric_padding`" _asymmetric_padding = list(repeat(asymmetric_padding, dim)) else: assert len(asymmetric_padding) == dim and all( len(ap) == 2 and all(isinstance(p, int) for p in ap) for ap in asymmetric_padding ), "Invalid `asymmetric_padding`" _asymmetric_padding = asymmetric_padding for idx in range(dim): _padding[idx][0] += _asymmetric_padding[idx][0] _padding[idx][1] += _asymmetric_padding[idx][1] if isinstance(output_padding, int): _output_padding = list(repeat(output_padding, dim)) elif len(output_padding) == dim: _output_padding = output_padding else: raise ValueError( f"input has {dim} dimensions, while `output_padding` has {len(output_padding)} dimensions, " "both not including the channel dimension" ) if isinstance(dilation, int): _dilation = list(repeat(dilation, dim)) elif len(dilation) == dim: _dilation = dilation else: raise ValueError( f"input has {dim} dimensions, while `dilation` has {len(dilation)} dimensions, " "both not including the channel dimension" ) if channel_last: _input_shape = list(input_shape[1:-1]) else: _input_shape = list(input_shape[2:]) if lt in [ "deconv", "deconvolution", "transposeconv", "transposeconvolution", ]: output_shape = [ (i - 1) * s - sum(p) + d * (k - 1) + o + 1 for i, p, o, d, k, s in zip( _input_shape, _padding, _output_padding, _dilation, _kernel_size, _stride, ) ] else: output_shape = [ floor(((i + sum(p) - minus_term(d, k)) / s) + 1) for i, p, d, k, s in zip(_input_shape, _padding, _dilation, _kernel_size, _stride) ] if channel_last: output_shape = tuple([input_shape[0]] + output_shape + [out_channels]) else: output_shape = tuple([input_shape[0], out_channels] + output_shape) return check_output_validity(output_shape)
[docs]def compute_conv_output_shape( input_shape: Sequence[Union[int, None]], num_filters: Optional[int] = None, kernel_size: Union[Sequence[int], int] = 1, stride: Union[Sequence[int], int] = 1, padding: Union[Sequence[int], int] = 0, dilation: Union[Sequence[int], int] = 1, channel_last: bool = False, asymmetric_padding: Union[Sequence[int], Sequence[Sequence[int]]] = None, ) -> Tuple[Union[int, None]]: """Compute the output shape of a convolution layer. Parameters ---------- input_shape : Sequence[Union[int, None]] Shape of an input :class:`~torch.Tensor`. The first dimension is the batch dimension, which is allowed to be `None`. num_filters : int, optional Number of filters, also the channel dimension. kernel_size : int or Sequence[int], default 1 Kernel size (filter size) of the layer, should be compatible with `input_shape`. stride : int or Sequence[int], default 1 Stride (down-sampling length) of the layer, should be compatible with `input_shape`. padding : int or Sequence[int], default 0 Padding length(s) of the layer, should be compatible with `input_shape`. dilation : int or Sequence[int], default 1 Dilation of the layer, should be compatible with `input_shape`. channel_last : bool, default False Whether the channel dimension is the last dimension, or the second dimension (the first is the batch dimension by convention). asymmetric_padding : Sequence[int] or Sequence[Sequence[int]], optional (2-)sequence of int or sequence of (2-)sequence of int asymmetric paddings for all dimensions or for each dimension. Returns ------- output_shape : tuple Shape of the output :class:`~torch.Tensor`. """ output_shape = compute_output_shape( "conv", input_shape, num_filters, kernel_size, stride, padding, 0, dilation, channel_last, asymmetric_padding, ) return output_shape
[docs]def compute_maxpool_output_shape( input_shape: Sequence[Union[int, None]], kernel_size: Union[Sequence[int], int] = 1, stride: Union[Sequence[int], int] = 1, padding: Union[Sequence[int], int] = 0, dilation: Union[Sequence[int], int] = 1, channel_last: bool = False, ) -> Tuple[Union[int, None]]: """Compute the output shape of a maxpool layer. Parameters ---------- input_shape : Sequence[Union[int, None]] Shape of an input :class:`~torch.Tensor`. The first dimension is the batch dimension, which is allowed to be `None`. kernel_size : int or Sequence[int], default 1 Kernel size (filter size) of the layer, should be compatible with `input_shape`. stride : int or Sequence[int], default 1 Stride (down-sampling length) of the layer, should be compatible with `input_shape`. padding : int or Sequence[int], default 0 Padding length(s) of the layer, should be compatible with `input_shape`. dilation : int or Sequence[int], default 1 Dilation of the layer, should be compatible with `input_shape`. channel_last : bool, default False Whether the channel dimension is the last dimension, or the second dimension (the first is the batch dimension by convention). Returns ------- output_shape : tuple Shape of the output :class:`~torch.Tensor`. """ output_shape = compute_output_shape( "maxpool", input_shape, 1, kernel_size, stride, padding, 0, dilation, channel_last, ) return output_shape
[docs]def compute_avgpool_output_shape( input_shape: Sequence[Union[int, None]], kernel_size: Union[Sequence[int], int] = 1, stride: Union[Sequence[int], int] = 1, padding: Union[Sequence[int], int] = 0, channel_last: bool = False, ) -> Tuple[Union[int, None]]: """Compute the output shape of a avgpool layer. Parameters ---------- input_shape : Sequence[Union[int, None]] Shape of an input :class:`~torch.Tensor`. The first dimension is the batch dimension, which is allowed to be `None`. kernel_size : int or Sequence[int], default 1 Kernel size (filter size) of the layer, should be compatible with `input_shape`. stride : int or Sequence[int], default 1 Stride (down-sampling length) of the layer, should be compatible with `input_shape`. padding : int or Sequence[int], default 0 Padding length(s) of the layer, should be compatible with `input_shape`. channel_last : bool, default False Whether the channel dimension is the last dimension, or the second dimension (the first is the batch dimension by convention). Returns ------- output_shape : tuple Shape of the output :class:`~torch.Tensor`. """ output_shape = compute_output_shape( "avgpool", input_shape, 1, kernel_size, stride, padding, 0, 1, channel_last, ) return output_shape
[docs]def compute_deconv_output_shape( input_shape: Sequence[Union[int, None]], num_filters: Optional[int] = None, kernel_size: Union[Sequence[int], int] = 1, stride: Union[Sequence[int], int] = 1, padding: Union[Sequence[int], int] = 0, output_padding: Union[Sequence[int], int] = 0, dilation: Union[Sequence[int], int] = 1, channel_last: bool = False, asymmetric_padding: Union[Sequence[int], Sequence[Sequence[int]]] = None, ) -> Tuple[Union[int, None]]: """Compute the output shape of a transpose convolution layer Parameters ---------- input_shape : Sequence[Union[int, None]] Shape of an input :class:`~torch.Tensor`. The first dimension is the batch dimension, which is allowed to be `None`. num_filters : int, optional Number of filters, also the channel dimension. kernel_size : int or Sequence[int], default 1 Kernel size (filter size) of the layer, should be compatible with `input_shape`. stride : int or Sequence[int], default 1 Stride (down-sampling length) of the layer, should be compatible with `input_shape`. padding : int or Sequence[int], default 0 Padding length(s) of the layer, should be compatible with `input_shape`. output_padding : int or Sequence[int], default 0 Additional size added to one side of the output shape. dilation : int or Sequence[int], default 1 Dilation of the layer, should be compatible with `input_shape`. channel_last : bool, default False Whether the channel dimension is the last dimension, or the second dimension (the first is the batch dimension by convention). asymmetric_padding : Sequence[int] or Sequence[Sequence[int]], optional (2-)sequence of int or sequence of (2-)sequence of int asymmetric paddings for all dimensions or for each dimension. Returns ------- output_shape : tuple Shape of the output :class:`~torch.Tensor`. """ output_shape = compute_output_shape( "deconv", input_shape, num_filters, kernel_size, stride, padding, output_padding, dilation, channel_last, asymmetric_padding, ) return output_shape
compute_sequential_output_shape_docstring = """ Parameters ---------- seq_len : int, optional Length of the input tensors. batch_size : int, optional Batch size of the input tensors. Returns ------- output_shape : sequence The output shape of the module. """
[docs]@add_docstring(compute_sequential_output_shape_docstring, mode="append") def compute_sequential_output_shape( model: nn.Sequential, seq_len: Optional[int] = None, batch_size: Optional[int] = None, ) -> Sequence[Union[int, None]]: """Compute the output shape of a sequential model.""" assert issubclass(type(model), nn.Sequential), f"model should be nn.Sequential, but got {type(model)}" _seq_len = seq_len for module in model: output_shape = module.compute_output_shape(_seq_len, batch_size) _, _, _seq_len = output_shape return output_shape
[docs]def compute_module_size( module: nn.Module, requires_grad: bool = True, include_buffers: bool = False, human: bool = False, ) -> Union[int, str]: """compute the size (number of parameters) of a :class:`~torch.nn.Module`. Parameters ---------- module : torch.nn.Module The :class:`~torch.nn.Module` to compute the size. requires_grad : bool, default True Whether to only count the parameters that require gradients. include_buffers : bool, default False Whether to include the buffers. If `requires_grad` is True, then `include_buffers` is ignored. human : bool, default False Size is returned in a way that is easy to read by a human, by appending a suffix corresponding to the unit (B, K, M, G, T, P). Returns ------- n_params : int or str Size (number of parameters) of this :class:`~torch.nn.Module`, or a string representing the memory size. Examples -------- >>> import torch >>> class Model(torch.nn.Sequential): ... def __init__(self): ... super().__init__() ... self.add_module("linear", torch.nn.Linear(10, 20, dtype=torch.float16)) ... self.register_buffer("hehe", torch.ones(20, 2, dtype=torch.float64)) >>> model = Model() >>> model.linear.weight.requires_grad_(False) >>> compute_module_size(model) 20 >>> compute_module_size(model, requires_grad=False) 220 >>> compute_module_size(model, requires_grad=False, include_buffers=True) 260 >>> compute_module_size(model, requires_grad=False, include_buffers=True, human=True) '0.7K' >>> compute_module_size(model, requires_grad=False, include_buffers=False, human=True) '0.4K' >>> compute_module_size(model, human=True) '40.0B' """ if requires_grad: tensor_containers = filter(lambda p: p.requires_grad, module.parameters()) if include_buffers: warnings.warn( "`include_buffers` is ignored when `requires_grad` is True", RuntimeWarning, ) elif include_buffers: tensor_containers = chain(module.parameters(), module.buffers()) else: tensor_containers = module.parameters() if human: size_dict = { "torch.float16": 2, "torch.float32": 4, "torch.float64": 8, "torch.int8": 1, "torch.int16": 2, "torch.int32": 4, "torch.int64": 8, "torch.uint8": 1, } n_params = sum([np.prod(item.size()) * size_dict[str(item.dtype)] for item in tensor_containers]) div_count = 0 while n_params >= 1024 * 0.1: n_params /= 1024 div_count += 1 cvt_dict = {c: u for c, u in enumerate(list("BKMGTP"))} n_params = f"""{n_params:.1f}{cvt_dict[div_count]}""" else: n_params = int(sum([np.prod(item.size()) for item in tensor_containers])) return n_params
[docs]def compute_receptive_field( kernel_sizes: Union[Sequence[int], int] = 1, strides: Union[Sequence[int], int] = 1, dilations: Union[Sequence[int], int] = 1, input_len: Optional[int] = None, fs: Optional[Real] = None, ) -> Union[int, float]: """Compute the receptive field of several types of :class:`~torch.nn.Module`. Computes the (generic) receptive field of feature map of certain channel, from certain flow (if not merged, different flows, e.g. shortcut, must be computed separately), for convolutions, (non-global) poolings. "generic" refers to a general position, rather than specific positions, like on the edges, whose receptive field is definitely different. In CNNs, for any element of some layer, its receptive field refers to all the elements (from all the previous layers) that may affect the calculation of during the forward propagation [:footcite:ct:`zhang2023dive`]. (See `this url <https://d2l.ai/chapter_convolutional-neural-networks/conv-layer.html#feature-map-and-receptive-field>`_) The receptive field is computed as follows. Let the layers has kernel size, stride, dilation :math:`(k_n, s_n, d_n)` respectively. Let each feature map has receptive field length :math:`r_n`, and the difference of receptive fields of adjacent positions (layers) be :math:`f_n`. By convention, :math:`(r_0, f_0) = (1, 1)`. Then one has .. math:: \\begin{eqnarray} r_{n+1} & = & r_n + d_n(k_n-1)f_n, \\\\ f_{n+1} & = & s_n f_n. \\end{eqnarray} Hence .. math:: \\begin{eqnarray} f_{n} & = & \\prod\\limits_{i=0}^{n-1} s_i, \\\\ r_{n} & = & 1 + \\sum\\limits_{i=0}^{n-1} d_i(k_i-1) \\prod\\limits_{i=0}^{j-1} s_j, \\end{eqnarray} with empty products equaling 1 by convention. Parameters ---------- kernel_sizes : int or Sequence[int], default 1 The sequence of kernel size for all the layers in the flow. strides : int or Sequence[int], default 1 The sequence of strides for all the layers in the flow dilations : int or Sequence[int], default 1 The sequence of strides for all the layers in the flow input_len : int, optional Length of the first feature map in the flow. fs : numbers.Real, optional Sampling frequency of the input signal. If is not ``None``, then the receptive field is returned in seconds. Returns ------- receptive_field : int or float (Length of) the receptive field, in samples if `fs` is ``None``, otherwise in seconds. Examples -------- >>> compute_receptive_field([11,2,7,7,2,5,5,5,2],[1,2,1,1,2,1,1,1,2]) 90 >>> compute_receptive_field([11,2,7,7,2,5,5,5,2],[1,2,1,1,2,1,1,1,2],fs=500) 0.18 >>> compute_receptive_field([11,2,7,7,2,5,5,5,2],[1,2,1,1,2,1,1,1,2],[2,1,2,4,1,8,8,8,1]) 484 >>> compute_receptive_field([11,2,7,7,2,5,5,5,2],[1,2,1,1,2,1,1,1,2],[4,1,4,8,1,16,32,64,1]) 1984 The above example exhibits the receptive fields of the output feature maps of the 3 branches of the multi-scopic net, using its original hyper-parameters, (note the 3 max pooling layers). .. footbibliography:: """ _kernel_sizes = [kernel_sizes] if isinstance(kernel_sizes, int) else list(kernel_sizes) num_layers = len(_kernel_sizes) _strides = list(repeat(strides, num_layers)) if isinstance(strides, int) else list(strides) _dilations = list(repeat(dilations, num_layers)) if isinstance(dilations, int) else list(dilations) assert num_layers == len(_strides) == len(_dilations) receptive_field = 1 for idx, (k, d) in enumerate(zip(_kernel_sizes, _dilations)): s = np.prod(_strides[:idx]) if idx > 0 else 1 receptive_field += d * (k - 1) * s if input_len is not None: receptive_field = min(receptive_field, input_len) if fs is not None: receptive_field /= fs return make_serializable(receptive_field)
[docs]def default_collate_fn( batch: Sequence[Union[Tuple[np.ndarray, ...], Dict[str, np.ndarray]]] ) -> Union[Tuple[Tensor, ...], Dict[str, Tensor]]: """Default collate functions for model training. The data generator (:class:`~torch.utils.data.Dataset`) should generate (`__getitem__`) n-tuples ``signals, labels, ...``, or dictionaries of tensors. Parameters ---------- batch : sequence Sequence of n-tuples, in which the first element is the signal, the second is the label, ...; or sequence of dictionaries of tensors. Returns ------- tuple or dict Tuple or dict of :class:`~torch.Tensor`, which are the concatenated values to feed into neural networks. """ if isinstance(batch[0], dict): keys = batch[0].keys() collated = _default_collate_fn([tuple(b[k] for k in keys) for b in batch]) return {k: collated[i] for i, k in enumerate(keys)} else: return _default_collate_fn(batch)
def _default_collate_fn(batch: Sequence[Tuple[np.ndarray, ...]]) -> Tuple[Tensor, ...]: """Collate functions for tuples of tensors. The data generator (:class:`~torch.utils.data.Dataset`) should generate (`__getitem__`) n-tuples ``signals, labels, ...``, or dictionaries of tensors. Parameters ---------- batch : sequence Sequence of n-tuples, in which the first element is the signal, the second is the label, ... Returns ------- Tuple[torch.Tensor] The concatenated values to feed into neural networks. """ try: n_fields = len(batch[0]) except Exception: raise ValueError("Invalid batch") if n_fields == 0: raise ValueError("No data") ret = [] for i in range(n_fields): values = [[item[i]] for item in batch] values = np.concatenate(values, axis=0) values = torch.from_numpy(values) ret.append(values) return tuple(ret) def _adjust_cnn_filter_lengths( config: dict, fs: int, ensure_odd: bool = True, pattern: str = "filter_length|filter_size", ) -> dict: """ Adjust the filter lengths (kernel sizes) in the config for convolutional neural networks, according to the new sampling frequency. Parameters ---------- config : dict The config dictionary. This `dict` is **NOT** modified. fs : int The new sampling frequency. ensure_odd : bool, default True If True, the new filter lengths are ensured to be odd. pattern : str, default "filter_length|filter_size" The pattern to search for in the config items related to filter lengths. Returns ------- config : dict The adjusted config dictionary. """ assert "fs" in config config = deepcopy(config) for k, v in config.items(): if isinstance(v, dict): tmp_config = deepcopy(v) original_fs = tmp_config.get("fs", None) tmp_config.update({"fs": config["fs"]}) config[k] = _adjust_cnn_filter_lengths(tmp_config, fs, ensure_odd, pattern) if original_fs is None: config[k].pop("fs", None) else: config[k]["fs"] = fs elif re.findall(pattern, k): if isinstance(v, (Sequence, np.ndarray)): # DO NOT use `Iterable` config[k] = [ _adjust_cnn_filter_lengths({"filter_length": fl, "fs": config["fs"]}, fs, ensure_odd)["filter_length"] for fl in v ] elif isinstance(v, Real): # DO NOT use `int`, which might not work for numpy array elements if v > 1: config[k] = int(round(v * fs / config["fs"])) if ensure_odd: config[k] = config[k] - config[k] % 2 + 1 elif isinstance(v, Sequence) and not isinstance(v, (str, bytes)): tmp_configs = [_adjust_cnn_filter_lengths({k: item, "fs": config["fs"]}, fs, ensure_odd, pattern) for item in v] config[k] = [item[k] for item in tmp_configs] return config
[docs]def adjust_cnn_filter_lengths( config: dict, fs: int, ensure_odd: bool = True, pattern: str = "filter_length|filter_size", ) -> dict: """ Adjust the filter lengths in the config for convolutional neural networks, according to the new sampling frequency. Parameters ---------- config : dict The config dictionary. This `dict` is **NOT** modified. fs : int The new sampling frequency. ensure_odd : bool, default True If True, the new filter lengths are ensured to be odd. pattern : str, default "filter_length|filter_size" The pattern to search for in the config items related to filter lengths. Returns ------- config : dict The adjusted config dictionary. """ config = _adjust_cnn_filter_lengths(config, fs, ensure_odd, pattern) config["fs"] = fs return config
[docs]class SizeMixin(object): """Mixin class for size related methods""" @property def module_size(self) -> int: return compute_module_size(self) @property def module_size_(self) -> str: return compute_module_size(self, human=True) @property def sizeof(self) -> int: return compute_module_size(self, requires_grad=False, include_buffers=True, human=False) @property def sizeof_(self) -> str: return compute_module_size(self, requires_grad=False, include_buffers=True, human=True) @property def dtype(self) -> torch.dtype: try: return next(self.parameters()).dtype except StopIteration: return torch.float32 except Exception as err: raise err # unknown error @property def device(self) -> torch.device: try: return next(self.parameters()).device except StopIteration: return torch.device("cpu") except Exception as err: raise err # unknown error @property def dtype_(self) -> str: return str(self.dtype).replace("torch.", "") @property def device_(self) -> str: return str(self.device)
[docs]class CkptMixin(object): """Mixin class for loading from checkpoint class methods"""
[docs] @classmethod def from_checkpoint( cls, path: Union[str, bytes, os.PathLike], device: Optional[torch.device] = None ) -> Tuple[nn.Module, dict]: """Load a model from a checkpoint. Parameters ---------- path : `path-like` Path of the checkpoint. device : torch.device, optional Map location of the model parameters, defaults to "cuda" if available, otherwise "cpu". Returns ------- model : torch.nn.Module The model loaded from a checkpoint. aux_config : dict Auxiliary configs that are needed for data preprocessing, etc. """ _device = device or DEFAULTS.device ckpt = torch.load(path, map_location=_device) aux_config = ckpt.get("train_config", None) or ckpt.get("config", None) assert aux_config is not None, "input checkpoint has no sufficient data to recover a model" kwargs = dict( config=ckpt["model_config"], ) if "classes" in aux_config: kwargs["classes"] = aux_config["classes"] if "n_leads" in aux_config: kwargs["n_leads"] = aux_config["n_leads"] model = cls(**kwargs) model.load_state_dict(ckpt["model_state_dict"]) return model, aux_config
[docs] def save(self, path: Union[str, bytes, os.PathLike], train_config: CFG) -> None: """Save the model to disk. Parameters ---------- path : `path-like` Path to save the model. train_config : CFG Config for training the model, used when one restores the model. Returns ------- None """ path = Path(path) if not path.parent.exists(): path.parent.mkdir(parents=True) torch.save( { "model_state_dict": self.state_dict(), "model_config": self.config, "train_config": train_config, }, path, )