Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_example_data(self, sample_size, sequence_length, num_ancestors):
ts = msprime.simulate(
sample_size,
recombination_rate=1,
mutation_rate=10,
length=sequence_length,
random_seed=100,
)
sample_data = formats.SampleData.from_tree_sequence(ts)
num_sites = sample_data.num_sites
ancestors = []
for j in reversed(range(num_ancestors)):
haplotype = np.full(num_sites, tskit.MISSING_DATA, dtype=np.int8)
start = j
end = max(num_sites - j, start + 1)
self.assertLess(start, end)
haplotype[start:end] = 0
if start + j < end:
haplotype[start + j : end] = 1
self.assertTrue(np.all(haplotype[:start] == tskit.MISSING_DATA))
self.assertTrue(np.all(haplotype[end:] == tskit.MISSING_DATA))
focal_sites = np.array([start + k for k in range(j)], dtype=np.int32)
focal_sites = focal_sites[focal_sites < end]
haplotype[focal_sites] = 1
ancestors.append((start, end, 2 * j + 1, focal_sites, haplotype))
return sample_data, ancestors
def verify_data_round_trip(
self,
genotypes,
positions,
alleles=None,
sequence_length=None,
site_times=None,
individual_times=None,
):
genotypes = genotypes.copy()
m, n = genotypes.shape
for j in range(m):
genotypes[j, j % n] = tskit.MISSING_DATA
sample_data = self.create_sample_data(
genotypes, positions, alleles, sequence_length, site_times, individual_times
)
engines = [tsinfer.C_ENGINE, tsinfer.PY_ENGINE]
for engine in engines:
ts = tsinfer.infer(
sample_data,
recombination_rate=1e-3,
mismatch_rate=1e-6,
precision=10,
engine=engine,
)
self.assert_lossless(
ts, genotypes, positions, alleles, sample_data.sequence_length
)
def test_all_missing_at_adjacent_site(self):
u = tskit.MISSING_DATA
G = np.array(
[
[1, 1, 0, 0, 0, 0], # Site 0
[u, u, 0, 1, 1, 1], # Site 1
[1, 1, 0, 0, 0, 0], # Site 2
[u, u, 1, 0, 1, 1], # Site 3
[1, 1, 1, 1, 0, 0],
]
)
# For the haplotype focussed on sites 0 & 2, the relevant values at sites
# 1 and 3 in this example are all missing and should default to 0 in both cases
expected_hap_focal_site_0 = [1, 0, 1, 0, 1]
adp, adc = self.verify_ancestor_generator(G)
site_0_anc = [i for i, fs in enumerate(adc.ancestors_focal_sites[:]) if 0 in fs]
self.assertTrue(len(site_0_anc) == 1)
site_0_anc = site_0_anc[0]
def verify_inserted_ancestors(self, ts):
# Verifies that we can round-trip the specified tree sequence
# using the generated ancestors. NOTE: this must be an SMC
# consistent tree sequence!
with tsinfer.SampleData(sequence_length=ts.sequence_length) as sample_data:
for v in ts.variants():
sample_data.add_site(v.position, v.genotypes, v.alleles)
ancestor_data = tsinfer.AncestorData(sample_data)
tsinfer.build_simulated_ancestors(sample_data, ancestor_data, ts)
ancestor_data.finalise()
A = np.full(
(ancestor_data.num_sites, ancestor_data.num_ancestors),
tskit.MISSING_DATA,
dtype=np.int8,
)
start = ancestor_data.ancestors_start[:]
end = ancestor_data.ancestors_end[:]
ancestors = ancestor_data.ancestors_haplotype[:]
for j in range(ancestor_data.num_ancestors):
A[start[j] : end[j], j] = ancestors[j]
for engine in [tsinfer.PY_ENGINE, tsinfer.C_ENGINE]:
ancestors_ts = tsinfer.match_ancestors(
sample_data, ancestor_data, engine=engine
)
tsinfer.check_ancestors_ts(ancestors_ts)
self.assertEqual(ancestor_data.num_sites, ancestors_ts.num_sites)
self.assertEqual(ancestor_data.num_ancestors, ancestors_ts.num_samples)
self.assertTrue(np.array_equal(ancestors_ts.genotype_matrix(), A))
inferred_ts = tsinfer.match_samples(
def verify_many_trees_dense_mutations(self, ts):
A = tsinfer.get_ancestral_haplotypes(ts)
A = A[ts.num_samples :][::-1]
tsinfer.get_ancestor_descriptors(A)
ancestors, start, end, focal_sites = tsinfer.get_ancestor_descriptors(A)
n, m = ancestors.shape
self.assertEqual(m, ts.num_sites)
self.assertTrue(np.all(ancestors[0, :] == 0))
for a, s, e, focal in zip(ancestors[1:], start[1:], end[1:], focal_sites[1:]):
self.assertTrue(0 <= s < e <= m)
self.assertTrue(np.all(a[:s] == tskit.MISSING_DATA))
self.assertTrue(np.all(a[e:] == tskit.MISSING_DATA))
self.assertTrue(np.all(a[s:e] != tskit.MISSING_DATA))
for site in focal:
self.assertEqual(a[site], 1)
def get_matrix(self, ts):
"""
Simple implementation using tree traversals.
"""
A = np.full((ts.num_nodes, ts.num_sites), tskit.MISSING_DATA, dtype=np.int8)
for t in ts.trees():
for site in t.sites():
for u in t.nodes():
A[u, site.id] = 0
for mutation in site.mutations:
# Every node underneath this node will have the value set
# at this site.
for u in t.nodes(mutation.node):
A[u, site.id] = 1
return A
def imputation_accuracy_worker(args):
simulation_args, missing_proportion = args
ts = msprime.simulate(**simulation_args)
np.random.seed(simulation_args["random_seed"])
G = ts.genotype_matrix()
missing = np.random.rand(ts.num_sites, ts.num_samples) < missing_proportion
G[missing] = tskit.MISSING_DATA
with tsinfer.SampleData(ts.sequence_length) as sample_data:
for var in ts.variants():
sample_data.add_site(
var.site.position, alleles=var.alleles, genotypes=G[var.site.id]
)
ts_inferred = tsinfer.infer(sample_data)
assert ts_inferred.num_sites == ts.num_sites
total_missing = np.sum(missing)
num_correct = 0
for v1, v2 in zip(ts.variants(), ts_inferred.variants()):
site_id = v1.site.id
a1 = np.array(v1.alleles)[v1.genotypes]
a2 = np.array(v2.alleles)[v2.genotypes]
original = a1[missing[site_id]]
inferred = a2[missing[site_id]]
end = [L]
# ancestors = []
# focal_sites = []
# start = []
# end = []
mask = np.ones(L)
for a in A:
masked = np.logical_and(a == 1, mask).astype(int)
new_sites = np.where(masked)[0]
mask[new_sites] = 0
segment = np.where(a != tskit.MISSING_DATA)[0]
# Skip any ancestors that are entirely unknown
if segment.shape[0] > 0:
s = segment[0]
e = segment[-1] + 1
assert np.all(a[s:e] != tskit.MISSING_DATA)
assert np.all(a[:s] == tskit.MISSING_DATA)
assert np.all(a[e:] == tskit.MISSING_DATA)
ancestors.append(a)
focal_sites.append(new_sites)
start.append(s)
end.append(e)
return np.array(ancestors, dtype=np.int8), start, end, focal_sites
def get_ancestral_haplotypes(ts):
"""
Returns a numpy array of the haplotypes of the ancestors in the
specified tree sequence.
"""
tables = ts.dump_tables()
nodes = tables.nodes
flags = nodes.flags[:]
flags[:] = 1
nodes.set_columns(time=nodes.time, flags=flags)
sites = tables.sites.position
tsp = tables.tree_sequence()
B = tsp.genotype_matrix().T
A = np.full((ts.num_nodes, ts.num_sites), tskit.MISSING_DATA, dtype=np.int8)
for edge in ts.edges():
start = bisect.bisect_left(sites, edge.left)
end = bisect.bisect_right(sites, edge.right)
if sites[end - 1] == edge.right:
end -= 1
A[edge.parent, start:end] = B[edge.parent, start:end]
A[: ts.num_samples] = B[: ts.num_samples]
return A
if inference[site.id]:
while (
inferred_mutation < len(site_id)
and site_id[inferred_mutation] == inferred_site
):
tables.mutations.add_row(
site=site.id,
node=node[inferred_mutation],
derived_state=variant.alleles[
derived_state[inferred_mutation]
],
)
inferred_mutation += 1
inferred_site += 1
else:
if np.all(variant.genotypes == tskit.MISSING_DATA):
# Map_mutations has to have at least 1 non-missing value to work
inferred_anc_state = predefined_anc_state
mapped_mutations = []
else:
inferred_anc_state, mapped_mutations = tree.map_mutations(
variant.genotypes[sample_indexes], variant.alleles
)
if inferred_anc_state != predefined_anc_state:
# The user specified a specific ancestral state. However, the
# map_mutations method has reconstructed a different state at the
# root, so insert an extra mutation over each root to allow the
# ancestral state to be as the user specified
for root_node in tree.roots:
tables.mutations.add_row(
site=site.id,
node=root_node,