Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_tree_stat_vector_interface(self):
ts = msprime.simulate(10)
tsc = msprime.TreeStatCalculator(ts)
def f(x):
return [1.0]
# Duplicated samples raise an error
self.assertRaises(ValueError, tsc.tree_stat_vector, [[1, 1]], f)
self.assertRaises(ValueError, tsc.tree_stat_vector, [[1], [2, 2]], f)
# Make sure the basic call doesn't throw an exception
tsc.tree_stat_vector([[1, 2]], f)
# Check for bad windows
for bad_start in [-1, 1, 1e-7]:
self.assertRaises(
ValueError, tsc.tree_stat_vector, [[1, 2]], f,
[bad_start, ts.sequence_length])
for bad_end in [0, ts.sequence_length - 1, ts.sequence_length + 1]:
self.assertRaises(
def check_windowization(self, ts):
samples = random.sample(ts.samples(), 2)
tsc = msprime.TreeStatCalculator(ts)
A_one = [[samples[0]], [samples[1]]]
A_many = [random.sample(ts.samples(), 2),
random.sample(ts.samples(), 2)]
some_breaks = list(set([0.0, ts.sequence_length/2, ts.sequence_length] +
random.sample(list(ts.breakpoints()), 5)))
some_breaks.sort()
tiny_breaks = ([(k / 4) * list(ts.breakpoints())[1] for k in range(4)] +
[ts.sequence_length])
wins = [[0.0, ts.sequence_length],
[0.0, ts.sequence_length/2, ts.sequence_length],
tiny_breaks,
some_breaks]
with self.assertRaises(ValueError):
tsc.tree_stat_vector(A_one, lambda x: 1.0,
windows=[0.0, 1.0, ts.sequence_length+1.1])
def check_vectorization(self, ts):
samples = random.sample(ts.samples(), 3)
tsc = msprime.TreeStatCalculator(ts)
A = [[samples[0]], [samples[1]], [samples[2]]]
def f(x):
return [float((x[0] > 0) != (x[1] > 0)),
float((x[0] > 0) != (x[2] > 0)),
float((x[1] > 0) != (x[2] > 0))]
self.assertListAlmostEqual(
tree_stat_vector_node_iter(ts, A, f, method='mutations'),
[ts.pairwise_diversity(samples=[samples[0], samples[1]]),
ts.pairwise_diversity(samples=[samples[0], samples[2]]),
ts.pairwise_diversity(samples=[samples[1], samples[2]])])
self.assertListAlmostEqual(
tree_stat_vector_node_iter(ts, A, f, method='length'),
[branch_length_diversity(ts, A[0], A[1]),
branch_length_diversity(ts, A[0], A[2]),
def check_f3_stat(self, ts):
A = [random.sample(ts.samples(), 3),
random.sample(ts.samples(), 2),
random.sample(ts.samples(), 1)]
tsc = msprime.TreeStatCalculator(ts)
windows = [0.0, ts.sequence_length/20, ts.sequence_length/2, ts.sequence_length]
ts_values = tsc.f3(A, windows)
ts_vector_values = tsc.f3_vector(A, windows, [(0, 1, 2), (1, 0, 2)])
self.assertListEqual([len(x) for x in ts_values],
[1 for _ in range(len(windows)-1)])
here_values = [[branch_length_f3(ts, A[0], A[1], A[2], begin=windows[k],
end=windows[k+1]),
branch_length_f3(ts, A[1], A[0], A[2], begin=windows[k],
end=windows[k+1])]
for k in range(len(windows)-1)]
self.assertListAlmostEqual([y[0] for y in here_values],
[x[0] for x in ts_values])
self.assertListAlmostEqual([y[0] for y in here_values],
[x[0] for x in ts_vector_values])
self.assertListAlmostEqual([y[1] for y in here_values],
[x[1] for x in ts_vector_values])
def check_f4_stat(self, ts):
A_zero = [[x] for x in random.sample(ts.samples(), 4)]
A_many = [random.sample(ts.samples(), 3),
random.sample(ts.samples(), 3),
random.sample(ts.samples(), 3),
random.sample(ts.samples(), 3)]
A_list = A_zero + A_many
tsc = msprime.TreeStatCalculator(ts)
windows = [0.0, ts.sequence_length/2.0, ts.sequence_length]
indices = [(0, 1, 2, 3), (0, 1, 4, 5), (4, 5, 6, 7)]
ts_vector = tsc.f4_vector(A_list, windows, indices)
for k in range(len(indices)):
index_list = indices[k]
A = [A_list[i] for i in index_list]
def f(x):
return ((float(x[0])/len(A[0])-float(x[1])/len(A[1]))
* (float(x[2])/len(A[2])-float(x[3])/len(A[3])))
here_values = [branch_length_f4(ts, A[0], A[1], A[2], A[3],
begin=windows[i], end=windows[i+1])
for i in range(len(windows)-1)]
self.assertListAlmostEqual(
def check_pairwise_diversity(self, ts):
samples = random.sample(ts.samples(), 2)
tsc = msprime.TreeStatCalculator(ts)
A_one = [[samples[0]], [samples[1]]]
A_many = [random.sample(ts.samples(), 2),
random.sample(ts.samples(), 2)]
for A in (A_one, A_many):
n = [len(a) for a in A]
def f(x):
return float(x[0]*(n[1]-x[1]) + (n[0]-x[0])*x[1])/float(n[0]*n[1])
self.assertAlmostEqual(
tree_stat_node_iter(ts, A, f, method='length'),
branch_length_diversity(ts, A[0], A[1]))
self.assertAlmostEqual(
tsc.tree_stat(A, tupleize(f)),
branch_length_diversity(ts, A[0], A[1]))
def check_tmrca_matrix(self, ts):
# nonoverlapping samples
samples = random.sample(ts.samples(), 6)
tsc = msprime.TreeStatCalculator(ts)
A = [samples[0:3], samples[3:5], samples[5:6]]
windows = [0.0, ts.sequence_length/2, ts.sequence_length]
ts_values = tsc.mean_pairwise_tmrca(A, windows)
ts_matrix_values = tsc.mean_pairwise_tmrca_matrix(A, windows)
self.assertListEqual([len(x) for x in ts_values], [len(samples), len(samples)])
assert(len(A[2]) == 1)
self.assertListEqual([x[5] for x in ts_values], [np.nan, np.nan])
self.assertEqual(len(ts_values), len(ts_matrix_values))
for w in range(len(ts_values)):
self.assertArrayEqual(
ts_matrix_values[w, :, :],
upper_tri_to_matrix(ts_values[w]))
here_values = np.array([[[branch_length_diversity(ts, A[i], A[j],
begin=windows[k],
end=windows[k+1])
for i in range(len(A))]
def check_f2_stat(self, ts):
A = [random.sample(ts.samples(), 3),
random.sample(ts.samples(), 2),
random.sample(ts.samples(), 2)]
tsc = msprime.TreeStatCalculator(ts)
windows = [0.0, ts.sequence_length/20, ts.sequence_length/2, ts.sequence_length]
ts_values = tsc.f2(A[0:2], windows)
ts_vector_values = tsc.f2_vector(A, windows, [(0, 1), (1, 2)])
self.assertListEqual([len(x) for x in ts_values],
[1 for _ in range(len(windows)-1)])
here_values = [[branch_length_f2(ts, A[0], A[1], begin=windows[k],
end=windows[k+1]),
branch_length_f2(ts, A[1], A[2], begin=windows[k],
end=windows[k+1])]
for k in range(len(windows)-1)]
self.assertListAlmostEqual([y[0] for y in here_values],
[x[0] for x in ts_values])
self.assertListAlmostEqual([y[0] for y in here_values],
[x[0] for x in ts_vector_values])
self.assertListAlmostEqual([y[1] for y in here_values],
[x[1] for x in ts_vector_values])