Source code for torch_ecg.databases.nsrr_databases.shhs

# -*- coding: utf-8 -*-

import itertools
import os
import re
import warnings
from datetime import datetime
from numbers import Real
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union

import numpy as np
import pandas as pd
import scipy.signal as SS
import xmltodict as xtd
from tqdm.auto import tqdm

from ...cfg import DEFAULTS
from ...utils.misc import add_docstring
from ...utils.utils_interval import intervals_union
from ..base import DataBaseInfo, NSRRDataBase, PSGDataBaseMixin

__all__ = [
    "SHHS",
]


_SHHS_INFO = DataBaseInfo(
    title="""
    Sleep Heart Health Study
    """,
    about=r"""
    **ABOUT the dataset** (Main webpage [1]_):

    1. shhs1 (Visit 1):

        - the baseline clinic visit and polysomnogram performed between November 1, 1995 and January 31, 1998
        - in all, 6,441 men and women aged 40 years and older were enrolled
        - 5,804 rows, down from the original 6,441 due to data sharing rules on certain cohorts and subjects

    2. shhs-interim-followup (Interim Follow-up):

        - an interim clinic visit or phone call 2-3 years after baseline (shhs1)
        - 5,804 rows, despite some subjects not having complete data, all original subjects are present in the dataset

    3. shhs2 (Visit 2):

        - the follow-up clinic visit and polysomnogram performed between January 2001 and June 2003
        - during this exam cycle 3, a second polysomnogram was obtained in 3,295 of the participants
        - 4,080 rows, not all cohorts and subjects took part

    4. shhs-cvd (CVD Outcomes):

        - the tracking of adjudicated heart health outcomes (e.g. stroke, heart attack) between baseline (shhs1) and 2008-2011 (varies by parent cohort)
        - 5,802 rows, outcomes data were not provided on all subjects

    5. shhs-cvd-events (CVD Outcome Events):

        - event-level details for the tracking of heart health outcomes (shhs-cvd)
        - 4,839 rows, representing individual events

    6. ECG was sampled at 125 Hz in shhs1 and 250/256 Hz in shhs2
    7. `annotations-events-nsrr` and `annotations-events-profusion`:
       annotation files both contain xml files, the former processed in the EDF Editor and Translator tool,
       the latter exported from Compumedics Profusion
    8. about 10% of the records have HRV (including sleep stages and sleep events) annotations

    **DATA Analysis Tips**:

    1. Respiratory Disturbance Index (RDI):

        - A number of RDI variables exist in the data set. These variables are highly skewed.
        - log-transformation is recommended, among which the following transformation performed best, at least in some subsets:

          .. math::

            NEWVA = \log(OLDVAR + 0.1)

    2. Obstructive Apnea Index (OAI):

        - There is one OAI index in the data set. It reflects obstructive events associated with a 4% desaturation or arousal. Nearly 30% of the cohort has a zero value for this variable
        - Dichotomization is suggested (e.g. >=3 or >=4 events per hour indicates positive)

    3. Central Apnea Index (CAI):

        - Several variables describe central breathing events, with different thresholds for desaturation and requirement/non-requirement of arousals. ~58% of the cohort have zero values
        - Dichotomization is suggested (e.g. >=3 or >=4 events per hour indicates positive)

    4. Sleep Stages:

        - Stage 1 and stage 3-4 are not normally distributed, but stage 2 and REM sleep are.
        - To use these data as continuous dependent variables, stages 1 and 3-4 must be transformed. The following formula is suggested:

          .. math::

            –\log(-\log(val/100+0.001))

    5. Sleep time below 90% O2:

        - Percent of total sleep time with oxygen levels below 75%, 80%, 85% and 90% were recorded
        - Dichotomization is suggested (e.g. >5% and >10% of sleep time with oxygen levels below a specific O2 level indicates positive)

    **ABOUT signals**: (ref. [9]_)

    1. C3/A2 and C4/A1 EEGs, sampled at 125 Hz
    2. right and left electrooculograms (EOGs), sampled at 50 Hz
    3. a bipolar submental electromyogram (EMG), sampled at 125 Hz
    4. thoracic and abdominal excursions (THOR and ABDO), recorded by inductive plethysmography bands and sampled at 10 Hz
    5. "AIRFLOW" detected by a nasal-oral thermocouple, sampled at 10 Hz
    6. finger-tip pulse oximetry sampled at 1 Hz
    7. ECG from a bipolar lead, sampled at 125 Hz for most SHHS-1 studies and 250 (and 256?) Hz for SHHS-2 studies
    8. Heart rate (PR) derived from the ECG and sampled at 1 Hz
    9. body position (using a mercury gauge sensor)
    10. ambient light (on/off, by a light sensor secured to the recording garment)

    **ABOUT annotations** (NOT including "nsrrid", "visitnumber", "pptid" etc.):

    1. hrv annotations: (in csv files, ref. [2]_)

        +-------------+------------------------------------------------------------------+
        | Start__sec_ | 5 minute window start time                                       |
        +-------------+------------------------------------------------------------------+
        | NN_RR       | Ratio of consecutive normal sinus beats (NN)                     |
        |             | over all cardiac inter-beat (RR) intervals (NN/RR)               |
        +-------------+------------------------------------------------------------------+
        | AVNN        | Mean of all normal sinus to normal sinus interbeat               |
        |             | intervals (NN)                                                   |
        +-------------+------------------------------------------------------------------+
        | IHR         | Instantaneous heart rate                                         |
        +-------------+------------------------------------------------------------------+
        | SDNN        | Standard deviation of all normal sinus                           |
        |             | to normal sinus interbeat (NN) intervals                         |
        +-------------+------------------------------------------------------------------+
        | SDANN       | Standard deviation of the averages of normal sinus to normal     |
        |             | sinus interbeat (NN) intervals in all 5-minute segments          |
        +-------------+------------------------------------------------------------------+
        | SDNNIDX     | Mean of the standard deviations of normal sinus to normal        |
        |             | sinus interbeat (NN) intervals in all 5-minute segments          |
        +-------------+------------------------------------------------------------------+
        | rMSSD       | Square root of the mean of the squares of difference between     |
        |             | adjacent normal sinus to normal sinus interbeat (NN) intervals   |
        +-------------+------------------------------------------------------------------+
        | pNN10       | Percentage of differences between adjacent normal sinus to       |
        |             | normal sinus interbeat (NN) intervals that are >10 ms            |
        +-------------+------------------------------------------------------------------+
        | pNN20       | Percentage of differences between adjacent normal sinus to       |
        |             | normal sinus interbeat (NN) intervals that are >20 ms            |
        +-------------+------------------------------------------------------------------+
        | pNN30       | Percentage of differences between adjacent normal sinus to       |
        |             | normal sinus interbeat (NN) intervals that are >30 ms            |
        +-------------+------------------------------------------------------------------+
        | pNN40       | Percentage of differences between adjacent normal sinus to       |
        |             | normal sinus interbeat (NN) intervals that are >40 ms            |
        +-------------+------------------------------------------------------------------+
        | pNN50       | Percentage of differences between adjacent normal sinus to       |
        |             | normal sinus interbeat (NN) intervals that are >50 ms            |
        +-------------+------------------------------------------------------------------+
        | tot_pwr     | Total normal sinus to normal sinus interbeat (NN) interval       |
        |             | spectral power up to 0.4 Hz                                      |
        +-------------+------------------------------------------------------------------+
        | ULF         | Ultra-low frequency power, the normal sinus to normal sinus      |
        |             | interbeat (NN) interval spectral power between 0 and 0.003 Hz    |
        +-------------+------------------------------------------------------------------+
        | VLF         | Very low frequency power, the normal sinus to normal sinus       |
        |             | interbeat (NN) interval spectral power between 0.003 and 0.04 Hz |
        +-------------+------------------------------------------------------------------+
        | LF          | Low frequency power, the normal sinus to normal sinus interbeat  |
        |             | (NN) interval spectral power between 0.04 and 0.15 Hz            |
        +-------------+------------------------------------------------------------------+
        | HF          | High frequency power, the normal sinus to normal sinus interbeat |
        |             | (NN) interval spectral power between 0.15 and 0.4 Hz             |
        +-------------+------------------------------------------------------------------+
        | LF_HF       | The ratio of low to high frequency                               |
        +-------------+------------------------------------------------------------------+
        | LF_n        | Low frequency power (normalized)                                 |
        +-------------+------------------------------------------------------------------+
        | HF_n        | High frequency power (normalized)                                |
        +-------------+------------------------------------------------------------------+

    2. wave delineation annotations: (in csv files, NOTE: see "CAUTION" by the end of this part, ref. [2]_)

        +--------------+------------------------------------------------------------------------------------------------+
        | RPoint       | Sample Number indicating R Point (peak of QRS)                                                 |
        +--------------+------------------------------------------------------------------------------------------------+
        | Start        | Sample Number indicating start of beat                                                         |
        +--------------+------------------------------------------------------------------------------------------------+
        | End          | Sample Number indicating end of beat                                                           |
        +--------------+------------------------------------------------------------------------------------------------+
        | STLevel1     | Level of ECG 1 in Raw data ( 65536 peak to peak rawdata = 10mV peak to peak)                   |
        +--------------+------------------------------------------------------------------------------------------------+
        | STSlope1     | Slope of ECG 1 stored as int and to convert to a double divide raw value by 1000.0             |
        +--------------+------------------------------------------------------------------------------------------------+
        | STLevel2     | Level of ECG 2 in Raw data ( 65536 peak to peak rawdata = 10mV peak to peak)                   |
        +--------------+------------------------------------------------------------------------------------------------+
        | STSlope2     | Slope of ECG 2 stored as int and to convert to a double divide raw value by 1000.0             |
        +--------------+------------------------------------------------------------------------------------------------+
        | Manual       | (True / False) True if record was manually inserted                                            |
        +--------------+------------------------------------------------------------------------------------------------+
        | Type         | Type of beat (0 = Artifact / 1 = Normal Sinus Beat / 2 = VE / 3 = SVE)                         |
        +--------------+------------------------------------------------------------------------------------------------+
        | Class        | no longer used                                                                                 |
        +--------------+------------------------------------------------------------------------------------------------+
        | PPoint       | Sample Number indicating peak of the P wave (-1 if no P wave detected)                         |
        +--------------+------------------------------------------------------------------------------------------------+
        | PStart       | Sample Number indicating start of the P wave                                                   |
        +--------------+------------------------------------------------------------------------------------------------+
        | PEnd         | Sample Number indicating end of the P wave                                                     |
        +--------------+------------------------------------------------------------------------------------------------+
        | TPoint       | Sample Number indicating peak of the T wave (-1 if no T wave detected)                         |
        +--------------+------------------------------------------------------------------------------------------------+
        | TStart       | Sample Number indicating start of the T wave                                                   |
        +--------------+------------------------------------------------------------------------------------------------+
        | TEnd         | Sample Number indicating end of the T wave                                                     |
        +--------------+------------------------------------------------------------------------------------------------+
        | TemplateID   | The ID of the template to which this beat has been assigned (-1 if not assigned to a template) |
        +--------------+------------------------------------------------------------------------------------------------+
        | nsrrid       | nsrrid of this record                                                                          |
        +--------------+------------------------------------------------------------------------------------------------+
        | samplingrate | frequency of the ECG signal of this record                                                     |
        +--------------+------------------------------------------------------------------------------------------------+
        | seconds      | Number of seconds from beginning of recording to R-point (Rpoint / sampling rate)              |
        +--------------+------------------------------------------------------------------------------------------------+
        | epoch        | Epoch (30 second) number                                                                       |
        +--------------+------------------------------------------------------------------------------------------------+
        | rpointadj    | R Point adjusted sample number (RPoint * (samplingrate/256))                                   |
        +--------------+------------------------------------------------------------------------------------------------+

      CAUTION that all the above sampling numbers except for rpointadj assume 256 Hz,
      while the rpointadj column has been added to provide an adjusted sample number based on the actual sampling rate.

    3. event annotations: (in xml files)
       TODO
    4. event_profusion annotations: (in xml files)
       TODO

    **DEFINITION of concepts in sleep study** (mainly apnea and arousal, ref. [8]_ for corresponding knowledge):

    1. Arousal: (ref. [3]_, [4]_)

        - interruptions of sleep lasting 3 to 15 seconds
        - can occur spontaneously or as a result of sleep-disordered breathing or other sleep disorders
        - sends you back to a lighter stage of sleep
        - if the arousal last more than 15 seconds, it becomes an awakening
        - the higher the arousal index (occurrences per hour), the more tired you are likely to feel, though people vary in their tolerance of sleep disruptions

    2. Central Sleep Apnea (CSA): (ref. [3]_, [5]_, [6]_)

        - breathing repeatedly stops and starts during sleep
        - occurs because your brain (central nervous system) doesn't send proper signals to the muscles that control your breathing, which is point that differs from obstructive sleep apnea
        - may occur as a result of other conditions, such as heart failure, stroke, high altitude, etc.

    3. Obstructive Sleep Apnea (OSA): (ref. [3]_, [7]_)

        - occurs when throat muscles intermittently relax and block upper airway during sleep
        - a noticeable sign of obstructive sleep apnea is snoring

    4. Complex (Mixed) Sleep Apnea: (ref. [3]_)

        - combination of both CSA and OSA
        - exact mechanism of the loss of central respiratory drive during sleep in OSA is unknown but is most likely related to incorrect settings of the CPAP (Continuous Positive Airway Pressure) treatment and other medical conditions the person has

    5. Hypopnea:
       overly shallow breathing or an abnormally low respiratory rate. Hypopnea is defined by some to be less severe than apnea (the complete cessation of breathing)
    6. Apnea Hypopnea Index (AHI): to write

        - used to indicate the severity of OSA
        - number of apneas or hypopneas recorded during the study per hour of sleep
        - based on the AHI, the severity of OSA is classified as follows

            - none/minimal: AHI < 5 per hour
            - mild: AHI ≥ 5, but < 15 per hour
            - moderate: AHI ≥ 15, but < 30 per hour
            - severe: AHI ≥ 30 per hour

    7. Oxygen Desaturation:

        - used to indicate the severity of OSA
        - reductions in blood oxygen levels (desaturation)
        - at sea level, a normal blood oxygen level (saturation) is usually 96 - 97%
        - (no generally accepted classifications for severity of oxygen desaturation)

            - mild: >= 90%
            - moderate: 80% - 89%
            - severe: < 80%

    """,
    usage=[
        "Sleep stage",
        "Sleep apnea",
    ],
    issues="""
    1. `Start__sec_` might not be the start time, but rather the end time, of the 5 minute windows in some records
    2. the current version "0.15.0" removed EEG spectral summary variables
    """,
    references=[
        "https://sleepdata.org/datasets/shhs/pages/",
        "https://sleepdata.org/datasets/shhs/pages/13-hrv-analysis.md",
        "https://en.wikipedia.org/wiki/Sleep_apnea",
        "https://www.sleepapnea.org/diagnosis/sleep-studies/",
        "https://www.mayoclinic.org/diseases-conditions/central-sleep-apnea/symptoms-causes/syc-20352109",
        "Eckert DJ, Jordan AS, Merchia P, Malhotra A. Central sleep apnea: Pathophysiology and treatment. Chest. 2007 Feb;131(2):595-607. doi: 10.1378/chest.06.2287. PMID: 17296668; PMCID: PMC2287191.",
        "https://www.mayoclinic.org/diseases-conditions/obstructive-sleep-apnea/symptoms-causes/syc-20352090",
        "https://en.wikipedia.org/wiki/Hypopnea",
        # "http://healthysleep.med.harvard.edu/sleep-apnea/diagnosing-osa/understanding-results",  # broken link
        "https://sleepdata.org/datasets/shhs/pages/full-description.md",
    ],
    doi=[
        "10.1093/jamia/ocy064",
    ],  # PMID: 9493915 not added
)


