Source code for fastdfe.parametrization

"""
DFE parametrizations.
"""

__author__ = "Janek Sendrowski"
__contact__ = "sendrowski.janek@gmail.com"
__date__ = "2023-02-26"

import logging
import warnings
from abc import abstractmethod, ABC
from functools import wraps
from typing import Callable, List, Union, Dict, Tuple, Literal, Sequence, Optional

import numpy as np
import pandas as pd
from scipy.stats import gamma, expon

from fastdfe.utils import Serializable

# get logger
logger = logging.getLogger('fastdfe')


def _from_string(model: Union['Parametrization', str]) -> 'Parametrization':
    """
    Return Parametrization from class name string.

    :param model: Class name string or parametrization object
    :return: Parametrization object
    """
    if isinstance(model, Parametrization):
        return model

    if isinstance(model, str):
        return globals()[model]()

    raise ValueError(f'Unknown parametrization: {model}')


def _to_string(model: Union['Parametrization', str]) -> str:
    """
    Return class name string from Parametrization.

    :param model: Class name string or Parametrization object
    :return: Class name string
    """
    if isinstance(model, Parametrization):
        return model.__class__.__name__

    return model


[docs] class Parametrization(Serializable, ABC): """ Base class for DFE parametrizations. Note that :func:`get_pdf` is not required to be implemented, provided that the linearized mode of fastDFE is used (which is highly recommended). """ #: Default initial parameters x0: Dict[str, float] = {} #: Default parameter bounds bounds: Dict[str, Tuple[float, float]] = {} #: Scales over which to optimize the parameters, either 'log' or 'lin' scales: Dict[str, Literal['lin', 'log']] = {} #: The kind of submodels supported by holding some parameters fixed submodels: Dict[str, Dict[str, float]] = dict( full=dict(), dele=dict() )
[docs] def __init__( self, bounds: Optional[Dict[str, Tuple[float, float]]] = None, scales: Optional[Dict[str, Literal['lin', 'log']]] = None, x0: Optional[Dict[str, float]] = None ): """ Initialize parametrization. :param bounds: Custom parameter bounds merging with defaults :param scales: Custom scales merging with defaults :param x0: Custom initial parameters merging with defaults """ self._logger = logger.getChild(self.__class__.__name__) # merge custom settings if bounds is not None: self.bounds = self.bounds | bounds if scales is not None: self.scales = self.scales | scales if x0 is not None: self.x0 = self.x0 | x0 #: argument names self.param_names: List = list(self.x0.keys())
@staticmethod def _accepts_scalars(func: Callable) -> Callable[[np.ndarray | float], np.ndarray | float]: """ Make func accept scalar values. :return: Function that accepts scalars """ @wraps(func) def wrapper(S: np.ndarray, *args, **kwargs) -> np.ndarray: """ Wrapper function. :param S: Array of selection coefficients :param args: Positional arguments :param kwargs: Keyword arguments :return: Output of func """ if isinstance(S, (int, float)): return func(np.array([S]), *args, **kwargs)[0] else: return func(S, *args, **kwargs) return wrapper
[docs] @abstractmethod def get_pdf(self, **kwargs) -> Callable[[np.ndarray], np.ndarray]: """ Get probability distribution function of DFE. :return: Function that accepts an array of selection coefficients and returns their probability density """ pass
[docs] @abstractmethod def get_cdf(self, **kwargs) -> Callable[[np.ndarray], np.ndarray]: """ Get probability distribution function of DFE. :return: Function that accepts an array of selection coefficients and returns their cumulative probability """ pass
def _discretize( self, params, bins: np.ndarray, warn_mass: bool = True ) -> np.ndarray: """ Discretize by using the CDF. :param params: Parameters of the parametrization :param bins: Bins to use for discretization :param warn_mass: Whether to warn if the total mass is not near 1. :return: Histogram """ x = self.get_cdf(**params)(bins) # issue warning if values at bounds are outside [0, 1] if warn_mass and (not -1e-5 < x[0] < 1e-5 or not 1 - 1e-5 < x[-1] < 1 + 1e-5): self._logger.warning(f'CDF evaluates to {(x[0], x[-1])} at the lower and upper ' f'bounds, which is a bit off from the expected (0, 1). ' f'Used parameters: {params}.') # compute histogram hist = x[1:] - x[:-1] return hist def _normalize(self, params: dict) -> dict: """ Normalize parameters. :param params: Parameters of the parametrization :return: Normalized parameters """ # do nothing by default return params
[docs] def plot( self, params: dict, intervals: Sequence = np.array([-np.inf, -100, -10, -1, 0, 1, np.inf]), file: str = None, show: bool = True, title: str = 'discretized DFE', ax: 'plt.Axes' = None ) -> 'plt.Axes': """ Plot the discretized DFE. :param params: Parameters of the parametrization :param intervals: Intervals to use for discretization. :param file: File to save the plot to. :param show: Whether to show the plot. :param title: Title of the plot. :param ax: Axes to use for the plot. :return: Axes """ from .visualization import Visualization values = self._discretize(params, np.array(intervals), warn_mass=False) # check for nan values if np.isnan(values).any(): self._logger.warning(f'NaN values in discretized DFE. Are the parameters valid?') return Visualization.plot_discretized( values=[values], file=file, show=show, intervals=np.array(intervals), title=title, ax=ax )
[docs] def freeze( self, params: Dict[str, float] ) -> 'DFE': """ Freeze parameters to create a DFE object. :param params: Parameters of the parametrization :return: DFE object """ return DFE(model=self, params=params)
[docs] class GammaExpParametrization(Parametrization): r""" Parametrization for mixture of a gamma and exponential distribution. This corresponds to model C in polyDFE. We have the following probability density function: .. math:: \phi(S; S_d, b, p_b, S_b) = (1 - p_b) f_\Gamma(-S; S_d, b) \cdot \mathbf{1}_{\{S < 0\}} + p_b f_e(S; S_b) \cdot \mathbf{1}_{\{S \geq 0\}} where: * :math:`S_d` is the mean of the DFE for :math:`S < 0` * :math:`b` is the shape of the gamma distribution * :math:`p_b` is the probability that :math:`S \geq 0` * :math:`S_b` is the mean of the DFE for :math:`S \geq 0` * :math:`f_\Gamma(x; m, b)` is the density of the gamma distribution with mean :math:`m` and shape :math:`b` * :math:`f_e(x; m)` is the density of the exponential distribution with mean :math:`m` * :math:`\mathbf{1}_{\{A\}}` denotes the indicator function, which is 1 if :math:`A` is true, and 0 otherwise. The DFE has often been observed to be multi-modal for negative selection coefficients. A gamma distribution provides a good amount of flexibility to accommodate this. Note that all :math:`S` parameters are population-scaled selection coefficients, i.e. :math:`S = 4N_es`. """ #: Default initial parameters x0: Dict[str, float] = dict( S_d=-1000, b=0.4, p_b=0.05, S_b=1 ) #: Default parameter bounds, using non-zero lower bounds for S_d and S_b due to log-scaled scales bounds: Dict[str, Tuple[float, float]] = dict( S_d=(-1e5, -1e-2), b=(0.01, 10), p_b=(0, 0.5), S_b=(1e-4, 100) ) #: Scales over which to optimize the parameters scales: Dict[str, Literal['lin', 'log']] = dict( S_d='log', b='log', p_b='lin', S_b='log' ) #: The kind of submodels supported by holding some parameters fixed submodels: Dict[str, Dict[str, float]] = dict( full=dict(), dele=dict( p_b=0, S_b=1 ) )
[docs] @staticmethod def get_pdf(S_d: float, b: float, p_b: float, S_b: float, **kwargs) -> Callable[[np.ndarray], np.ndarray]: """ Get PDF. :param S_d: Mean selection coefficient for deleterious mutations :param b: Shape parameter for gamma distribution :param p_b: Probability of a beneficial mutation :param S_b: Mean selection coefficient for beneficial mutations :return: Function that accepts an array of selection coefficients and returns their probability density """ @Parametrization._accepts_scalars def pdf(S: np.ndarray) -> np.ndarray: """ The PDF. :param S: Selection coefficients :return: Probability density """ x = np.zeros_like(S, dtype=float) # The gamma distribution may approach infinity as S -> 0 # so in order to evaluate this function at s = 0, # we use--unlike polyDFE--the exponential mixture # distribution at this point. negative = S < 0 # positive S x[negative] = (1 - p_b) * gamma.pdf(-S[negative], b, scale=-S_d / b) # Allow for S_b == 0 when p_b == 0 as well # which would produce an error otherwise. if S_b == 0 and p_b == 0: x[~negative] = 0 else: # non-negative S x[~negative] = p_b * expon.pdf(S[~negative], scale=S_b) return x return pdf
[docs] @staticmethod def get_cdf(S_d: float, b: float, p_b: float, S_b: float, **kwargs) -> Callable[[np.ndarray], np.ndarray]: """ Get CDF. :param S_d: Mean selection coefficient for deleterious mutations :param b: Shape parameter for gamma distribution :param p_b: Probability of a beneficial mutation :param S_b: Mean selection coefficient for beneficial mutations :return: Function that accepts an array of selection coefficients and returns their cumulative probability """ @Parametrization._accepts_scalars def cdf(S: np.ndarray) -> np.ndarray: """ The CDF. :param S: Selection coefficients :return: Cumulative probability """ x = np.zeros_like(S, dtype=float) negative = S < 0 # positive S x[negative] = (1 - p_b) - ((1 - p_b) * gamma.cdf(-S[negative], b, scale=-S_d / b)) # Allow for S_b = 0 when p_b = 0 as well # which would produce an error otherwise. if S_b == 0 and p_b == 0: x[~negative] = 1 else: # non-negative S x[~negative] = (1 - p_b) + p_b * expon.cdf(S[~negative], scale=S_b) return x return cdf
[docs] class DisplacedGammaParametrization(Parametrization): r""" Parametrization for a reflected displaced gamma distribution. We have the following probability density function: .. math:: \phi(S; \hat{S}, b, S_{max}) = f_\Gamma(S_{max} - S; S_{max} - \hat{S}, b) \cdot \mathbf{1}_{\{S \leq S_{max}\}} where: * :math:`\hat{S}` is the mean of the DFE * :math:`b` is the shape of the gamma distribution * :math:`S_{max}` is the maximum value that :math:`S` can take * :math:`f_\Gamma(x; m, b)` is the density of the gamma distribution with mean :math:`m` and shape :math:`b` * :math:`\mathbf{1}_{\{A\}}` denotes the indicator function, which is 1 if :math:`A` is true, and 0 otherwise. This parametrization uses a single gamma distribution for both positive and negative selection coefficients. This is a less flexible parametrization, which may produce results similar to the other models while requiring fewer parameters. Note that all :math:`S` parameters are population-scaled selection coefficients, i.e. :math:`S = 4N_es`. .. warning:: This model does not allow for a purely deleterious sub-parametrization, so :meth:`Inference.compare_nested_models` won't work as expected. """ #: Default initial parameters x0: Dict[str, float] = dict( S_mean=-100, b=1, S_max=1 ) #: Default parameter bounds bounds: Dict[str, Tuple[float, float]] = dict( S_mean=(-100000, -0.01), b=(0.01, 10), S_max=(0.001, 100) ) #: Scales over which to optimize the parameters scales: Dict[str, Literal['lin', 'log']] = dict( S_mean='log', b='lin', S_max='log' ) #: The kind of submodels supported by holding some parameters fixed submodels: Dict[str, Dict[str, float]] = dict( full=dict(), dele=dict() )
[docs] @staticmethod def get_pdf(S_mean: float, b: float, S_max: float, **kwargs) -> Callable[[np.ndarray], np.ndarray]: """ Get PDF. :return: Function that accepts an array of selection coefficients and returns their probability density """ @Parametrization._accepts_scalars def pdf(S: np.ndarray) -> np.ndarray: """ The PDF. :param S: Selection coefficients :return: Probability density """ x = np.zeros_like(S, dtype=float) # calculate the probability density for S <= S_max is_lower = S <= S_max x[is_lower] = gamma.pdf(S_max - S[is_lower], b, scale=max(S_max - S_mean, 1e-16) / b) return x return pdf
[docs] @staticmethod def get_cdf(S_mean: float, b: float, S_max: float, **kwargs) -> Callable[[np.ndarray], np.ndarray]: """ Get CDF. :return: Function that accepts an array of selection coefficients and returns their cumulative probability """ @Parametrization._accepts_scalars def cdf(S: np.ndarray) -> np.ndarray: """ The CDF. :param S: Selection coefficients :return: Cumulative probability """ x = np.zeros_like(S, dtype=float) # calculate the cumulative probability for S <= S_max is_lower = S <= S_max x[is_lower] = 1 - gamma.cdf(S_max - S[is_lower], b, scale=max(S_max - S_mean, 1e-16) / b) # set the cumulative probability to 1 for S > S_max x[~is_lower] = 1 return x return cdf
[docs] class GammaDiscreteParametrization(Parametrization): r""" Parametrization for a mixture of a gamma and discrete distribution. This corresponds to polyDFE's model B. We have the following probability density function: .. math:: \phi(S; S_d, b, p_b, S_b) = (1 - p_b) f_{\Gamma}(S; S_d, b) \cdot \mathbf{1}_{\{S < 0\}} + p_b \cdot S_b \cdot \mathbf{1}_{\{0 \leq S \leq 1 / S_b\}} where: * :math:`S_d` is the mean of the DFE for :math:`S < 0` * :math:`b` is the shape of the gamma distribution * :math:`p_b` is the probability that :math:`S \geq 0` * :math:`S_b` is the shared selection coefficient of all positively selected mutations up to :math:`1/S_b` * :math:`f_\Gamma(x; m, b)` is the density of the gamma distribution with mean :math:`m` and shape :math:`b` This parametrization is similar to :class:`GammaExpParametrization`, but uses a constant mass for positive selection coefficients. The results should be rather similar in most cases. Note that all :math:`S` parameters are population-scaled selection coefficients, i.e. :math:`S = 4N_es`. """ #: Default initial parameters x0: Dict[str, float] = dict( S_d=-1000, b=0.4, p_b=0.05, S_b=1 ) #: default parameter bounds bounds: Dict[str, Tuple[float, float]] = dict( S_d=(-1e5, -1e-2), b=(0.01, 10), p_b=(0, 0.5), S_b=(1e-4, 100) ) #: scales over which to optimize the parameters scales: Dict[str, Literal['lin', 'log']] = dict( S_d='log', b='log', p_b='lin', S_b='log' ) #: The kind of submodels supported by holding some parameters fixed submodels: Dict[str, Dict[str, float]] = dict( full=dict(), dele=dict( p_b=0, S_b=1 ) )
[docs] @staticmethod def get_pdf(S_d: float, b: float, p_b: float, S_b: float, **kwargs) -> Callable[[np.ndarray], np.ndarray]: """ Get PDF. :param S_d: Mean of the DFE for S < 0 :param b: Shape of the gamma distribution :param p_b: Probability that S > 0 :param S_b: Shared selection coefficient of all positively selected mutations up to S_b :return: Function that accepts an array of selection coefficients and returns their probability density """ @Parametrization._accepts_scalars def pdf(S: np.ndarray) -> np.ndarray: """ The PDF. :param S: Selection coefficients :return: Probability density """ x = np.zeros_like(S, dtype=float) negative = S < 0 x[negative] = (1 - p_b) * gamma.pdf(-S[negative], b, scale=-S_d / b) equal_to_S_b = (0 <= S) & (S <= 1 / S_b) x[equal_to_S_b] = p_b * S_b # the density is 0 for all other cases, no need to explicitly set it return x return pdf
[docs] @staticmethod def get_cdf(S_d: float, b: float, p_b: float, S_b: float, **kwargs) -> Callable[[np.ndarray], np.ndarray]: """ Get CDF. :param S_d: Mean of the DFE for S < 0 :param b: Shape of the gamma distribution :param p_b: Probability that S > 0 :param S_b: Shared selection coefficient of all positively selected mutations up to S_b :return: Function that accepts an array of selection coefficients and returns their cumulative probability """ @Parametrization._accepts_scalars def cdf(S: np.ndarray) -> np.ndarray: """ The CDF. :param S: Selection coefficients :return: Cumulative probability """ x = np.zeros_like(S, dtype=float) negative = S < 0 x[negative] = (1 - p_b) - ((1 - p_b) * gamma.cdf(-S[negative], b, scale=-S_d / b)) within_S_b = (0 <= S) & (S <= 1 / S_b) x[within_S_b] = (1 - p_b) + p_b * S_b * S[within_S_b] greater_than_S_b = S > 1 / S_b x[greater_than_S_b] = 1 return x return cdf
[docs] class DiscreteParametrization(Parametrization): r""" Parametrization for a discrete distribution. This corresponds to polyDFE's model D. By default we use 6 bins, but this can be changed by passing a different array to the constructor. The resulting parameter names are :math:`S_1, S_2, \dots, S_k`, where :math:`k` is the number of bins. That is, the probability density function is given by: :math:`\phi(S; S_1, S_2, \dots, S_k) = \sum_{i=1}^{k} S_i/c_i \cdot \mathbf{1}_{\{S \in B_i\}}` such that :math:`\sum_{i=1}^{k} S_i = 1,` where :math:`B_i` and :math:`c_i` are interval :math:`i` and the width of interval :math:`i`, respectively. This parametrization has the advantage of not imposing a shape on the DFE. For a reasonably fine parametrization, the number of parameters is larger than those of the other models, however. We generally also observe larger confidence intervals for this parametrization, and the optimization procedure may well be less efficient as we have to re-normalize the parameters to make sure they sum up to 1. Note that all :math:`S` parameters are population-scaled selection coefficients, i.e. :math:`S = 4N_es`. """
[docs] def __init__( self, intervals: Sequence = np.array([-100000, -100, -10, -1, 0, 1, 1000]), bounds: Optional[Dict[str, Tuple[float, float]]] = None, scales: Optional[Dict[str, Literal['lin', 'log']]] = None, x0: Optional[Dict[str, float]] = None ): """ Constructor. :param intervals: The intervals of the discrete distribution. :param bounds: Custom parameter bounds merging with defaults :param scales: Custom scales merging with defaults :param x0: Custom initial parameters merging with defaults """ super().__init__() # deprecation warning self._logger.warning( 'DiscreteParametrization is deprecated. Use DiscreteFractionalParametrization instead, which ' 'has better optimization properties.' ) #: Intervals self.intervals: np.ndarray = np.concatenate(([-np.inf], intervals, [np.inf])) #: Interval sizes self.interval_sizes: np.ndarray = self.intervals[1:] - self.intervals[:-1] #: Number of intervals, including the two infinite ones self.k: int = self.intervals.shape[0] - 1 #: All parameter names, including the fixed ones self.params: np.ndarray = np.array([f"S{i}" for i in range(self.k)]) #: Parameter names that are not fixed self.param_names: List[str] = self.params[1:-1].tolist() #: Fixed parameters self.fixed_params: Dict[str, int] = {self.params[0]: 0, self.params[-1]: 0} #: Default initial parameters self.x0: Dict[str, float] = dict((p, 1 / (self.k - 2)) for p in self.param_names) | (x0 or {}) #: Default parameter bounds self.bounds: Dict[str, Tuple[float, float]] = dict((p, (0, 1)) for p in self.param_names) | (bounds or {}) #: Scales # noinspection all self.scales: Dict[str, Literal['lin', 'log']] = dict((p, 'lin') for p in self.param_names) | (scales or {}) #: Submodels self.submodels: Dict[str, Dict[str, float]] = dict( full=dict(), dele=dict((p, 0) for p in self.params[1:-1][self.intervals[1:-2] >= 0]) )
def _normalize(self, params: dict) -> dict: """ Add params for boundaries and normalize so that the parameters sum up to 1. :param params: Parameter values :return: Dict of normalized values plus remaining parameters """ # make sure we only include parameter used for this parametrization filtered = dict((k, v) for k, v in params.items() if k in self.param_names) # sum up s = np.sum(list(filtered.values())) # normalize and include remaining parameters return params | dict((k, v / s) for k, v in filtered.items())
[docs] def get_pdf(self, **kwargs) -> Callable[[np.ndarray], np.ndarray]: """ Get PDF. :param kwargs: Parameter values S1, S2, ..., Sk :return: Function that computes the PDF for given S values """ # normalize and add boundary parameters values = self._normalize(kwargs) | self.fixed_params @Parametrization._accepts_scalars def pdf(S: np.ndarray) -> np.ndarray: """ The PDF. :param S: Selection coefficients :return: Probability density """ x = np.zeros_like(S, dtype=float) # iterate over parameters and assign values for i, p in enumerate(self.params): x[(self.intervals[i] < S) & (S <= self.intervals[i + 1])] = values[p] / self.interval_sizes[i] return x return pdf
[docs] def get_cdf(self, **kwargs) -> Callable[[np.ndarray], np.ndarray]: """ Get CDF. :param kwargs: Parameter values S1, S2, ..., Sk :return: Function that computes the CDF for given S values """ # normalize and add boundary parameters values = self._normalize(kwargs) | self.fixed_params # convert to numpy arrays intervals = np.array(self.intervals) interval_sizes = np.array(self.interval_sizes) # convert to ordered array values vals = np.array([values[self.params[i]] for i in range(self.k)]) @Parametrization._accepts_scalars def cdf(S: np.ndarray) -> np.ndarray: """ The CDF. :param S: Selection coefficients :return: Cumulative probability """ # iterate over parameters and assign values cum = np.cumsum([values[self.params[i]] for i in range(self.k)]) # obtain bin indices i = np.sum(self.intervals[:, None] <= S[None, :], axis=0) - 1 # make sure we don't go out of bounds which can happen if S is np.inf i[i >= self.k] = self.k - 1 # cumulative probability up to previous bin cum_prev = cum[np.maximum(i - 1, np.zeros_like(S, dtype=int))] with warnings.catch_warnings(): warnings.simplefilter("ignore", RuntimeWarning) # probability in current bin cum_within = np.abs(intervals[i] - S) / interval_sizes[i] * vals[i] cum_within[np.isnan(cum_within)] = 0 # return cumulative probability return cum_prev + cum_within return cdf
[docs] class DiscreteFractionalParametrization(Parametrization): r""" Same model as :class:`DiscreteParametrization`, but re-parametrized by :math:`\hat{S}_1, \hat{S}_2, \dots, \hat{S}_{k-1}`, so that the mass in the ith interval :math:`S_i` is determined by the sum of masses to the left, i.e. :math:`S_i = \hat{S}_i \sum_{j<i} S_j, i = 1, \dots, k - 1`, :math:`S_k = 1 - \sum_{j=1}^{k-1} S_j`. This parametrization has the advantage of not imposing a shape on the DFE. For a reasonably fine parametrization, the number of parameters is larger than those of the other models, however. It is more easily optimized than :class:`DiscreteParametrization` as it has one parameter less but its parameters are more difficult to interpret. One disadvantage with discrete parametrizations is that there may be `gaps` in the estimated DFE. Note that all :math:`S` parameters are population-scaled selection coefficients, i.e. :math:`S = 4N_es`. """
[docs] def __init__( self, intervals: Sequence = np.array([-100000, -100, -10, -1, 0, 1, 1000]), bounds: Optional[Dict[str, Tuple[float, float]]] = None, scales: Optional[Dict[str, Literal['lin', 'log']]] = None, x0: Optional[Dict[str, float]] = None ): """ Constructor. :param intervals: The intervals of the discrete distribution. :param bounds: Custom parameter bounds merging with defaults :param scales: Custom scales merging with defaults :param x0: Custom initial parameters merging with defaults """ super().__init__() #: Intervals self.intervals: np.ndarray = np.concatenate(([-np.inf], intervals, [np.inf])) #: Interval sizes self.interval_sizes: np.ndarray = self.intervals[1:] - self.intervals[:-1] #: Number of intervals, including the two infinite ones self.k: int = self.intervals.shape[0] - 1 #: All parameter names, including fixed parameters self.params: np.ndarray = np.array([f"S{i}" for i in range(self.k)]) #: Parameter names that are not fixed self.param_names: List[str] = self.params[1:-2].tolist() #: Fixed parameters self.fixed_params = {self.params[0]: 0, self.params[-2]: 1, self.params[-1]: 0} #: Default initial parameters self.x0: Dict[str, float] = self.to_fractional(dict((p, 1 / (self.k - 2)) for p in self.param_names)) | ( x0 or {}) #: Default parameter bounds self.bounds: Dict[str, Tuple[float, float]] = dict((p, (0, 1)) for p in self.param_names) | (bounds or {}) #: Scales # noinspection all self.scales: Dict[str, Literal['lin', 'log']] = dict((p, 'lin') for p in self.param_names) | (scales or {}) #: Submodels self.submodels: Dict[str, Dict[str, float]] = dict( full=dict(), dele=dict((p, 1) for p in self.params[1:-2][self.intervals[2:-2] >= 0]) )
[docs] def to_nominal(self, params: Dict[str, float]) -> Dict[str, float]: """ Convert representation of fraction of total mass to the left to representation of fractions which sum to 1. :param params: Parameter values S1, S2, ..., Sk :return: Converted parameters """ # converted parameters converted = params.copy() # cumulative sum up to the previous parameter mass = 0 # take mass of current bin to be fraction of mass assigned to the previous bins for p in self.params[1:-2]: converted[p] = params[p] * (1 - mass) mass += converted[p] # assign remaining mass to last bin converted[self.params[-2]] = 1 - mass return converted
[docs] def to_fractional(self, params: Dict[str, float]) -> Dict[str, float]: """ Invert the to_nominal operation: Convert representation of fractions which sum to 1 back to representation of fraction of total mass to the left. :param params: Converted parameters (nominal space) :return: Original parameters (parameter space) """ # converted parameters converted = params.copy() # cumulative sum up to the previous parameter mass = 0 # Iterate over parameters for p in self.params[1:-2]: # Calculate original parameter value converted[p] = params[p] / (1 - mass) # update cumulative sum mass += params[p] # last parameter is simply what's left converted[self.params[-2]] = 1 - mass return converted
[docs] def get_pdf(self, **kwargs) -> Callable[[np.ndarray], np.ndarray]: """ Get PDF. :param kwargs: Parameter values S1, S2, ..., Sk-1 :return: Function that computes the PDF for given S values """ # normalize and add boundary parameters values = self.to_nominal(kwargs | self.fixed_params) @Parametrization._accepts_scalars def pdf(S: np.ndarray) -> np.ndarray: """ The PDF. :param S: Selection coefficients :return: Probability density """ x = np.zeros_like(S, dtype=float) # iterate over parameters and assign values for i, p in enumerate(self.params): x[(self.intervals[i] < S) & (S <= self.intervals[i + 1])] = values[p] / self.interval_sizes[i] return x return pdf
[docs] def get_cdf(self, **kwargs) -> Callable[[np.ndarray], np.ndarray]: """ Get CDF. :param kwargs: Parameter values S1, S2, ..., Sk-1 :return: Function that computes the CDF for given S values """ # normalize and add boundary parameters values = self.to_nominal(kwargs | self.fixed_params) # convert to numpy arrays intervals = np.array(self.intervals) interval_sizes = np.array(self.interval_sizes) # convert to ordered array values vals = np.array([values[self.params[i]] for i in range(self.k)]) @Parametrization._accepts_scalars def cdf(S: np.ndarray) -> np.ndarray: """ The CDF. :param S: Selection coefficients :return: Cumulative probability """ # iterate over parameters and assign values cum = np.cumsum([values[self.params[i]] for i in range(self.k)]) # obtain bin indices i = np.sum(self.intervals[:, None] <= S[None, :], axis=0) - 1 # make sure we don't go out of bounds which can happen if S is np.inf i[i >= self.k] = self.k - 1 # cumulative probability up into previous bin cum_prev = cum[np.maximum(i - 1, np.zeros_like(S, dtype=int))] with warnings.catch_warnings(): warnings.simplefilter("ignore", RuntimeWarning) # probability in current bin cum_within = np.abs(intervals[i] - S) / interval_sizes[i] * vals[i] cum_within[np.isnan(cum_within)] = 0 # return cumulative probability return cum_prev + cum_within return cdf
[docs] class DFE(Serializable): """ Frozen parametrization of a DFE. Wraps a :class:`Parametrization` instance together with fixed parameter values. """
[docs] def __init__( self, params: dict, model: Parametrization = None, bootstraps: Optional[pd.DataFrame] = None ): """ :param params: Frozen parameter values. :param model: Parametrization instance used to generate the DFE. Defaults to :class:`GammaExpParametrization`. :param bootstraps: Optional bootstrap DataFrame (same format as AbstractInference.bootstraps). """ #: Underlying parametrization model self.model: Parametrization = model or GammaExpParametrization() #: Frozen parameters self.params: Dict[str, float] = params #: Optional bootstrap samples self.bootstraps: Optional[pd.DataFrame] = bootstraps
[docs] def get_bootstrap_dfes(self) -> List['DFE']: """ Get DFEs for all bootstrap samples. :return: List of DFE instances for each bootstrap sample. """ if self.bootstraps is None: return [] dfes = [] for _, row in self.bootstraps.iterrows(): params = row.to_dict() dfes.append(DFE(model=self.model, params=params)) return dfes
[docs] def pdf(self, S: np.ndarray | float) -> np.ndarray | float: """ Evaluate the frozen PDF. :param S: Selection coefficient(s) :return: Probability density """ return self.model.get_pdf(**self.params)(S)
[docs] def cdf(self, S: np.ndarray | float) -> np.ndarray | float: """ Evaluate the frozen CDF. :param S: Selection coefficient(s) :return: Cumulative probability """ return self.model.get_cdf(**self.params)(S)
[docs] def discretize( self, bins: np.ndarray, warn_mass: bool = True, confidence_intervals: bool = True, ci_level: float = 0.05, bootstrap_type: Literal['percentile', 'bca'] = 'percentile', point_estimate: Literal['original', 'mean', 'median'] = 'mean' ) -> Tuple[np.ndarray, Optional[np.ndarray]]: """ Discretize the DFE with optional bootstrap confidence intervals. :param bins: Intervals to use for discretization. :param warn_mass: Whether to warn if mass is lost during discretization. :param confidence_intervals: Whether to compute confidence intervals. :param ci_level: Confidence interval level (e.g., 0.05 for 95% CI). :param bootstrap_type: Type of bootstrap confidence intervals ('percentile' or 'bca'). :param point_estimate: Whether to use 'original' MLE values, 'mean' or 'median' of bootstraps as point estimate. :return: Center values and (optionally) errors for each bin. """ from .abstract_inference import Inference # no bootstraps -> plain histogram if not confidence_intervals or self.bootstraps is None: return self.model._discretize(self.params, bins, warn_mass=warn_mass), None return Inference.get_stats_discretized( params=self.params, bootstraps=self.bootstraps, model=self.model, ci_level=ci_level, intervals=bins, bootstrap_type=bootstrap_type, point_estimate=point_estimate )[:2]
[docs] @staticmethod def plot_many( dfes: Sequence['DFE'], labels: Optional[Sequence[str]] = None, intervals=np.array([-np.inf, -100, -10, -1, 0, 1, np.inf]), file=None, show=True, title="discretized DFE", ax=None, confidence_intervals: bool = True, ci_level: float = 0.05, bootstrap_type: Literal['percentile', 'bca'] = 'percentile', point_estimate: Literal['original', 'mean', 'median'] = 'mean', kwargs_legend: dict = dict(prop=dict(size=8)), ): """ Plot several DFE instances in a single discretized plot. :param dfes: Sequence of DFE instances to plot. :param labels: Labels for the DFEs. :param intervals: Intervals to use for discretization. :param file: File to save the plot to. :param show: Whether to show the plot. :param title: Title of the plot. :param ax: Axes to use for the plot. :param confidence_intervals: Whether to plot confidence intervals. :param ci_level: Confidence interval level (e.g., 0.05 for 95% CI). :param bootstrap_type: Type of bootstrap confidence intervals ('percentile' or 'bca'). :param point_estimate: Whether to use 'original' MLE values, 'mean' or 'median' of bootstraps as point estimate. :param kwargs_legend: Additional keyword arguments for the legend. :return: Matplotlib Axes object. """ from .visualization import Visualization intervals = np.array(intervals) values = [] errors = [] for d in dfes: vals, errs = d.discretize( bins=intervals, confidence_intervals=confidence_intervals, ci_level=ci_level, bootstrap_type=bootstrap_type, point_estimate=point_estimate ) values.append(vals) errors.append(errs) return Visualization.plot_discretized( ax=ax, values=values, errors=errors, labels=labels, file=file, show=show, intervals=intervals, title=title, kwargs_legend=kwargs_legend )
[docs] def plot( self, intervals=np.array([-np.inf, -100, -10, -1, 0, 1, np.inf]), file=None, show=True, title="discretized DFE", ax=None, confidence_intervals: bool = True, ci_level: float = 0.05, bootstrap_type: Literal['percentile', 'bca'] = 'percentile', point_estimate: Literal['original', 'mean', 'median'] = 'mean', kwargs_legend: dict = dict(prop=dict(size=8)) ): """ Plot this DFE instance in a discretized plot. :param intervals: Intervals to use for discretization. :param file: File to save the plot to. :param show: Whether to show the plot. :param title: Title of the plot. :param ax: Axes to use for the plot. :param confidence_intervals: Whether to plot confidence intervals. :param ci_level: Confidence interval level (e.g., 0.05 for 95% CI). :param bootstrap_type: Type of bootstrap confidence intervals ('percentile' or 'bca'). :param point_estimate: Whether to use 'original' MLE values, 'mean' or 'median' of bootstraps as point estimate. :param kwargs_legend: Additional keyword arguments for the legend. :return: Matplotlib Axes object. """ return DFE.plot_many( dfes=[self], intervals=intervals, file=file, show=show, title=title, ax=ax, confidence_intervals=confidence_intervals, ci_level=ci_level, bootstrap_type=bootstrap_type, point_estimate=point_estimate, kwargs_legend=kwargs_legend )