Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
mutations = six.StringIO("""\
site node derived_state parent
0 1 1 -1
0 2 1 -1
0 3 2 1
1 0 1 -1
1 1 1 3
1 3 2 4
1 2 1 3
1 4 2 6
2 0 1 -1
2 1 1 8
2 2 1 8
2 4 1 8
""")
ts = msprime.load_text(
nodes=nodes, edges=edges, sites=sites, mutations=mutations, strict=False)
self.verify_parents(ts)
0.2 0.8 8 9,10
0.0 0.2 8 9
0.7 1.0 7 8
0.0 0.2 7 10
0.8 1.0 6 9
0.0 0.7 6 8
0.4 1.0 5 2,7
0.1 0.4 5 7
0.6 0.9 4 6
0.0 0.6 4 0,6
0.9 1.0 3 4,5,6
0.1 0.9 3 4,5
0.0 0.1 3 4,5,7
""")
ts = msprime.load_text(nodes, edgesets)
tree_dicts = [t.parent_dict for t in ts.trees()]
self.assertEqual(ts.sample_size, 3)
self.assertEqual(ts.num_trees, len(true_trees))
self.assertEqual(ts.num_nodes, 11)
self.assertEqual(len(list(ts.diffs())), ts.num_trees)
# check topologies agree:
for a, t in zip(true_trees, tree_dicts):
for k in a.keys():
if k in t.keys():
self.assertEqual(t[k], a[k])
else:
self.assertEqual(a[k], msprime.NULL_NODE)
self.verify_simplify_topology(ts, [0, 1])
self.verify_simplify_topology(ts, [1, 2])
self.verify_simplify_topology(ts, [2, 0])
is_sample time
1 0
1 0
"""
edges = """\
left right parent child
0 1 2 0
"""
for j in range(num_unary_nodes + 2):
nodes += "0 {}\n".format(j + 2)
for j in range(num_unary_nodes):
edges += "0 1 {} {}\n".format(n + j + 1, n + j)
root = num_unary_nodes + 3
root_time = num_unary_nodes + 3
edges += "0 1 {} 1,{}\n".format(root, num_unary_nodes + 2)
ts = msprime.load_text(six.StringIO(nodes), six.StringIO(edges), strict=False)
t = next(ts.trees())
self.assertEqual(t.mrca(0, 1), root)
self.assertEqual(t.tmrca(0, 1), root_time)
ts_simplified, node_map = ts.simplify(map_nodes=True)
test_map = [msprime.NULL_NODE for _ in range(ts.num_nodes)]
test_map[0] = 0
test_map[1] = 1
test_map[root] = 2
self.assertEqual(list(node_map), test_map)
self.assertEqual(ts_simplified.num_edges, 2)
t = next(ts_simplified.trees())
self.assertEqual(t.mrca(0, 1), 2)
self.assertEqual(t.tmrca(0, 1), root_time)
id position ancestral_state
0 0.1 0
1 0.2 0
2 0.3 0
3 0.4 0
4 0.5 0
""")
mutations = six.StringIO("""\
site node derived_state
0 0 1
1 1 1
2 2 1
3 3 1
4 8 1
""")
ts = msprime.load_text(
nodes=nodes, edges=edges, sites=sites, mutations=mutations, strict=False)
self.assertEqual(ts.num_nodes, 9)
self.assertEqual(ts.num_trees, 1)
self.assertEqual(ts.num_sites, 5)
self.assertEqual(ts.num_mutations, 5)
t = next(ts.trees())
self.assertEqual(t.parent_dict, {0: 4, 1: 5, 2: 7, 3: 7, 4: 6, 5: 6, 8: 7})
self.assertEqual(list(ts.haplotypes()), ["10000", "01000", "00100", "00010"])
self.assertEqual(
[v.genotypes for v in ts.variants(as_bytes=True)],
[b"1000", b"0100", b"0010", b"0001", b"0000"])
self.assertEqual(t.mrca(0, 1), 6)
self.assertEqual(t.mrca(2, 3), 7)
self.assertEqual(t.mrca(2, 8), 7)
self.assertEqual(t.mrca(0, 2), msprime.NULL_NODE)
self.assertEqual(t.mrca(0, 3), msprime.NULL_NODE)
def test_small_tree_internal_samples(self):
ts = msprime.load_text(
nodes=six.StringIO(self.small_tree_ex_nodes),
edges=six.StringIO(self.small_tree_ex_edges), strict=False)
tables = ts.dump_tables()
nodes = tables.nodes
flags = nodes.flags
# The parent of samples 0 and 1 is 5. Change this to an internal sample
# and set 0 and 1 to be unsampled.
flags[0] = 0
flags[1] = 0
flags[5] = msprime.NODE_IS_SAMPLE
nodes.set_columns(flags=flags, time=nodes.time)
ts = msprime.load_tables(nodes=nodes, edges=tables.edges)
self.assertEqual(ts.sample_size, 4)
tss, node_map = self.do_simplify(ts, [3, 5])
self.assertEqual(node_map[3], 0)
self.assertEqual(node_map[5], 1)
5 0 3
""")
edges = six.StringIO("""\
left right parent child
0 1 2 0
0 1 3 1
0 1 4 2,3
0 1 5 4
""")
sites = "position ancestral_state\n"
mutations = "site node derived_state\n"
for j in range(5):
position = j * 1 / 5
sites += "{} 0\n".format(position)
mutations += "{} {} 1\n".format(j, j)
ts = msprime.load_text(
nodes=nodes, edges=edges, sites=six.StringIO(sites),
mutations=six.StringIO(mutations), strict=False)
self.assertEqual(ts.sample_size, 2)
self.assertEqual(ts.num_nodes, 6)
self.assertEqual(ts.num_trees, 1)
self.assertEqual(ts.num_sites, 5)
self.assertEqual(ts.num_mutations, 5)
self.assertEqual(len(list(ts.edge_diffs())), ts.num_trees)
t = next(ts.trees())
self.assertEqual(
t.parent_dict, {0: 2, 1: 3, 2: 4, 3: 4, 4: 5})
self.assertEqual(t.mrca(0, 1), 4)
self.assertEqual(t.mrca(0, 2), 2)
self.assertEqual(t.mrca(0, 4), 4)
self.assertEqual(t.mrca(0, 5), 5)
def test_overlapping_unary_edges_internal_samples(self):
nodes = six.StringIO("""\
id is_sample time
0 1 0
1 1 0
2 1 1
""")
edges = six.StringIO("""\
left right parent child
0 2 2 0
1 3 2 1
""")
ts = msprime.load_text(nodes, edges, strict=False)
self.assertEqual(ts.sample_size, 3)
self.assertEqual(ts.num_trees, 3)
trees = [{0: 2}, {0: 2, 1: 2}, {1: 2}]
for t in ts.trees():
self.assertEqual(t.parent_dict, trees[t.index])
tss, node_map = self.do_simplify(ts)
self.assertEqual(list(node_map), [0, 1, 2])
""")
sites = six.StringIO("""\
id position ancestral_state
0 0.1 0
1 0.2 0
2 0.3 0
3 0.4 0
""")
mutations = six.StringIO("""\
site node derived_state
0 0 1
1 1 1
2 2 1
3 3 1
""")
ts = msprime.load_text(
nodes=nodes, edges=edges, sites=sites, mutations=mutations, strict=False)
self.assertEqual(ts.num_nodes, 6)
self.assertEqual(ts.num_trees, 1)
self.assertEqual(ts.num_sites, 4)
self.assertEqual(ts.num_mutations, 4)
t = next(ts.trees())
self.assertEqual(t.parent_dict, {0: 4, 1: 4, 2: 5, 3: 5})
self.assertEqual(list(ts.haplotypes()), ["1000", "0100", "0010", "0001"])
self.assertEqual(
[v.genotypes for v in ts.variants(as_bytes=True)],
[b"1000", b"0100", b"0010", b"0001"])
self.assertEqual(t.mrca(0, 1), 4)
self.assertEqual(t.mrca(0, 4), 4)
self.assertEqual(t.mrca(2, 3), 5)
self.assertEqual(t.mrca(0, 2), msprime.NULL_NODE)
self.assertEqual(t.mrca(0, 3), msprime.NULL_NODE)
def main(ts, fastARG_executable, fa_in, fa_out, nodes_fh, edges_fh, sites_fh, muts_fh):
"""
This is just to test if fastarg produces the same haplotypes
"""
import subprocess
seq_len = ts.get_sequence_length()
ts_to_fastARG_in(ts, fa_in)
subprocess.call([fastARG_executable, 'build', fa_in.name], stdout=fa_out)
fastARG_out_to_ts_txts(fa_out, variant_positions_from_fastARGin(fa_in),
nodes_fh, edges_fh, sites_fh, muts_fh, seq_len=seq_len)
new_ts = msprime.load_text(nodes=nodes_fh, edges=edges_fh, sites=sites_fh, mutations=muts_fh)
simple_ts = new_ts.simplify()
logging.debug("Simplified num_records should always be < unsimplified num_records.\n"
"For low mutationRate:recombinationRate ratio,"
" the initial num records will probably be higher than the"
" fastarg num_records, as the original simulation will have records"
" which leave no mutational trace. As the mutation rate increases,"
" we expect the fastarg num_records to equal, then exceed the original"
" as fastarg starts inferring the wrong (less parsimonious) set of trees")
logging.debug(
"Initial num records = {}, fastARG (simplified) = {}, fastARG (unsimplified) = {}".format(
ts.get_num_records(), simple_ts.get_num_records(), new_ts.get_num_records()))
def ts_txts_to_trees(ts_nodes, ts_edges, trees_outname=None):
import shutil
import msprime
logging.info("== Converting new ts ARG to .trees ===")
try:
ts = msprime.load_text(nodes=ts_nodes, edges=ts_edges)
except:
logging.warning("Can't load the texts file properly. Saved copied to 'bad.nodes' & 'bad.edges' for inspection")
shutil.copyfile(ts_nodes.name, "bad.nodes")
shutil.copyfile(ts_edges.name, "bad.edges")
raise
logging.info("== loaded {}, {}===".format(ts_nodes.name, ts_edges.name))
try:
simple_ts = ts.simplify()
except:
ts.dump("bad.trees")
logging.warning("Can't simplify. .trees file dumped to 'bad.trees'")
raise
if trees_outname:
simple_ts.dump(trees_outname)
return(simple_ts)