Source code for torch_ecg.models.cnn.densenet

"""
The core part of the SOTA model (framework) of CPSC2020

Its key points:

1. ECG signal pre-processing: filters out the high frequency noise and baseline drift in the ECG signal
2. QRS complex detection
3. noisy heartbeat recognition: transient noise and artifacts
4. atrial fibrillation recognition: remove SPB and carefully distinguish PVC and AF beats with aberrant ventricular conduction in episodes with atrial fibrillation
5. PVC and SPB model detection: DenseNet
6. post-processing with clinical rules: a set of clinical experiences and rules including rhythm and morphological rules to suppress false positives and search for false negatives of PVC and SPB detection

"""

import math
import textwrap
from copy import deepcopy
from itertools import repeat
from typing import List, Optional, Sequence, Union

import torch
from torch import Tensor, nn

from ...cfg import CFG
from ...models._nets import Conv_Bn_Activation, DownSample
from ...utils.misc import CitationMixin, add_docstring, list_sum
from ...utils.utils_nn import SizeMixin, compute_sequential_output_shape, compute_sequential_output_shape_docstring

__all__ = [
    "DenseNet",
    "DenseBasicBlock",
    "DenseBottleNeck",
    "DenseMacroBlock",
    "DenseTransition",
]


if not hasattr(nn, "Dropout1d"):
    nn.Dropout1d = nn.Dropout  # added in pytorch 1.12


class DenseBasicBlock(nn.Module, SizeMixin):
    """The basic building block for DenseNet.

    This block consists of
    normalization -> activation -> convolution (-> dropout (optional)).
    The output Tensor is the concatenation of old features (input) with new features.

    Parameters
    ----------
    in_channels : int
        Number of features (channels) of the input.
    growth_rate : int
        Number of features of (channels) output from the main stream,
        further concatenated to the shortcut,
        hence making the final number of output channels grow by this value.
    filter_length : int
        Length (size) of the filter kernels.
    groups : int, default 1
        Pattern of connections between inputs and outputs.
        For more details, ref. :class:`torch.nn.Conv1d`.
    bias : bool, default False
        Whether to use bias in the convolution layers.
    dropout : float or dict, default 0.0
        Dropout rate of the new features produced from the main stream.
        If is a dict, it should contain the keys ``"p"`` and ``"type"``,
        where ``"p"`` is the dropout rate and ``"type"`` is the type of dropout,
        which can be either ``"1d"`` (:class:`torch.nn.Dropout1d`) or
        ``None`` (:class:`torch.nn.Dropout`).
    config : dict, optional
        Additional hyper-parameters, including
        activation choices, memory_efficient choices, etc.

    """

    __name__ = "DenseBasicBlock"
    __DEFAULT_CONFIG__ = CFG(
        activation="relu",
        kw_activation={"inplace": True},
        memory_efficient=False,
    )

    def __init__(
        self,
        in_channels: int,
        growth_rate: int,
        filter_length: int,
        groups: int = 1,
        bias: bool = False,
        dropout: Union[float, dict] = 0.0,
        **config,
    ) -> None:
        super().__init__()
        self.__in_channels = in_channels
        self.__growth_rate = growth_rate
        self.__kernel_size = filter_length
        self.__groups = groups
        self.config = CFG(deepcopy(self.__DEFAULT_CONFIG__))
        self.config.update(deepcopy(config))
        assert in_channels % groups == 0, f"`in_channels` (= `{in_channels}`) must be divisible by `groups` (= `{groups}`)"
        assert growth_rate % groups == 0, f"`growth_rate` (= `{growth_rate}`) must be divisible by `groups` (= `{groups}`)"

        self.bac = Conv_Bn_Activation(
            in_channels=self.__in_channels,
            out_channels=self.__growth_rate,
            kernel_size=self.__kernel_size,
            stride=1,
            dilation=1,
            groups=self.__groups,
            norm=True,
            activation=self.config.activation.lower(),
            kw_activation=self.config.kw_activation,
            bias=bias,
            ordering="bac",
        )
        if isinstance(dropout, dict):
            if dropout["type"] == "1d" and dropout["p"] > 0:
                self.dropout = nn.Dropout1d(dropout["p"])
            elif dropout["type"] is None and dropout["p"] > 0:
                self.dropout = nn.Dropout(dropout["p"])
            else:
                self.dropout = nn.Identity()
        elif dropout > 0:
            self.dropout = nn.Dropout(dropout)
        else:
            self.dropout = nn.Identity()

    def forward(self, input: Tensor) -> Tensor:
        """Forward pass of the network.

        Parameters
        ----------
        input : torch.Tensor
            Input tensor,
            of shape ``(batch_size, n_channels, seq_len)``.

        Returns
        -------
        output : torch.Tensor
            Output tensor,
            of shape (batch_size, n_channels, seq_len).

        """
        new_features = self.bac(input)
        new_features = self.dropout(new_features)
        if self.__groups == 1:
            output = torch.cat([input, new_features], dim=1)
        else:  # see TODO of `DenseNet`
            # input width per group
            iw_per_group = self.__in_channels // self.__groups
            # new features width per group
            nfw_per_group = self.__growth_rate // self.__groups
            output = torch.cat(
                list_sum(
                    [
                        [
                            input[:, iw_per_group * i : iw_per_group * (i + 1), :],
                            new_features[:, nfw_per_group * i : nfw_per_group * (i + 1), :],
                        ]
                        for i in range(self.__groups)
                    ]
                ),
                1,
            )
        return output

    def compute_output_shape(
        self, seq_len: Optional[int] = None, batch_size: Optional[int] = None
    ) -> Sequence[Union[int, None]]:
        """Compute the output shape of the network.

        Parameters
        ----------
        seq_len : int, optional
            Length of the 1d sequence input.
        batch_size : int, optional
            The batch size.

        Returns
        -------
        output_shape : sequence
            The output shape of the network.

        """
        out_channels = self.__in_channels + self.__growth_rate
        output_shape = (batch_size, out_channels, seq_len)
        return output_shape


