Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_provenance(self):
sample_data, ancestors = self.get_example_data(10, 10, 40)
ancestor_data = tsinfer.AncestorData(sample_data)
self.verify_data_round_trip(sample_data, ancestor_data, ancestors)
self.assertEqual(ancestor_data.num_provenances, sample_data.num_provenances + 1)
timestamp = ancestor_data.provenances_timestamp[-1]
iso = datetime.datetime.now().isoformat()
self.assertEqual(timestamp.split("T")[0], iso.split("T")[0])
record = ancestor_data.provenances_record[-1]
self.assertEqual(record["software"]["name"], "tsinfer")
a = list(ancestor_data.provenances())
self.assertEqual(a[-1][0], timestamp)
self.assertEqual(a[-1][1], record)
for j, (timestamp, record) in enumerate(sample_data.provenances()):
self.assertEqual(timestamp, a[j][0])
self.assertEqual(record, a[j][1])
def verify_data_round_trip(
self,
genotypes,
positions,
alleles=None,
sequence_length=None,
site_times=None,
individual_times=None,
):
sample_data = self.create_sample_data(
genotypes, positions, alleles, sequence_length, site_times, individual_times
)
num_alleles = sample_data.num_alleles()
with tsinfer.AncestorData(sample_data) as ancestor_data:
t = np.sum(num_alleles) + 1
for j in range(sample_data.num_sites):
for allele in range(num_alleles[j] - 1):
ancestor_data.add_ancestor(j, j + 1, t, [j], [allele])
t -= 1
engines = [tsinfer.C_ENGINE, tsinfer.PY_ENGINE]
for engine in engines:
ancestors_ts = tsinfer.match_ancestors(
sample_data, ancestor_data, engine=engine
)
ts = tsinfer.match_samples(
sample_data,
ancestors_ts,
recombination_rate=1e-3,
mismatch_rate=1e-3,
engine=engine,
def match_ancestors_ancestors_unfinalised(self, path=None):
with tsinfer.SampleData(sequence_length=2) as sample_data:
sample_data.add_site(1, genotypes=[0, 1, 1, 0], alleles=["G", "C"])
with tsinfer.AncestorData(sample_data, path=path) as ancestor_data:
ancestor_data.add_ancestor(
start=0,
end=1,
time=2.0,
focal_sites=[0],
haplotype=np.array([1], dtype=np.int8),
)
# match_ancestors fails when ancestors unfinalised
self.assertRaises(
ValueError, tsinfer.match_ancestors, sample_data, ancestor_data
)
if path is not None:
ancestor_data.close()
def test_chunk_size(self):
N = 20
for chunk_size in [1, 2, 3, N - 1, N, N + 1]:
sample_data, ancestors = self.get_example_data(6, 1, N)
ancestor_data = tsinfer.AncestorData(sample_data, chunk_size=chunk_size)
self.verify_data_round_trip(sample_data, ancestor_data, ancestors)
self.assertEqual(ancestor_data.ancestors_haplotype.chunks, (chunk_size,))
self.assertEqual(ancestor_data.ancestors_focal_sites.chunks, (chunk_size,))
self.assertEqual(ancestor_data.ancestors_start.chunks, (chunk_size,))
self.assertEqual(ancestor_data.ancestors_end.chunks, (chunk_size,))
self.assertEqual(ancestor_data.ancestors_time.chunks, (chunk_size,))
def verify_inserted_ancestors(self, ts):
# Verifies that we can round-trip the specified tree sequence
# using the generated ancestors. NOTE: this must be an SMC
# consistent tree sequence!
with tsinfer.SampleData(sequence_length=ts.sequence_length) as sample_data:
for v in ts.variants():
sample_data.add_site(v.position, v.genotypes, v.alleles)
ancestor_data = tsinfer.AncestorData(sample_data)
tsinfer.build_simulated_ancestors(sample_data, ancestor_data, ts)
ancestor_data.finalise()
A = np.full(
(ancestor_data.num_sites, ancestor_data.num_ancestors),
tskit.MISSING_DATA,
dtype=np.int8,
)
start = ancestor_data.ancestors_start[:]
end = ancestor_data.ancestors_end[:]
ancestors = ancestor_data.ancestors_haplotype[:]
for j in range(ancestor_data.num_ancestors):
A[start[j] : end[j], j] = ancestors[j]
for engine in [tsinfer.PY_ENGINE, tsinfer.C_ENGINE]:
ancestors_ts = tsinfer.match_ancestors(
sample_data, ancestor_data, engine=engine
def test_defaults_with_path(self):
sample_data, ancestors = self.get_example_data(10, 10, 40)
with tempfile.TemporaryDirectory(prefix="tsinf_format_test") as tempdir:
filename = os.path.join(tempdir, "ancestors.tmp")
ancestor_data = tsinfer.AncestorData(sample_data, path=filename)
self.verify_data_round_trip(sample_data, ancestor_data, ancestors)
compressor = formats.DEFAULT_COMPRESSOR
for _, array in ancestor_data.arrays():
self.assertEqual(array.compressor, compressor)
with tsinfer.load(filename) as other:
self.assertEqual(other, ancestor_data)
np.random.seed(args.random_seed)
sim_args = {
"sample_size": args.sample_size,
"length": args.length * MB,
"recombination_rate": args.recombination_rate,
"mutation_rate": args.mutation_rate,
"Ne": args.Ne,
"model": "smc_prime",
"random_seed": rng.randint(1, 2 ** 30),
}
ts = msprime.simulate(**sim_args)
sample_data = generate_samples(ts, args.error)
inferred_anc = tsinfer.generate_ancestors(sample_data, engine=args.engine)
true_anc = tsinfer.AncestorData(sample_data)
tsinfer.build_simulated_ancestors(sample_data, true_anc, ts)
true_anc.finalise()
return sample_data, true_anc, inferred_anc
def visualise(
ts,
recombination_rate,
error_rate,
engine="C",
box_size=8,
perfect_ancestors=False,
path_compression=False,
time_chunking=False,
):
sample_data = tsinfer.SampleData.from_tree_sequence(ts)
if perfect_ancestors:
ancestor_data = tsinfer.AncestorData(sample_data)
tsinfer.build_simulated_ancestors(
sample_data, ancestor_data, ts, time_chunking=time_chunking
)
ancestor_data.finalise()
else:
ancestor_data = tsinfer.generate_ancestors(sample_data, engine=engine)
ancestors_ts = tsinfer.match_ancestors(
sample_data,
ancestor_data,
engine=engine,
path_compression=path_compression,
extended_checks=True,
)
inferred_ts = tsinfer.match_samples(
sample_data,