Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if vpaths:
alignment = get_multiple_alignment_of_repeats_from_reads(vpaths)
transitions, emissions = build_profile_hmm_pseudocounts_for_alignment(settings.MAX_ERROR_RATE, alignment)
else:
transitions, emissions = build_profile_hmm_for_repeats(patterns, settings.MAX_ERROR_RATE)
matches = [m for m in emissions.keys() if m.startswith('M')]
last_end = None
for repeat in range(copies):
insert_states = []
match_states = []
delete_states = []
for i in range(len(matches) + 1):
insert_distribution = DiscreteDistribution(emissions['I%s' % i])
insert_states.append(State(insert_distribution, name='I%s_%s' % (i, repeat)))
for i in range(1, len(matches) + 1):
match_distribution = DiscreteDistribution(emissions['M%s' % i])
match_states.append(State(match_distribution, name='M%s_%s' % (str(i), repeat)))
for i in range(1, len(matches) + 1):
delete_states.append(State(None, name='D%s_%s' % (str(i), repeat)))
unit_start = State(None, name='unit_start_%s' % repeat)
unit_end = State(None, name='unit_end_%s' % repeat)
model.add_states(insert_states + match_states + delete_states + [unit_start, unit_end])
n = len(delete_states)-1
if repeat > 0:
model.add_transition(last_end, unit_start, 1)
else:
def build_reference_repeat_finder_hmm(patterns, copies=1):
pattern = patterns[0]
model = Model(name="HMM Model")
insert_distribution = DiscreteDistribution({'A': 0.25, 'C': 0.25, 'G': 0.25, 'T': 0.25})
last_end = None
start_random_matches = State(insert_distribution, name='start_random_matches')
end_random_matches = State(insert_distribution, name='end_random_matches')
model.add_states([start_random_matches, end_random_matches])
for repeat in range(copies):
insert_states = []
match_states = []
delete_states = []
for i in range(len(pattern) + 1):
insert_states.append(State(insert_distribution, name='I%s_%s' % (i, repeat)))
for i in range(len(pattern)):
distribution_map = dict({'A': 0.01, 'C': 0.01, 'G': 0.01, 'T': 0.01})
distribution_map[pattern[i]] = 0.97
match_states.append(State(DiscreteDistribution(distribution_map), name='M%s_%s' % (str(i + 1), repeat)))
for i in range(len(pattern)):
delete_states.append(State(None, name='D%s_%s' % (str(i + 1), repeat)))
unit_start = State(None, name='unit_start_%s' % repeat)
unit_end = State(None, name='unit_end_%s' % repeat)
model.add_states(insert_states + match_states + delete_states + [unit_start, unit_end])
last = len(delete_states)-1
if repeat > 0:
model.add_transition(last_end, unit_start, 0.5)
def make_hmm_model(emission_mat, transition_probs):
model = pomegranate.HiddenMarkovModel('ndf')
ictal_emissions = {i:emission_mat[1,i] for i in range(emission_mat.shape[1])}
baseline_emissions = {i:emission_mat[0,i] for i in range(emission_mat.shape[1])}
ictal = pomegranate.State(pomegranate.DiscreteDistribution(ictal_emissions ), name = '1')
baseline = pomegranate.State(pomegranate.DiscreteDistribution(baseline_emissions), name = '0')
model.add_state(ictal)
model.add_state(baseline)
model.add_transition( model.start, ictal, 0.05 )
model.add_transition( model.start, baseline, 99.95)
model.add_transition( baseline, baseline, transition_probs[0,0] )
model.add_transition( baseline, ictal, transition_probs[0,1] )
model.add_transition( ictal, ictal , transition_probs[1,1] )
model.add_transition( ictal, baseline, transition_probs[1,0] )
model.bake(verbose=False )
return model
def test_example_pomegranate(self):
"""
This example is taken from https://pomegranate.readthedocs.io/en/latest/HiddenMarkovModel.html
"""
from pomegranate import DiscreteDistribution, State, HiddenMarkovModel
d1 = DiscreteDistribution({'A': 0.35, 'C': 0.20, 'G': 0.05, 'T': 0.40})
d2 = DiscreteDistribution({'A': 0.25, 'C': 0.25, 'G': 0.25, 'T': 0.25})
d3 = DiscreteDistribution({'A': 0.10, 'C': 0.40, 'G': 0.40, 'T': 0.10})
s1 = State(d1, name="s1")
s2 = State(d2, name="s2")
s3 = State(d3, name="s3")
model = HiddenMarkovModel(name='example')
model.add_states([s1, s2, s3])
model.add_transition(model.start, s1, 0.90)
model.add_transition(model.start, s2, 0.10)
model.add_transition(s1, s1, 0.80)
model.add_transition(s1, s2, 0.20)
model.add_transition(s2, s2, 0.90)
model.add_transition(s2, s3, 0.10)
model.add_transition(s3, s3, 0.70)
model.add_transition(s3, model.end, 0.30)
model.bake()
answer = model.log_probability(list('ACGACTATTCGAT'))
expected = -22.73896159971087
def load_segmentation_model(modeldata):
model = HiddenMarkovModel('model')
states = {}
for s in modeldata:
if len(s['emission']) == 1:
emission = NormalDistribution(*s['emission'][0][:2])
else:
weights = np.array([w for _, _, w in s['emission']])
dists = [NormalDistribution(mu, sigma)
for mu, sigma, _ in s['emission']]
emission = GeneralMixtureModel(dists, weights=weights)
state = State(emission, name=s['name'])
states[s['name']] = state
model.add_state(state)
if 'start_prob' in s:
model.add_transition(model.start, state, s['start_prob'])
for s in modeldata:
current = states[s['name']]
for nextstate, prob in s['transition']:
model.add_transition(current, states[nextstate], prob)
model.bake()
return model
def test_example_pomegranate(self):
"""
This example is taken from https://pomegranate.readthedocs.io/en/latest/HiddenMarkovModel.html
"""
from pomegranate import DiscreteDistribution, State, HiddenMarkovModel
d1 = DiscreteDistribution({'A': 0.35, 'C': 0.20, 'G': 0.05, 'T': 0.40})
d2 = DiscreteDistribution({'A': 0.25, 'C': 0.25, 'G': 0.25, 'T': 0.25})
d3 = DiscreteDistribution({'A': 0.10, 'C': 0.40, 'G': 0.40, 'T': 0.10})
s1 = State(d1, name="s1")
s2 = State(d2, name="s2")
s3 = State(d3, name="s3")
model = HiddenMarkovModel(name='example')
model.add_states([s1, s2, s3])
model.add_transition(model.start, s1, 0.90)
model.add_transition(model.start, s2, 0.10)
model.add_transition(s1, s1, 0.80)
model.add_transition(s1, s2, 0.20)
model.add_transition(s2, s2, 0.90)
model.add_transition(s2, s3, 0.10)
model.add_transition(s3, s3, 0.70)
model.add_transition(s3, model.end, 0.30)
model.bake()
answer = model.log_probability(list('ACGACTATTCGAT'))
expected = -22.73896159971087
def get_suffix_matcher_hmm(pattern):
model = Model(name="Suffix Matcher HMM Model")
insert_distribution = DiscreteDistribution({'A': 0.25, 'C': 0.25, 'G': 0.25, 'T': 0.25})
insert_states = []
match_states = []
delete_states = []
hmm_name = 'suffix'
for i in range(len(pattern) + 1):
insert_states.append(State(insert_distribution, name='I%s_%s' % (i, hmm_name)))
for i in range(len(pattern)):
distribution_map = dict({'A': 0.01, 'C': 0.01, 'G': 0.01, 'T': 0.01})
distribution_map[pattern[i]] = 0.97
match_states.append(State(DiscreteDistribution(distribution_map), name='M%s_%s' % (str(i + 1), hmm_name)))
for i in range(len(pattern)):
delete_states.append(State(None, name='D%s_%s' % (str(i + 1), hmm_name)))
unit_start = State(None, name='suffix_start_%s' % hmm_name)
unit_end = State(None, name='suffix_end_%s' % hmm_name)
model.add_states(insert_states + match_states + delete_states + [unit_start, unit_end])
last = len(delete_states)-1
model.add_transition(model.start, unit_start, 1)
def build_reference_repeat_finder_hmm(patterns, copies=1):
pattern = patterns[0]
model = Model(name="HMM Model")
insert_distribution = DiscreteDistribution({'A': 0.25, 'C': 0.25, 'G': 0.25, 'T': 0.25})
last_end = None
start_random_matches = State(insert_distribution, name='start_random_matches')
end_random_matches = State(insert_distribution, name='end_random_matches')
model.add_states([start_random_matches, end_random_matches])
for repeat in range(copies):
insert_states = []
match_states = []
delete_states = []
for i in range(len(pattern) + 1):
insert_states.append(State(insert_distribution, name='I%s_%s' % (i, repeat)))
for i in range(len(pattern)):
distribution_map = dict({'A': 0.01, 'C': 0.01, 'G': 0.01, 'T': 0.01})
distribution_map[pattern[i]] = 0.97
match_states.append(State(DiscreteDistribution(distribution_map), name='M%s_%s' % (str(i + 1), repeat)))
for i in range(len(pattern)):
delete_states.append(State(None, name='D%s_%s' % (str(i + 1), repeat)))
for repeat in range(copies):
insert_states = []
match_states = []
delete_states = []
for i in range(len(matches) + 1):
insert_distribution = DiscreteDistribution(emissions['I%s' % i])
insert_states.append(State(insert_distribution, name='I%s_%s' % (i, repeat)))
for i in range(1, len(matches) + 1):
match_distribution = DiscreteDistribution(emissions['M%s' % i])
match_states.append(State(match_distribution, name='M%s_%s' % (str(i), repeat)))
for i in range(1, len(matches) + 1):
delete_states.append(State(None, name='D%s_%s' % (str(i), repeat)))
unit_start = State(None, name='unit_start_%s' % repeat)
unit_end = State(None, name='unit_end_%s' % repeat)
model.add_states(insert_states + match_states + delete_states + [unit_start, unit_end])
n = len(delete_states)-1
if repeat > 0:
model.add_transition(last_end, unit_start, 1)
else:
model.add_transition(model.start, unit_start, 1)
if repeat == copies - 1:
model.add_transition(unit_end, model.end, 1)
model.add_transition(unit_start, match_states[0], transitions['unit_start']['M1'])
model.add_transition(unit_start, delete_states[0], transitions['unit_start']['D1'])
model.add_transition(unit_start, insert_states[0], transitions['unit_start']['I0'])