class DenseBottleNeck(nn.Module, SizeMixin):
    """
    Bottleneck modification of :class:`DenseBasicBlock`,
    with an additional prefixed sequence of
    (normalization -> activation -> convolution of kernel size 1)

    Parameters
    ----------
    in_channels : int
        Number of features (channels) of the input.
    growth_rate : int
        Number of features of (channels) output from the main stream,
        further concatenated to the shortcut,
        hence making the final number of output channels grow by this value.
    bn_size : int
        Base width of intermediate layers (the bottleneck).
    filter_length : int
        Length (size) of the filter kernels of the second convolutional layer.
    groups : int, default 1
        Pattern of connections between inputs and outputs.
        For more details, ref. :class:`torch.nn.Conv1d`.
    bias : bool, default False
        Whether to use bias in the convolutional layers.
    dropout : float or dict, default 0.0
        Dropout rate of the new features produced from the main stream.
        If is a dict, it should contain the keys ``"p"`` and ``"type"``,
        where ``"p"`` is the dropout rate and ``"type"`` is the type of dropout,
        which can be either ``"1d"`` (:class:`torch.nn.Dropout1d`) or
        ``None`` (:class:`torch.nn.Dropout`).
    config : dict, optional
        Other hyper-parameters, including
        activation choices, memory_efficient choices, etc.

    """

    __name__ = "DenseBottleNeck"
    __DEFAULT_CONFIG__ = CFG(
        activation="relu",
        kw_activation={"inplace": True},
        memory_efficient=False,
    )

    def __init__(
        self,
        in_channels: int,
        growth_rate: int,
        bn_size: int,
        filter_length: int,
        groups: int = 1,
        bias: bool = False,
        dropout: Union[float, dict] = 0.0,
        **config,
    ) -> None:
        super().__init__()
        self.__in_channels = in_channels
        self.__growth_rate = growth_rate
        self.__bn_size = bn_size
        self.__kernel_size = filter_length
        self.__groups = groups
        self.config = CFG(deepcopy(self.__DEFAULT_CONFIG__))
        self.config.update(deepcopy(config))
        bottleneck_channels = self.__bn_size * self.__growth_rate

        self.neck_conv = Conv_Bn_Activation(
            in_channels=self.__in_channels,
            out_channels=bottleneck_channels,
            kernel_size=1,
            stride=1,
            dilation=1,
            groups=groups,
            norm=True,
            activation=self.config.activation.lower(),
            kw_activation=self.config.kw_activation,
            bias=bias,
            ordering="bac",
        )
        self.main_conv = Conv_Bn_Activation(
            in_channels=bottleneck_channels,
            out_channels=self.__growth_rate,
            kernel_size=self.__kernel_size,
            stride=1,
            dilation=1,
            groups=self.__groups,
            norm=True,
            activation=self.config.activation.lower(),
            kw_activation=self.config.kw_activation,
            bias=bias,
            ordering="bac",
        )
        if isinstance(dropout, dict):
            if dropout["type"] == "1d" and dropout["p"] > 0.0:
                self.dropout = nn.Dropout1d(dropout["p"])
            elif dropout["type"] is None and dropout["p"] > 0.0:
                self.dropout = nn.Dropout(dropout["p"])
            else:
                self.dropout = nn.Identity()
        elif dropout > 0.0:
            self.dropout = nn.Dropout(dropout)
        else:
            self.dropout = nn.Identity()

    def bn_function(self, input: Tensor) -> Tensor:
        """BottleNeck function.

        The "not memory_efficient" way.

        Parameters
        ----------
        input : torch.Tensor
            Input tensor,
            of shape ``(batch_size, n_channels, seq_len)``.

        Returns
        -------
        bottleneck_output : torch.Tensor
            of shape ``(batch_size, n_channels, seq_len)``.

        """
        bottleneck_output = self.neck_conv(input)
        return bottleneck_output

    def forward(self, input: Tensor) -> Tensor:
        """Forward pass.

        Parameters
        ----------
        input : torch.Tensor
            Input tensor,
            of shape ``(batch_size, n_channels, seq_len)``.

        Returns
        -------
        output : torch.Tensor
            Output tensor,
            of shape ``(batch_size, n_channels, seq_len)``.

        """
        if self.config.memory_efficient:
            raise NotImplementedError
        else:
            new_features = self.bn_function(input)
        new_features = self.main_conv(new_features)
        new_features = self.dropout(new_features)
        if self.__groups == 1:
            output = torch.cat([input, new_features], dim=1)
        else:  # see TODO of `DenseNet`
            # input width per group
            iw_per_group = self.__in_channels // self.__groups
            # new features width per group
            nfw_per_group = self.__growth_rate // self.__groups
            output = torch.cat(
                list_sum(
                    [
                        [
                            input[:, iw_per_group * i : iw_per_group * (i + 1), :],
                            new_features[:, nfw_per_group * i : nfw_per_group * (i + 1), :],
                        ]
                        for i in range(self.__groups)
                    ]
                ),
                1,
            )
        return output

    def compute_output_shape(
        self, seq_len: Optional[int] = None, batch_size: Optional[int] = None
    ) -> Sequence[Union[int, None]]:
        """Compute the output shape of the network.

        Parameters
        ----------
        seq_len : int, optional
            Length of the 1d sequence input.
        batch_size : int, optional
            The batch size.

        Returns
        -------
        output_shape : sequence
            The output shape of the network.

        """
        out_channels = self.__in_channels + self.__growth_rate
        output_shape = (batch_size, out_channels, seq_len)
        return output_shape


