Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_migration_rate_directionality_from_ts(self):
tables = msprime.TableCollection(1)
for _ in range(3):
tables.populations.add_row()
tables.nodes.add_row(flags=msprime.NODE_IS_SAMPLE, time=0, population=0)
tables.nodes.add_row(flags=msprime.NODE_IS_SAMPLE, time=0, population=1)
population_configurations = [
msprime.PopulationConfiguration(),
msprime.PopulationConfiguration(),
msprime.PopulationConfiguration(),
]
t = 5
demographic_events = [
msprime.MigrationRateChange(time=t, rate=1, matrix_index=(0, 2)),
msprime.MigrationRateChange(time=t, rate=1, matrix_index=(1, 2)),
]
ts = msprime.simulate(
population_configurations=population_configurations,
demographic_events=demographic_events,
from_ts=tables.tree_sequence(), start_time=0,
random_seed=1)
tree = next(ts.trees())
self.assertEqual(tree.root, 2)
self.assertGreater(tree.time(2), t / 4)
self.assertEqual(tree.population(0), 0)
self.assertEqual(tree.population(1), 1)
self.assertEqual(tree.population(2), 2)
self.assertEqual(ts.node(0).population, 0)
self.assertEqual(ts.node(1).population, 1)
self.assertEqual(list(ts.samples()), [0, 1])
self.assertEqual(list(ts.samples(0)), [0])
def test_migration_rate_change_matrix_index(self):
g = 51
for N in range(1, 5):
for index in itertools.permutations(range(N), 2):
event = msprime.MigrationRateChange(time=g, rate=0, matrix_index=index)
d = event.get_ll_representation(N)
dp = {
"time": g,
"type": "migration_rate_change",
"migration_rate": 0,
"matrix_index": index[0] * N + index[1]}
self.assertEqual(d, dp)
msprime.PopulationConfiguration(
sample_size=nhaps[1], initial_size=N_EU, growth_rate=r_EU),
msprime.PopulationConfiguration(
sample_size=nhaps[2], initial_size=N_AS, growth_rate=r_AS)
]
migration_matrix = [
[ 0, m_AF_EU, m_AF_AS],
[m_AF_EU, 0, m_EU_AS],
[m_AF_AS, m_EU_AS, 0],
]
demographic_events = [
# CEU and CHB merge into B with rate changes at T_EU_AS
msprime.MassMigration(
time=T_EU_AS, source=2, destination=1, proportion=1.0),
msprime.MigrationRateChange(time=T_EU_AS, rate=0),
msprime.MigrationRateChange(
time=T_EU_AS, rate=m_AF_B, matrix_index=(0, 1)),
msprime.MigrationRateChange(
time=T_EU_AS, rate=m_AF_B, matrix_index=(1, 0)),
msprime.PopulationParametersChange(
time=T_EU_AS, initial_size=N_B, growth_rate=0, population_id=1),
# Population B merges into YRI at T_B
msprime.MassMigration(
time=T_B, source=1, destination=0, proportion=1.0),
# Size changes to N_A at T_AF
msprime.PopulationParametersChange(
time=T_AF, initial_size=N_A, population_id=0)
]
# Use the demography debugger to print out the demographic history
# that we have just described.
dp = msprime.DemographyDebugger(
Ne=N_A,
msprime.PopulationConfiguration(
sample_size= nCEU, initial_size=N_EU, growth_rate=r_EU),
msprime.PopulationConfiguration(
sample_size= nCHB, initial_size=N_AS, growth_rate=r_AS)
]
migration_matrix = [
[ 0, m_AF_EU, m_AF_AS],
[m_AF_EU, 0, m_EU_AS],
[m_AF_AS, m_EU_AS, 0],
]
demographic_events = [
# CEU and CHB merge into B with rate changes at T_EU_AS
msprime.MassMigration(
time=T_EU_AS, source=2, destination=1, proportion=1.0),
msprime.MigrationRateChange(time=T_EU_AS, rate=0),
msprime.MigrationRateChange(
time=T_EU_AS, rate=m_AF_B, matrix_index=(0, 1)),
msprime.MigrationRateChange(
time=T_EU_AS, rate=m_AF_B, matrix_index=(1, 0)),
msprime.PopulationParametersChange(
time=T_EU_AS, initial_size=N_B, growth_rate=0, population_id=1),
# Population B merges into YRI at T_B
msprime.MassMigration(
time=T_B, source=1, destination=0, proportion=1.0),
# Size changes to N_A at T_AF
msprime.PopulationParametersChange(
time=T_AF, initial_size=N_A, population_id=0)
]
return dict(
population_configurations=population_configurations,
migration_matrix=migration_matrix,
raise_admixture_incompatability_error(parser, "-eM")
check_migration_rate(parser, x)
check_event_time(parser, t)
event = msprime.MigrationRateChange(
t, x / (num_populations - 1))
demographic_events.append((index, event))
for index, event in args.migration_matrix_entry_change:
t = event[0]
check_event_time(parser, t)
dest = convert_population_id(parser, event[1], num_populations)
source = convert_population_id(parser, event[2], num_populations)
if dest == source:
parser.error("Cannot set diagonal elements in migration matrix")
rate = event[3]
check_migration_rate(parser, rate)
msp_event = msprime.MigrationRateChange(t, rate, (dest, source))
demographic_events.append((index, msp_event))
for index, event in args.migration_matrix_change:
if len(event) < 3:
parser.error("Need at least three arguments to -ma")
if len(args.admixture) != 0:
raise_admixture_incompatability_error(parser, "-ema")
t = convert_float(event[0], parser)
check_event_time(parser, t)
if convert_int(event[1], parser) != num_populations:
parser.error(
"num_populations must be equal for new migration matrix")
matrix = convert_migration_matrix(parser, event[2:], num_populations)
for j in range(num_populations):
for k in range(num_populations):
if j != k:
msp_event = msprime.MigrationRateChange(
demographic_events.append((index, msp_event))
for index, event in args.migration_matrix_change:
if len(event) < 3:
parser.error("Need at least three arguments to -ma")
if len(args.admixture) != 0:
raise_admixture_incompatability_error(parser, "-ema")
t = convert_float(event[0], parser)
check_event_time(parser, t)
if convert_int(event[1], parser) != num_populations:
parser.error(
"num_populations must be equal for new migration matrix")
matrix = convert_migration_matrix(parser, event[2:], num_populations)
for j in range(num_populations):
for k in range(num_populations):
if j != k:
msp_event = msprime.MigrationRateChange(
t, matrix[j][k], (j, k))
demographic_events.append((index, msp_event))
# We've created all the events and PopulationConfiguration objects. Because
# msprime uses absolute population sizes we need to rescale these relative
# to Ne, since this is what ms does.
for _, msp_event in demographic_events:
if isinstance(msp_event, msprime.PopulationParametersChange):
if msp_event.initial_size is not None:
msp_event.initial_size /= 4
for config in population_configurations:
if config.initial_size is not None:
config.initial_size /= 4
demographic_events.sort(key=lambda x: (x[0], x[1].time))
time_sorted = sorted(demographic_events, key=lambda x: x[1].time)
demographic_events.append((index, event))
# Demographic events that affect the migration matrix
if num_populations == 1:
condition = (
len(args.migration_rate_change) > 0 or
len(args.migration_matrix_entry_change) > 0 or
len(args.migration_matrix_change) > 0)
if condition:
parser.error("Cannot change migration rates for 1 population")
for index, (t, x) in args.migration_rate_change:
if len(args.admixture) != 0:
raise_admixture_incompatability_error(parser, "-eM")
check_migration_rate(parser, x)
check_event_time(parser, t)
event = msprime.MigrationRateChange(
t, x / (num_populations - 1))
demographic_events.append((index, event))
for index, event in args.migration_matrix_entry_change:
t = event[0]
check_event_time(parser, t)
dest = convert_population_id(parser, event[1], num_populations)
source = convert_population_id(parser, event[2], num_populations)
if dest == source:
parser.error("Cannot set diagonal elements in migration matrix")
rate = event[3]
check_migration_rate(parser, rate)
msp_event = msprime.MigrationRateChange(t, rate, (dest, source))
demographic_events.append((index, msp_event))
for index, event in args.migration_matrix_change:
if len(event) < 3:
parser.error("Need at least three arguments to -ma")
sample_size=nhaps[0], initial_size=N_AF),
msprime.PopulationConfiguration(
sample_size=nhaps[1], initial_size=N_EU, growth_rate=r_EU),
msprime.PopulationConfiguration(
sample_size=nhaps[2], initial_size=N_AS, growth_rate=r_AS)
]
migration_matrix = [
[ 0, m_AF_EU, m_AF_AS],
[m_AF_EU, 0, m_EU_AS],
[m_AF_AS, m_EU_AS, 0],
]
demographic_events = [
# CEU and CHB merge into B with rate changes at T_EU_AS
msprime.MassMigration(
time=T_EU_AS, source=2, destination=1, proportion=1.0),
msprime.MigrationRateChange(time=T_EU_AS, rate=0),
msprime.MigrationRateChange(
time=T_EU_AS, rate=m_AF_B, matrix_index=(0, 1)),
msprime.MigrationRateChange(
time=T_EU_AS, rate=m_AF_B, matrix_index=(1, 0)),
msprime.PopulationParametersChange(
time=T_EU_AS, initial_size=N_B, growth_rate=0, population_id=1),
# Population B merges into YRI at T_B
msprime.MassMigration(
time=T_B, source=1, destination=0, proportion=1.0),
# Size changes to N_A at T_AF
msprime.PopulationParametersChange(
time=T_AF, initial_size=N_A, population_id=0)
]
# Use the demography debugger to print out the demographic history
# that we have just described.
dp = msprime.DemographyDebugger(
msprime.PopulationConfiguration(
sample_size= nCHB, initial_size=N_AS, growth_rate=r_AS)
]
migration_matrix = [
[ 0, m_AF_EU, m_AF_AS],
[m_AF_EU, 0, m_EU_AS],
[m_AF_AS, m_EU_AS, 0],
]
demographic_events = [
# CEU and CHB merge into B with rate changes at T_EU_AS
msprime.MassMigration(
time=T_EU_AS, source=2, destination=1, proportion=1.0),
msprime.MigrationRateChange(time=T_EU_AS, rate=0),
msprime.MigrationRateChange(
time=T_EU_AS, rate=m_AF_B, matrix_index=(0, 1)),
msprime.MigrationRateChange(
time=T_EU_AS, rate=m_AF_B, matrix_index=(1, 0)),
msprime.PopulationParametersChange(
time=T_EU_AS, initial_size=N_B, growth_rate=0, population_id=1),
# Population B merges into YRI at T_B
msprime.MassMigration(
time=T_B, source=1, destination=0, proportion=1.0),
# Size changes to N_A at T_AF
msprime.PopulationParametersChange(
time=T_AF, initial_size=N_A, population_id=0)
]
return dict(
population_configurations=population_configurations,
migration_matrix=migration_matrix,
demographic_events=demographic_events
)