Source code for torch_ecg.utils.utils_interval

# -*- coding: utf-8 -*-
"""Remarks: commonly used functions related to intervals.

NOTE
----
1. `Interval` refers to interval of the form ``[a,b]``
2. `GeneralizedInterval` refers to some (finite) union of `Interval`

TODO
----
1. Unify `Interval` and `GeneralizedInterval`, by letting `Interval` be of the form ``[[a,b]]``.
2. Distinguish openness and closedness.

"""

import time
import warnings
from copy import deepcopy
from numbers import Real
from typing import Any, List, Sequence, Tuple, Union

import numpy as np

__all__ = [
    "overlaps",
    "validate_interval",
    "in_interval",
    "in_generalized_interval",
    "intervals_union",
    "generalized_intervals_union",
    "intervals_intersection",
    "generalized_intervals_intersection",
    "generalized_interval_complement",
    "get_optimal_covering",
    "find_max_cont_len",
    "interval_len",
    "generalized_interval_len",
    "find_extrema",
    "is_intersect",
    "max_disjoint_covering",
]


EMPTY_SET = []
Interval = Union[Sequence[Real], type(EMPTY_SET)]
GeneralizedInterval = Union[Sequence[Interval], type(EMPTY_SET)]


[docs]def overlaps(interval: Interval, another: Interval) -> int: """Find the overlap between two intervals. The amount of overlap, in bp between interval and anohter, is returned. - If > 0, the number of bp of overlap - If 0, they are book-ended - If < 0, the distance in bp between them Parameters ---------- interval, another : Interval The two intervals to compute their overlap. Returns ------- int Overlap length of two intervals; if < 0, the distance of two intervals. Examples -------- >>> overlaps([1,2], [2,3]) 0 >>> overlaps([1,2], [3,4]) -1 >>> overlaps([1,2], [0,3]) 1 """ # in case a or b is not in ascending order interval.sort() another.sort() return min(interval[-1], another[-1]) - max(interval[0], another[0])
[docs]def validate_interval( interval: Union[Interval, GeneralizedInterval], join_book_endeds: bool = True ) -> Tuple[bool, Union[Interval, GeneralizedInterval]]: """Check whether `interval` is an `Interval` or a `GeneralizedInterval`. If true, return True, and validated (of the form [a, b] with a <= b) interval, return ``False, []`` otherwise. NOTE: if `interval` is empty, return ``False, []``. Parameters ---------- interval : Interval or GeneralizedInterval The interval to be validated. join_book_endeds : bool, default True If True, two book-ended intervals will be joined into one. Returns ------- tuple 2-tuple consisting of - bool: indicating whether `interval` is a valid interval - an interval (can be empty) Examples -------- >>> validate_interval([1, 2, 3]) (False, []) >>> validate_interval([2, 1]) (True, [1, 2]) >>> validate_interval([[1, 4], [4, 8]]) (True, [[1, 8]]) >>> validate_interval([[1, 4], [4, 8]], join_book_endeds=False) (True, [[1, 4], [4, 8]]) >>> validate_interval([]) (False, []) """ if (not isinstance(interval, (list, tuple))) or (len(interval) == 0): return False, [] if isinstance(interval[0], (list, tuple)): info = [validate_interval(itv, join_book_endeds) for itv in interval] if all([item[0] for item in info]): return True, intervals_union(interval, join_book_endeds) else: return False, [] if len(interval) == 2: return True, [min(interval), max(interval)] else: return False, []
[docs]def in_interval(val: Real, interval: Interval, left_closed: bool = True, right_closed: bool = False) -> bool: """Check whether val is inside interval or not. Parameters ---------- val : numbers.Real The value to be checked. interval : Interval The interval to be checked. left_closed : bool, default True Whether the left end of `interval` is closed. right_closed : bool, default False Whether the right end of `interval` is closed. Returns ------- bool Whether `val` is inside `generalized_interval` or not. Examples -------- >>> in_interval(-1.3, [0, 2]) False >>> in_interval(1.5, [1, 2]) True >>> in_interval(1, [1, 2]) True >>> in_interval(1, [1, 2], left_closed=False) False >>> in_interval(2, [1, 2]) False >>> in_interval(2, [1, 2], right_closed=True) True """ itv = sorted(interval) if left_closed: is_in = itv[0] <= val else: is_in = itv[0] < val if right_closed: is_in = is_in and (val <= itv[-1]) else: is_in = is_in and (val < itv[-1]) return is_in
[docs]def in_generalized_interval( val: Real, generalized_interval: GeneralizedInterval, left_closed: bool = True, right_closed: bool = False, ) -> bool: """Check whether val is inside generalized_interval or not. Parameters ---------- val : numbers.Real The value to be checked whether it is inside `generalized_interval` or not. generalized_interval : GeneralizedInterval The interval to be checked. left_closed : bool, default True Whether the left end of `generalized_interval` is closed or not. right_closed : bool, default False Whether the right end of `generalized_interval` is closed or not. Returns ------- bool Whether `val` is inside `generalized_interval` or not. Examples -------- >>> in_generalized_interval(1.5, [[1, 2], [3, 4]]) True >>> in_generalized_interval(2.5, [[1, 3], [2, 4]]) True >>> in_generalized_interval(3.45, [[1, 3], [4, 6.9]]) False >>> in_generalized_interval(0, [[0, 1], [3, 4]]) True >>> in_generalized_interval(0, [[0, 1], [3, 4]], left_closed=False) False >>> in_generalized_interval(1, [[0, 1], [3, 4]]) False >>> in_generalized_interval(1, [[0, 1], [3, 4]], right_closed=True) True """ is_in = False for interval in generalized_interval: if in_interval(val, interval, left_closed, right_closed): is_in = True break return is_in
[docs]def intervals_union(interval_list: GeneralizedInterval, join_book_endeds: bool = True) -> GeneralizedInterval: """Find the union of intervals. This function is used to find the union (ordered and non-intersecting) of all the intervals in `interval_list`, which is a list of intervals in the form ``[a, b]``, where a, b need not be ordered. Parameters ---------- interval_list : GeneralizedInterval The list of intervals to calculate their union join_book_endeds : bool, default True Whether join the book-ended intervals into one (e.g. ``[[1, 2],[2, 3]]`` into ``[1, 3]``) or not. Returns ------- GeneralizedInterval The union of the intervals in `interval_list` Examples -------- >>> intervals_union([[1, 2], [3, 4]]) [[1, 2], [3, 4]] >>> intervals_union([[1, 2], [2, 3]]) [[1, 3]] >>> intervals_union([[1, 2], [2, 3]], join_book_endeds=False) [[1, 2], [2, 3]] >>> intervals_union([[1, 2.1], [1.6, 4], [3.1, 10.9]]) [[1, 10.9]] """ # list_add = lambda list1, list2: list1+list2 processed = [item for item in interval_list if len(item) > 0] for item in processed: item.sort() processed.sort(key=lambda i: i[0]) # end_points = reduce(list_add, processed) merge_flag = True while merge_flag: merge_flag = False new_intervals = [] if len(processed) == 1: return processed for idx, interval in enumerate(processed[:-1]): this_start, this_end = interval next_start, next_end = processed[idx + 1] # it is certain that this_start <= next_start if this_end < next_start: # the case where two consecutive intervals are disjoint new_intervals.append([this_start, this_end]) if idx == len(processed) - 2: new_intervals.append([next_start, next_end]) elif this_end == next_start: # the case where two consecutive intervals are book-ended # concatenate if `join_book_endeds` is True, # or one interval degenerates (to a single point) if (this_start == this_end or next_start == next_end) or join_book_endeds: new_intervals.append([this_start, max(this_end, next_end)]) new_intervals += processed[idx + 2 :] merge_flag = True processed = new_intervals break else: new_intervals.append([this_start, this_end]) if idx == len(processed) - 2: new_intervals.append([next_start, next_end]) else: new_intervals.append([this_start, max(this_end, next_end)]) new_intervals += processed[idx + 2 :] merge_flag = True processed = new_intervals break processed = new_intervals return processed
[docs]def generalized_intervals_union( interval_list: Union[List[GeneralizedInterval], Tuple[GeneralizedInterval]], join_book_endeds: bool = True, ) -> GeneralizedInterval: """Calculate the union of a list (or tuple) of ``GeneralizedInterval``. Parameters ---------- interval_list : list or tuple A list (or tuple) of `GeneralizedInterval`. join_book_endeds : bool, default True Whether join the book-ended intervals into one (e.g. [[1,2],[2,3]] into [1,3]) or not Returns ------- GeneralizedInterval The union of `interval_list`. Examples -------- >>> generalized_intervals_union(([[1, 2], [3, 7]], [[40,90], [-30, -10]])) [[-30, -10], [1, 2], [3, 7], [40, 90]] >>> generalized_intervals_union(([[1, 2], [3, 7]], [[4,9], [-3, 1]])) [[-3, 2], [3, 9]] >>> generalized_intervals_union(([[1, 2], [3, 7]], [[4,9], [-3, 1]]), join_book_endeds=False) [[-3, 1], [1, 2], [3, 9]] """ all_intervals = [itv for gnr_itv in interval_list for itv in gnr_itv] iu = intervals_union(interval_list=all_intervals, join_book_endeds=join_book_endeds) return iu
[docs]def intervals_intersection(interval_list: GeneralizedInterval, drop_degenerate: bool = True) -> Interval: """Calculate the intersection of all intervals in `interval_list`. Parameters ---------- interval_list : GeneralizedInterval The list of intervals to yield intersection. drop_degenerate : bool, default True Whether or not drop the degenerate intervals, i.e. intervals with length 0. Returns ------- Interval The intersection of all intervals in `interval_list`. Examples -------- >>> intervals_intersection([[1, 2], [3, 4]]) [] >>> intervals_intersection([[1, 2], [2, 3]]) [] >>> intervals_intersection([[1, 2], [2, 3]], drop_degenerate=False) [[2, 2]] >>> intervals_intersection([[1, 2.1], [1.6, 4], [3.1, 10.9]]) [] >>> intervals_intersection([[1, 2.1], [1.6, 4], [0.7, 1.9]]) [1.6, 1.9] """ if [] in interval_list: return [] for item in interval_list: item.sort() potential_start = max([item[0] for item in interval_list]) potential_end = min([item[-1] for item in interval_list]) if (potential_end > potential_start) or (potential_end == potential_start and not drop_degenerate): its = [potential_start, potential_end] else: its = [] return its
[docs]def generalized_intervals_intersection( generalized_interval: GeneralizedInterval, another_generalized_interval: GeneralizedInterval, drop_degenerate: bool = True, ) -> GeneralizedInterval: """calculate the intersection of intervals. Parameters ---------- generalized_interval, another_generalized_interval : GeneralizedInterval The 2 `GeneralizedInterval` to yield intersection. drop_degenerate : bool, default True Whether or not drop the degenerate intervals, i.e. intervals with length 0. Returns ------- GeneralizedInterval The intersection of `generalized_interval` and `another_generalized_interval`. Examples -------- >>> generalized_intervals_intersection([[1, 2], [3, 7]], [[40,90], [-30, -10]]) [] >>> generalized_intervals_intersection([[1, 5], [12, 33]], [[4, 9], [-3, 3], [33, 99]]) [[1, 3], [4, 5]] >>> generalized_intervals_intersection([[1, 5], [12, 33]], [[4, 9], [-3, 3], [33, 99]], drop_degenerate=False) [[1, 3], [4, 5], [33, 33]] """ this = intervals_union(generalized_interval) another = intervals_union(another_generalized_interval) # NOTE: from now on, `this`, `another` are in ascending ordering # and are disjoint unions of intervals its = [] # TODO: optimize the following process cut_idx = 0 for item in this: another = another[cut_idx:] intersected_indices = [] for idx, item_prime in enumerate(another): tmp = intervals_intersection([item, item_prime], drop_degenerate=drop_degenerate) if len(tmp) > 0: its.append(tmp) intersected_indices.append(idx) if len(intersected_indices) > 0: cut_idx = intersected_indices[-1] return its
[docs]def generalized_interval_complement(total_interval: Interval, generalized_interval: GeneralizedInterval) -> GeneralizedInterval: """Calculate the complement of an interval in another interval. Parameters ---------- total_interval : Interval The total interval. generalized_interval : GeneralizedInterval The interval to be complemented. Returns ------- cpl : GeneralizedInterval The complement of `generalized_interval` in `total_interval`. TODO: the case `total_interval` is a `GeneralizedInterval`. Examples -------- >>> generalized_interval_complement([1, 100], [[5, 33], [40, 50], [60, 140]]) [[1, 5], [33, 40], [50, 60]] >>> generalized_interval_complement([1, 10], [[40, 66], [111, 300]]) [[1, 10]] >>> generalized_interval_complement([150, 200], [[40, 66], [111, 300]]) [] """ rearranged_intervals = intervals_union(generalized_interval) total_interval.sort() tot_start, tot_end = total_interval[0], total_interval[-1] rearranged_intervals = [ [max(tot_start, item[0]), min(tot_end, item[-1])] for item in rearranged_intervals if overlaps(item, total_interval) > 0 ] slice_points = [tot_start] for item in rearranged_intervals: slice_points += item slice_points.append(tot_end) cpl = [] for i in range(len(slice_points) // 2): if slice_points[2 * i + 1] - slice_points[2 * i] > 0: cpl.append([slice_points[2 * i], slice_points[2 * i + 1]]) return cpl
[docs]def get_optimal_covering( total_interval: Interval, to_cover: List[Union[Real, Interval]], min_len: Real, split_threshold: Real, isolated_point_dist_threshold: Real = 0, traceback: bool = False, **kwargs: Any, ) -> Union[GeneralizedInterval, Tuple[GeneralizedInterval, list]]: """Compute an optimal covering of `to_cover` by intervals. This function tries to find an optimal covering (disjoint union of intervals) that covers `to_cover` such that each interval in the covering is of length at least `min_len`, and any two intervals in the covering are at least `split_threshold` distance apart. Parameters ---------- total_interval : Interval The total interval that the covering is picked from. to_cover : list A list of intervals or points to cover. min_len : numbers.Real Minimun length (positive) of the intervals of the covering. split_threshold : numbers.Real Minumun distance (positive) of intervals of the covering. isolated_point_dist_threshold : numbers.Real, default 0.0 The minimum distance (non-negative) of isolated points in `to_cover` to the interval boundaries of the interval containing the point in the covering. If one wants the isolated points to be centered in the interval containing the point, set `isolated_point_dist_threshold` to be ``min_len / 2``. traceback : bool, default False if True, a list containing the list of indices of the intervals in the original `to_cover`, that each interval in the covering covers. Raises ------ If any of the intervals in `to_cover` exceeds the range of `total_interval`, ValueError will be raised Returns ------- covering : GeneralizedInterval The covering that satisfies the given conditions ret_traceback : list, optional Contains the list of indices of the intervals in the original `to_cover`, that each interval in the covering covers. If `traceback` is False, this will not be returned. TODO ---- make positions of isolated points in the final covering as close as possible to the center of the interval that contains the point Examples -------- >>> total_interval = [0, 100] >>> to_cover = [[7,33], 66, [82, 89]] >>> get_optimal_covering(total_interval, to_cover, 10, 5) [[7, 33], [56, 66], [82, 92]] >>> get_optimal_covering(total_interval, to_cover, 10, 5, traceback=True) ([[7, 33], [56, 66], [82, 92]], [[0], [1], [2]]) >>> get_optimal_covering(total_interval, to_cover, 20, 5, traceback=True) ([[7, 33], [46, 66], [80, 100]], [[0], [1], [2]]) >>> get_optimal_covering(total_interval, to_cover, 20, 13, traceback=True) ([[7, 33], [46, 66], [80, 100]], [[0], [1], [2]]) >>> get_optimal_covering(total_interval, to_cover, 20, 14, traceback=True) ([[7, 33], [66, 89]], [[0], [1, 2]]) >>> get_optimal_covering(total_interval, to_cover, 20, 13, isolated_point_dist_threshold=1) [[7, 33], [47, 67], [80, 100]] >>> get_optimal_covering(total_interval, to_cover, 20, 13, isolated_point_dist_threshold=2) [[7, 33], [64, 89]] >>> get_optimal_covering(total_interval, to_cover, 30, 3) [[3, 33], [36, 66], [70, 100]] >>> get_optimal_covering(total_interval, to_cover, 30, 4) [[3, 33], [59, 89]] >>> get_optimal_covering(total_interval, to_cover, 40, 5) [[0, 40], [60, 100]] >>> get_optimal_covering(total_interval, to_cover, 1000, 1, traceback=True) ([[0, 100]], [[0, 1, 2]]) """ assert validate_interval(total_interval)[0], "`total_interval` must be a valid interval (a sequence of two real numbers)" assert min_len > 0, "`min_len` must be positive" assert split_threshold > 0, "`split_threshold` must be positive" assert isolated_point_dist_threshold >= 0, "`isolated_point_dist_threshold` must be non-negative" if len(to_cover) == 0: return [] if not traceback else ([], []) start_time = time.time() verbose = kwargs.get("verbose", 0) tmp = sorted(total_interval) tot_start, tot_end = tmp[0], tmp[-1] if (tot_start > min([item if isinstance(item, Real) else item[0] for item in to_cover])) or ( tot_end < max([item if isinstance(item, Real) else item[-1] for item in to_cover]) ): raise ValueError("some of the elements in `to_cover` exceeds the range of `total_interval`") if verbose >= 1: print(f"total_interval = {total_interval}, with_length = {tot_end-tot_start}") if tot_end - tot_start < min_len: covering = [[tot_start, tot_end]] ret_traceback = [list(range(len(to_cover)))] if traceback else [] return (covering, ret_traceback) if traceback else covering to_cover_intervals = [] isolated_points = [] if isolated_point_dist_threshold > min_len / 2: isolated_point_dist_threshold = min_len / 2 warnings.warn( "`isolated_point_dist_threshold` should be smaller than `min_len`/2, " f"hence is set to {isolated_point_dist_threshold}", RuntimeWarning, ) for item in to_cover: if isinstance(item, list): to_cover_intervals.append(item.copy()) else: to_cover_intervals.append( [ max(tot_start, item - isolated_point_dist_threshold), min(tot_end, item + isolated_point_dist_threshold), ] ) isolated_points.append(item) if traceback: replica_for_traceback = deepcopy(to_cover_intervals) if verbose >= 2: print(f"to_cover_intervals after all converted to intervals = {to_cover_intervals}") for interval in to_cover_intervals: interval.sort() to_cover_intervals.sort(key=lambda i: i[0]) to_cover_intervals = intervals_union(to_cover_intervals) if verbose >= 2: print(f"to_cover_intervals after sorted = {to_cover_intervals}") # if to_cover_intervals[0][0] < tot_start or to_cover_intervals[-1][-1] > tot_end: # raise IndexError("some item in to_cover list exceeds the range of total_interval") # these cases now seen normal, and treated as follows: for item in to_cover_intervals: item[0] = max(item[0], tot_start) item[-1] = min(item[-1], tot_end) # to_cover_intervals = [item for item in to_cover_intervals if item[-1] > item[0]] # ensure that the distance from the first interval to `tot_start` is at least `min_len` to_cover_intervals[0][-1] = max(to_cover_intervals[0][-1], tot_start + min_len) # ensure that the distance from the last interval to `tot_end` is at least `min_len` to_cover_intervals[-1][0] = min(to_cover_intervals[-1][0], tot_end - min_len) to_cover_intervals = intervals_union(to_cover_intervals) if verbose >= 2: print(f"`to_cover_intervals` after two tails adjusted to {to_cover_intervals}") # merge intervals whose distances (might be negative) are less than `split_threshold` merge_flag = True while merge_flag: merge_flag = False new_intervals = [] if len(to_cover_intervals) == 1: break for idx, item in enumerate(to_cover_intervals[:-1]): this_start, this_end = item next_start, next_end = to_cover_intervals[idx + 1] if next_start - this_end >= split_threshold: if split_threshold == (next_start - next_end) == 0 or split_threshold == (this_start - this_end) == 0: # the case where split_threshold ==0 and the degenerate case should be dealth with separately new_intervals.append([this_start, max(this_end, next_end)]) new_intervals += to_cover_intervals[idx + 2 :] merge_flag = True to_cover_intervals = new_intervals break else: new_intervals.append([this_start, this_end]) if idx == len(to_cover_intervals) - 2: new_intervals.append([next_start, next_end]) else: new_intervals.append([this_start, max(this_end, next_end)]) new_intervals += to_cover_intervals[idx + 2 :] merge_flag = True to_cover_intervals = new_intervals break if verbose >= 2: print(f"`to_cover_intervals` after merging intervals whose gaps < split_threshold are {to_cover_intervals}") # currently, distance between any two intervals in `to_cover_intervals` are larger than `split_threshold` # but any interval except the head and tail might has length less than `min_len` covering = [] if len(to_cover_intervals) == 1: # NOTE: here, there's only one `to_cover_intervals`, # whose length should be at least `min_len` mid_pt = (to_cover_intervals[0][0] + to_cover_intervals[0][-1]) // 2 half_len = min_len // 2 if mid_pt - tot_start < half_len: ret_start = tot_start ret_end = min(tot_end, max(tot_start + min_len, to_cover_intervals[0][-1])) covering = [[ret_start, ret_end]] else: ret_start = max(tot_start, min(to_cover_intervals[0][0], mid_pt - half_len)) ret_end = min(tot_end, max(mid_pt - half_len + min_len, to_cover_intervals[0][-1])) covering = [[ret_start, ret_end]] start = min(to_cover_intervals[0][0], to_cover_intervals[0][-1] - min_len) start = [start, start] for idx, item in enumerate(to_cover_intervals[:-1]): this_start, this_end = item next_start, next_end = to_cover_intervals[idx + 1] potential_end = max(this_end, start[0] + min_len) # if distance from `potential_end` to `next_start` is not enough # and has not reached the end of `to_cover_intervals` # continue to the next loop if next_start - potential_end < split_threshold: if idx < len(to_cover_intervals) - 2: continue else: # now, idx equals len(to_cover_intervals)-2 # distance from `next_start` (hence `start`) to `tot_end` is at least `min_len` potential_end = max(start[0] + min_len, next_end) covering.append( [ max(start[0], min(start[1], potential_end - min_len)), potential_end, ] ) else: covering.append([max(start[0], min(start[1], potential_end - min_len)), potential_end]) start = [ max(potential_end + split_threshold, min(next_start, next_end - min_len)), next_start, ] if idx == len(to_cover_intervals) - 2: covering.append([next_start, max(next_start + min_len, next_end)]) ret_traceback = [] if traceback: for item in covering: record = [] for idx, item_prime in enumerate(replica_for_traceback): itc = intervals_intersection([item, item_prime], drop_degenerate=False) len_itc = itc[-1] - itc[0] if len(itc) > 0 else -1 if len_itc > 0 or (len_itc == 0 and item_prime[-1] - item_prime[0] == 0): record.append(idx) ret_traceback.append(record) if verbose >= 1: print( f"the final result of `get_optimal_covering` is {covering}, " f"ret_traceback = {ret_traceback}, " f"the whole process used {time.time()-start_time} second(s)." ) if traceback: return covering, ret_traceback return covering
def find_max_cont_len(sublist: Interval, tot_rng: Real) -> dict: """Compute the maximum length of continuous (consecutive) sublists. This function computes the maximum length of continuous (consecutive) sublists of `sublist`, whose element are integers within the range from 0 to `tot_rng`, along with the position of this sublist and the sublist itself. Parameters ---------- sublist : Interval The sublist. tot_rng : numbers.Real The total range. Returns ------- dict A dictionary containing the following keys: - "max_cont_len" - "max_cont_sublist_start" - "max_cont_sublist" Examples -------- >>> tot_rng = 10 >>> sublist = [0, 2, 3, 4, 7, 9] >>> find_max_cont_len(sublist, tot_rng) {'max_cont_len': 3, 'max_cont_sublist_start': 1, 'max_cont_sublist': [2, 3, 4]} """ complementary_sublist = [-1] + [i for i in range(tot_rng) if i not in sublist] + [tot_rng] diff_list = np.diff(np.array(complementary_sublist)) max_cont_len = np.max(diff_list) - 1 max_cont_sublist_start = np.argmax(diff_list) max_cont_sublist = sublist[max_cont_sublist_start : max_cont_sublist_start + max_cont_len] ret = { "max_cont_len": max_cont_len, "max_cont_sublist_start": max_cont_sublist_start, "max_cont_sublist": max_cont_sublist, } return ret
[docs]def interval_len(interval: Interval) -> Real: """Compute the length of an interval. Parameters ---------- interval : Interval The interval to compute the length of. Returns ------- numbers.Real The "length" of `interval`, 0 for the empty interval []. Examples -------- >>> interval_len([0, 10]) 10 >>> interval_len([10, 10]) 0 >>> interval_len([10, 0]) 10 >>> interval_len([]) 0 """ interval.sort() itv_len = interval[-1] - interval[0] if len(interval) > 0 else 0 return itv_len
[docs]def generalized_interval_len(generalized_interval: GeneralizedInterval) -> Real: """Compute the length of an interval. Parameters ---------- generalized_interval : GeneralizedInterval The generalized interval to compute the length of. Returns ------- numbers.Real The "length" of `generalized_interval`, 0 for the empty interval []. Examples -------- >>> generalized_interval_len([[0, 10], [20, 30]]) 20 >>> generalized_interval_len([[10, 10], [20, 30]]) 10 >>> generalized_interval_len([[10, 0], [20, 30]]) 20 >>> generalized_interval_len([[0, 20], [10, 30]]) 30 >>> generalized_interval_len([]) 0 """ gi_len = sum([interval_len(item) for item in intervals_union(generalized_interval)]) return gi_len
[docs]def find_extrema(signal: Union[np.ndarray, Sequence], mode: str = "both") -> np.ndarray: """Locate local extrema points in a 1D signal. This function is based on Fermat's Theorem. Parameters ---------- signal : array_like 1D input signal. mode : {"max", "min", "both"}, optional Whether to find maxima ("max"), minima ("min"), or both ("both"), by default "both", case insensitive. Returns ------- extrema : numpy.ndarray Indices of the extrama points. Examples -------- >>> x = np.linspace(0, 2 * np.pi, 100) >>> y = np.sin(x) >>> find_extrema(y, mode="max") array([25]) >>> find_extrema(y, mode="min") array([74]) >>> find_extrema(y, mode="both") array([25, 74]) """ # check inputs if np.ndim(signal) != 1: raise ValueError(f"`signal` must be 1D, but got {np.ndim(signal)}D") mode = mode.lower() if mode not in ["max", "min", "both"]: raise ValueError(f"Unknwon `{mode}`, must be one of `max`, `min`, `both`") aux = np.diff(np.sign(np.diff(signal))) if mode == "both": aux = np.abs(aux) extrema = np.nonzero(aux > 0)[0] + 1 elif mode == "max": extrema = np.nonzero(aux < 0)[0] + 1 elif mode == "min": extrema = np.nonzero(aux > 0)[0] + 1 return extrema
[docs]def is_intersect( interval: Union[GeneralizedInterval, Interval], another_interval: Union[GeneralizedInterval, Interval], ) -> bool: """Determines if two (generalized) intervals intersect or not. Parameters ---------- interval, another_interval : GeneralizedInterval or Interval The two intervals to check if they intersect. Returns ------- bool True if `interval` intersects with another_interval, False otherwise. Examples -------- >>> is_intersect([0, 10], [5, 15]) True >>> is_intersect([0, 10], [10, 15]) False >>> is_intersect([0, 10], []) False >>> is_intersect([0, 10], [[5, 20], [25, 30]]) True """ if interval is None or another_interval is None or len(interval) * len(another_interval) == 0: # the case of empty set return False # check if is GeneralizedInterval is_generalized = isinstance(interval[0], (list, tuple)) is_another_generalized = isinstance(another_interval[0], (list, tuple)) if is_generalized and is_another_generalized: return any([is_intersect(interval, itv) for itv in another_interval]) elif not is_generalized and is_another_generalized: return is_intersect(another_interval, interval) elif is_generalized: # and not is_another_generalized return any([is_intersect(itv, another_interval) for itv in interval]) else: # not is_generalized and not is_another_generalized return any([overlaps(interval, another_interval) > 0])
[docs]def max_disjoint_covering( intervals: GeneralizedInterval, allow_book_endeds: bool = True, traceback: bool = True, verbose: int = 0, ) -> Tuple[GeneralizedInterval, List[int]]: """Find the largest (the largest interval length) covering of a sequence of intervals. Parameters ---------- intervals : GeneralizedInterval A sequence of intervals. allow_book_endeds : bool, default True If True, book-ended intervals will be considered valid (disjoint). traceback : bool, default True If True, the indices of the intervals in the input `intervals` of the output covering will also be returned. Returns ------- covering : GeneralizedInterval The maximum non-overlapping (disjoint) subset of `intervals`. covering_inds : List[int] Indices in `intervals` of the intervals of `covering_inds`. Examples -------- >>> max_disjoint_covering([]) ([], []) >>> max_disjoint_covering([[0, 10]]) ([[0, 10]], [0]) >>> max_disjoint_covering([[1, 4], [2, 3], [4, 6], [8, 9]]) ([[1, 4], [4, 6], [8, 9]], [0, 2, 3]) >>> max_disjoint_covering([[1, 4], [2, 3], [4, 6], [8, 9]], allow_book_endeds=False, traceback=False) ([[2, 3], [4, 6], [8, 9]], []) NOTE ---- 1. The problem seems slightly different from the problem discussed in reference [#mdc1]_ and [#mdc2]_. 2. Intervals with non-positive length will be ignored References ---------- .. [#mdc1] https://en.wikipedia.org/wiki/Maximum_disjoint_set .. [#mdc2] https://www.geeksforgeeks.org/maximal-disjoint-intervals/ """ if len(intervals) <= 1: covering = deepcopy(intervals) return covering, list(range(len(covering))) l_itv = [sorted(itv) for itv in intervals] ordering = np.argsort([itv[-1] for itv in l_itv]) l_itv = [l_itv[idx] for idx in ordering] if verbose >= 1: print(f"the sorted intervals are {l_itv}, " f"whose indices in the original input `intervals` are {ordering}") if allow_book_endeds: candidates_inds = [[idx] for idx, itv in enumerate(l_itv) if overlaps(itv, l_itv[0]) > 0] else: candidates_inds = [[idx] for idx, itv in enumerate(l_itv) if overlaps(itv, l_itv[0]) >= 0] candidates = [[l_itv[inds[0]]] for inds in candidates_inds] if verbose >= 1: print( f"candidates heads = {candidates}, with corresponding indices " f"in the sorted list of input intervals = {candidates_inds}" ) for c_idx, (cl, ci) in enumerate(zip(candidates, candidates_inds)): if interval_len(cl[0]) == 0: continue if allow_book_endeds: tmp_inds = [idx for idx, itv in enumerate(l_itv) if itv[0] >= cl[0][-1] and interval_len(itv) > 0] else: tmp_inds = [idx for idx, itv in enumerate(l_itv) if itv[0] > cl[0][-1] and interval_len(itv) > 0] if verbose >= 2: print(f"for the {c_idx}-th candidate, tmp_inds = {tmp_inds}") if len(tmp_inds) > 0: tmp = [l_itv[idx] for idx in tmp_inds] tmp_candidates, tmp_candidates_inds = max_disjoint_covering( intervals=tmp, allow_book_endeds=allow_book_endeds, traceback=traceback, # verbose=verbose, ) candidates[c_idx] = cl + tmp_candidates candidates_inds[c_idx] = ci + [tmp_inds[i] for i in tmp_candidates_inds] if verbose >= 1: print( f"the processed candidates are {candidates}, with corresponding indices " f"in the sorted list of input intervals = {candidates_inds}" ) # covering = max(candidates, key=generalized_interval_len) max_idx = np.argmax([generalized_interval_len(c) for c in candidates]) covering = candidates[max_idx] if traceback: covering_inds = candidates_inds[max_idx] covering_inds = [ordering[i] for i in covering_inds] # map to the original indices else: covering_inds = [] return covering, covering_inds