Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import shutil
import re
import csv
n_fastARG_replicates = 1 #should be the same each time
fa_mut_params = []
fa_rep_params = []
aw_mut_params = []
aw_rep_params = []
aw_samp_params = []
aw_likelihood = []
random.seed(rand_seed)
mutseeds = seed_set(len(mut_rates), random)
ts = msprime.simulate(sample_size=sample_size, Ne=Ne, mutation_rate=None, recombination_rate=recombination_rate, length=length, random_seed=rand_seed)
for mu, mut_seed in zip(mut_rates, mutseeds):
print("Throwing mutations onto the ARG at rate {} ".format(mu), file=sys.stderr)
rng = msprime.RandomGenerator(mut_seed)
ts.generate_mutations(mu, rng)
simname = msprime_filename(sample_size, Ne, length, recombination_rate, mu, rand_seed, mut_seed)
with open(os.path.join(nexus_dir,simname+".nex"), "w+") as nex, \
open(os.path.join(nexus_dir, simname + ".vpos"), "w+") as v_pos:
write_nexus_trees(ts, nex, index_trees_by_variants=False, zero_based_tip_numbers=False)
write_variant_positions(ts, v_pos)
#create seeds for inference params
for inference_seed in seed_set(n_fastARG_replicates, random):
with TemporaryDirectory() as tmp:
fastarg_base = construct_fastarg_basename(inference_seed, simname)
with open(os.path.join(tmp, fastarg_base + ".fava"), "w+") as fa_in, \
open(os.path.join(tmp, fastarg_base + ".farg"), "w+") as fa_out, \
open(os.path.join(tmp, fastarg_base + ".mscr"), "w+") as tree, \
open(os.path.join(tmp, fastarg_base + ".new.fava"), "w+") as fa_revised, \
open(os.path.join(tmp, fastarg_base + ".msmu"), "w+") as mutation_file, \
open(os.path.join(nexus_dir, fastarg_base + ".nex"), "w+") as nex_g:
def test_smc_variants(self):
for model in ["smc", "smc_prime"]:
threshold = 20
sim = msprime.simulator_factory(
sample_size=10, recombination_rate=5, model=model)
sim.random_generator = msprime.RandomGenerator(3)
sim.run()
self.assertGreater(sim.num_common_ancestor_events, threshold)
self.assertGreater(sim.num_recombination_events, threshold)
self.assertGreater(sim.num_rejected_common_ancestor_events, 0)
def test_default_random_seed(self):
sim = msprime.simulator_factory(10)
rng = sim.random_generator
self.assertIsInstance(rng, msprime.RandomGenerator)
self.assertNotEqual(rng.get_seed(), 0)
def verify_simulation(self, n, m, r):
"""
Verifies a simulation for the specified parameters.
"""
recomb_map = msprime.RecombinationMap.uniform_map(m, r, num_loci=m)
rng = msprime.RandomGenerator(1)
sim = msprime.simulator_factory(
n, recombination_map=recomb_map, random_generator=rng)
self.assertEqual(sim.random_generator, rng)
sim.run()
self.assertEqual(sim.num_breakpoints, len(sim.breakpoints))
self.assertGreater(sim.time, 0)
self.assertGreater(sim.num_avl_node_blocks, 0)
self.assertGreater(sim.num_segment_blocks, 0)
self.assertGreater(sim.num_node_mapping_blocks, 0)
tree_sequence = sim.get_tree_sequence()
t = 0.0
for record in tree_sequence.nodes():
if record.time > t:
t = record.time
self.assertEqual(sim.time, t)
self.assertGreater(sim.num_common_ancestor_events, 0)
def _run_msprime_coalescent_stats(self, args):
print("\t msprime:", args)
runner = cli.get_mspms_runner(args.split())
sim = runner.get_simulator()
rng = msprime.RandomGenerator(random.randint(1, 2**32 - 1))
sim.random_generator = rng
num_populations = sim.num_populations
replicates = runner.get_num_replicates()
num_trees = [0 for j in range(replicates)]
time = [0 for j in range(replicates)]
ca_events = [0 for j in range(replicates)]
re_events = [0 for j in range(replicates)]
mig_events = [None for j in range(replicates)]
for j in range(replicates):
sim.reset()
sim.run()
num_trees[j] = sim.num_breakpoints + 1
time[j] = sim.time
ca_events[j] = sim.num_common_ancestor_events
re_events[j] = sim.num_recombination_events
mig_events[j] = [r for row in sim.num_migration_events for r in row]
Ne_used = 1
n_used = None
else:
Ne_used = Ne
n_used = sample_size
if mut_seed is None:
ts = msprime.simulate(
n_used, Ne_used, recombination_map=recombination_map, mutation_rate=mutation_rate,
random_seed=sim_seed, **kwargs)
else:
#run with no mutations (should give same result regardless of mutation rate)
ts = msprime.simulate(
n_used, Ne_used, recombination_map=recombination_map, mutation_rate=0,
random_seed=sim_seed, **kwargs)
#now add the mutations
rng2 = msprime.RandomGenerator(mut_seed)
nodes = msprime.NodeTable()
edges = msprime.EdgeTable()
sites = msprime.SiteTable()
muts = msprime.MutationTable()
ts.dump_tables(nodes=nodes, edges=edges)
mutgen = msprime.MutationGenerator(rng2, mutation_rate)
mutgen.generate(nodes, edges, sites, muts)
msprime.sort_tables(nodes=nodes, edges=edges, sites=sites, mutations=muts)
ts = msprime.load_tables(nodes=nodes, edges=edges, sites=sites, mutations=muts)
logging.info(
"Neutral simulation done; {} sites, {} trees".format(ts.num_sites, ts.num_trees))
sim_fn = mk_sim_name(sample_size, Ne, length, recombination_rate, mutation_rate, seed, mut_seed, self.simulations_dir)
logging.debug("writing {}.hdf5".format(sim_fn))
ts.dump(sim_fn+".hdf5")
# we therefore need to use an Ne value of 1/4.
self._simulator = msprime.simulator_factory(
Ne=0.25,
sample_size=sample_size,
recombination_map=recomb_map,
population_configurations=population_configurations,
migration_matrix=migration_matrix,
demographic_events=demographic_events)
self._precision = precision
self._print_trees = print_trees
# sort out the random seeds
ms_seeds = random_seeds
if random_seeds is None:
ms_seeds = generate_seeds()
seed = get_single_seed(ms_seeds)
self._random_generator = msprime.RandomGenerator(seed)
self._ms_random_seeds = ms_seeds
self._simulator.random_generator = self._random_generator
self._mutation_generator = msprime.MutationGenerator(
self._random_generator, self._mutation_rate)
Ne_used = 1
n_used = None
else:
Ne_used = Ne
n_used = sample_size
if mut_seed is None:
ts = msprime.simulate(
n_used, Ne_used, recombination_map=recombination_map, mutation_rate=mutation_rate,
random_seed=sim_seed, **kwargs)
else:
#run with no mutations (should give same result regardless of mutation rate)
ts = msprime.simulate(
n_used, Ne_used, recombination_map=recombination_map, mutation_rate=0,
random_seed=sim_seed, **kwargs)
#now add the mutations
rng2 = msprime.RandomGenerator(mut_seed)
muts = msprime.MutationTable()
tables = ts.dump_tables()
mutgen = msprime.MutationGenerator(rng2, mutation_rate)
mutgen.generate(tables.nodes, tables.edges, tables.sites, muts)
msprime.sort_tables(
nodes=tables.nodes, edges=tables.edges, sites=tables.sites, mutations=muts)
ts = msprime.load_tables(nodes=nodes, edges=edges, sites=sites, mutations=muts)
logging.info(
"Neutral simulation done; {} sites, {} trees".format(ts.num_sites, ts.num_trees))
sim_fn = mk_sim_name(sample_size, Ne, length, recombination_rate, mutation_rate, seed, mut_seed, self.simulations_dir)
logging.debug("writing {}.trees".format(sim_fn))
ts.dump(sim_fn+".trees")
# Make sure that there is *some* information in this simulation that can be used
# to infer a ts, otherwise it's pointless