[docs]@add_docstring(_SHHS_INFO.format_database_docstring(), mode="prepend") class SHHS(NSRRDataBase, PSGDataBaseMixin): """ Parameters ---------- db_dir : `path-like`, optional Storage path of the database. If not specified, data will be fetched from Physionet. working_dir : `path-like`, optional Working directory, to store intermediate files and log files. verbose : int, default 1 Level of logging verbosity. kwargs : dict, optional Auxilliary key word arguments """ __name__ = "SHHS" def __init__( self, db_dir: Optional[Union[str, bytes, os.PathLike]] = None, working_dir: Optional[Union[str, bytes, os.PathLike]] = None, verbose: int = 1, **kwargs: Any, ) -> None: super().__init__( db_name="SHHS", db_dir=db_dir, working_dir=working_dir, verbose=verbose, **kwargs, ) self.__create_constants(**kwargs) # `current_version` will be when calling `_ls_rec` self.current_version = kwargs.get("current_version", "0.19.0") self.version_pattern = "\\d+\\.\\d+\\.\\d+" self.rec_name_pattern = "^shhs[12]\\-\\d{6}$" self.psg_data_path = None self.ann_path = None self.hrv_ann_path = None self.eeg_ann_path = None self.wave_deli_path = None self.event_ann_path = None self.event_profusion_ann_path = None self.form_paths() self._df_records = pd.DataFrame() self._all_records = [] self.rec_with_hrv_summary_ann = [] self.rec_with_hrv_detailed_ann = [] self.rec_with_event_ann = [] self.rec_with_event_profusion_ann = [] self.rec_with_rpeaks_ann = [] self._tables = {} self._ls_rec() self.fs = None self.file_opened = None
[docs] def form_paths(self) -> None: """Form paths to the database files.""" self.psg_data_path = self.db_dir / "polysomnography" / "edfs" self.ann_path = self.db_dir / "datasets" self.hrv_ann_path = self.ann_path / "hrv-analysis" self.eeg_ann_path = self.ann_path / "eeg-spectral-analysis" self.wave_deli_path = self.db_dir / "polysomnography" / "annotations-rpoints" self.event_ann_path = self.db_dir / "polysomnography" / "annotations-events-nsrr" self.event_profusion_ann_path = self.db_dir / "polysomnography" / "annotations-events-profusion"
def _ls_rec(self) -> None: """Find all records in the database directory and store them (path, metadata, etc.) in some private attributes. """ self.logger.info("Finding `edf` records....") self._df_records = pd.DataFrame() self._df_records["path"] = sorted(self.db_dir.rglob("*.edf")) if self._subsample is not None: size = min( len(self._df_records), max(1, int(round(self._subsample * len(self._df_records)))), ) self._df_records = self._df_records.sample(n=size, random_state=DEFAULTS.SEED, replace=False) # if self._df_records is non-empty, call `form_paths` again if necessary # typically path for a record is like: # self.db_dir / "polysomnography/edfs/shhs1/shhs1-200001.edf" if len(self._df_records) > 0 and self._df_records.iloc[0]["path"].parents[3] != self.db_dir: self.db_dir = self._df_records.iloc[0]["path"].parents[3] self.form_paths() # get other columns self._df_records["record"] = self._df_records["path"].apply(lambda x: x.stem) self._df_records["tranche"] = self._df_records["record"].apply(lambda x: x.split("-")[0]) self._df_records["visitnumber"] = self._df_records["record"].apply(lambda x: int(x.split("-")[0][4:])) self._df_records["nsrrid"] = self._df_records["record"].apply(lambda x: int(x.split("-")[1])) # auxiliary and annotation files if not self._df_records.empty: for key in self.extension: self._df_records[key] = self._df_records.apply( lambda row: self.folder_or_file[key] / row["tranche"] / (row["record"] + self.extension[key]), axis=1, ) self._df_records.set_index("record", inplace=True) self._all_records = self._df_records.index.tolist() # update `current_version` if self.ann_path.is_dir(): for file in self.ann_path.iterdir(): if file.is_file() and len(re.findall(self.version_pattern, file.name)) > 0: self.current_version = re.findall(self.version_pattern, file.name)[0] break self.logger.info("Loading tables....") # gather tables in self.ann_path and in self.hrv_ann_path for file in itertools.chain(self.ann_path.glob("*.csv"), self.hrv_ann_path.glob("*.csv")): if not file.suffix == ".csv": continue table_name = file.stem.replace(f"-{self.current_version}", "") try: self._tables[table_name] = pd.read_csv(file, low_memory=False) except UnicodeDecodeError: self._tables[table_name] = pd.read_csv(file, low_memory=False, encoding="latin-1") self.logger.info("Finding records with HRV annotations....") # find records with hrv annotations self.rec_with_hrv_summary_ann = [] for table_name in ["shhs1-hrv-summary", "shhs2-hrv-summary"]: if table_name in self._tables: self.rec_with_hrv_summary_ann.extend( [f"shhs{int(row['visitnumber'])}-{int(row['nsrrid'])}" for _, row in self._tables[table_name].iterrows()] ) self.rec_with_hrv_summary_ann = sorted(list(set(self.rec_with_hrv_summary_ann))) self.rec_with_hrv_detailed_ann = [] for table_name in ["shhs1-hrv-5min", "shhs2-hrv-5min"]: if table_name in self._tables: self.rec_with_hrv_detailed_ann.extend( [f"shhs{int(row['visitnumber'])}-{int(row['nsrrid'])}" for _, row in self._tables[table_name].iterrows()] ) self.rec_with_hrv_detailed_ann = sorted(list(set(self.rec_with_hrv_detailed_ann))) self.logger.info("Finding records with rpeaks annotations....") # find available rpeak annotation files self.rec_with_rpeaks_ann = sorted( [f.stem.replace("-rpoint", "") for f in self.wave_deli_path.rglob("shhs*-rpoint.csv")] ) self.logger.info("Finding records with event annotations....") # find available event annotation files self.rec_with_event_ann = sorted([f.stem.replace("-nsrr", "") for f in self.event_ann_path.rglob("shhs*-nsrr.xml")]) self.rec_with_event_profusion_ann = sorted( [f.stem.replace("-profusion", "") for f in self.event_profusion_ann_path.rglob("shhs*-profusion.xml")] ) self._df_records["available_signals"] = None if not self.lazy: self.get_available_signals(None) # END OF `_ls_rec`
[docs] def list_table_names(self) -> List[str]: """List available table names.""" return list(self._tables.keys())
[docs] def get_table(self, table_name: str) -> pd.DataFrame: """Get table by name. Parameters ---------- table_name : str Table name. For available table names, call method :meth:`list_table_names`. Returns ------- table : pandas.DataFrame The loaded table. """ return self._tables[table_name]
[docs] def update_sleep_stage_names(self) -> None: """Update :attr:`self.sleep_stage_names` according to :attr:`self.sleep_stage_protocol`. """ if self.sleep_stage_protocol == "aasm": nb_stages = 5 elif self.sleep_stage_protocol == "simplified": nb_stages = 4 elif self.sleep_stage_protocol == "shhs": nb_stages = 6 else: raise ValueError(f"No stage protocol named `{self.sleep_stage_protocol}`") self.sleep_stage_names = self.all_sleep_stage_names[:nb_stages]
[docs] def get_subject_id(self, rec: Union[str, int]) -> int: """Attach a unique subject ID for the record. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. Returns ------- pid : int Subject ID derived from (attached to) the record. """ if isinstance(rec, int): rec = self[rec] head_shhs1, head_shhs2v3, head_shhs2v4 = "30000", "30001", "30002" tranche, nsrrid, visitnumber = [self.split_rec_name(rec)[k] for k in ["tranche", "nsrrid", "visitnumber"]] if visitnumber == "2": raise ValueError( "SHHS2 has two different sampling frequencies, " "currently could not be distinguished using only `rec`" ) pid = int(head_shhs1 + str(visitnumber) + str(nsrrid)) return pid
[docs] def get_available_signals(self, rec: Union[str, int, None]) -> Union[List[str], None]: """Get available signals for a record. If input `rec` is None, this function finds available signals for all records, and assign to :attr:`self._df_records['available_signals']`. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. Returns ------- available_signals : List[str] Names of available signals for `rec`. """ if rec is None: # iterrows with tqdm for _, row in tqdm( self._df_records.iterrows(), total=len(self._df_records), desc="Finding available signals", unit="record", dynamic_ncols=True, mininterval=1.0, disable=(self.verbose < 1), ): rec = row.name if self._df_records.loc[rec, "available_signals"] is not None: continue available_signals = self.get_available_signals(rec) self._df_records.at[rec, "available_signals"] = available_signals return if isinstance(rec, int): rec = self[rec] if rec in self._df_records.index: available_signals = self._df_records.loc[rec, "available_signals"] if available_signals is not None and len(available_signals) > 0: return available_signals frp = self.get_absolute_path(rec) try: # perhaps broken file # or the downloading is not finished self.safe_edf_file_operation("open", frp) except OSError: return None available_signals = [s.lower() for s in self.file_opened.getSignalLabels()] self.safe_edf_file_operation("close") self._df_records.at[rec, "available_signals"] = available_signals self.all_signals = self.all_signals.union(set(available_signals)) else: available_signals = [] return available_signals
[docs] def split_rec_name(self, rec: Union[str, int]) -> Dict[str, Union[str, int]]: """Split `rec` into `tranche`, `visitnumber`, `nsrrid` Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in attr:`self.all_records`. Returns ------- dict Keys: "tranche", "visitnumber", "nsrrid". """ if isinstance(rec, int): rec = self[rec] assert isinstance(rec, str) and re.match(self.rec_name_pattern, rec), f"Invalid record name: `{rec}`" tranche, nsrrid = rec.split("-") visitnumber = tranche[-1] return { "tranche": tranche, "visitnumber": int(visitnumber), "nsrrid": int(nsrrid), }
[docs] def get_visitnumber(self, rec: Union[str, int]) -> int: """Get ``visitnumber`` from `rec`. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. Returns ------- int Visit number extracted from `rec`. """ return self.split_rec_name(rec)["visitnumber"]
[docs] def get_tranche(self, rec: Union[str, int]) -> str: """Get ``tranche`` ("shhs1" or "shhs2") from `rec`. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. Returns ------- str Tranche extracted from `rec`. """ return self.split_rec_name(rec)["tranche"]
[docs] def get_nsrrid(self, rec: Union[str, int]) -> int: """Get ``nsrrid`` from `rec`. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. Returns ------- int ``nsrrid`` extracted from `rec`. """ return self.split_rec_name(rec)["nsrrid"]
[docs] def get_fs( self, rec: Union[str, int], sig: str = "ECG", rec_path: Optional[Union[str, bytes, os.PathLike]] = None, ) -> Real: """Get the sampling frequency of a signal of a record. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. sig : str, default "ECG" Signal name or annotation name (e.g. "rpeak"). Some annotation files (\\*-rpeak.csv) have a sampling frequency column. rec_path : `path-like`, optional Path of the file which contains the PSG data. If is None, default path will be used. Returns ------- fs : numbers.Real Sampling frequency of the signal `sig` of the record `rec`. If corresponding signal (.edf) file is not available, or the signal file does not contain the signal `sig`, -1 will be returned. """ if isinstance(rec, int): rec = self[rec] sig = self.match_channel(sig, raise_error=False) assert sig in self.all_signals.union({"rpeak"}), f"Invalid signal name: `{sig}`" if sig.lower() == "rpeak": df_rpeaks_with_type_info = self.load_wave_delineation_ann(rec) if df_rpeaks_with_type_info.empty: self.logger.info(f"Rpeak annotation file corresponding to `{rec}` is not available.") return -1 return df_rpeaks_with_type_info.iloc[0]["samplingrate"] frp = self.get_absolute_path(rec, rec_path) if not frp.exists(): self.logger.info(f"Signal (.edf) file corresponding to `{rec}` is not available.") return -1 self.safe_edf_file_operation("open", frp) sig = self.match_channel(sig) available_signals = [s.lower() for s in self.file_opened.getSignalLabels()] if sig not in available_signals: self.logger.info(f"Signal `{sig}` is not available in signal file corresponding to `{rec}`.") return -1 chn_num = available_signals.index(sig) fs = self.file_opened.getSampleFrequency(chn_num) self.safe_edf_file_operation("close") return fs
[docs] def get_chn_num( self, rec: Union[str, int], sig: str = "ECG", rec_path: Optional[Union[str, bytes, os.PathLike]] = None, ) -> int: """Get the index of the channel of the signal in the record. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. sig : str, default "ECG" Signal name. rec_path : `path-like`, optional Path of the file which contains the PSG data. If is None, default path will be used. Returns ------- chn_num : int Index of channel of the signal `sig` of the record `rec`. Returns -1 if corresponding signal (.edf) file is not available, or the signal file does not contain the signal `sig`. """ sig = self.match_channel(sig) available_signals = self.get_available_signals(rec) if sig not in available_signals: if isinstance(rec, int): rec = self[rec] self.logger.info( f"Signal (.edf) file corresponding to `{rec}` is not available, or" f"signal `{sig}` is not available in signal file corresponding to `{rec}`." ) return -1 chn_num = available_signals.index(self.match_channel(sig)) return chn_num
[docs] def match_channel(self, channel: str, raise_error: bool = True) -> str: """Match the channel name to the standard channel name. Parameters ---------- channel : str Channel name. raise_error : bool, default True Whether to raise error if no match is found. If False, returns the input `channel` directly. Returns ------- sig : str Standard channel name in SHHS. If no match is found, and `raise_error` is False, returns the input `channel` directly. """ if channel.lower() in self.all_signals: return channel.lower() if raise_error: raise ValueError(f"No channel named `{channel}`") return channel
[docs] def get_absolute_path( self, rec: Union[str, int], rec_path: Optional[Union[str, bytes, os.PathLike]] = None, rec_type: str = "psg", ) -> Path: """Get the absolute path of specific type of the record. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. rec_path : `path-like`, optional Path of the file which contains the desired data. If is None, default path will be used. rec_type : str, default "psg" Record type, either data (psg, etc.) or annotations. Returns ------- rp : pathlib.Path Absolute path of the record `rec` with type `rec_type`. """ if rec_path is not None: rp = Path(rec_path) return rp assert rec_type in self.folder_or_file, ( "`rec_type` should be one of " f"`{list(self.folder_or_file.keys())}`, but got `{rec_type}`" ) if isinstance(rec, int): rec = self[rec] tranche, nsrrid = [self.split_rec_name(rec)[k] for k in ["tranche", "nsrrid"]] # rp = self._df_records.loc[rec, rec_type] rp = self.folder_or_file[rec_type] / tranche / f"{rec}{self.extension[rec_type]}" return rp
def database_stats(self) -> None: raise NotImplementedError
[docs] def show_rec_stats(self, rec: Union[str, int], rec_path: Optional[Union[str, bytes, os.PathLike]] = None) -> None: """Print the statistics of the record. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. rec_path : `path-like`, optional Path of the file which contains the PSG data. If is None, default path will be used. """ frp = self.get_absolute_path(rec, rec_path, rec_type="psg") self.safe_edf_file_operation("open", frp) for chn, lb in enumerate(self.file_opened.getSignalLabels()): print("SignalLabel:", lb) print("Prefilter:", self.file_opened.getPrefilter(chn)) print("Transducer:", self.file_opened.getTransducer(chn)) print("PhysicalDimension:", self.file_opened.getPhysicalDimension(chn)) print("SampleFrequency:", self.file_opened.getSampleFrequency(chn)) print("*" * 40) self.safe_edf_file_operation("close")
[docs] def load_psg_data( self, rec: Union[str, int], channel: str = "all", rec_path: Optional[Union[str, bytes, os.PathLike]] = None, sampfrom: Optional[Real] = None, sampto: Optional[Real] = None, fs: Optional[int] = None, physical: bool = True, ) -> Union[Dict[str, Tuple[np.ndarray, Real]], Tuple[np.ndarray, Real]]: """Load PSG data of the record. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. channel : str, default "all" Name of the channel of PSG. If is "all", then all channels will be returned. rec_path : `path-like`, optional Path of the file which contains the PSG data. If is None, default path will be used. sampfrom : numbers.Real, optional Start time (units in seconds) of the data to be loaded, valid only when `channel` is some specific channel. sampto : numbers.Real, optional End time (units in seconds) of the data to be loaded, valid only when `channel` is some specific channel fs : numbers.Real, optional Sampling frequency of the loaded data. If not None, the loaded data will be resampled to this frequency, otherwise, the original sampling frequency will be used. Valid only when `channel` is some specific channel. physical : bool, default True If True, then the data will be converted to physical units, otherwise, the data will be in digital units. Returns ------- dict or tuple If `channel` is "all", then a dictionary will be returned: - keys: PSG channel names; - values: PSG data and sampling frequency Otherwise, a 2-tuple will be returned: (:class:`numpy.ndarray`, :class:`numbers.Real`), which is the PSG data of the channel `channel` and its sampling frequency. """ chn = self.match_channel(channel) if channel.lower() != "all" else "all" frp = self.get_absolute_path(rec, rec_path, rec_type="psg") self.safe_edf_file_operation("open", frp) if chn == "all": ret_data = { k: ( self.file_opened.readSignal(idx, digital=not physical), self.file_opened.getSampleFrequency(idx), ) for idx, k in enumerate(self.file_opened.getSignalLabels()) } else: all_signals = [s.lower() for s in self.file_opened.getSignalLabels()] assert chn in all_signals, f"`channel` should be one of `{self.file_opened.getSignalLabels()}`, but got `{chn}`" idx = all_signals.index(chn) data_fs = self.file_opened.getSampleFrequency(idx) data = self.file_opened.readSignal(idx, digital=not physical) # the `readSignal` method of `EdfReader` does NOT treat # the parameters `start` and `n` correctly # so we have to do it manually if sampfrom is not None: idx_from = int(round(sampfrom * data_fs)) else: idx_from = 0 if sampto is not None: idx_to = int(round(sampto * data_fs)) else: idx_to = len(data) data = data[idx_from:idx_to] if fs is not None and fs != data_fs: data = SS.resample_poly(data, fs, data_fs).astype(data.dtype) data_fs = fs ret_data = (data, data_fs) self.safe_edf_file_operation("close") return ret_data
[docs] def load_ecg_data( self, rec: Union[str, int], rec_path: Optional[Union[str, bytes, os.PathLike]] = None, sampfrom: Optional[int] = None, sampto: Optional[int] = None, data_format: str = "channel_first", units: Union[str, type(None)] = "mV", fs: Optional[int] = None, return_fs: bool = True, ) -> Union[np.ndarray, Tuple[np.ndarray, Real]]: """Load ECG data of the record. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. rec_path : `path-like`, optional Path of the file which contains the ECG data. If is None, default path will be used. sampfrom : int, optional Start index of the data to be loaded. sampto : int, optional End index of the data to be loaded. data_format : str, default "channel_first" Format of the ECG data, "channel_last" (alias "lead_last"), or "channel_first" (alias "lead_first"), or "flat" (alias "plain") which is valid only when `leads` is a single lead. units : str or None, default "mV" Units of the output signal, can also be "μV" (aliases "uV", "muV"). None for digital data, without digital-to-physical conversion. fs : numbers.Real, optional Sampling frequency of the loaded data. If not None, the loaded data will be resampled to this frequency, otherwise, the original sampling frequency will be used. return_fs : bool, default True Whether to return the sampling frequency of the output signal. Returns ------- data : numpy.ndarray The loaded ECG data. data_fs : numbers.Real Sampling frequency of the loaded ECG data. Returned if `return_fs` is True. """ allowed_data_format = [ "channel_first", "lead_first", "channel_last", "lead_last", "flat", "plain", ] assert ( data_format.lower() in allowed_data_format ), f"`data_format` should be one of `{allowed_data_format}`, but got `{data_format}`" allowed_units = ["mv", "uv", "μv", "muv"] assert ( units is None or units.lower() in allowed_units ), f"`units` should be one of `{allowed_units}` or None, but got `{units}`" data, data_fs = self.load_psg_data( rec=rec, channel="ecg", rec_path=rec_path, sampfrom=sampfrom, sampto=sampto, fs=fs, physical=units is not None, ) data = data.astype(DEFAULTS.DTYPE.NP) if units is not None and units.lower() in ["μv", "uv", "muv"]: data *= 1e3 if data_format.lower() in ["channel_first", "lead_first"]: data = data[np.newaxis, :] elif data_format.lower() in ["channel_last", "lead_last"]: data = data[:, np.newaxis] if return_fs: return data, data_fs return data
[docs] @add_docstring( " " * 8 + "NOTE: one should call `load_psg_data` to load other channels.", mode="append", ) @add_docstring(load_ecg_data.__doc__) def load_data( self, rec: Union[str, int], rec_path: Optional[Union[str, bytes, os.PathLike]] = None, sampfrom: Optional[int] = None, sampto: Optional[int] = None, data_format: str = "channel_first", units: Union[str, type(None)] = "mV", fs: Optional[int] = None, return_fs: bool = True, ) -> Union[np.ndarray, Tuple[np.ndarray, Real]]: """alias of `load_ecg_data`""" return self.load_ecg_data( rec=rec, rec_path=rec_path, sampfrom=sampfrom, sampto=sampto, data_format=data_format, units=units, fs=fs, return_fs=return_fs, )
[docs] def load_ann( self, rec: Union[str, int], ann_type: str, ann_path: Optional[Union[str, bytes, os.PathLike]] = None, **kwargs: Any, ) -> Union[np.ndarray, pd.DataFrame, dict]: """Load annotations of specific type of the record. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. ann_type : str, Type of the annotation, can be "event", "event_profusion", "hrv_summary", "hrv_detailed", "sleep", "sleep_stage", "sleep_event", "apnea" (alias "sleep_apnea"), "wave_delineation", "rpeak", "rr", "nn". ann_path : `path-like`, optional Path of the file which contains the annotations. If is None, default path will be used. kwargs : dict, optional Other arguments for specific annotation type. Returns ------- annotations : numpy.ndarray or pandas.DataFrame or dict The loaded annotations. """ if ann_type.lower() == "event": return self.load_event_ann(rec=rec, event_ann_path=ann_path, **kwargs) elif ann_type.lower() == "event_profusion": return self.load_event_profusion_ann(rec=rec, event_profusion_ann_path=ann_path, **kwargs) elif ann_type.lower() == "hrv_summary": return self.load_hrv_summary_ann(rec=rec, hrv_ann_path=ann_path, **kwargs) elif ann_type.lower() == "hrv_detailed": return self.load_hrv_detailed_ann(rec=rec, hrv_ann_path=ann_path, **kwargs) elif ann_type.lower() == "sleep": return self.load_sleep_ann(rec=rec, sleep_ann_path=ann_path, **kwargs) elif ann_type.lower() == "sleep_stage": return self.load_sleep_stage_ann(rec=rec, sleep_stage_ann_path=ann_path, **kwargs) elif ann_type.lower() == "sleep_event": return self.load_sleep_event_ann(rec=rec, sleep_event_ann_path=ann_path, **kwargs) elif ann_type.lower() in ["sleep_apnea", "apnea"]: return self.load_apnea_ann(rec=rec, apnea_ann_path=ann_path, **kwargs) elif ann_type.lower() == "wave_delineation": return self.load_wave_delineation_ann(rec=rec, wave_deli_path=ann_path, **kwargs) elif ann_type.lower() == "rpeak": return self.load_rpeak_ann(rec=rec, rpeak_ann_path=ann_path, **kwargs) elif ann_type.lower() in ["rr", "rr_interval"]: return self.load_rr_ann(rec=rec, rpeak_ann_path=ann_path, **kwargs) elif ann_type.lower() in ["nn", "nn_interval"]: return self.load_nn_ann(rec=rec, rpeak_ann_path=ann_path, **kwargs)
[docs] def load_event_ann( self, rec: Union[str, int], event_ann_path: Optional[Union[str, bytes, os.PathLike]] = None, simplify: bool = False, **kwargs: Any, ) -> pd.DataFrame: """Load event annotations of the record. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. event_ann_path : `path-like`, optional Path of the file which contains the events-nsrr annotations. If is None, default path will be used. Returns ------- df_events : pandas.DataFrame Event annotations of the record. """ file_path = self.get_absolute_path(rec, event_ann_path, rec_type="event") if not file_path.exists(): # rec not in `self.rec_with_event_ann` return pd.DataFrame() doc = xtd.parse(file_path.read_text()) df_events = pd.DataFrame(doc["PSGAnnotation"]["ScoredEvents"]["ScoredEvent"][1:]) if simplify: df_events["EventType"] = df_events["EventType"].apply(lambda s: s.split("|")[1]) df_events["EventConcept"] = df_events["EventConcept"].apply(lambda s: s.split("|")[1]) for c in ["Start", "Duration", "SpO2Nadir", "SpO2Baseline"]: df_events[c] = df_events[c].apply(self.str_to_real_number) return df_events
[docs] def load_event_profusion_ann( self, rec: Union[str, int], event_profusion_ann_path: Optional[Union[str, bytes, os.PathLike]] = None, **kwargs: Any, ) -> dict: """Load events-profusion annotations of the record. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. event_profusion_ann_path : `path-like`, optional Path of the file which contains the events-profusion annotations. If is None, default path will be used. Returns ------- dict Event-profusions annotations of the record, with items "sleep_stage_list", "df_events". TODO ---- Merge "sleep_stage_list" and "df_events" into one :class:`~pandas.DataFrame`. """ file_path = self.get_absolute_path(rec, event_profusion_ann_path, rec_type="event_profusion") if not file_path.exists(): # rec not in `self.rec_with_event_profusion_ann` return {"sleep_stage_list": [], "df_events": pd.DataFrame()} doc = xtd.parse(file_path.read_text()) sleep_stage_list = [int(ss) for ss in doc["CMPStudyConfig"]["SleepStages"]["SleepStage"]] df_events = pd.DataFrame(doc["CMPStudyConfig"]["ScoredEvents"]["ScoredEvent"]) for c in ["Start", "Duration", "LowestSpO2", "Desaturation"]: df_events[c] = df_events[c].apply(self.str_to_real_number) ret = {"sleep_stage_list": sleep_stage_list, "df_events": df_events} return ret
[docs] def load_hrv_summary_ann( self, rec: Optional[Union[str, int]] = None, hrv_ann_path: Optional[Union[str, bytes, os.PathLike]] = None, **kwargs: Any, ) -> pd.DataFrame: """Load summary HRV annotations of the record. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. hrv_ann_path : `path-like`, optional Path of the summary HRV annotation file. If is None, default path will be used. Returns ------- df_hrv_ann : pandas.DataFrame If `rec` is not None, `df_hrv_ann` is the summary HRV annotations of `rec`; if `rec` is None, `df_hrv_ann` is the summary HRV annotations of all records that had HRV annotations (about 10% of all the records in SHHS). """ if rec is None: df_hrv_ann = pd.concat( [ self._tables[table_name] for table_name in ["shhs1-hrv-summary", "shhs2-hrv-summary"] if table_name in self._tables ], ignore_index=True, ) return df_hrv_ann if isinstance(rec, int): rec = self[rec] if rec not in self.rec_with_hrv_summary_ann: return pd.DataFrame() tranche, nsrrid = [self.split_rec_name(rec)[k] for k in ["tranche", "nsrrid"]] table_name = f"{tranche}-hrv-summary" df_hrv_ann = self._tables[table_name][self._tables[table_name].nsrrid == int(nsrrid)].reset_index(drop=True) return df_hrv_ann
[docs] def load_hrv_detailed_ann( self, rec: Union[str, int], hrv_ann_path: Optional[Union[str, bytes, os.PathLike]] = None, **kwargs: Any, ) -> pd.DataFrame: """Load detailed HRV annotations of the record. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. hrv_ann_path : `path-like`, optional Path of the detailed HRV annotation file. If is None, default path will be used. Returns ------- df_hrv_ann : pandas.DataFrame. Detailed HRV annotations of the record. """ if isinstance(rec, int): rec = self[rec] if rec not in self.rec_with_hrv_detailed_ann: return pd.DataFrame() tranche, nsrrid = [self.split_rec_name(rec)[k] for k in ["tranche", "nsrrid"]] table_name = f"{tranche}-hrv-5min" df_hrv_ann = self._tables[table_name][self._tables[table_name].nsrrid == int(nsrrid)].reset_index(drop=True) return df_hrv_ann
[docs] def load_sleep_ann( self, rec: Union[str, int], source: str = "event", sleep_ann_path: Optional[Union[str, bytes, os.PathLike]] = None, **kwargs: Any, ) -> Union[pd.DataFrame, dict]: """Load sleep annotations of the record. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. source : {"hrv", "event", "event_profusion"}, optional Source of the annotations, case insensitive, by default "event" sleep_ann_path : `path-like`, optional Path of the file which contains the sleep annotations. If is None, default path will be used. Returns ------- df_sleep_ann : pandas.DataFrame or dict All sleep annotations of the record. """ if isinstance(rec, int): rec = self[rec] if source.lower() == "hrv": df_hrv_ann = self.load_hrv_detailed_ann(rec=rec, hrv_ann_path=sleep_ann_path) if not df_hrv_ann.empty: df_sleep_ann = df_hrv_ann[self.sleep_ann_keys_from_hrv].reset_index(drop=True) else: df_sleep_ann = pd.DataFrame(columns=self.sleep_ann_keys_from_hrv) self.logger.debug( f"record `{rec}` has `{len(df_sleep_ann)}` sleep annotations from corresponding " f"hrv-5min (detailed) annotation file, with `{len(self.sleep_ann_keys_from_hrv)}` column(s)" ) elif source.lower() == "event": df_event_ann = self.load_event_ann(rec, event_ann_path=sleep_ann_path, simplify=False) _cols = ["EventType", "EventConcept", "Start", "Duration", "SignalLocation"] if not df_event_ann.empty: df_sleep_ann = df_event_ann[_cols] else: df_sleep_ann = pd.DataFrame(columns=_cols) self.logger.debug( f"record `{rec}` has `{len(df_sleep_ann)}` sleep annotations from corresponding " f"event-nsrr annotation file, with `{len(_cols)}` column(s)" ) elif source.lower() == "event_profusion": dict_event_ann = self.load_event_profusion_ann(rec) # temporarily finished # latter to make imporvements df_sleep_ann = dict_event_ann self.logger.debug( f"record `{rec}` has `{len(df_sleep_ann['df_events'])}` sleep event annotations " "from corresponding event-profusion annotation file, " f"with `{len(df_sleep_ann['df_events'].columns)}` column(s)" ) else: raise ValueError(f"Source `{source}` not supported, " "only `hrv`, `event`, `event_profusion` are supported") return df_sleep_ann
[docs] def load_sleep_stage_ann( self, rec: Union[str, int], source: str = "event", sleep_stage_ann_path: Optional[Union[str, bytes, os.PathLike]] = None, sleep_stage_protocol: str = "aasm", with_stage_names: bool = True, **kwargs: Any, ) -> pd.DataFrame: """Load sleep stage annotations of the record. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. source : {"hrv", "event", "event_profusion"}, optional Source of the annotations, case insensitive, by default "event". sleep_stage_ann_path : `path-like`, optional Path of the file which contains the sleep stage annotations. If is None, default path will be used. sleep_stage_protocol : str, default "aasm" The protocol to classify sleep stages. Currently can be "aasm", "simplified", "shhs". The only difference lies in the number of different stages of the NREM periods. with_stage_names : bool, default True If True, an additional column "sleep_stage_name" will be added to the returned :class:`~pandas.DataFrame`. Returns ------- df_sleep_stage_ann : pandas.DataFrame Sleep stage annotations of the record. """ if isinstance(rec, int): rec = self[rec] self.sleep_stage_protocol = sleep_stage_protocol self.update_sleep_stage_names() df_sleep_ann = self.load_sleep_ann(rec=rec, source=source, sleep_ann_path=sleep_stage_ann_path) df_sleep_stage_ann = pd.DataFrame(columns=self.sleep_stage_keys) if source.lower() == "hrv": df_tmp = df_sleep_ann[self.sleep_stage_ann_keys_from_hrv].reset_index(drop=True) for _, row in df_tmp.iterrows(): start_sec = row["Start__sec_"] l_start_sec = np.arange( start_sec, start_sec + self.hrv_ann_epoch_len_sec, self.sleep_epoch_len_sec, ) l_sleep_stage = np.array( [ row[self.sleep_stage_ann_keys_from_hrv[i]] for i in range( 1, 1 + self.hrv_ann_epoch_len_sec // self.sleep_epoch_len_sec, ) ] ) df_to_concat = pd.DataFrame({"start_sec": l_start_sec, "sleep_stage": l_sleep_stage}) df_sleep_stage_ann = pd.concat([df_sleep_stage_ann, df_to_concat], axis=0, ignore_index=True) elif source.lower() == "event": df_tmp = df_sleep_ann[df_sleep_ann["EventType"] == "Stages|Stages"][ ["EventConcept", "Start", "Duration"] ].reset_index(drop=True) df_tmp["EventConcept"] = df_tmp["EventConcept"].apply(lambda s: int(s.split("|")[1])) for _, row in df_tmp.iterrows(): start_sec = int(row["Start"]) duration = int(row["Duration"]) l_start_sec = np.arange(start_sec, start_sec + duration, self.sleep_epoch_len_sec) l_sleep_stage = np.full(shape=len(l_start_sec), fill_value=int(row["EventConcept"])) df_to_concat = pd.DataFrame({"start_sec": l_start_sec, "sleep_stage": l_sleep_stage}) df_sleep_stage_ann = pd.concat([df_sleep_stage_ann, df_to_concat], axis=0, ignore_index=True) elif source.lower() == "event_profusion": df_sleep_stage_ann = pd.DataFrame( { "start_sec": 30 * np.arange(len(df_sleep_ann["sleep_stage_list"])), "sleep_stage": df_sleep_ann["sleep_stage_list"], } ) else: raise ValueError(f"Source `{source}` not supported, " "only `hrv`, `event`, `event_profusion` are supported") df_sleep_stage_ann = df_sleep_stage_ann[self.sleep_stage_keys] if self.sleep_stage_protocol == "aasm": df_sleep_stage_ann["sleep_stage"] = df_sleep_stage_ann["sleep_stage"].apply(lambda a: self._to_aasm_states[a]) elif self.sleep_stage_protocol == "simplified": df_sleep_stage_ann["sleep_stage"] = df_sleep_stage_ann["sleep_stage"].apply(lambda a: self._to_simplified_states[a]) elif self.sleep_stage_protocol == "shhs": df_sleep_stage_ann["sleep_stage"] = df_sleep_stage_ann["sleep_stage"].apply(lambda a: self._to_shhs_states[a]) if with_stage_names: df_sleep_stage_ann["sleep_stage_name"] = df_sleep_stage_ann["sleep_stage"].apply( lambda a: self.sleep_stage_names[a] ) if source.lower() != "event_profusion": self.logger.debug( f"record `{rec}` has `{len(df_tmp)}` raw (epoch_len = 5min) sleep stage annotations, " f"with `{len(self.sleep_stage_ann_keys_from_hrv)}` column(s)" ) self.logger.debug( f"after being transformed (epoch_len = 30sec), record `{rec}` has {len(df_sleep_stage_ann)} " f"sleep stage annotations, with `{len(self.sleep_stage_keys)}` column(s)" ) return df_sleep_stage_ann
[docs] def load_sleep_event_ann( self, rec: Union[str, int], source: str = "event", event_types: Optional[List[str]] = None, sleep_event_ann_path: Optional[Union[str, bytes, os.PathLike]] = None, ) -> pd.DataFrame: """Load sleep event annotations of a record. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. source : {"hrv", "event", "event_profusion"}, optional Source of the annotations, case insensitive, by default "event". event_types : List[str], optional List of event types to be loaded, by default None. The event types are: "Respiratory" (including "Apnea", "SpO2"), "Arousal", "Apnea" (including "CSA", "OSA", "MSA", "Hypopnea"), "SpO2", "CSA", "OSA", "MSA", "Hypopnea". Used only when `source` is "event" or "event_profusion". sleep_event_ann_path : `path-like`, optional Path of the file which contains the sleep event annotations. If is None, default path will be used. Returns ------- df_sleep_event_ann : pandas.DataFrame Sleep event annotations of the record. """ if isinstance(rec, int): rec = self[rec] df_sleep_ann = self.load_sleep_ann(rec=rec, source=source, sleep_ann_path=sleep_event_ann_path) if isinstance(df_sleep_ann, pd.DataFrame) and df_sleep_ann.empty: return df_sleep_ann elif isinstance(df_sleep_ann, dict) and df_sleep_ann["df_events"].empty: return df_sleep_ann["df_events"] df_sleep_event_ann = pd.DataFrame(columns=self.sleep_event_keys) if source.lower() == "hrv": df_sleep_ann = df_sleep_ann[self.sleep_event_ann_keys_from_hrv].reset_index(drop=True) df_sleep_event_ann = pd.DataFrame(columns=self.sleep_event_keys[1:3]) for _, row in df_sleep_ann.iterrows(): if row["hasrespevent"] == 0: continue l_events = row[self.sleep_event_ann_keys_from_hrv[1:-1]].values.reshape( (len(self.sleep_event_ann_keys_from_hrv) // 2 - 1, 2) ) l_events = l_events[~np.isnan(l_events[:, 0])] df_to_concat = pd.DataFrame(l_events, columns=self.sleep_event_keys[1:3]) df_sleep_event_ann = pd.concat([df_sleep_event_ann, df_to_concat], axis=0, ignore_index=True) df_sleep_event_ann["event_name"] = None df_sleep_event_ann["event_duration"] = df_sleep_event_ann.apply( lambda row: row["event_end"] - row["event_start"], axis=1 ) df_sleep_event_ann = df_sleep_event_ann[self.sleep_event_keys] self.logger.debug( f"record `{rec}` has `{len(df_sleep_ann)}` raw (epoch_len = 5min) sleep event " f"annotations from hrv, with `{len(self.sleep_event_ann_keys_from_hrv)}` column(s)" ) self.logger.debug(f"after being transformed, record `{rec}` has `{len(df_sleep_event_ann)}` sleep event(s)") elif source.lower() == "event": if event_types is None: event_types = ["respiratory", "arousal"] else: event_types = [e.lower() for e in event_types] assert ( set() < set(event_types) <= set( [ "respiratory", "arousal", "apnea", "spo2", "csa", "osa", "msa", "hypopnea", ] ) ), ( "`event_types` should be a subset of " "'respiratory', 'arousal', 'apnea', 'spo2', 'csa', 'osa', 'msa', 'hypopnea'", f"but got `{event_types}`", ) _cols = set() if "respiratory" in event_types: _cols = _cols | set(self.long_event_names_from_event[:6]) if "arousal" in event_types: _cols = _cols | set(self.long_event_names_from_event[6:11]) if "apnea" in event_types: _cols = _cols | set(self.long_event_names_from_event[:4]) if "spo2" in event_types: _cols = _cols | set(self.long_event_names_from_event[4:6]) if "csa" in event_types: _cols = _cols | set(self.long_event_names_from_event[0:1]) if "osa" in event_types: _cols = _cols | set(self.long_event_names_from_event[1:2]) if "msa" in event_types: _cols = _cols | set(self.long_event_names_from_event[2:3]) if "hypopnea" in event_types: _cols = _cols | set(self.long_event_names_from_event[3:4]) _cols = list(_cols) self.logger.debug(f"for record `{rec}`, _cols = `{_cols}`") df_sleep_event_ann = df_sleep_ann[df_sleep_ann["EventConcept"].isin(_cols)].reset_index(drop=True) df_sleep_event_ann = df_sleep_event_ann.rename( { "EventConcept": "event_name", "Start": "event_start", "Duration": "event_duration", }, axis=1, ) df_sleep_event_ann["event_name"] = df_sleep_event_ann["event_name"].apply(lambda s: s.split("|")[1]) df_sleep_event_ann["event_end"] = df_sleep_event_ann.apply( lambda row: row["event_start"] + row["event_duration"], axis=1 ) df_sleep_event_ann = df_sleep_event_ann[self.sleep_event_keys] elif source.lower() == "event_profusion": df_sleep_ann = df_sleep_ann["df_events"] _cols = set() if event_types is None: event_types = ["respiratory", "arousal"] else: event_types = [e.lower() for e in event_types] assert ( set() < set(event_types) <= set( [ "respiratory", "arousal", "apnea", "spo2", "csa", "osa", "msa", "hypopnea", ] ) ), ( "`event_types` should be a subset of " "'respiratory', 'arousal', 'apnea', 'spo2', 'csa', 'osa', 'msa', 'hypopnea', " f"but got `{event_types}`" ) if "respiratory" in event_types: _cols = _cols | set(self.event_names_from_event_profusion[:6]) if "arousal" in event_types: _cols = _cols | set(self.event_names_from_event_profusion[6:8]) if "apnea" in event_types: _cols = _cols | set(self.event_names_from_event_profusion[:4]) if "spo2" in event_types: _cols = _cols | set(self.event_names_from_event_profusion[4:6]) if "csa" in event_types: _cols = _cols | set(self.event_names_from_event_profusion[0:1]) if "osa" in event_types: _cols = _cols | set(self.event_names_from_event_profusion[1:2]) if "msa" in event_types: _cols = _cols | set(self.event_names_from_event_profusion[2:3]) if "hypopnea" in event_types: _cols = _cols | set(self.event_names_from_event_profusion[3:4]) _cols = list(_cols) self.logger.debug(f"for record `{rec}`, _cols = `{_cols}`") df_sleep_event_ann = df_sleep_ann[df_sleep_ann["Name"].isin(_cols)].reset_index(drop=True) df_sleep_event_ann = df_sleep_event_ann.rename( { "Name": "event_name", "Start": "event_start", "Duration": "event_duration", }, axis=1, ) df_sleep_event_ann["event_end"] = df_sleep_event_ann.apply( lambda row: row["event_start"] + row["event_duration"], axis=1 ) df_sleep_event_ann = df_sleep_event_ann[self.sleep_event_keys] else: raise ValueError(f"Source `{source}` not supported, " "only `hrv`, `event`, `event_profusion` are supported") return df_sleep_event_ann
[docs] def load_apnea_ann( self, rec: Union[str, int], source: str = "event", apnea_types: Optional[List[str]] = None, apnea_ann_path: Optional[Union[str, bytes, os.PathLike]] = None, **kwargs: Any, ) -> pd.DataFrame: """Load annotations on apnea events of the record. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. source : {"event", "event_profusion"}, optional Source of the annotations, case insensitive, by default "event". apnea_types : List[str], optional Types of apnea events to load, should be a subset of "CSA", "OSA", "MSA", "Hypopnea". If is None, then all types of apnea will be loaded. apnea_ann_path : `path-like`, optional Path of the file which contains the apnea event annotations. If is None, default path will be used. Returns ------- df_apnea_ann : pandas.DataFrame Apnea event annotations of the record. """ event_types = ["apnea"] if apnea_types is None else apnea_types if source.lower() not in ["event", "event_profusion"]: raise ValueError(f"Source `{source}` contains no apnea annotations, " "should be one of 'event', 'event_profusion'") df_apnea_ann = self.load_sleep_event_ann( rec=rec, source=source, event_types=event_types, sleep_event_ann_path=apnea_ann_path, ) return df_apnea_ann
[docs] def load_wave_delineation_ann( self, rec: Union[str, int], wave_deli_path: Optional[Union[str, bytes, os.PathLike]] = None, **kwargs: Any, ) -> pd.DataFrame: """Load annotations on wave delineations of the record. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. wave_deli_path : `path-like`, optional Path of the file which contains wave delineation annotations. If is None, default path will be used. Returns ------- df_wave_delineation : pandas.DataFrame Wave delineation annotations of the record. NOTE ---- See the part describing wave delineation annotations of the docstring of the class, or call ``self.database_info(detailed=True)``. """ if isinstance(rec, int): rec = self[rec] file_path = self.get_absolute_path(rec, wave_deli_path, rec_type="wave_delineation") if not file_path.is_file(): self.logger.debug( f"The annotation file of wave delineation of record `{rec}` has not been downloaded yet. " f"Or the path `{str(file_path)}` is not correct. " f"Or `{rec}` does not have `rpeak.csv` annotation file. Please check!" ) return pd.DataFrame() df_wave_delineation = pd.read_csv(file_path, engine="python") df_wave_delineation = df_wave_delineation[self.wave_deli_keys].reset_index(drop=True) return df_wave_delineation
[docs] def load_rpeak_ann( self, rec: Union[str, int], rpeak_ann_path: Optional[Union[str, bytes, os.PathLike]] = None, exclude_artifacts: bool = True, exclude_abnormal_beats: bool = True, units: Optional[str] = None, **kwargs: Any, ) -> np.ndarray: """Load annotations on R peaks of the record. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. rpeak_ann_path : `path-like`, optional Path of the file which contains R peak annotations. If is None, default path will be used. exclude_artifacts : bool, default True Whether exlcude those beats (R peaks) that are labelled artifact or not. exclude_abnormal_beats : bool, default True Whether exlcude those beats (R peaks) that are labelled abnormal ("VE" and "SVE") or not. units : {None, "s", "ms"}, optional Units of the returned R peak locations, case insensitive. None for no conversion, using indices of samples. Returns ------- numpy.ndarray Locations of R peaks of the record, of shape ``(n_rpeaks, )``. """ info_items = ["Type", "rpointadj", "samplingrate"] df_rpeaks_with_type_info = self.load_wave_delineation_ann(rec, rpeak_ann_path) if df_rpeaks_with_type_info.empty: return np.array([], dtype=int) df_rpeaks_with_type_info = df_rpeaks_with_type_info[info_items] exclude_beat_types = [] # 0 = Artifact, 1 = Normal Sinus Beat, 2 = VE, 3 = SVE if exclude_artifacts: exclude_beat_types.append(0) if exclude_abnormal_beats: exclude_beat_types += [2, 3] rpeaks = df_rpeaks_with_type_info[~df_rpeaks_with_type_info["Type"].isin(exclude_beat_types)]["rpointadj"].values if units is None: rpeaks = (np.round(rpeaks)).astype(int) elif units.lower() == "s": fs = df_rpeaks_with_type_info.iloc[0]["samplingrate"] rpeaks = rpeaks / fs elif units.lower() == "ms": fs = df_rpeaks_with_type_info.iloc[0]["samplingrate"] rpeaks = rpeaks / fs * 1000 rpeaks = (np.round(rpeaks)).astype(int) else: raise ValueError( "`units` should be one of 's', 'ms', case insensitive, " "or None for no conversion, using indices of samples, " f"but got `{units}`" ) return rpeaks
[docs] def load_rr_ann( self, rec: Union[str, int], rpeak_ann_path: Optional[Union[str, bytes, os.PathLike]] = None, units: Union[str, None] = "s", **kwargs: Any, ) -> np.ndarray: """Load annotations on RR intervals of the record. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. rpeak_ann_path : `path-like`, optional Path of the file which contains R peak annotations. If is None, default path will be used. units : {None, "s", "ms"}, optional units of the returned R peak locations, by default "s", case insensitive. None for no conversion, using indices of samples. Returns ------- rr : numpy.ndarray. Array of RR intervals, of shape ``(n_rpeaks - 1, 2)``. Each row is a RR interval, and the first column is the location of the R peak. """ rpeaks_ts = self.load_rpeak_ann( rec=rec, rpeak_ann_path=rpeak_ann_path, exclude_artifacts=True, exclude_abnormal_beats=True, units=units, ) rr = np.diff(rpeaks_ts) rr = np.column_stack((rpeaks_ts[:-1], rr)) return rr
[docs] def load_nn_ann( self, rec: Union[str, int], rpeak_ann_path: Optional[Union[str, bytes, os.PathLike]] = None, units: Union[str, None] = "s", **kwargs: Any, ) -> np.ndarray: """Load annotations on NN intervals of the record. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. rpeak_ann_path: os.PathLike, optional Path of the file which contains R peak annotations. If is None, default path will be used. units: {None, "s", "ms"}, optional Units of the returned R peak locations, by default "s", case insensitive. None for no conversion, using indices of samples. Returns ------- nn : numpy.ndarray Array of nn intervals, of shape (n, 2). Each row is a nn interval, and the first column is the location of the R peak. """ info_items = ["Type", "rpointadj", "samplingrate"] df_rpeaks_with_type_info = self.load_wave_delineation_ann(rec, rpeak_ann_path) if df_rpeaks_with_type_info.empty: return np.array([]).reshape(0, 2) df_rpeaks_with_type_info = df_rpeaks_with_type_info[info_items] fs = df_rpeaks_with_type_info.iloc[0]["samplingrate"] rpeaks = df_rpeaks_with_type_info["rpointadj"] if units is None: rpeaks = (np.round(rpeaks)).astype(int) elif units.lower() == "s": rpeaks = rpeaks / fs elif units.lower() == "ms": rpeaks = rpeaks / fs * 1000 rpeaks = (np.round(rpeaks)).astype(int) else: raise ValueError( "`units` should be one of 's', 'ms', case insensitive, " "or None for no conversion, using indices of samples, " f"but got `{units}`" ) rr = np.diff(rpeaks) rr = np.column_stack((rpeaks[:-1], rr)) normal_sinus_rpeak_indices = np.where(df_rpeaks_with_type_info["Type"].values == 1)[0] # 1 = Normal Sinus Beat keep_indices = np.where(np.diff(normal_sinus_rpeak_indices) == 1)[0].tolist() nn = rr[normal_sinus_rpeak_indices[keep_indices]] return nn.reshape(-1, 2)
[docs] def locate_artifacts( self, rec: Union[str, int], wave_deli_path: Optional[Union[str, bytes, os.PathLike]] = None, units: Optional[str] = None, ) -> np.ndarray: """Locate "artifacts" in the record. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. wave_deli_path : `path-like`, optional Path of the file which contains wave delineation annotations. If is None, default path will be used. units : {None, "s", "ms"}, optional Units of the returned artifact locations, can be one of "s", "ms", case insensitive, None for no conversion, using indices of samples. Returns ------- artifacts : numpy.ndarray Array of indices (or time) of artifacts locations, of shape ``(n_artifacts,)``. """ df_rpeaks_with_type_info = self.load_wave_delineation_ann(rec, wave_deli_path) if df_rpeaks_with_type_info.empty: dtype = int if units is None or units.lower() != "s" else float return np.array([], dtype=dtype) # df_rpeaks_with_type_info = df_rpeaks_with_type_info[["Type", "rpointadj"]] artifacts = (np.round(df_rpeaks_with_type_info[df_rpeaks_with_type_info["Type"] == 0]["rpointadj"].values)).astype(int) if units is not None: fs = df_rpeaks_with_type_info.iloc[0]["samplingrate"] if units.lower() == "s": artifacts = artifacts / fs elif units.lower() == "ms": artifacts = artifacts / fs * 1000 artifacts = (np.round(artifacts)).astype(int) else: raise ValueError( "`units` should be one of 's', 'ms', case insensitive, " "or None for no conversion, using indices of samples, " f"but got `{units}`" ) return artifacts
[docs] def locate_abnormal_beats( self, rec: Union[str, int], wave_deli_path: Optional[Union[str, bytes, os.PathLike]] = None, abnormal_type: Optional[str] = None, units: Optional[str] = None, ) -> Union[Dict[str, np.ndarray], np.ndarray]: """Locate "abnormal beats" in the record. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. wave_deli_path : `path-like`, optional Path of the file which contains wave delineation annotations. If is None, default path will be used. abnormal_type : {"VE", "SVE"}, optional Type of abnormal beat type to locate. If is None, both "VE" and "SVE" will be located. units : {None, "s", "ms"}, optional Units of the returned R peak locations, by default None, case insensitive. None for no conversion, using indices of samples. Returns ------- abnormal_rpeaks : dict or numpy.ndarray If `abnormal_type` is None, return a dictionary of abnormal beat locations, which contains two keys "VE" and/or "SVE", and values are indices (or time) of abnormal beats, of shape ``(n,)``. If `abnormal_type` is not None, return a :class:`~numpy.ndarray` of abnormal beat locations, of shape ``(n,)``. """ if abnormal_type is not None and abnormal_type not in ["VE", "SVE"]: raise ValueError(f"No abnormal type of `{abnormal_type}` in " "wave delineation annotation (*-rpeak.csv) files") df_rpeaks_with_type_info = self.load_wave_delineation_ann(rec, wave_deli_path) if not df_rpeaks_with_type_info.empty: # df_rpeaks_with_type_info = df_rpeaks_with_type_info[["Type", "rpointadj"]] # 2 = VE, 3 = SVE ve = (np.round(df_rpeaks_with_type_info[df_rpeaks_with_type_info["Type"] == 2]["rpointadj"].values)).astype(int) sve = (np.round(df_rpeaks_with_type_info[df_rpeaks_with_type_info["Type"] == 3]["rpointadj"].values)).astype(int) abnormal_rpeaks = {"VE": ve, "SVE": sve} else: dtype = int if units is None or units.lower() != "s" else float abnormal_rpeaks = { "VE": np.array([], dtype=dtype), "SVE": np.array([], dtype=dtype), } if units is not None and not df_rpeaks_with_type_info.empty: fs = df_rpeaks_with_type_info.iloc[0]["samplingrate"] if units.lower() == "s": abnormal_rpeaks = {abnormal_type: abnormal_rpeaks[abnormal_type] / fs for abnormal_type in abnormal_rpeaks} elif units.lower() == "ms": abnormal_rpeaks = { abnormal_type: abnormal_rpeaks[abnormal_type] / fs * 1000 for abnormal_type in abnormal_rpeaks } abnormal_rpeaks = { abnormal_type: (np.round(abnormal_rpeaks[abnormal_type])).astype(int) for abnormal_type in abnormal_rpeaks } else: raise ValueError( "`units` should be one of 's', 'ms', case insensitive, " "or None for no conversion, using indices of samples, " f"but got `{units}`" ) if abnormal_type is None: return abnormal_rpeaks elif abnormal_type in ["VE", "SVE"]: return abnormal_rpeaks[abnormal_type]
[docs] def load_eeg_band_ann( self, rec: Union[str, int], eeg_band_ann_path: Optional[Union[str, bytes, os.PathLike]] = None, **kwargs: Any, ) -> pd.DataFrame: """Load annotations on EEG bands of the record. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. eeg_band_ann_path : `path-like`, optional Path of the file which contains EEG band annotations. if is None, default path will be used. Returns ------- pandas.DataFrame A :class:`~pandas.DataFrame` of EEG band annotations. """ if self.current_version >= "0.15.0": self.logger.info(f"EEG spectral summary variables are removed in version {self.current_version}") else: raise NotImplementedError
[docs] def load_eeg_spectral_ann( self, rec: Union[str, int], eeg_spectral_ann_path: Optional[Union[str, bytes, os.PathLike]] = None, **kwargs: Any, ) -> pd.DataFrame: """Load annotations on EEG spectral summary of the record. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. eeg_spectral_ann_path : `path-like`, optional Path of the file which contains EEG spectral summary annotations. If is None, default path will be used. Returns ------- pandas.DataFrame A :class:`~pandas.DataFrame` of EEG spectral summary annotations. """ if self.current_version >= "0.15.0": self.logger.info(f"EEG spectral summary variables are removed in version {self.current_version}") else: raise NotImplementedError
# TODO: add more functions for annotation reading # TODO: add plotting functions
[docs] def plot_ann( self, rec: Union[str, int], stage_source: Optional[str] = None, stage_kw: dict = {}, event_source: Optional[str] = None, event_kw: dict = {}, plot_format: str = "span", ) -> None: """Plot annotations of the record. Plot the sleep stage annotations and sleep event annotations of the record. Parameters ---------- rec : str or int Record name, typically in the form "shhs1-200001", or index of the record in :attr:`all_records`. stage_source : {"hrv", "event", "event_profusion"}, optional Source of the sleep stage annotations, case in-sensitive. If is None, then annotations of sleep stages of `rec` won't be plotted. stage_kw : dict, optional Key word arguments to the function :meth:`load_sleep_stage_ann`. event_source : {"hrv", "event", "event_profusion"}, optional Source of the sleep event annotations, case in-sensitive. If is None, then annotations of sleep events of `rec` won't be plotted. event_kw : dict, optional Key word arguments to the function :meth:`load_sleep_event_ann`. plot_format : {"span", "hypnogram"}, optional Format of the plot, case insensitive, by default "span". TODO ---- 1. ~~Implement the "hypnogram" format.~~ 2. Implement plotting of sleep events. """ if all([stage_source is None, event_source is None]): raise ValueError("`stage_source` and `event_source` cannot be both `None`") if stage_source is not None: df_sleep_stage = self.load_sleep_stage_ann(rec, source=stage_source, **stage_kw) if df_sleep_stage.empty: if isinstance(rec, int): rec = self[rec] raise ValueError(f"No sleep stage annotations found for record `{rec}` " f"with source `{stage_source}`") else: df_sleep_stage = None if event_source is not None: df_sleep_event = self.load_sleep_event_ann(rec, source=event_source, **event_kw) if df_sleep_event.empty: if isinstance(rec, int): rec = self[rec] raise ValueError(f"No sleep event annotations found for record `{rec}` " f"with source `{event_source}`") else: df_sleep_event = None self._plot_ann( df_sleep_stage=df_sleep_stage, df_sleep_event=df_sleep_event, plot_format=plot_format, )
def _plot_ann( self, df_sleep_stage: Optional[pd.DataFrame] = None, df_sleep_event: Optional[pd.DataFrame] = None, plot_format: str = "span", ) -> None: """Internal function to plot annotations. Parameters ---------- df_sleep_stage : pandas.DataFrame, optional Sleep stage annotations. df_sleep_event : pandas.DataFrame, optional Sleep event annotations. plot_format : {"span", "hypnogram"}, optional Format of the plot, case insensitive, by default "span". """ import matplotlib.patches as mpatches import matplotlib.pyplot as plt check = [df_sleep_stage is None, df_sleep_event is None] nb_axes = len(check) - np.sum(check) if nb_axes == 0: raise ValueError("No input data!") if plot_format.lower() not in ["span", "hypnogram"]: raise ValueError(f"Unknown plot format `{plot_format}`! " f"`plot_format` can only be one of `span`, `hypnogram`") if df_sleep_stage is not None: sleep_stages = {} for k in self.sleep_stage_names: sleep_stages[k] = intervals_union( interval_list=[ [sec, sec + self.sleep_epoch_len_sec] for sec in df_sleep_stage[df_sleep_stage["sleep_stage"] == self.sleep_stage_name_value_mapping[k]][ "start_sec" ].values ], join_book_endeds=True, ) if df_sleep_event is not None: current_legal_events = [ "Central Apnea", "Obstructive Apnea", "Mixed Apnea", "Hypopnea", ] if len(current_legal_events) != len(set(current_legal_events) | set(df_sleep_event["event_name"])): raise NotImplementedError("Plotting of some type of events in `df_sleep_event` has not been implemented yet!") if plot_format.lower() == "hypnogram": stage_mask = df_sleep_stage["sleep_stage"].values stage_mask = len(self.sleep_stage_names) - 1 - stage_mask fig, ax = self.plot_hypnogram(stage_mask, granularity=30) return patches = {k: mpatches.Patch(color=c, label=k) for k, c in self.palette.items()} _, axes = plt.subplots(nb_axes, 1, figsize=(20, 4 * nb_axes), sharex=True) plt.subplots_adjust(hspace=0) plot_alpha = 0.5 ax_stages, ax_events = None, None if nb_axes == 1 and df_sleep_stage is not None: ax_stages = axes ax_stages.set_title("Sleep Stages", fontsize=24) ax_stages.set_xlabel("Time", fontsize=16) # ax_stages.set_ylabel("Stages", fontsize=16) elif nb_axes == 1 and df_sleep_event is not None: ax_events = axes ax_events.set_title("Sleep Events", fontsize=24) ax_events.set_xlabel("Time", fontsize=16) # ax_events.set_ylabel("Events", fontsize=16) else: ax_stages, ax_events = axes ax_stages.set_title("Sleep Stages and Events", fontsize=24) ax_events.set_xlabel("Time", fontsize=16) if ax_stages is not None: for k, v in sleep_stages.items(): for itv in v: ax_stages.axvspan( datetime.fromtimestamp(itv[0]), datetime.fromtimestamp(itv[1]), color=self.palette[k], alpha=plot_alpha, ) ax_stages.legend( handles=[patches[k] for k in self.all_sleep_stage_names if k in sleep_stages.keys()], loc="best", ) # keep ordering plt.setp(ax_stages.get_yticklabels(), visible=False) ax_stages.tick_params(axis="y", which="both", length=0) if ax_events is not None: for _, row in df_sleep_event.iterrows(): ax_events.axvspan( datetime.fromtimestamp(row["event_start"]), datetime.fromtimestamp(row["event_end"]), color=self.palette[row["event_name"]], alpha=plot_alpha, ) ax_events.legend( handles=[patches[k] for k in current_legal_events if k in set(df_sleep_event["event_name"])], loc="best", ) # keep ordering plt.setp(ax_events.get_yticklabels(), visible=False) ax_events.tick_params(axis="y", which="both", length=0)
[docs] def str_to_real_number(self, s: Union[str, Real]) -> Real: """Convert a string to a real number. Some columns in the annotations might incorrectly been converted from numbers.Real to string, using ``xmltodict``. Parameters ---------- s : str or numbers.Real The string to be converted. Returns ------- numbers.Real The converted number. """ if isinstance(s, str): if "." in s: return float(s) else: return int(s) else: # NaN case return s
def __create_constants(self, **kwargs) -> None: """Create constants for the class.""" self.lazy = kwargs.get("lazy", False) self.extension = { "psg": ".edf", "wave_delineation": "-rpoint.csv", "event": "-nsrr.xml", "event_profusion": "-profusion.xml", } # fmt: off self.all_signals = [ "EEG(sec)", "ECG", "EMG", "EOG(L)", "EOG(R)", "EEG", "AIRFLOW", "THOR RES", "ABDO RES", "NEW AIR", "OX stat", "SaO2", "H.R.", "POSITION", "SOUND", "LIGHT", "AUX", "CPAP", "EPMS", "OX STAT", "PR", ] self.all_signals = set([s.lower() for s in self.all_signals]) # annotations regarding sleep analysis self.hrv_ann_summary_keys = [ "nsrrid", "visitnumber", "NN_RR", "AVNN", "IHR", "SDNN", "SDANN", "SDNNIDX", "rMSSD", "pNN10", "pNN20", "pNN30", "pNN40", "pNN50", "tot_pwr", "ULF", "VLF", "LF", "HF", "LF_HF", "LF_n", "HF_n", ] self.hrv_ann_detailed_keys = [ "nsrrid", "visitnumber", "Start__sec_", "ihr", "hasrespevent", "NN_RR", "AVNN", "SDNN", "rMSSD", "PNN10", "PNN20", "PNN30", "PNN40", "PNN50", "TOT_PWR", "VLF", "LF", "LF_n", "HF", "HF_n", "LF_HF", "sleepstage01", "sleepstage02", "sleepstage03", "sleepstage04", "sleepstage05", "sleepstage06", "sleepstage07", "sleepstage08", "sleepstage09", "sleepstage10", "event01start", "event01end", "event02start", "event02end", "event03start", "event03end", "event04start", "event04end", "event05start", "event05end", "event06start", "event06end", "event07start", "event07end", "event08start", "event08end", "event09start", "event09end", "event10start", "event10end", "event11start", "event11end", "event12start", "event12end", "event13start", "event13end", "event14start", "event14end", "event15start", "event15end", "event16start", "event16end", "event17start", "event17end", "event18start", "event18end", ] self.hrv_ann_epoch_len_sec = 300 # 5min self.sleep_ann_keys_from_hrv = [ "Start__sec_", "hasrespevent", "sleepstage01", "sleepstage02", "sleepstage03", "sleepstage04", "sleepstage05", "sleepstage06", "sleepstage07", "sleepstage08", "sleepstage09", "sleepstage10", "event01start", "event01end", "event02start", "event02end", "event03start", "event03end", "event04start", "event04end", "event05start", "event05end", "event06start", "event06end", "event07start", "event07end", "event08start", "event08end", "event09start", "event09end", "event10start", "event10end", "event11start", "event11end", "event12start", "event12end", "event13start", "event13end", "event14start", "event14end", "event15start", "event15end", "event16start", "event16end", "event17start", "event17end", "event18start", "event18end", ] self.sleep_stage_ann_keys_from_hrv = [ "Start__sec_", "sleepstage01", "sleepstage02", "sleepstage03", "sleepstage04", "sleepstage05", "sleepstage06", "sleepstage07", "sleepstage08", "sleepstage09", "sleepstage10", ] self.sleep_event_ann_keys_from_hrv = [ "Start__sec_", "hasrespevent", "event01start", "event01end", "event02start", "event02end", "event03start", "event03end", "event04start", "event04end", "event05start", "event05end", "event06start", "event06end", "event07start", "event07end", "event08start", "event08end", "event09start", "event09end", "event10start", "event10end", "event11start", "event11end", "event12start", "event12end", "event13start", "event13end", "event14start", "event14end", "event15start", "event15end", "event16start", "event16end", "event17start", "event17end", "event18start", "event18end", ] # annotations from events-nsrr and events-profusion folders self.event_keys = [ "EventType", "EventConcept", "Start", "Duration", "SignalLocation", "SpO2Nadir", "SpO2Baseline", ] # NOTE: the union of names from shhs1-200001 to shhs1-200399 # NOT a full search self.short_event_types_from_event = [ "Respiratory", "Stages", "Arousals", ] self.long_event_types_from_event = [ "Respiratory|Respiratory", "Stages|Stages", "Arousals|Arousals", ] # NOTE: the union of names from shhs1-200001 to shhs1-200399 # NOT a full search # NOT including sleep stages self.short_event_names_from_event = [ "Central Apnea", "Obstructive Apnea", "Mixed Apnea", "Hypopnea", "SpO2 artifact", "SpO2 desaturation", "Arousal ()", "Arousal (Standard)", "Arousal (STANDARD)", "Arousal (CHESHIRE)", "Arousal (ASDA)", "Unsure", ] self.long_event_names_from_event = [ "Central apnea|Central Apnea", "Obstructive apnea|Obstructive Apnea", "Mixed apnea|Mixed Apnea", "Hypopnea|Hypopnea", "SpO2 artifact|SpO2 artifact", "SpO2 desaturation|SpO2 desaturation", "Arousal|Arousal ()", "Arousal|Arousal (Standard)", "Arousal|Arousal (STANDARD)", "Arousal resulting from Chin EMG|Arousal (CHESHIRE)", "ASDA arousal|Arousal (ASDA)", "Unsure|Unsure", ] self.event_profusion_keys = [ "Name", "Start", "Duration", "Input", "LowestSpO2", "Desaturation", ] # NOTE: currently the union of names from shhs1-200001 to shhs1-200099, # NOT a full search self.event_names_from_event_profusion = [ "Central Apnea", "Obstructive Apnea", "Mixed Apnea", "Hypopnea", "SpO2 artifact", "SpO2 desaturation", "Arousal ()", "Arousal (ASDA)", "Unsure", ] self.apnea_types = [ "Central Apnea", "Obstructive Apnea", "Mixed Apnea", "Hypopnea", ] # annotations regarding wave delineation self.wave_deli_keys = [ "RPoint", "Start", "End", "STLevel1", "STSlope1", "STLevel2", "STSlope2", "Manual", "Type", "rpointadj", "PPoint", "PStart", "PEnd", "TPoint", "TStart", "TEnd", "TemplateID", "nsrrid", "samplingrate", "seconds", "epoch", ] self.wave_deli_samp_num_keys = [ "RPoint", "Start", "End", "PPoint", "PStart", "PEnd", "TPoint", "TStart", "TEnd", ] # TODO: other annotation files: EEG # self-defined items self.sleep_stage_keys = ["start_sec", "sleep_stage"] self.sleep_event_keys = [ "event_name", "event_start", "event_end", "event_duration", ] self.sleep_epoch_len_sec = 30 self.ann_sleep_stages = [0, 1, 2, 3, 4, 5, 9] """ 0 --- Wake 1 --- sleep stage 1 2 --- sleep stage 2 3 --- sleep stage 3/4 4 --- sleep stage 3/4 5 --- REM stage 9 --- Movement/Wake or Unscored? """ self.sleep_stage_protocol = kwargs.get("sleep_stage_protocol", "aasm") self.all_sleep_stage_names = ["W", "R", "N1", "N2", "N3", "N4"] self.sleep_stage_name_value_mapping = { "W": 0, "R": 1, "N1": 2, "N2": 3, "N3": 4, "N4": 5, } self.sleep_stage_names = [] self.update_sleep_stage_names() self._to_simplified_states = {9: 0, 0: 0, 5: 1, 1: 2, 2: 2, 3: 3, 4: 3} """9 to nan? 0 --- awake 1 --- REM 2 --- N1 (NREM1/2), shallow sleep 3 --- N2 (NREM3/4), deep sleep """ self._to_aasm_states = {9: 0, 0: 0, 5: 1, 1: 2, 2: 3, 3: 4, 4: 4} """9 to nan? 0 --- awake 1 --- REM 2 --- N1 (NREM1) 3 --- N2 (NREM2) 4 --- N3 (NREM3/4) """ self._to_shhs_states = {9: 0, 0: 0, 5: 1, 1: 2, 2: 3, 3: 4, 4: 5} # for plotting self.palette = { "W": "orange", "R": "yellow", "N1": "green", "N2": "cyan", "N3": "blue", "N4": "purple", "Central Apnea": "red", "Obstructive Apnea": "yellow", "Mixed Apnea": "cyan", "Hypopnea": "purple", } # TODO: add more # fmt: on @property def folder_or_file(self) -> Dict[str, Path]: return { "psg": self.psg_data_path, "wave_delineation": self.wave_deli_path, "event": self.event_ann_path, "event_profusion": self.event_profusion_ann_path, } @property def url(self) -> str: warnings.warn( "one has to apply for a token from `sleepdata.org` " "and uses `nsrr` to download the data", RuntimeWarning, ) return "" @property def database_info(self) -> DataBaseInfo: return _SHHS_INFO