Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def verify(self, sample_data):
ancestor_data = tsinfer.generate_ancestors(sample_data)
# Turn off path compression in the ancestors to make this as difficult
# as possible.
ancestors_ts = tsinfer.match_ancestors(
sample_data, ancestor_data, path_compression=False
)
ts = tsinfer.match_samples(
sample_data,
ancestors_ts,
path_compression=True,
engine=self.engine,
extended_checks=True,
)
self.verify_tree_sequence(ts)
def test_index_errors(self):
ts = msprime.simulate(5, mutation_rate=5, random_seed=8, recombination_rate=1)
sample_data = tsinfer.SampleData.from_tree_sequence(ts, use_times=False)
ancestors = tsinfer.generate_ancestors(sample_data)
ancestors_ts = tsinfer.match_ancestors(sample_data, ancestors)
for bad_subset in [[], [-1], [0, 6]]:
with self.assertRaises(ValueError):
tsinfer.augment_ancestors(sample_data, ancestors_ts, bad_subset)
def verify(self, sample_data, position_subset):
full_ts = tsinfer.infer(sample_data)
subset_ts = self.subset_sites(full_ts, position_subset)
ancestor_data = tsinfer.generate_ancestors(sample_data)
ancestors_ts = tsinfer.match_ancestors(sample_data, ancestor_data)
subset_ancestors_ts = tsinfer.minimise(
self.subset_sites(ancestors_ts, position_subset)
)
subset_ancestors_ts = subset_ancestors_ts.simplify()
subset_sample_data = tsinfer.SampleData.from_tree_sequence(subset_ts)
output_ts = tsinfer.match_samples(subset_sample_data, subset_ancestors_ts)
self.assertTrue(
np.array_equal(output_ts.genotype_matrix(), subset_ts.genotype_matrix())
)
def test_match_samples_unfinalised(self):
with tsinfer.SampleData(sequence_length=2) as sample_data:
sample_data.add_site(1, genotypes=[0, 1, 1, 0], alleles=["G", "C"])
ancestor_data = tsinfer.generate_ancestors(sample_data)
anc_ts = tsinfer.match_ancestors(sample_data, ancestor_data)
sample_data = tsinfer.SampleData(sequence_length=2)
sample_data.add_site(1, genotypes=[0, 1, 1, 0], alleles=["G", "C"])
self.assertRaises(ValueError, tsinfer.match_samples, sample_data, anc_ts)
sample_data.finalise()
tsinfer.match_samples(sample_data, anc_ts)
def verify_example(self, subset, samples, ancestors, path_compression):
ancestors_ts = tsinfer.match_ancestors(
samples, ancestors, path_compression=path_compression
)
augmented_ancestors = tsinfer.augment_ancestors(
samples, ancestors_ts, subset, path_compression=path_compression
)
self.verify_augmented_ancestors(
subset, ancestors_ts, augmented_ancestors, path_compression
)
# Run the inference now
final_ts = tsinfer.match_samples(samples, augmented_ancestors, simplify=False)
t1 = ancestors_ts.dump_tables()
tables = final_ts.tables
for j, index in enumerate(subset):
sample_id = final_ts.samples()[index]
edges = [e for e in final_ts.edges() if e.child == sample_id]
def verify_example(self, full_subset, samples, ancestors, path_compression):
ancestors_ts = tsinfer.match_ancestors(
samples, ancestors, path_compression=path_compression
)
expected_sample_ancestors = 0
for j in range(1, len(full_subset)):
subset = full_subset[:j]
expected_sample_ancestors += len(subset)
augmented_ancestors = tsinfer.augment_ancestors(
samples, ancestors_ts, subset, path_compression=path_compression
)
self.verify_augmented_ancestors(
subset, ancestors_ts, augmented_ancestors, path_compression
)
# Run the inference now
final_ts = tsinfer.match_samples(
samples, augmented_ancestors, simplify=False
)
def verify(self, samples):
ancestors = tsinfer.generate_ancestors(samples)
# this ancestors TS has positions mapped only to inference sites
ancestors_ts_1 = tsinfer.match_ancestors(samples, ancestors)
ts = tsinfer.match_samples(
samples, ancestors_ts_1, path_compression=False, simplify=False
)
t1 = ancestors_ts_1.dump_tables()
t2, node_id_map = tsinfer.extract_ancestors(samples, ts)
self.assertEqual(len(t2.provenances), len(t1.provenances) + 2)
t1.provenances.clear()
t2.provenances.clear()
# Population data isn't carried through in ancestors tree sequences
# for now.
t2.populations.clear()
self.assertEqual(t1, t2)
for node in ts.nodes():
if node_id_map[node.id] != -1:
for i, fn in enumerate(args.infiles):
ext = ('.' + str(i)) if len(args.infiles) > 1 else ''
if args.outfile:
out_fn = args.outfile + ext
else:
out_fn = os.path.splitext(fn)[0] + '.hdf5'
if not os.path.isfile(fn):
raise FileNotFoundError
input_hdf5 = zarr.DBMStore(fn, open=bsddb3.btopen)
input_root = zarr.group(store=input_hdf5)
ancestors_root = zarr.group()
tsinfer.build_ancestors(
input_root, ancestors_root, method=method, chunk_size=16, compress=False,
progress = args.progress)
ancestors_ts = tsinfer.match_ancestors(
input_root, ancestors_root, method=method, path_compression=path_compression,
progress = args.progress)
full_inferred_ts = tsinfer.match_samples(
input_root, ancestors_ts, method=method, path_compression=path_compression,
simplify=simplify, progress = args.progress)
full_inferred_ts.dump(out_fn)
orig_ts = msprime.load(args.inject_real_ancestors_from_ts)
ancestor_data = formats.AncestorData.initialise(sample_data, compressor=None)
evaluation.build_simulated_ancestors(sample_data, ancestor_data, orig_ts)
ancestor_data.finalise()
ancestors_ts = tsinfer.match_ancestors(
sample_data, ancestor_data, method=args.method,
path_compression=args.shared_recombinations)
ts = tsinfer.match_samples(
sample_data, ancestors_ts, method=args.method,
path_compression=args.shared_recombinations,
simplify=True)
else:
ancestor_data = formats.AncestorData.initialise(sample_data, compressor=None)
tsinfer.build_ancestors(sample_data, ancestor_data, method=args.method)
ancestor_data.finalise()
ancestors_ts = tsinfer.match_ancestors(
sample_data, ancestor_data, method=args.method,
num_threads=args.threads,
path_compression=args.shared_recombinations)
ts = tsinfer.match_samples(
sample_data, ancestors_ts, method=args.method,
path_compression=args.shared_recombinations,
simplify=True)
ts.dump(args.output)
def run_infer(
ts, engine=tsinfer.C_ENGINE, path_compression=True, exact_ancestors=False
):
"""
Runs the perfect inference process on the specified tree sequence.
"""
sample_data = tsinfer.SampleData.from_tree_sequence(ts)
if exact_ancestors:
ancestor_data = tsinfer.AncestorData(sample_data)
tsinfer.build_simulated_ancestors(sample_data, ancestor_data, ts)
ancestor_data.finalise()
else:
ancestor_data = tsinfer.generate_ancestors(sample_data, engine=engine)
ancestors_ts = tsinfer.match_ancestors(
sample_data, ancestor_data, path_compression=path_compression, engine=engine
)
inferred_ts = tsinfer.match_samples(
sample_data, ancestors_ts, path_compression=path_compression, engine=engine
)
return inferred_ts