class DenseMacroBlock(nn.Sequential, SizeMixin):
    """Macro blocks for :class:`DenseNet`.

    Composed of a stacked sequence
    of builing blocks of similar pattern.

    Parameters
    ----------
    in_channels : int
        Number of features (channels) of the input.
    num_layers : int
        Number of building block layers.
    growth_rates: int or Sequence[int]
        Growth rate(s) for each building block layers.
        If is sequence of int, should have length equal to `num_layers`.
    bn_size : int
        Base width of intermediate layers for :class:`DenseBottleNeck`,
        not used for :class:`DenseBasicBlock`.
    filter_lengths: int or Sequence[int]
        Filter lengths(s) (kernel size(s)) for each building block layers.
        If is sequence of int, should have length equal to `num_layers`.
    groups : int, default 1
        Pattern of connections between inputs and outputs.
        For more details, ref. :class:`torch.nn.Conv1d`.
    bias : bool, default False
        Whether to use bias in convolutional layers.
    dropout : float, default 0.0,
        Dropout rate of the new features produced from the main stream
    config : dict, optional
        Other hyper-parameters, including
        extra kw for :class:`DenseBottleNeck`, and
        activation choices, memory_efficient choices, etc.

    """

    __name__ = "DenseMacroBlock"
    building_block = DenseBottleNeck

    def __init__(
        self,
        in_channels: int,
        num_layers: int,
        growth_rates: Union[Sequence[int], int],
        bn_size: int,
        filter_lengths: Union[Sequence[int], int],
        groups: int = 1,
        bias: bool = False,
        dropout: float = 0.0,
        **config,
    ) -> None:
        super().__init__()
        self.__in_channels = in_channels
        self.__num_layers = num_layers
        if isinstance(growth_rates, int):
            self.__growth_rates = list(repeat(growth_rates, num_layers))
        else:
            self.__growth_rates = list(growth_rates)
        assert len(self.__growth_rates) == self.__num_layers
        self.__bn_size = bn_size
        if isinstance(filter_lengths, int):
            self.__filter_lengths = list(repeat(filter_lengths, num_layers))
        else:
            self.__filter_lengths = list(filter_lengths)
        assert len(self.__filter_lengths) == self.__num_layers
        self.__groups = groups
        self.config = deepcopy(config)
        if self.config.get("building_block", "").lower() in [
            "basic",
            "basic_block",
        ]:
            self.building_block = DenseBasicBlock

        for idx in range(self.__num_layers):
            self.add_module(
                f"dense_building_block_{idx}",
                self.building_block(
                    in_channels=self.__in_channels + idx * self.__growth_rates[idx],
                    growth_rate=self.__growth_rates[idx],
                    bn_size=self.__bn_size,
                    filter_length=self.__filter_lengths[idx],
                    groups=self.__groups,
                    bias=bias,
                    dropout=dropout,
                    **(self.config),
                ),
            )

    @add_docstring(
        textwrap.indent(compute_sequential_output_shape_docstring, " " * 4),
        mode="append",
    )
    def compute_output_shape(
        self, seq_len: Optional[int] = None, batch_size: Optional[int] = None
    ) -> Sequence[Union[int, None]]:
        """Compute the output shape of the network."""
        return compute_sequential_output_shape(self, seq_len, batch_size)


