Source code for fastdfe.simulation

"""
Simulate an SFS given a DFE.
"""

__author__ = "Janek Sendrowski"
__contact__ = "sendrowski.janek@gmail.com"
__date__ = "2024-09-06"

import logging
from typing import Tuple, Literal, Sequence, Callable

import numpy as np
from scipy.stats import hypergeom
from tqdm import tqdm

from .base_inference import BaseInference
from .discretization import Discretization
from .parametrization import Parametrization, DFE
from .parametrization import _from_string
from .spectrum import Spectrum, Spectra

logger = logging.getLogger('fastdfe')


[docs] class Simulation: """ Simulate an SFS under selection given a DFE and a neutral SFS. Example usage: :: import fastdfe as fd # create simulation object by specifying neutral SFS and DFE sim = fd.Simulation( sfs_neut=fd.Simulation.get_neutral_sfs(n=20, n_sites=1e8, theta=1e-4), params=dict(S_d=-300, b=0.3, p_b=0.1, S_b=0.1), model=fd.GammaExpParametrization() ) # perform the simulation sim.run() # plot spectra sim.get_spectra().plot() """
[docs] def __init__( self, sfs_neut: Spectrum, params: dict = None, model: Parametrization | str = 'GammaExpParametrization', intervals_del: Tuple[float, float, int] = (-1.0e+8, -1.0e-5, 1000), intervals_ben: Tuple[float, float, int] = (1.0e-5, 1.0e4, 1000), h_callback: Callable[[np.ndarray], np.ndarray] = None, integration_mode: Literal['midpoint', 'quad'] = 'midpoint', linearized: bool = True, discretization: Discretization = None, parallelize: bool = True ): """ Create a simulation object. :param sfs_neut: Neutral SFS. This sfs is informative on the population sample size, population mutation rate, the number of sites, and demography. :func:`get_neutral_sfs` can be used to obtain a neutral SFS. :param params: Parameters of the DFE parametrization (see model) in addition to ancestral misidentification params ``eps`` and dominance ``h``. By default, `eps=0`, `h=0.5`, the default parameters of ``model`` will be used. :param model: DFE parametrization model :param intervals_del: ``(start, stop, n_interval)`` for deleterious population-scaled selection coefficients. The intervals will be log10-spaced. Decreasing the number of intervals to ``100`` provides nearly identical results while increasing speed, especially when precomputing across dominance coefficients. :param intervals_ben: Same as ``intervals_del`` but for positive selection coefficients. Decreasing the number of intervals to ``100`` provides nearly identical results while increasing speed, especially when precomputing across dominance coefficients. :param h_callback: A function mapping the scalar parameter ``h`` and the array of selection coefficients ``S`` to dominance coefficients of the same shape, allowing models where ``h`` depends on ``S``. The default is ``lambda h, S: np.full_like(S, h)``, keeping ``h`` constant. Expected allele counts for a given dominance value are obtained by linear interpolation between precomputed values in ``intervals_h``. The inferred parameter is still named ``h``, even if transformed by ``h_callback``, and its bounds, scales, and initial values can be set via ``bounds``, ``scales``, and ``x0``. The fitness of heterozygotes and mutation homozygotes is defined as ``1 + 2hs`` and ``1 + 2s``, respectively. :param integration_mode: Integration mode when computing expected SFS under semidominance. ``quad`` is not recommended. :param linearized: Whether to discretize and cache the linearized integral mapping DFE to SFS or use ``scipy.integrate.quad`` in each call. ``False`` not recommended. :param parallelize: Whether to parallelize computations """ #: The DFE parametrization self.model: Parametrization = _from_string(model) #: Parameters for the DFE parametrization self.params: dict = self.model.x0 if params is None else params if 'eps' not in self.params: self.params['eps'] = 0 if 'h' not in self.params: self.params['h'] = 0.5 # check if parameters are within bounds self._check_bounds() #: Neutral SFS self.sfs_neut: Spectrum = sfs_neut #: Modelled SFS self.sfs_sel: Spectrum = None #: DFE instance self.dfe: DFE = DFE(model=self.model, params=self.params) #: SFS sample size self.n: int = self.sfs_neut.n #: Population mutation rate self.theta: float = self.sfs_neut.theta #: Number of sites self.n_sites: float = self.sfs_neut.n_sites if discretization is None: # create discretization instance #: Discretization instance self.discretization: Discretization = Discretization( n=self.n, h=self.params['h'], intervals_del=intervals_del, intervals_ben=intervals_ben, h_callback=h_callback, integration_mode=integration_mode, linearized=linearized, parallelize=parallelize ) else: # otherwise assign instance self.discretization: Discretization = discretization #: Logger self._logger = logger.getChild('Simulation')
[docs] def run(self) -> Spectrum: """ Simulate an SFS given a DFE. :return: Simulated SFS """ self._logger.info("Simulating SFS under selection using parameters: " + ', '.join( [f"{k}={v:.4g}" for k, v in self.params.items()] )) # obtain modelled selected SFS counts_modelled = self.discretization.model_selection_sfs(self.model, self.params) # adjust for mutation rate and mutational target size counts_modelled *= self.theta * self.n_sites # add contribution of demography and adjust polarization error counts_modelled = BaseInference._add_demography(self.sfs_neut, counts_modelled, eps=self.params['eps']) self.sfs_sel = Spectrum([self.n_sites - sum(counts_modelled)] + list(counts_modelled) + [0]) return self.sfs_sel
def _check_bounds(self): """ Check if the specified parameters are within the bounds of the DFE parametrization. """ for k, v in self.params.items(): if k in self.model.bounds and not self.model.bounds[k][0] <= v <= self.model.bounds[k][1]: raise ValueError( f"Parameter {k}={v} is out of bounds {self.model.bounds[k]} " f"for model {self.model.__class__.__name__}." ) if not 0 <= self.params['eps'] <= 1: raise ValueError(f"Ancestral misidentification parameter eps={self.params['eps']} is out of bounds [0, 1].") if not 0 <= self.params['h'] <= 1: raise ValueError(f"Dominance parameter h={self.params['h']} is out of bounds [0, 1].")
[docs] def get_spectra(self) -> Spectra: """ Get the spectra used in the simulation. :return: Tuple of neutral SFS and selected SFS """ return Spectra.from_spectra({ "neutral": self.sfs_neut, "selected": self.sfs_sel })
[docs] def get_alpha(self) -> float: """ Get the proportion of adaptive substitutions (alpha) under the specified DFE and parameters. :return: Proportion of adaptive substitutions """ return self.discretization.get_alpha(self.model, self.params)
[docs] @staticmethod def get_neutral_sfs( theta: float, n_sites: float, n: int, r: Sequence[float] = None ) -> Spectrum: """ Obtain a standard neutral SFS for a given theta and number of sites. :param theta: Population mutation rate :param n_sites: Number of total sites :param n: Number of frequency classes :param r: Nuisance parameters that account for demography. An array of length ``n-1`` whose elements are multiplied element-wise with the polymorphic counts of the Kingman SFS. By default, no demography effects are considered which is equivalent to ``r = [1] * (n-1)``. Note that non-default values of ``r`` will also affect estimates of the population mutation rate. :return: Neutral SFS """ return Spectrum.get_neutral( theta=theta, n_sites=n_sites, n=n, r=r )
[docs] def get_wright_fisher( self, pop_size: int, generations: int ) -> 'WrightFisherSimulation': """ Get a Wright-Fisher simulation object. :param pop_size: Effective population size :param generations: Number of generations to simulate :return: Wright-Fisher simulation object """ return WrightFisherSimulation( params=self.params, model=self.model, sfs_neut=self.sfs_neut, pop_size=pop_size, n_generations=generations, )
class WrightFisherSimulation: # pragma: no cover """ Simulate an SFS under selection given a DFE under the Wright-Fisher model. Example usage: :: import fastdfe as fd # create simulation object by specifying neutral SFS and DFE sim = fd.simulation.WrightFisherSimulation( sfs_neut=fd.Simulation.get_neutral_sfs(n=20, n_sites=1e7, theta=1e-4), params=dict(S_d=-300, b=0.3, p_b=0.1, S_b=0.1), model=fd.GammaExpParametrization(), pop_size=100, n_generations=500 ) # perform the simulation sfs_sel = sim.run() # plot SFS sfs_sel.plot() """ def __init__( self, params: dict = None, model: Parametrization | str = 'GammaExpParametrization', sfs_neut: Spectrum = None, pop_size: int = 1000, n_generations: int = 100, n_sites: int = None, theta: float = None, parallelize: bool = True, n_threads: int = 100 ): """ Create a Wright-Fisher simulation object. :param params: Parameters for the DFE parametrization (see model). By default, the default parameters of ``model`` will be used. :param model: DFE parametrization model :param sfs_neut: Neutral SFS. This sfs is informative on the population sample size, population mutation rate, the number of sites, and demography. :func:`get_neutral_sfs` can be used to obtain a neutral SFS. :param pop_size: Effective population size :param n_generations: Number of generations to simulate :param n_sites: Number of sites in the simulated SFS. If not provided, the number of sites in ``sfs_neut`` will be used. :param theta: Population mutation rate. If not provided, the population mutation rate in ``sfs_neut`` will be used. :param parallelize: Whether to parallelize computations :param n_threads: Number of threads to use for parallelization """ #: The DFE parametrization self.model: Parametrization = _from_string(model) #: Parameters for the DFE parametrization self.params: dict = self.model.x0 if params is None else params #: Neutral SFS self.sfs_neut: Spectrum = sfs_neut #: SFS sample size self.n: int = self.sfs_neut.n #: Population mutation rate self.theta: float = self.sfs_neut.theta if theta is None else theta #: Number of sites self.n_sites: int = self.sfs_neut.n_sites if n_sites is None else n_sites #: Effective population size self.pop_size: int = pop_size #: Number of generations to simulate self.n_generations: int = n_generations #: CDF of the DFE self.cdf: Callable[[np.ndarray], np.ndarray] = self.model.get_cdf(**self.params) #: Logger self._logger = logger.getChild('WrightFisherSimulation') #: Whether to parallelize computations self.parallelize: bool = parallelize #: Number of threads to use for parallelization self.n_threads: int = n_threads #: Tolerance for CDF bounds self.tol_bounds = 1e-6 #: Tolerance for bisection self.tol_bisect = 1e-6 #: Bounds for the CDF self.lower, self.upper = self._determine_cdf_bounds() def _determine_cdf_bounds(self) -> Tuple[float, float]: """ Determine bounds for the CDF by identifying where the CDF approaches 0 and 1. :return: Lower and upper bounds for the CDF. """ lower = -1e-4 upper = 1e-4 # Expand lower bound until CDF is close to 0 while self.cdf(lower) > self.tol_bounds: lower *= 2 # Expand upper bound until CDF is close to 1 while self.cdf(upper) < 1 - self.tol_bounds: upper *= 2 self._logger.debug(f"Bounded CDF between {lower} and {upper}.") return lower, upper def run(self) -> Spectrum: """ Simulate an SFS using the Wright-Fisher model with per-individual, per-locus mutations and selection. :return: Simulated SFS. """ rng = np.random.default_rng() # population mutation rate for all loci theta = int(self.sfs_neut.theta * self.n_sites / 2) # fixed total of mutations n_mut = int(self.n_generations * theta) self._logger.info(f'Pre-sampling selection coefficients for {n_mut} mutations.') # pre-sample selection coefficients for mutations selection_coefficients = self._sample_cdf( n=n_mut, desc="Number of bisections" ) / self.pop_size # log number of selection coefficients lower than -1 if (n_low := (selection_coefficients < -1).sum()) > 0: self._logger.debug( f"Number of selection coefficients lower than -1: {n_low}." ) # selection coefficients per individual and locus s = np.zeros((self.pop_size, n_mut)) # initial allele frequencies per individual and locus freqs = np.zeros((self.pop_size, n_mut), dtype=np.int8) # pre-sample individuals for mutations across all generations individuals = rng.choice(self.pop_size, size=(self.n_generations, theta), replace=True) for g in tqdm(range(self.n_generations), desc="Simulating generations"): # indices for mutations i_muts = np.arange(g * theta, (g + 1) * theta) # introduce mutations under assumption of infinite sites freqs[individuals[g], i_muts] = 1 s[individuals[g], i_muts] = selection_coefficients[i_muts] segregating = (freqs.sum(axis=0) > 0) & (freqs.sum(axis=0) < self.pop_size) n_segregating = segregating.sum() # selection w = 1 + s[:, segregating] probs = w / w.sum(axis=0) # selection probabilities u = rng.random((self.pop_size, n_segregating)) # Compute cumulative sums of probabilities along rows cumsum_probs = np.cumsum(probs, axis=0) idx = (cumsum_probs[:, None, :] < u).sum(axis=0) if g % 100 == 0: self._logger.debug(f"Number of mutations at generation {g}: {freqs.any(axis=0).sum()}") freqs[:, segregating] = freqs[:, segregating][idx, np.arange(n_segregating)] # vectorize the hypergeometric PMF calculation values = hypergeom.pmf(k=np.arange(self.n + 1)[:, None], M=self.pop_size, n=freqs.sum(axis=0), N=self.n) sfs = Spectrum(values.sum(axis=1)) sfs.data[0] = self.n_sites - sfs.n_polymorphic sfs.data[-1] = 0 return sfs def _sample_cdf( self, n: int, seed: int = None, tolerance: float = 1e-13, desc: str = "Bisection" ) -> np.ndarray: """ Sample the cdf of the dfe with a fully vectorized bisection method. :param n: Number of samples to draw :param seed: Random seed :param tolerance: Tolerance for the bisection method :return: Array of values x such that cdf(x) = u """ # generate uniform samples samples = np.random.default_rng(seed=seed).uniform(0, 1, n) # initialize bounds lower = np.full(n, self.lower, dtype=float) upper = np.full(n, self.upper, dtype=float) pbar = tqdm(desc=desc) while np.mean(upper - lower) > tolerance: # compute midpoint mid = (lower + upper) / 2.0 # compute cdf at midpoint for all samples cdf_mid = self.cdf(mid) # update bounds vectorized lower_update = cdf_mid < samples lower = np.where(lower_update, mid, lower) upper = np.where(~lower_update, mid, upper) pbar.update(1) pbar.close() # return final approximated x values return (lower + upper) / 2.0