Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def verify(self, sample_data, position_subset):
full_ts = tsinfer.infer(sample_data)
subset_ts = self.subset_sites(full_ts, position_subset)
ancestor_data = tsinfer.generate_ancestors(sample_data)
ancestors_ts = tsinfer.match_ancestors(sample_data, ancestor_data)
subset_ancestors_ts = tsinfer.minimise(
self.subset_sites(ancestors_ts, position_subset)
)
subset_ancestors_ts = subset_ancestors_ts.simplify()
subset_sample_data = tsinfer.SampleData.from_tree_sequence(subset_ts)
output_ts = tsinfer.match_samples(subset_sample_data, subset_ancestors_ts)
self.assertTrue(
np.array_equal(output_ts.genotype_matrix(), subset_ts.genotype_matrix())
)
def verify(self, sample_data):
ancestor_data = tsinfer.generate_ancestors(sample_data)
ts = tsinfer.match_ancestors(
sample_data, ancestor_data, engine=self.engine, extended_checks=True
)
self.verify_tree_sequence(ts)
def verify(self, samples):
ancestors = tsinfer.generate_ancestors(samples)
# this ancestors TS has positions mapped only to inference sites
ancestors_ts_1 = tsinfer.match_ancestors(samples, ancestors)
ts = tsinfer.match_samples(
samples, ancestors_ts_1, path_compression=False, simplify=False
)
t1 = ancestors_ts_1.dump_tables()
t2, node_id_map = tsinfer.extract_ancestors(samples, ts)
self.assertEqual(len(t2.provenances), len(t1.provenances) + 2)
t1.provenances.clear()
t2.provenances.clear()
# Population data isn't carried through in ancestors tree sequences
# for now.
t2.populations.clear()
self.assertEqual(t1, t2)
def test_bad_exclude_sites(self):
# Only things that can be interpreted as a 1D double array
# should be accepted.
with tsinfer.SampleData(1.0) as sample_data:
sample_data.add_site(0.5, [1, 1])
with self.assertRaises(ValueError):
tsinfer.generate_ancestors(sample_data, exclude_positions=[[None]])
with self.assertRaises(ValueError):
tsinfer.generate_ancestors(sample_data, exclude_positions=["not", 1.1])
def test_generate_ancestors(self):
with tsinfer.SampleData(sequence_length=2) as sample_data:
sample_data.add_site(1, genotypes=[0, 1, 1, 0], alleles=["G", "C"])
self.assertRaises(ValueError, tsinfer.generate_ancestors, sample_data)
tsinfer.generate_ancestors(sample_data)
def test_zero_node_times(self):
sim = msprime.simulate(sample_size=6, random_seed=1, mutation_rate=6)
sample_data = tsinfer.SampleData.from_tree_sequence(sim)
ancestor_data = tsinfer.generate_ancestors(sample_data)
ancestors_ts = tsinfer.match_ancestors(sample_data, ancestor_data)
tables = ancestors_ts.dump_tables()
tables.nodes.add_row(time=0, flags=0)
with self.assertRaises(ValueError):
tsinfer.match_samples(sample_data, tables.tree_sequence())
# for variant in samples.variants():
# print(variant)
rho = recombination_rate
mu = 1e-3 # 1e-15
# num_alleles = samples.num_alleles(inference_sites=True)
# num_sites = samples.num_inference_sites
# with tsinfer.AncestorData(samples) as ancestor_data:
# t = np.sum(num_alleles) + 1
# for j in range(num_sites):
# for allele in range(num_alleles[j]):
# ancestor_data.add_ancestor(j, j + 1, t, [j], [allele])
# t -= 1
ancestor_data = tsinfer.generate_ancestors(
samples, engine=engine, num_threads=num_threads
)
print(ancestor_data)
ancestors_ts = tsinfer.match_ancestors(
samples,
ancestor_data,
engine=engine,
path_compression=True,
extended_checks=False,
precision=precision,
recombination_rate=rho,
mismatch_rate=mu,
)
# print(ancestors_ts.tables)
def run_generate_ancestors(args):
setup_logging(args)
ancestors_path = get_ancestors_path(args.ancestors, args.samples)
progress_monitor = ProgressMonitor(enabled=args.progress, generate_ancestors=True)
sample_data = tsinfer.SampleData.load(args.samples)
tsinfer.generate_ancestors(
sample_data,
progress_monitor=progress_monitor,
path=ancestors_path,
num_flush_threads=args.num_flush_threads,
num_threads=args.num_threads,
)
summarise_usage()
def ancestor_properties_worker(args):
simulation_args, compute_exact = args
ts = msprime.simulate(**simulation_args)
sample_data = tsinfer.SampleData.from_tree_sequence(ts)
estimated_anc = tsinfer.generate_ancestors(sample_data)
# Show lengths as a fraction of the total.
estimated_anc_length = estimated_anc.ancestors_length / ts.sequence_length
focal_sites = estimated_anc.ancestors_focal_sites[:]
estimated_anc_focal_distance = np.zeros(estimated_anc.num_ancestors)
pos = np.hstack([estimated_anc.sites_position[:] / ts.sequence_length] + [1])
for j in range(estimated_anc.num_ancestors):
focal = focal_sites[j]
if len(focal) > 0:
estimated_anc_focal_distance[j] = pos[focal[-1]] - pos[focal[0]]
results = {
"num_sites": ts.num_sites,
"num_trees": ts.num_trees,
"estimated_anc_num": estimated_anc.num_ancestors,
"estimated_anc_mean_len": np.mean(estimated_anc_length),
"estimated_anc_mean_focal_distance": np.mean(estimated_anc_focal_distance),
box_size=8,
perfect_ancestors=False,
path_compression=False,
time_chunking=False,
):
sample_data = tsinfer.SampleData.from_tree_sequence(ts)
if perfect_ancestors:
ancestor_data = tsinfer.AncestorData(sample_data)
tsinfer.build_simulated_ancestors(
sample_data, ancestor_data, ts, time_chunking=time_chunking
)
ancestor_data.finalise()
else:
ancestor_data = tsinfer.generate_ancestors(sample_data, engine=engine)
ancestors_ts = tsinfer.match_ancestors(
sample_data,
ancestor_data,
engine=engine,
path_compression=path_compression,
extended_checks=True,
)
inferred_ts = tsinfer.match_samples(
sample_data,
ancestors_ts,
engine=engine,
simplify=False,
path_compression=path_compression,
extended_checks=True,
)