Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def verify_inserted_ancestors(self, ts):
# Verifies that we can round-trip the specified tree sequence
# using the generated ancestors. NOTE: this must be an SMC
# consistent tree sequence!
with tsinfer.SampleData(sequence_length=ts.sequence_length) as sample_data:
for v in ts.variants():
sample_data.add_site(v.position, v.genotypes, v.alleles)
ancestor_data = tsinfer.AncestorData(sample_data)
tsinfer.build_simulated_ancestors(sample_data, ancestor_data, ts)
ancestor_data.finalise()
A = np.full(
(ancestor_data.num_sites, ancestor_data.num_ancestors),
tskit.MISSING_DATA,
dtype=np.int8,
)
start = ancestor_data.ancestors_start[:]
end = ancestor_data.ancestors_end[:]
ancestors = ancestor_data.ancestors_haplotype[:]
for j in range(ancestor_data.num_ancestors):
A[start[j] : end[j], j] = ancestors[j]
for engine in [tsinfer.PY_ENGINE, tsinfer.C_ENGINE]:
ancestors_ts = tsinfer.match_ancestors(
sample_data, ancestor_data, engine=engine
)
G = S.astype(np.uint8).T
#Create the ancestors
input_root = zarr.group()
tsinfer.InputFile.build(
input_root, genotypes=G,
# genotype_qualities=tsinfer.proba_to_phred(error_probability),
position=positions,
recombination_rate=rho, sequence_length=ts.sequence_length,
compress=False)
ancestors_root = zarr.group()
#tsinfer.extract_ancestors(ts, ancestors_root)
tsinfer.build_simulated_ancestors(input_root, ancestors_root, ts)
ancestors_ts = tsinfer.match_ancestors(input_root, ancestors_root)
assert ancestors_ts.sequence_length == ts.num_sites
inferred_ts = tsinfer.match_samples(
input_root, ancestors_ts, method="C",
simplify=False)
print("inferred num_edges = ", inferred_ts.num_edges)
def visualise(
ts,
recombination_rate,
error_rate,
engine="C",
box_size=8,
perfect_ancestors=False,
path_compression=False,
time_chunking=False,
):
sample_data = tsinfer.SampleData.from_tree_sequence(ts)
if perfect_ancestors:
ancestor_data = tsinfer.AncestorData(sample_data)
tsinfer.build_simulated_ancestors(
sample_data, ancestor_data, ts, time_chunking=time_chunking
)
ancestor_data.finalise()
else:
ancestor_data = tsinfer.generate_ancestors(sample_data, engine=engine)
ancestors_ts = tsinfer.match_ancestors(
sample_data,
ancestor_data,
engine=engine,
path_compression=path_compression,
extended_checks=True,
)
inferred_ts = tsinfer.match_samples(
sample_data,
ancestors_ts,
for j in range(estimated_anc.num_ancestors):
focal = focal_sites[j]
if len(focal) > 0:
estimated_anc_focal_distance[j] = pos[focal[-1]] - pos[focal[0]]
results = {
"num_sites": ts.num_sites,
"num_trees": ts.num_trees,
"estimated_anc_num": estimated_anc.num_ancestors,
"estimated_anc_mean_len": np.mean(estimated_anc_length),
"estimated_anc_mean_focal_distance": np.mean(estimated_anc_focal_distance),
}
if compute_exact:
exact_anc = tsinfer.AncestorData(sample_data)
tsinfer.build_simulated_ancestors(sample_data, exact_anc, ts)
exact_anc.finalise()
# Show lengths as a fraction of the total.
exact_anc_length = exact_anc.ancestors_end[:] - exact_anc.ancestors_start[:]
focal_sites = exact_anc.ancestors_focal_sites[:]
pos = np.hstack([exact_anc.sites_position[:] / ts.sequence_length] + [1])
exact_anc_focal_distance = np.zeros(exact_anc.num_ancestors)
for j in range(exact_anc.num_ancestors):
focal = focal_sites[j]
if len(focal) > 0:
exact_anc_focal_distance[j] = pos[focal[-1]] - pos[focal[0]]
results.update(
{
"exact_anc_num": exact_anc.num_ancestors,
"exact_anc_mean_len": np.mean(exact_anc_length),
"exact_anc_mean_focal_distance": np.mean(exact_anc_focal_distance),
sim_args = {
"sample_size": args.sample_size,
"length": args.length * MB,
"recombination_rate": args.recombination_rate,
"mutation_rate": args.mutation_rate,
"Ne": args.Ne,
"model": "smc_prime",
"random_seed": rng.randint(1, 2 ** 30),
}
ts = msprime.simulate(**sim_args)
sample_data = generate_samples(ts, args.error)
inferred_anc = tsinfer.generate_ancestors(sample_data, engine=args.engine)
true_anc = tsinfer.AncestorData(sample_data)
tsinfer.build_simulated_ancestors(sample_data, true_anc, ts)
true_anc.finalise()
return sample_data, true_anc, inferred_anc
def run_infer(
ts, engine=tsinfer.C_ENGINE, path_compression=True, exact_ancestors=False
):
"""
Runs the perfect inference process on the specified tree sequence.
"""
sample_data = tsinfer.SampleData.from_tree_sequence(ts)
if exact_ancestors:
ancestor_data = tsinfer.AncestorData(sample_data)
tsinfer.build_simulated_ancestors(sample_data, ancestor_data, ts)
ancestor_data.finalise()
else:
ancestor_data = tsinfer.generate_ancestors(sample_data, engine=engine)
ancestors_ts = tsinfer.match_ancestors(
sample_data, ancestor_data, path_compression=path_compression, engine=engine
)
inferred_ts = tsinfer.match_samples(
sample_data, ancestors_ts, path_compression=path_compression, engine=engine
)
return inferred_ts