class DenseTransition(nn.Sequential, SizeMixin):
    """Transition blocks between :class:`DenseMacroBlock`.

    This is a module that is
    used to perform sub-sampling,
    and compression of channels if specified.

    Parameters
    ----------
    in_channels : int
        Number of features (channels) of the input.
    compression : float, default 1.0
        Compression factor, proportion of the number of output channels
        to the number of input channels.
    subsample_length : int, default 2
        Subsampling length (size).
    groups : int, default 1
        Pattern of connections between inputs and outputs.
        For more details, ref. :class:`torch.nn.Conv1d`.
    bias : bool, default False
        Whether to use bias in convolutional layers.
    config : dict, optional
        Other parameters, including
        activation choices, subsampling mode (method), etc.

    """

    __name__ = "DenseTransition"
    __DEFAULT_CONFIG__ = CFG(
        activation="relu",
        kw_activation={"inplace": True},
        subsample_mode="avg",
    )

    def __init__(
        self,
        in_channels: int,
        compression: float = 1.0,
        subsample_length: int = 2,
        groups: int = 1,
        bias: bool = False,
        **config,
    ) -> None:
        super().__init__()
        self.__in_channels = in_channels
        self.__compression = compression
        self.__subsample_length = subsample_length
        self.__groups = groups
        assert 0 < self.__compression <= 1.0 and self.__in_channels % self.__groups == 0
        self.config = CFG(deepcopy(self.__DEFAULT_CONFIG__))
        self.config.update(deepcopy(config))

        # input width per group
        iw_per_group = self.__in_channels // self.__groups
        # new feature widths per group
        nfw_per_group = math.floor(iw_per_group * self.__compression)
        self.__out_channels = nfw_per_group * self.__groups

        self.add_module(
            "bac",
            Conv_Bn_Activation(
                in_channels=self.__in_channels,
                out_channels=self.__out_channels,
                kernel_size=1,
                stride=1,
                dilation=1,
                groups=self.__groups,
                norm=True,
                activation=self.config.activation.lower(),
                kw_activation=self.config.kw_activation,
                bias=bias,
                ordering="bac",
            ),
        )
        self.add_module(
            "down",
            DownSample(
                down_scale=self.__subsample_length,
                in_channels=self.__out_channels,
                mode=self.config.subsample_mode.lower(),
            ),
        )

    @add_docstring(
        textwrap.indent(compute_sequential_output_shape_docstring, " " * 4),
        mode="append",
    )
    def compute_output_shape(
        self, seq_len: Optional[int] = None, batch_size: Optional[int] = None
    ) -> Sequence[Union[int, None]]:
        """Compute the output shape of the network."""
        return compute_sequential_output_shape(self, seq_len, batch_size)


