Source code for moments.LD.LDstats_mod

import logging

logging.basicConfig()
logger = logging.getLogger("LDstats_mod")

import os, sys
import numpy, numpy as np
import copy
import demes
import warnings

from . import Numerics, Util
import moments.Demes.Demes


[docs] class LDstats(list): """ Represents linkage disequilibrium statistics as a list of arrays, where each entry in the list is an array of statistics for a corresponding recombination rate. The final entry in the list is always the heterozygosity statistics. Thus, if we have an LDstats object for 3 recombination rate values, the list will have length 4. LDstats are represented as a list of statistics over two locus pairs for a given recombination distance. :param data: A list of LD and heterozygosity stats. :type data: list of arrays :param num_pops: Number of populations. For one population, higher order statistics may be computed. :type num_pops: int :param pop_ids: Population IDs in order that statistics are represented here. :type pop_ids: list of strings, optional """ def __new__(self, data, num_pops=None, pop_ids=None): if num_pops == None: raise ValueError("Specify number of populations as num_pops=.") my_list = super(LDstats, self).__new__(self, data, num_pops=None, pop_ids=None) if hasattr(data, "num_pops"): my_list.num_pops = data.num_pops else: my_list.num_pops = num_pops if hasattr(data, "pop_ids"): my_list.pop_ids = data.pop_ids else: my_list.pop_ids = pop_ids return my_list def __init__(self, *args, **kwargs): if len(args) == 1 and hasattr(args[0], "__iter__"): list.__init__(self, args[0]) else: list.__init__(self, args) self.__dict__.update(kwargs) def __call__(self, **kwargs): self.__dict__.update(kwargs) return self # See http://www.scipy.org/Subclasses for information on the # __array_finalize__ and __array_wrap__ methods. I had to do some debugging # myself to discover that I also needed _update_from. # Also, see http://docs.scipy.org/doc/numpy/reference/arrays.classes.html # Also, see http://docs.scipy.org/doc/numpy/user/basics.subclassing.html # # We need these methods to ensure extra attributes get copied along when # we do arithmetic on the LD stats. def __array_finalize__(self, obj): if obj is None: return np.ma.masked_array.__array_finalize__(self, obj) self.num_pops = getattr(obj, "num_pops", "unspecified") self.pop_ids = getattr(obj, "pop_ids", "unspecified") def __array_wrap__(self, obj, context=None): result = obj.view(type(self)) result = np.ma.masked_array.__array_wrap__(self, obj, context=context) result.num_pops = self.num_pops result.pop_ids = self.pop_ids return result def _update_from(self, obj): np.ma.masked_array._update_from(self, obj) if hasattr(obj, "num_pops"): self.num_pops = obj.num_pops if hasattr(obj, "pop_ids"): self.pop_ids = obj.pop_ids # masked_array has priority 15. __array_priority__ = 20 def __repr__(self): ys = np.asanyarray(self[:-1]) h = self[-1] return "LDstats(%s, %s, num_pops=%s, pop_ids=%s)" % ( str(ys), str(h), str(self.num_pops), str(self.pop_ids), )
[docs] def names(self): """ Returns the set of LD and heterozygosity statistics names for the number of populations represented by the LDstats. Note that this will always return the full set of statistics, """ if self.num_pops == None: raise ValueError( "Number of populations must be specified (as stats.num_pops)" ) return Util.moment_names( self.num_pops ) # returns (ld_stat_names, het_stat_names)
[docs] def LD(self, pops=None): """ Returns LD stats for populations given (if None, returns all). :param pops: The indexes of populations to return stats for. :type pops: list of ints, optional """ if len(self) <= 1: raise ValueError("No LD statistics present") else: if pops is not None: to_marginalize = list(set(range(self.num_pops)) - set(pops)) Y_new = self.marginalize(to_marginalize) return np.array(Y_new[:-1]) else: return np.array(self[:-1])
[docs] def H(self, pops=None): """ Returns heterozygosity statistics for the populations given. :param pops: The indexes of populations to return stats for. :type pops: list of ints, optional """ if pops is not None: to_marginalize = list(set(range(self.num_pops)) - set(pops)) Y_new = self.marginalize(to_marginalize) return Y_new[-1] else: return self[-1]
[docs] def f2(self, X, Y): """ Returns :math:`f_2(X, Y) = (X-Y)^2`. X, and Y can be specified as population ID strings, or as indexes (but these cannot be mixed). :param X: One of the populations, as index or population ID. :param Y: The other population, as index or population ID. """ if type(X) is str: if type(Y) is not str: raise ValueError("X and Y must both be strings or both be indexes") if X not in self.pop_ids: raise ValueError(f"Population {X} not in pop_ids: {self.pop_ids}") if Y not in self.pop_ids: raise ValueError(f"Population {X} not in pop_ids: {self.pop_ids}") X = self.pop_ids.index(X) Y = self.pop_ids.index(Y) elif type(X) is int: if type(Y) is not int: raise ValueError("X and Y must both be strings or both be indexes") if X < 0 or Y < 0 or X >= self.num_pops or Y >= self.num_pops: raise ValueError("Population indexes out of bounds") else: raise ValueError("X and Y must both be strings or both be indexes") stats = self.names()[1] if X > Y: X, Y = Y, X H_X = self.H()[stats.index(f"H_{X}_{X}")] H_Y = self.H()[stats.index(f"H_{Y}_{Y}")] H_XY = self.H()[stats.index(f"H_{X}_{Y}")] return H_XY - H_X / 2 - H_Y / 2
[docs] def f3(self, X, Y, Z): """ Returns :math:`f_3(X; Y, Z) = (X-Y)(X-Z)`. A significantly negative :math:`f_3` of this form suggests that population X is the result of admixture between ancient populations related to Y and Z. A positive value suggests that X is an outgroup to Y and Z. X, Y, and Z can be specified as population ID strings, or as indexes (but these cannot be mixed). :param X: The "test" population, as index or population ID. :param Y: The first reference population, as index or population ID. :param Z: The second reference population, as index or population ID. """ if type(X) is str: if type(Y) is not str or type(Z) is not str: raise ValueError("Inputs must be all strings or all indexes") for _ in [X, Y, Z]: if _ not in self.pop_ids: raise ValueError(f"Population {_} not in pop_ids: {self.pop_ids}") X = self.pop_ids.index(X) Y = self.pop_ids.index(Y) Z = self.pop_ids.index(Z) elif type(X) is int: if type(Y) is not int or type(Z) is not int: raise ValueError("Inputs must be all strings or all indexes") if ( X < 0 or Y < 0 or Z < 0 or X >= self.num_pops or Y >= self.num_pops or Z >= self.num_pops ): raise ValueError("Population indexes out of bounds") else: raise ValueError("Inputs must be all strings or all indexes") stats = self.names()[1] H_XX = self.H()[stats.index(Util.map_moment(f"H_{X}_{X}"))] H_XY = self.H()[stats.index(Util.map_moment(f"H_{X}_{Y}"))] H_XZ = self.H()[stats.index(Util.map_moment(f"H_{X}_{Z}"))] H_YZ = self.H()[stats.index(Util.map_moment(f"H_{Y}_{Z}"))] return (H_XY + H_XZ - H_XX - H_YZ) / 2
[docs] def f4(self, X, Y, Z, W): """ Returns :math:`f_4(X, Y; Z, W) = (X-Y)(Z-W)`. X, Y, Z and W can be specified as population ID strings, or as indexes (but these cannot be mixed). :param X: A population index or ID. :param Y: A population index or ID. :param Z: A population index or ID. :param W: A population index or ID. :returns: Patterson's f4 statistic (pX-pY)*(pZ-pW). """ if type(X) is str: if type(Y) is not str or type(Z) is not str or type(W) is not str: raise ValueError("Inputs must be all strings or all indexes") for _ in [X, Y, Z, W]: if _ not in self.pop_ids: raise ValueError(f"Population {_} not in pop_ids: {self.pop_ids}") X = self.pop_ids.index(X) Y = self.pop_ids.index(Y) Z = self.pop_ids.index(Z) W = self.pop_ids.index(W) elif type(X) is int: if type(Y) is not int or type(Z) is not int or type(W) is not int: raise ValueError("Inputs must be all strings or all indexes") if ( X < 0 or Y < 0 or Z < 0 or W < 0 or X >= self.num_pops or Y >= self.num_pops or Z >= self.num_pops or W >= self.num_pops ): raise ValueError("Population indexes out of bounds") else: raise ValueError("Inputs must be all strings or all indexes") stats = self.names()[1] H_XW = self.H()[stats.index(Util.map_moment(f"H_{X}_{W}"))] H_XZ = self.H()[stats.index(Util.map_moment(f"H_{X}_{Z}"))] H_YW = self.H()[stats.index(Util.map_moment(f"H_{Y}_{W}"))] H_YZ = self.H()[stats.index(Util.map_moment(f"H_{Y}_{Z}"))] return (H_XW - H_XZ - H_YW + H_YZ) / 2
[docs] def H2(self, X, Y=None, phased=True): """ Note: the H2 name may change! This is sometimes called "D+". This is the statistics E[2*fAB*fab + 2*fAb*faB], which measures the probability of polymorphism between two sampled genome copies at two loci. This is closely related to pi2=p(1-p)q(1-q), which is inherently a four-haplotype statistic. Instead, H2 is a two-haplotype statistic, which can be measured with just a single diploid. In the one population case, it equals 4D^2+2Dz+4pi2. In the two population case, it equals 4D1*D2+Dz(1,2,2)+Dz(2,1,1)+4pi2(1,2,1,2). At steady state, the solution is theta**2 * (36 + 14*rho + rho**2) / (18 + 13*rho + rho**2). To compare to unphased data estimated assuming that double heterozygotes have equal probability of being in coupling or repulsion LD, the statistic equals D1*D2+1/2(Dz(1,2,2)+Dz(2,1,1)+4pi2. Note: This unphased case (setting phased=False) is only relevant to comparing to data where a single diploid individual exists from each population, and it is only needed for cross-population H2! """ if type(X) is str: if X not in self.pop_ids: raise ValueError(f"Population {X} not in pop_ids") X = self.pop_ids.index(X) if Y is None: Y = X else: if type(Y) is str: if Y not in self.pop_ids: raise ValueError(f"Population {Y} not in pop_ids") Y = self.pop_ids.index(Y) if X < 0 or X >= self.num_pops or Y < 0 or Y >= self.num_pops: raise ValueError("Population indexes out of bounds") if Y < X: X, Y = Y, X DD = self.names()[0].index(f"DD_{X}_{Y}") Dz0 = self.names()[0].index(f"Dz_{X}_{Y}_{Y}") Dz1 = self.names()[0].index(f"Dz_{Y}_{X}_{X}") pi2 = self.names()[0].index(f"pi2_{X}_{Y}_{X}_{Y}") data = self.LD() if phased: Dplus = 4 * data[:, DD] + data[:, Dz0] + data[:, Dz1] + 4 * data[:, pi2] else: Dplus = ( data[:, DD] + 1 / 2 * data[:, Dz0] + 1 / 2 * data[:, Dz1] + 4 * data[:, pi2] ) return Dplus
# demographic and manipulation functions
[docs] def split(self, pop_to_split, new_ids=None): """ Splits the population given into two child populations. One child population keeps the same index and the second child population is placed at the end of the list of present populations. If new_ids is given, we can set the population IDs of the child populations, but only if the input LDstats have population IDs available. :param pop_to_split: The index of the population to split. :type pop_to_split: int :param new_ids: List of child population names, of length two. :type new_ids: list of strings, optional """ if type(pop_to_split) is not int: raise ValueError("population to split must be an integer index") if pop_to_split < 0: raise ValueError("population to split must be a nonnegative index") if pop_to_split >= self.num_pops: raise ValueError("population to split is greater than maximum index") h = self[-1] ys = self[:-1] h_new = Numerics.split_h(h, pop_to_split, self.num_pops) ys_new = [] for y in ys: ys_new.append(Numerics.split_ld(y, pop_to_split, self.num_pops)) if self.pop_ids is not None and new_ids is not None: new_pop_ids = copy.copy(self.pop_ids) new_pop_ids[pop_to_split] = new_ids[0] new_pop_ids.append(new_ids[1]) else: new_pop_ids = None return LDstats( ys_new + [h_new], num_pops=self.num_pops + 1, pop_ids=new_pop_ids )
[docs] def swap_pops(self, pop0, pop1): """ Swaps pop0 and pop1 in the order of the population in the LDstats. :param pop0: The index of the first population to swap. :type pop0: int :param pop1: The index of the second population to swap. :type pop1: int """ if pop0 >= self.num_pops or pop1 >= self.num_pops or pop0 < 0 or pop1 < 0: raise ValueError("Invalid population number specified.") if pop0 == pop1: return self mom_list = Util.moment_names(self.num_pops) h_new = np.zeros(len(mom_list[-1])) if len(self) == 2: y_new = np.zeros(len(mom_list[0])) if len(self) > 2: ys_new = [np.zeros(len(mom_list[0])) for i in range(len(self) - 1)] pops_old = list(range(self.num_pops)) pops_new = list(range(self.num_pops)) pops_new[pop0] = pop1 pops_new[pop1] = pop0 d = dict(zip(pops_old, pops_new)) # swap heterozygosity statistics for ii, mom in enumerate(mom_list[-1]): pops_mom = [int(p) for p in mom.split("_")[1:]] pops_mom_new = [d.get(p) for p in pops_mom] mom_new = mom.split("_")[0] + "_" + "_".join([str(p) for p in pops_mom_new]) h_new[ii] = self[-1][mom_list[-1].index(Util.map_moment(mom_new))] # swap LD statistics if len(self) > 1: for ii, mom in enumerate(mom_list[0]): pops_mom = [int(p) for p in mom.split("_")[1:]] pops_mom_new = [d.get(p) for p in pops_mom] mom_new = ( mom.split("_")[0] + "_" + "_".join([str(p) for p in pops_mom_new]) ) if len(self) == 2: y_new[ii] = self[0][mom_list[0].index(Util.map_moment(mom_new))] elif len(self) > 2: for jj in range(len(self) - 1): ys_new[jj][ii] = self[jj][ mom_list[0].index(Util.map_moment(mom_new)) ] if self.pop_ids is not None: current_order = self.pop_ids new_order = [self.pop_ids[d[ii]] for ii in range(self.num_pops)] else: new_order = None if len(self) == 1: return LDstats([h_new], num_pops=self.num_pops, pop_ids=new_order) elif len(self) == 2: return LDstats([y_new, h_new], num_pops=self.num_pops, pop_ids=new_order) else: return LDstats(ys_new + [h_new], num_pops=self.num_pops, pop_ids=new_order)
[docs] def marginalize(self, pops): """ Marginalize over the LDstats, removing moments for given populations. :param pops: The index or list of indexes of populations to marginalize. :type pops: int or list of ints """ if hasattr(pops, "__len__") == False: pops = [pops] if self.num_pops == len(pops): raise ValueError("Marginalization would remove all populations.") # multiple pop indices to marginalize over names_from_ld, names_from_h = Util.moment_names(self.num_pops) names_to_ld, names_to_h = Util.moment_names(self.num_pops - len(pops)) y_new = [np.zeros(len(names_to_ld)) for i in range(len(self) - 1)] + [ np.zeros(len(names_to_h)) ] count = 0 for mom in names_from_ld: mom_pops = [int(p) for p in mom.split("_")[1:]] if len(np.intersect1d(pops, mom_pops)) == 0: for ii in range(len(y_new) - 1): y_new[ii][count] = self[ii][names_from_ld.index(mom)] count += 1 count = 0 for mom in names_from_h: mom_pops = [int(p) for p in mom.split("_")[1:]] if len(np.intersect1d(pops, mom_pops)) == 0: y_new[-1][count] = self[-1][names_from_h.index(mom)] count += 1 if self.pop_ids == None: return LDstats(y_new, num_pops=self.num_pops - len(pops)) else: new_ids = copy.copy(self.pop_ids) for ii in sorted(pops)[::-1]: new_ids.pop(ii) return LDstats(y_new, num_pops=self.num_pops - len(pops), pop_ids=new_ids)
## Admix takes two populations, creates new population with fractions f, 1-f ## Merge takes two populations (pop1, pop2) and merges together with fraction ## f from population pop1, and (1-f) from pop2 ## Pulse migrate again takes two populations, pop1 and pop2, and replaces fraction ## f from pop2 with pop1 ## In each case, we use the admix function, which appends a new population on the end ## In the case of merge, we use admix and then marginalize pop1 and pop2 ## in the case of pulse migrate, we use admix, and then swap new pop with pop2, and ## then marginalize pop2, so the new population takes the same position in pop_ids ## that pop2 was previously in
[docs] def admix(self, pop0, pop1, f, new_id="Adm"): """ Admixture between pop0 and pop1, given by indexes. f is the fraction contributed by pop0, so pop1 contributes 1-f. If new_id is not specified, the admixed population's name is 'Adm'. Otherwise, we can set it with new_id=new_pop_id. :param pop0: First population to admix. :type pop0: int :param pop1: Second population to admix. :type pop1: int :param f: The fraction of ancestry contributed by pop0, so pop1 contributes 1 - f. :type f: float :param new_id: The name of the admixed population. :type new_id: str, optional """ if ( self.num_pops < 2 or pop0 >= self.num_pops or pop1 >= self.num_pops or pop0 < 0 or pop1 < 0 ): raise ValueError("Improper usage of admix (wrong indices?).") if pop0 == pop1: raise ValueError("pop0 cannot equal pop1") if f < 0 or f > 1: raise ValueError("Admixture fraction must be between 0 and 1") Y_new = Numerics.admix(self, self.num_pops, pop0, pop1, f) if self.pop_ids is not None: new_pop_ids = self.pop_ids + [new_id] else: new_pop_ids = None return LDstats(Y_new, num_pops=self.num_pops + 1, pop_ids=new_pop_ids)
[docs] def merge(self, pop0, pop1, f, new_id="Merged"): """ Merger of populations pop0 and pop1, with fraction f from pop0 and 1-f from pop1. Places new population at the end, then marginalizes pop0 and pop1. To admix two populations and keep one or both, use pulse migrate or admix, respectively. :param pop0: First population to merge. :type pop0: int :param pop1: Second population to merge. :type pop1: int :param f: The fraction of ancestry contributed by pop0, so pop1 contributes 1 - f. :type f: float :param new_id: The name of the merged population. :type new_id: str, optional """ Y_new = self.admix(pop0, pop1, f, new_id=new_id) Y_new = Y_new.marginalize([pop0, pop1]) return Y_new
[docs] def pulse_migrate(self, pop0, pop1, f): """ Pulse migration/admixure event from pop0 to pop1, with fraction f replacement. We use the admix function above. We want to keep the original population names the same, if they are given in the LDstats object, so we use new_pop=self.pop_ids[pop1]. We admix pop0 and pop1 with fraction f and 1-f, then swap the new admixed population with pop1, then marginalize the original pop1. :param pop0: The index of the source population. :type pop0: int :param pop1: The index of the target population. :type pop1: int :param f: The fraction of ancestry contributed by the source population. :type f: float """ if ( self.num_pops < 2 or pop0 >= self.num_pops or pop1 >= self.num_pops or pop0 < 0 or pop1 < 0 ): raise ValueError("Improper usage of admix (wrong indices?).") if pop0 == pop1: raise ValueError("pop0 cannot equal pop1") if f < 0 or f > 1: raise ValueError("Admixture fraction must be between 0 and 1") if self.pop_ids is not None: Y_new = self.admix(pop0, pop1, f, new_id=self.pop_ids[pop1]) else: Y_new = self.admix(pop0, pop1, f) Y_new = Y_new.swap_pops(pop1, Y_new.num_pops - 1) Y_new = Y_new.marginalize(Y_new.num_pops - 1) return Y_new
# Make from_file a static method, so we can use it without an instance.
[docs] @staticmethod def from_file(fid, return_statistics=False, return_comments=False): """ Read LD statistics from file. :param fid: The file name to read from or an open file object. :type fid: str :param return_statistics: If true, returns statistics writen to file. :type return_statistics: bool, optional :param return_comments: If true, the return value is (y, comments), where comments is a list of strings containing the comments from the file (without #'s). :type return_comments: bool, optional """ newfile = False # Try to read from fid. If we can't, assume it's something that we can # use to open a file. if not hasattr(fid, "read"): newfile = True fid = open(fid, "r") line = fid.readline() # Strip out the comments comments = [] while line.startswith("#"): comments.append(line[1:].strip()) line = fid.readline() # Read the num pops and pop_ids, if given line_spl = line.split() num_pops = int(line_spl[0]) if len(line_spl) > 1: # get the pop_ids pop_ids = line_spl[1:] if num_pops != len(pop_ids): print("Warning: num_pops does not match number of pop_ids.") else: pop_ids = None # Get the statistic names ld_stats = fid.readline().split() het_stats = fid.readline().split() if ld_stats == ["ALL"]: ld_stats = Util.moment_names(num_pops)[0] if het_stats == ["ALL"]: het_stats = Util.moment_names(num_pops)[1] statistics = [ld_stats, het_stats] # Get the number of LD statistic rows and read LD data num_ld_rows = int(fid.readline().strip()) data = [] for r in range(num_ld_rows): data.append(numpy.fromstring(fid.readline().strip(), sep=" ")) # Read heterozygosity data data.append(numpy.fromstring(fid.readline().strip(), sep=" ")) # If we opened a new file, clean it up. if newfile: fid.close() y = LDstats(data, num_pops=num_pops, pop_ids=pop_ids) if return_statistics: if not return_comments: return y, statistics else: return y, statistics, comments else: if not return_comments: return y else: return y, comments
[docs] def to_file(self, fid, precision=16, statistics="ALL", comment_lines=[]): """ Write LD statistics to file. The file format is: - # Any number of comment lines beginning with a '#' - A single line containing an integer giving the number of populations. - On the *same line*, optional, the names of those populations. If names are given, there needs to be the same number of pop_ids as the integer number of populations. For example, the line could be '3 YRI CEU CHB'. - A single line giving the names of the *LD* statistics, in the order they appear for each recombination rate distance or bin. Optionally, this line could read ALL, indicating that every statistic in the basis is given, and in the 'correct' order. - A single line giving the names of the *heterozygosity* statistics, in the order they appear in the final row of data. Optionally, this line could read ALL. - A line giving the number of recombination rate bins/distances we have data for (so we know how many to read) - One line for each row of LD statistics. - A single line for the heterozygosity statistics. :param fid: The file name to write to or an open file object. :type fid: str :param precision: The precision with which to write out entries of the LD stats. (They are formated via %.<p>g, where <p> is the precision.) :type precision: int :param statistics: Defaults to 'ALL', meaning all statistics are given in the LDstats object. Otherwise, list of two lists, first giving present LD stats, and the second giving present het stats. :type statistics: list of list of strings :param comment_lines: List of strings to be used as comment lines in the header of the output file. I use comment lines mainly to record the recombination bins or distances given in the LDstats (something like "'edges = ' + str(r_edges)". :type comment_lines: list of srtings """ # if statistics is ALL, check to make sure the lengths are correct if statistics != "ALL": ld_stat_names, het_stat_names = statistics else: ld_stat_names, het_stat_names = Util.moment_names(self.num_pops) all_correct_length = 1 for ld_stats in self.LD(): if len(ld_stats) != len(ld_stat_names): all_correct_length = 0 break if len(self.H()) != len(het_stat_names): all_correct_length = 0 if all_correct_length == 0: raise ValueError( "Length of data arrays does not match expected \ length of statistics." ) # Open the file object. newfile = False if not hasattr(fid, "write"): newfile = True fid = open(fid, "w") # Write comments for line in comment_lines: fid.write("# ") fid.write(line.strip()) fid.write(os.linesep) # Write out the number of populations and pop_ids if given fid.write("%i" % self.num_pops) if self.pop_ids is not None: for pop in self.pop_ids: fid.write(" %s" % pop) fid.write(os.linesep) # Write out LD statistics if statistics == "ALL": fid.write(statistics) else: for stat in statistics[0]: fid.write("%s " % stat) fid.write(os.linesep) # Write out het statistics if statistics == "ALL": fid.write(statistics) else: for stat in statistics[1]: fid.write("%s " % stat) fid.write(os.linesep) # Write the LD data to the file fid.write("%i" % len(self.LD())) fid.write(os.linesep) for ld_stats in self.LD(): for stat in ld_stats: fid.write("%%.%ig " % precision % stat) fid.write(os.linesep) # Write the het data to the file for stat in self.H(): fid.write("%%.%ig " % precision % stat) # Close file if newfile: fid.close()
[docs] @staticmethod def from_demes( g, sampled_demes, sample_times=None, rho=None, theta=0.001, r=None, u=None, ): """ Takes a deme graph and computes the LD stats. ``demes`` is a package for specifying demographic models in a user-friendly, human-readable YAML format. This function automatically parses the demographic description and returns a LD for the specified populations and recombination and mutation rates. :param g: A ``demes`` DemeGraph from which to compute the LD. :type g: :class:`demes.DemeGraph` :param sampled_demes: A list of deme IDs to take samples from. We can repeat demes, as long as the sampling of repeated deme IDs occurs at distinct times. :type sampled_demes: list of strings :param sample_times: If None, assumes all sampling occurs at the end of the existence of the sampled deme. If there are ancient samples, ``sample_times`` must be a list of same length as ``sampled_demes``, giving the sampling times for each sampled deme. Sampling times are given in time units of the original deme graph, so might not necessarily be generations (e.g. if ``g.time_units`` is years) :type sample_times: list of floats, optional :param rho: The population-size scaled recombination rate(s). Can be None, a non-negative float, or a list of values. :param theta: The population-size scaled mutation rate. This defaults to ``theta=0.001``, which is very roughly what is observed in humans. :param r: The raw recombination rate. Can be None, a non-negative float, or a list of values. :param u: The raw per-base mutation rate. ``theta`` is set to ``4 * Ne * u``, where ``Ne`` is the reference population size from the root deme. :return: A ``moments.LD`` LD statistics object, with number of populations equal to the length of ``sampled_demes``. :rtype: :class:`moments.LD.LDstats` """ if isinstance(g, str): dg = demes.load(g) else: dg = g y = moments.Demes.Demes.LD( dg, sampled_demes, sample_times=sample_times, rho=rho, theta=theta, r=r, u=u, ) return y
[docs] @staticmethod def steady_state( nus, m=None, rho=None, theta=0.001, selfing_rate=None, pop_ids=None ): """ Computes the steady state solution for any number of populations. The number of populations is determined by the length of ``nus``, which is a list with relative population sizes (often, these will be set to 1, meaning sizes are equal to some reference or ancestral population size). If the solution for more than one population is desired, we must provide ``m``, an n-by-n migration matrix, and there must be migrations that connect all populations so that a steady state solution exists. This corresponds to an island model with potentially asymmetric migration and allows for unequal population sizes. :param nus: The relative population sizes, with one or two entries, corresponding to a steady state solution with one or two populations, resp. :type nus: list-like :param m: A migration matrix, only provided when the length of `nus` is 2 or more. :type m: array-like :param rho: The population-size scaled recombination rate(s). Can be None, a non-negative float, or a list of values. :param theta: The population-size scaled mutation rate :type theta: float :param selfing_rate: Self-fertilization rate(s), given as a number (for a single population, or list of numbers (for two populations). Selfing rates must be between 0 and 1. :type selfing_rate: number or list of numbers :param pop_ids: The population IDs. :type pop_ids: list of strings :return: A ``moments.LD`` LD statistics object. :rtype: :class:`moments.LD.LDstats` """ num_pops = len(nus) if num_pops == 1: if m is not None: raise ValueError( "Migration matrix cannot be provided for one population." ) elif num_pops >= 2: if m is None: raise ValueError( "Migration matrix must be provided for the steady state solution " "with two or more populations" ) if pop_ids is not None and len(pop_ids) != num_pops: raise ValueError("pop_ids must be a list of length equal to nus.") data = Numerics.steady_state( nus, m=m, rho=rho, theta=theta, selfing_rate=selfing_rate ) y = LDstats(data, num_pops=num_pops, pop_ids=pop_ids) return y
# Ensures that when arithmetic is done with LDstats objects, # attributes are preserved. For details, see similar code in # moments.Spectrum_mod for method in [ "__add__", "__radd__", "__sub__", "__rsub__", "__mul__", "__rmul__", "__div__", "__rdiv__", "__truediv__", "__rtruediv__", "__floordiv__", "__rfloordiv__", "__rpow__", "__pow__", ]: exec( """ def %(method)s(self, other): if isinstance(other, np.ma.masked_array): newdata = self.%(method)s (other.data) newmask = np.ma.mask_or(self.mask, other.mask) else: newdata = self.%(method)s (other) newmask = self.mask outLDstats = self.__class__.__new__(self.__class__, newdata, newmask, num_pops=self.num_pops, pop_ids=self.pop_ids) return outLDstats """ % {"method": method} ) # Methods that modify the Spectrum in-place. for method in [ "__iadd__", "__isub__", "__imul__", "__idiv__", "__itruediv__", "__ifloordiv__", "__ipow__", ]: exec( """ def %(method)s(self, other): if isinstance(other, np.ma.masked_array): self.%(method)s (other.data) self.mask = np.ma.mask_or(self.mask, other.mask) else: self.%(method)s (other) return self """ % {"method": method} )
[docs] def integrate( self, nu, tf, dt=None, dt_fac=0.02, rho=None, theta=0.001, m=None, selfing=None, selfing_rate=None, frozen=None, ): """ Integrates the LD statistics forward in time. When integrating LD statistics for a list of recombination rates and mutation rate, they must be passed as keywork arguments to this function. We can integrate either single-population LD statistics up to order 10, or multi-population LD statistics but only for order 2 (which includes :math:`D^2`, :math:`Dz`, and :math:`\\pi_2`). :param nu: The relative population size, may be a function of time, given as a list [nu1, nu2, ...] :type nu: list or function :param tf: Total time to integrate :type tf: float :param dt: Integration timestep. This is deprecated! Use `dt_fac` instead. :type dt: float :param dt_fac: The integration time step factor, so that dt is determined by `tf * dt_fac`. Note: Should also build in adaptive time-stepping... this is to come. :type dt_fac: float :param rho: Can be a single recombination rate or list of recombination rates (in which case we are integrating a list of LD stats for each rate) :type rho: float or list of floats :param theta: The per base population-scaled mutation rate (4N*mu) if we pass [theta1, theta2], differing mutation rates at left and right locus, implemented in the ISM=True model :param m: The migration matrix (num_pops x num_pops, storing m_ij migration rates where m_ij is probability that a lineage in i had parent in j m_ii is unused, and found by summing off diag elements in the ith row :type m: array :param selfing: A list of selfing probabilities, same length as nu. :type selfing: list of floats :param selfing_rate: Alias for selfing. :type selfing_rate: list of floats :param frozen: A list of True and False same length as nu. True implies that a lineage is frozen (as in ancient samples). False integrates as normal. :type frozen: list of bools """ num_pops = self.num_pops if tf == 0.0: return if tf < 0 or np.isinf(tf): raise ValueError("Integration time must be positive and finite.") if rho is None and len(self) > 1: raise ValueError("There are LD statistics, but rho is None.") elif rho is not None and np.isscalar(rho) and len(self) != 2: raise ValueError( "Single rho passed but LD object does have correct number of entries." ) elif ( rho is not None and np.isscalar(rho) == False and len(rho) != len(self) - 1 ): raise ValueError("Mismatch length of input rho and size of LD object.") if rho is not None and np.isscalar(rho) == False and len(rho) == 1: rho = rho[0] if callable(nu): if len(nu(0)) != num_pops: raise ValueError("len of pop size function must equal number of pops.") else: if len(nu) != num_pops: raise ValueError("len of pop sizes must equal number of pops.") if m is not None and num_pops > 1: if np.shape(m) != (num_pops, num_pops): raise ValueError( "migration matrix incorrectly defined for number of pops." ) if frozen is not None: if len(frozen) != num_pops: raise ValueError("frozen must have same length as number of pops.") if selfing_rate is not None: if selfing is not None: raise ValueError("Cannot specify both selfing and selfing_rate") selfing = selfing_rate if selfing is not None: if len(selfing) != num_pops: raise ValueError("selfing must have same length as number of pops.") # enforce minimum 10 time steps per integration # if tf < dt * 10: # dt_adj = tf / 10 # else: # dt_adj = dt * 1.0 if dt is not None: warnings.warn( "dt is deprecated, use dt_fac to control integration time steps", DeprecationWarning, ) if dt_fac <= 0 or dt_fac > 1: raise ValueError("dt_fac must be between 0 and 1") self[:] = Numerics.integrate( self[:], nu, tf, dt=None, dt_fac=dt_fac, rho=rho, theta=theta, m=m, num_pops=num_pops, selfing=selfing, frozen=frozen, )
# Allow LDstats objects to be pickled. try: import copy_reg except: import copyreg def LDstats_pickler(y): return LDstats_unpickler, (y[:], y.num_pops, y.pop_ids) def LDstats_unpickler(data, num_pops, pop_ids): return LDstats(data[:], num_pops=num_pops, pop_ids=pop_ids) try: copy_reg.pickle(LDstats, LDstats_pickler, LDstats_unpickler) except: copyreg.pickle(LDstats, LDstats_pickler, LDstats_unpickler)