Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_zero_rate_end(self):
positions = [0, 50, 100]
rates = [1, 0, 0]
num_loci = 50
maps = [
msprime.RecombinationMap(positions, rates, num_loci),
PythonRecombinationMap(positions, rates, num_loci)]
for rm in maps:
# Anything < 50 maps to x
for x in [0, 10, 49]:
self.assertEqual(x, rm.physical_to_genetic(x))
self.assertEqual(x, rm.genetic_to_physical(x))
# values >= 50 should map to 50
for x in [50, 51, 55, 99, 100]:
self.assertEqual(50, rm.physical_to_genetic(x))
self.assertEqual(100, rm.genetic_to_physical(50))
def verify_coordinate_conversion(self, positions, rates, num_loci=10):
"""
Verifies coordinate conversions by the specified RecombinationMap
instance.
"""
L = positions[-1]
rm = msprime.RecombinationMap(positions, rates, num_loci)
other_rm = PythonRecombinationMap(positions, rates, num_loci)
if rm.get_size() == 2:
# When we have very large numbers of loci, this calculations for
# max distance is off by very small amounts, probably because of
# machine precision. But if the expected diff is less than say, 10^-10
# anyway, there's no point in worrying about it.
max_discretisation_distance = max(1e-10, L / (2 * num_loci))
else:
# The above calculation works for a uniform map, but I couldn't
# figure out how to generalise it. Cop out:
max_discretisation_distance = L
self.assertEqual(
rm.get_total_recombination_rate(),
other_rm.get_total_recombination_rate())
num_random_trials = 10
def test_mean_recombination_rate(self):
# Some quick sanity checks.
recomb_map = msprime.RecombinationMap([0, 1], [1, 0])
mean_rr = recomb_map.mean_recombination_rate
self.assertEqual(mean_rr, 1.0)
recomb_map = msprime.RecombinationMap([0, 1, 2], [1, 0, 0])
mean_rr = recomb_map.mean_recombination_rate
self.assertEqual(mean_rr, 0.5)
recomb_map = msprime.RecombinationMap([0, 1, 2], [0, 0, 0])
mean_rr = recomb_map.mean_recombination_rate
self.assertEqual(mean_rr, 0.0)
def simulate_from_example():
num_loci = 2
wf_ts = wright_fisher(10, 5, L=num_loci, random_seed=3)
for tree in wf_ts.trees():
tree.draw(path="_static/simulate_from_wf_{}.svg".format(tree.index))
recomb_map = msprime.RecombinationMap.uniform_map(num_loci, 1, num_loci)
coalesced_ts = msprime.simulate(
from_ts=wf_ts, recombination_map=recomb_map, random_seed=5)
for tree in coalesced_ts.trees():
tree.draw(path="_static/simulate_from_coalesced_{}.svg".format(tree.index))
final_ts = coalesced_ts.simplify()
for tree in final_ts.trees():
print("interval = ", tree.interval)
print(tree.draw(format="unicode"))
def add_dtwf_vs_coalescent_random_instance(
self, key, num_populations=1, num_replicates=200, num_demographic_events=0):
N = num_populations
num_loci = np.random.randint(1e5, 1e7)
rho = 1e-8
recombination_map = msprime.RecombinationMap(
[0, num_loci], [rho, 0], num_loci=num_loci)
population_configurations = []
for i in range(N):
population_configurations.append(
msprime.PopulationConfiguration(
sample_size=np.random.randint(1, 10),
initial_size=int(1000 / N)))
migration_matrix = []
for i in range(N):
migration_matrix.append(
[random.uniform(0.05, 0.25) * (j != i) for j in range(N)])
# Add demographic events and some migration rate changes
t_max = 1000
growth_rate = growth_rates[i]
else:
# Enforce zero growth rate for small populations
print("Warning - setting growth rate to zero for small",
"population of size", initial_sizes[i])
growth_rate = 0
population_configurations.append(
msprime.PopulationConfiguration(
sample_size=sample_sizes[i],
initial_size=initial_sizes[i],
growth_rate=growth_rate
)
)
recombination_map = msprime.RecombinationMap(
[0, num_loci], [recombination_rate, 0], num_loci=num_loci)
if migration_matrix is None:
default_mig_rate = 0.05
migration_matrix = []
for i in range(num_pops):
row = [default_mig_rate] * num_pops
row[i] = 0
migration_matrix.append(row)
def f():
self.run_dtwf_coalescent_comparison(
key,
population_configurations=population_configurations,
migration_matrix=migration_matrix,
num_replicates=num_replicates,
MB = 10 ** 6
L = args.length * MB
rng = random.Random()
if args.random_seed is not None:
rng.seed(args.random_seed)
breakpoints = np.linspace(0, L, args.num_hotspots + 2)
end = breakpoints[1:-1] + L * args.hotspot_width
breakpoints = np.hstack([breakpoints, end])
breakpoints.sort()
rates = np.zeros_like(breakpoints)
rates[:-1] = args.recombination_rate
# Set the odd elements of the array to be hotspots.
rates[1::2] *= args.hotspot_intensity
recomb_map = msprime.RecombinationMap(list(breakpoints), list(rates))
sim_args = {
"sample_size": args.sample_size,
"recombination_map": recomb_map,
"mutation_rate": args.mutation_rate,
"Ne": args.Ne,
"random_seed": rng.randint(1, 2 ** 30),
}
ts = msprime.simulate(**sim_args)
print("simulated ", ts.num_trees, "trees and", ts.num_sites, "sites")
inferred_ts = run_infer(ts)
num_bins = 100
hotspot_breakpoints = breakpoints