Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
2 0.75 0
3 0.85 0
""")
mutations = six.StringIO("""\
site node derived_state parent
0 0 1 -1
0 10 1 -1
0 0 0 0
1 8 1 -1
1 2 1 -1
2 8 1 -1
2 9 0 5
""")
ts = tskit.load_text(
nodes=nodes, edges=edges, sites=sites, mutations=mutations, strict=False)
branch_tsc = tskit.BranchLengthStatCalculator(ts)
py_branch_tsc = PythonBranchLengthStatCalculator(ts)
site_tsc = tskit.SiteStatCalculator(ts)
py_site_tsc = PythonSiteStatCalculator(ts)
# divergence between 0 and 1
A = [[0], [1]]
def f(x):
return float((x[0] > 0) != (x[1] > 0))/2.0
# tree lengths:
self.assertAlmostEqual(py_branch_tsc.tree_length_diversity([0], [1]),
branch_true_diversity_01)
self.assertAlmostEqual(branch_tsc.tree_stat(A, f),
branch_true_diversity_01)
self.assertAlmostEqual(py_branch_tsc.tree_stat(A, f),
def test_windowization(self):
ts = msprime.simulate(10, random_seed=self.random_seed, recombination_rate=100)
samples = random.sample(list(ts.samples()), 2)
tsc = tskit.BranchLengthStatCalculator(ts)
py_tsc = PythonBranchLengthStatCalculator(ts)
A_one = [[samples[0]], [samples[1]]]
A_many = [random.sample(list(ts.samples()), 2),
random.sample(list(ts.samples()), 2)]
some_breaks = list(set([0.0, ts.sequence_length/2, ts.sequence_length] +
random.sample(list(ts.breakpoints()), 5)))
some_breaks.sort()
tiny_breaks = ([(k / 4) * list(ts.breakpoints())[1] for k in range(4)] +
[ts.sequence_length])
wins = [[0.0, ts.sequence_length],
[0.0, ts.sequence_length/2, ts.sequence_length],
tiny_breaks,
some_breaks]
with self.assertRaises(ValueError):
tsc.tree_stat_vector(A_one, lambda x: 1.0,
def check_divergence_matrix(self, ts):
# nonoverlapping samples
samples = random.sample(list(ts.samples()), 6)
tsc = tskit.BranchLengthStatCalculator(ts)
py_tsc = PythonBranchLengthStatCalculator(ts)
A = [samples[0:3], samples[3:5], samples[5:6]]
windows = [0.0, ts.sequence_length/2, ts.sequence_length]
ts_values = tsc.divergence(A, windows)
ts_matrix_values = tsc.divergence_matrix(A, windows)
self.assertListEqual([len(x) for x in ts_values], [len(samples), len(samples)])
assert(len(A[2]) == 1)
self.assertListEqual([x[5] for x in ts_values], [np.nan, np.nan])
self.assertEqual(len(ts_values), len(ts_matrix_values))
for w in range(len(ts_values)):
self.assertArrayEqual(
ts_matrix_values[w, :, :], upper_tri_to_matrix(ts_values[w]))
here_values = np.array([[[py_tsc.tree_length_diversity(A[i], A[j],
begin=windows[k],
end=windows[k+1])
for i in range(len(A))]
def check_pairwise_diversity(self, ts):
samples = random.sample(list(ts.samples()), 2)
tsc = tskit.BranchLengthStatCalculator(ts)
py_tsc = PythonBranchLengthStatCalculator(ts)
A_one = [[samples[0]], [samples[1]]]
A_many = [random.sample(list(ts.samples()), 2),
random.sample(list(ts.samples()), 2)]
for A in (A_one, A_many):
n = [len(a) for a in A]
def f(x):
return float(x[0]*(n[1]-x[1]))/float(n[0]*n[1])
self.assertAlmostEqual(
py_tsc.tree_stat(A, f),
py_tsc.tree_length_diversity(A[0], A[1]))
self.assertAlmostEqual(
tsc.tree_stat(A, f),
py_tsc.tree_length_diversity(A[0], A[1]))
def test_tree_stat_vector_interface(self):
ts = msprime.simulate(10)
tsc = tskit.BranchLengthStatCalculator(ts)
def f(x):
return [1.0]
# Duplicated samples raise an error
self.assertRaises(ValueError, tsc.tree_stat_vector, [[1, 1]], f)
self.assertRaises(ValueError, tsc.tree_stat_vector, [[1], [2, 2]], f)
# Make sure the basic call doesn't throw an exception
tsc.tree_stat_vector([[1, 2]], f)
# Check for bad windows
for bad_start in [-1, 1, 1e-7]:
self.assertRaises(
ValueError, tsc.tree_stat_vector, [[1, 2]], f,
[bad_start, ts.sequence_length])
for bad_end in [0, ts.sequence_length - 1, ts.sequence_length + 1]:
self.assertRaises(
def test_interface(self):
self.assertRaises(TypeError, tskit.GeneralStatCalculator)
self.assertRaises(TypeError, tskit.SiteStatCalculator)
self.assertRaises(TypeError, tskit.BranchLengthStatCalculator)
def test_sfs_interface(self):
ts = msprime.simulate(10)
tsc = tskit.BranchLengthStatCalculator(ts)
# Duplicated samples raise an error
self.assertRaises(ValueError, tsc.site_frequency_spectrum, [1, 1])
self.assertRaises(ValueError, tsc.site_frequency_spectrum, [])
self.assertRaises(ValueError, tsc.site_frequency_spectrum, [0, 11])
# Check for bad windows
for bad_start in [-1, 1, 1e-7]:
self.assertRaises(
ValueError, tsc.site_frequency_spectrum, [1, 2],
[bad_start, ts.sequence_length])
for bad_end in [0, ts.sequence_length - 1, ts.sequence_length + 1]:
self.assertRaises(
ValueError, tsc.site_frequency_spectrum, [1, 2],
[0, bad_end])
# Windows must be increasing.
self.assertRaises(
def test_small_sim(self):
orig_ts = msprime.simulate(4, random_seed=self.random_seed,
mutation_rate=0.0,
recombination_rate=3.0)
ts = tsutil.jukes_cantor(orig_ts, num_sites=3, mu=3,
multiple_per_node=True, seed=self.seed)
branch_tsc = tskit.BranchLengthStatCalculator(ts)
py_branch_tsc = PythonBranchLengthStatCalculator(ts)
site_tsc = tskit.SiteStatCalculator(ts)
py_site_tsc = PythonSiteStatCalculator(ts)
A = [[0], [1], [2]]
self.assertAlmostEqual(branch_tsc.Y3(A, [0.0, 1.0])[0][0],
py_branch_tsc.Y3(*A))
self.assertAlmostEqual(site_tsc.Y3(A, [0.0, 1.0])[0][0],
py_site_tsc.Y3(*A))
A = [[0], [1, 2]]
self.assertAlmostEqual(branch_tsc.Y2(A, [0.0, 1.0])[0][0],
py_branch_tsc.Y2(*A))
self.assertAlmostEqual(site_tsc.Y2(A, [0.0, 1.0])[0][0],
py_site_tsc.Y2(*A))