[docs]class DenseNet(nn.Sequential, SizeMixin, CitationMixin): """The core part of the SOTA model (framework) of CPSC2020. DenseNet is originally proposed in [1]_, [2]_ (journal version). The original implementation is available at [3]_ and [4]_. [5]_ is an unofficial implementation of DenseNet in PyTorch. Torchvision also provides an implementation of DenseNet [6]_. DenseNet is not only successful in image classification, but also in various ECG-related tasks, and is the core part of the SOTA model (framework) of CPSC2020. Parameters ---------- in_channels : int Number of features (channels) of the input. config : dict Other hyper-parameters of the Module, ref. corresponding config file. Keyword arguments that must be set are as follows: - num_layers: sequence of int, number of building block layers of each dense (macro) block - init_num_filters: sequence of int, number of filters of the first convolutional layer - init_filter_length: sequence of int, filter length (kernel size) of the first convolutional layer - init_conv_stride: int, stride of the first convolutional layer - init_pool_size: int, pooling kernel size of the first pooling layer - init_pool_stride: int, pooling stride of the first pooling layer - growth_rates: int or sequence of int or sequence of sequences of int, growth rates of the building blocks, with granularity to the whole network, or to each dense (macro) block, or to each building block - filter_lengths: int or sequence of int or sequence of sequences of int, filter length(s) (kernel size(s)) of the convolutions, with granularity to the whole network, or to each macro block, or to each building block - subsample_lengths: int or sequence of int, subsampling length(s) (ratio(s)) of the transition blocks - compression: float, compression factor of the transition blocks - bn_size: int, bottleneck base width, used only when building block is :class:`DenseBottleNeck` - dropouts: float or dict, dropout ratio of each building block - groups: int, connection pattern (of channels) of the inputs and outputs - block: dict, other parameters that can be set for the building blocks For a full list of configurable parameters, ref. corr. config file NOTE ---- The difference of forward output of [5]_ from others, however [5]_ doesnot support dropout. TODO ---- 1. For `groups` > 1, the concatenated output should be re-organized in the channel dimension? 2. memory-efficient mode, i.e. storing the `new_features` in a shared memory instead of stacking in newly created :class:`~torch.Tensor` after each mini-block. References ---------- .. [1] G. Huang, Z. Liu, L. Van Der Maaten and K. Q. Weinberger, "Densely Connected Convolutional Networks," 2017 IEEE Conference on Computer Vision and Pattern Recognition (CVPR), Honolulu, HI, 2017, pp. 2261-2269, doi: 10.1109/CVPR.2017.243. .. [2] G. Huang, Z. Liu, G. Pleiss, L. Van Der Maaten and K. Weinberger, "Convolutional Networks with Dense Connectivity," in IEEE Transactions on Pattern Analysis and Machine Intelligence, doi: 10.1109/TPAMI.2019.2918284. .. [3] https://github.com/liuzhuang13/DenseNet/tree/master/models .. [4] https://github.com/gpleiss/efficient_densenet_pytorch/blob/master/models/densenet.py .. [5] https://github.com/bamos/densenet.pytorch/blob/master/densenet.py .. [6] https://github.com/pytorch/vision/blob/main/torchvision/models/densenet.py """ __name__ = "DenseNet" __DEFAULT_CONFIG__ = CFG( bias=False, activation="relu", kw_activation={"inplace": True}, kernel_initializer="he_normal", kw_initializer={}, init_subsample_mode="avg", ) def __init__(self, in_channels: int, **config) -> None: """ """ super().__init__() self.__in_channels = in_channels self.config = CFG(deepcopy(self.__DEFAULT_CONFIG__)) self.config.update(deepcopy(config)) self.__num_blocks = len(self.config.num_layers) self.add_module( "init_cba", Conv_Bn_Activation( in_channels=self.__in_channels, out_channels=self.config.init_num_filters, kernel_size=self.config.init_filter_length, stride=1, dilation=1, groups=self.config.groups, norm=True, activation=self.config.activation.lower(), kw_activation=self.config.kw_activation, bias=self.config.bias, ), ) self.add_module( "init_pool", DownSample( down_scale=self.config.init_pool_stride, in_channels=self.config.init_num_filters, kernel_size=self.config.init_pool_size, padding=(self.config.init_pool_size - 1) // 2, mode=self.config.init_subsample_mode.lower(), ), ) if isinstance(self.config.growth_rates, int): self.__growth_rates = list(repeat(self.config.growth_rates, self.__num_blocks)) else: self.__growth_rates = list(self.config.growth_rates) assert len(self.__growth_rates) == self.__num_blocks, ( f"`config.growth_rates` indicates {len(self.__growth_rates)} macro blocks, " f"while `config.num_layers` indicates {self.__num_blocks}" ) if isinstance(self.config.filter_lengths, int): self.__filter_lengths = list(repeat(self.config.filter_lengths, self.__num_blocks)) else: self.__filter_lengths = list(self.config.filter_lengths) assert len(self.__filter_lengths) == self.__num_blocks, ( f"`config.filter_lengths` indicates {len(self.__filter_lengths)} macro blocks, " f"while `config.num_layers` indicates {self.__num_blocks}" ) if isinstance(self.config.subsample_lengths, int): self.__subsample_lengths = list(repeat(self.config.subsample_lengths, self.__num_blocks - 1)) else: self.__subsample_lengths = list(self.config.subsample_lengths) assert len(self.__subsample_lengths) == self.__num_blocks - 1, ( f"`config.subsample_lengths` indicates {len(self.__subsample_lengths)+1} macro blocks, " f"while `config.num_layers` indicates {self.__num_blocks}" ) macro_in_channels = self.config.init_num_filters for idx, macro_num_layers in enumerate(self.config.num_layers): dmb = DenseMacroBlock( in_channels=macro_in_channels, num_layers=macro_num_layers, growth_rates=self.__growth_rates[idx], bn_size=self.config.bn_size, filter_lengths=self.__filter_lengths[idx], groups=self.config.groups, bias=self.config.bias, dropout=self.config.dropout, **(self.config.block), ) _, transition_in_channels, _ = dmb.compute_output_shape() self.add_module(f"dense_macro_block_{idx}", dmb) if idx < self.__num_blocks - 1: dt = DenseTransition( in_channels=transition_in_channels, compression=self.config.compression, subsample_length=self.__subsample_lengths[idx], groups=self.config.groups, bias=self.config.bias, **(self.config.transition), ) _, macro_in_channels, _ = dt.compute_output_shape() self.add_module(f"transition_{idx}", dt)
[docs] @add_docstring( textwrap.indent(compute_sequential_output_shape_docstring, " " * 4), mode="append", ) def compute_output_shape( self, seq_len: Optional[int] = None, batch_size: Optional[int] = None ) -> Sequence[Union[int, None]]: """Compute the output shape of the network.""" return compute_sequential_output_shape(self, seq_len, batch_size)
@property def in_channels(self) -> int: return self.__in_channels @property def doi(self) -> List[str]: return list(set(self.config.get("doi", []) + ["10.1109/cvpr.2017.243"]))