Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
X=a[0]
X=X.astype(int)
# Create HMM
D=bond_dimension
N=X.shape[1]
d=np.max(X+1)
list_of_states=[]
for i in xrange(N):
list_of_states.append([])
for u in xrange(bond_dimension):
dictionnary=dict()
for l in xrange(d):
dictionnary[str(l)] = np.random.rand()
list_of_states[i].append(pomegranate.State(pomegranate.DiscreteDistribution(dictionnary)))
model = pomegranate.HiddenMarkovModel()
for i in xrange(N-1):
for d in xrange(D):
for d2 in xrange(D):
model.add_transition(list_of_states[i][d],list_of_states[i+1][d2],np.random.rand())
for d in xrange(D):
model.add_transition(model.start,list_of_states[0][d],np.random.rand())
for d in xrange(D):
model.add_transition(list_of_states[N-1][d],model.end,np.random.rand())
model.bake()
# Train HMM
begin = time.time()
sequencetrain=[[str(i) for i in v] for v in X]
np.random.seed()
model.fit(sequencetrain,algorithm='baum-welch',stop_threshold=1e-50,min_iterations=1000,\
max_iterations=n_iter)
def train(self, train_data):
self.discretizer = DiscretizeTransformer(self.meta, 8)
self.discretizer.fit(train_data)
train_data_d = self.discretizer.transform(train_data)
self.model = BayesianNetwork.from_samples(train_data_d, algorithm='chow-liu')
def fit(self, data, categoricals=tuple(), ordinals=tuple()):
self.discretizer = DiscretizeTransformer(n_bins=15)
self.discretizer.fit(data, categoricals, ordinals)
discretized_data = self.discretizer.transform(data)
self.model = BayesianNetwork.from_samples(discretized_data, algorithm='chow-liu')
pom.NormalDistribution(-1.0, stdev, frozen=False),
pom.NormalDistribution(0.0, stdev, frozen=False),
pom.NormalDistribution(0.585, stdev, frozen=False),
]
n_states = len(distributions)
# Starts -- prefer neutral
binom_coefs = scipy.special.binom(n_states - 1, range(n_states))
start_probabilities = binom_coefs / binom_coefs.sum()
# Prefer to keep the current state in each transition
# All other transitions are equally likely, to start
transition_matrix = (np.identity(n_states) * 100
+ np.ones((n_states, n_states)) / n_states)
model = pom.HiddenMarkovModel.from_matrix(transition_matrix, distributions,
start_probabilities, state_names=state_names, name=method)
model.fit(sequences=observations,
weights=[len(obs) for obs in observations],
distribution_inertia = .8, # Allow updating dists, but slowly
edge_inertia=0.1,
# lr_decay=.75,
pseudocount=5,
use_pseudocount=True,
max_iterations=100000,
n_jobs=processes,
verbose=False)
return model
def variants_in_segment(varr, segment, min_variants=50):
if len(varr) > min_variants:
observations = varr.mirrored_baf(above_half=True)
state_names = ["neutral", "alt"]
distributions = [
pom.NormalDistribution(0.5, .1, frozen=True),
pom.NormalDistribution(0.67, .1, frozen=True),
]
n_states = len(distributions)
# Starts -- prefer neutral
start_probabilities = [.95, .05]
# Prefer to keep the current state in each transition
# All other transitions are equally likely, to start
transition_matrix = (np.identity(n_states) * 100
+ np.ones((n_states, n_states)) / n_states)
model = pom.HiddenMarkovModel.from_matrix(transition_matrix, distributions,
start_probabilities, state_names=state_names, name="loh")
model.fit(sequences=[observations],
edge_inertia=0.1,
lr_decay=.75,
pseudocount=5,
use_pseudocount=True,
max_iterations=100000,
#n_jobs=1, # processes,
verbose=False)
states = np.array(model.predict(observations, algorithm='map'))
logging.info("Done, now finalizing")
logging.debug("Model states: %s", model.states)
logging.debug("Predicted states: %s", states[:100])
logging.debug(str(collections.Counter(states)))
def get_prefix_matcher_hmm(pattern):
model = Model(name="Prefix Matcher HMM Model")
insert_distribution = DiscreteDistribution({'A': 0.25, 'C': 0.25, 'G': 0.25, 'T': 0.25})
insert_states = []
match_states = []
delete_states = []
hmm_name = 'prefix'
for i in range(len(pattern) + 1):
insert_states.append(State(insert_distribution, name='I%s_%s' % (i, hmm_name)))
for i in range(len(pattern)):
distribution_map = dict({'A': 0.01, 'C': 0.01, 'G': 0.01, 'T': 0.01})
distribution_map[pattern[i]] = 0.97
match_states.append(State(DiscreteDistribution(distribution_map), name='M%s_%s' % (str(i + 1), hmm_name)))
for i in range(len(pattern)):
delete_states.append(State(None, name='D%s_%s' % (str(i + 1), hmm_name)))
unit_start = State(None, name='prefix_start_%s' % hmm_name)
unit_end = State(None, name='prefix_end_%s' % hmm_name)
model.add_states(insert_states + match_states + delete_states + [unit_start, unit_end])
last = len(delete_states)-1
model.add_transition(model.start, unit_start, 1)
model.add_transition(unit_end, model.end, 1)
insert_error = settings.MAX_ERROR_RATE * 2 / 5
delete_error = settings.MAX_ERROR_RATE * 1 / 5
model.add_transition(unit_start, match_states[0], 1 - insert_error - delete_error)
if vpaths:
alignment = get_multiple_alignment_of_repeats_from_reads(vpaths)
transitions, emissions = build_profile_hmm_pseudocounts_for_alignment(settings.MAX_ERROR_RATE, alignment)
else:
transitions, emissions = build_profile_hmm_for_repeats(patterns, settings.MAX_ERROR_RATE)
matches = [m for m in emissions.keys() if m.startswith('M')]
last_end = None
for repeat in range(copies):
insert_states = []
match_states = []
delete_states = []
for i in range(len(matches) + 1):
insert_distribution = DiscreteDistribution(emissions['I%s' % i])
insert_states.append(State(insert_distribution, name='I%s_%s' % (i, repeat)))
for i in range(1, len(matches) + 1):
match_distribution = DiscreteDistribution(emissions['M%s' % i])
match_states.append(State(match_distribution, name='M%s_%s' % (str(i), repeat)))
for i in range(1, len(matches) + 1):
delete_states.append(State(None, name='D%s_%s' % (str(i), repeat)))
unit_start = State(None, name='unit_start_%s' % repeat)
unit_end = State(None, name='unit_end_%s' % repeat)
model.add_states(insert_states + match_states + delete_states + [unit_start, unit_end])
n = len(delete_states)-1
if repeat > 0:
model.add_transition(last_end, unit_start, 1)
else:
def build_reference_repeat_finder_hmm(patterns, copies=1):
pattern = patterns[0]
model = Model(name="HMM Model")
insert_distribution = DiscreteDistribution({'A': 0.25, 'C': 0.25, 'G': 0.25, 'T': 0.25})
last_end = None
start_random_matches = State(insert_distribution, name='start_random_matches')
end_random_matches = State(insert_distribution, name='end_random_matches')
model.add_states([start_random_matches, end_random_matches])
for repeat in range(copies):
insert_states = []
match_states = []
delete_states = []
for i in range(len(pattern) + 1):
insert_states.append(State(insert_distribution, name='I%s_%s' % (i, repeat)))
for i in range(len(pattern)):
distribution_map = dict({'A': 0.01, 'C': 0.01, 'G': 0.01, 'T': 0.01})
distribution_map[pattern[i]] = 0.97
match_states.append(State(DiscreteDistribution(distribution_map), name='M%s_%s' % (str(i + 1), repeat)))
for i in range(len(pattern)):
delete_states.append(State(None, name='D%s_%s' % (str(i + 1), repeat)))
unit_start = State(None, name='unit_start_%s' % repeat)
unit_end = State(None, name='unit_end_%s' % repeat)
model.add_states(insert_states + match_states + delete_states + [unit_start, unit_end])
last = len(delete_states)-1
if repeat > 0:
model.add_transition(last_end, unit_start, 0.5)
def make_hmm_model(emission_mat, transition_probs):
model = pomegranate.HiddenMarkovModel('ndf')
ictal_emissions = {i:emission_mat[1,i] for i in range(emission_mat.shape[1])}
baseline_emissions = {i:emission_mat[0,i] for i in range(emission_mat.shape[1])}
ictal = pomegranate.State(pomegranate.DiscreteDistribution(ictal_emissions ), name = '1')
baseline = pomegranate.State(pomegranate.DiscreteDistribution(baseline_emissions), name = '0')
model.add_state(ictal)
model.add_state(baseline)
model.add_transition( model.start, ictal, 0.05 )
model.add_transition( model.start, baseline, 99.95)
model.add_transition( baseline, baseline, transition_probs[0,0] )
model.add_transition( baseline, ictal, transition_probs[0,1] )
model.add_transition( ictal, ictal , transition_probs[1,1] )
model.add_transition( ictal, baseline, transition_probs[1,0] )
model.bake(verbose=False )
return model
def test_example_pomegranate(self):
"""
This example is taken from https://pomegranate.readthedocs.io/en/latest/HiddenMarkovModel.html
"""
from pomegranate import DiscreteDistribution, State, HiddenMarkovModel
d1 = DiscreteDistribution({'A': 0.35, 'C': 0.20, 'G': 0.05, 'T': 0.40})
d2 = DiscreteDistribution({'A': 0.25, 'C': 0.25, 'G': 0.25, 'T': 0.25})
d3 = DiscreteDistribution({'A': 0.10, 'C': 0.40, 'G': 0.40, 'T': 0.10})
s1 = State(d1, name="s1")
s2 = State(d2, name="s2")
s3 = State(d3, name="s3")
model = HiddenMarkovModel(name='example')
model.add_states([s1, s2, s3])
model.add_transition(model.start, s1, 0.90)
model.add_transition(model.start, s2, 0.10)
model.add_transition(s1, s1, 0.80)
model.add_transition(s1, s2, 0.20)
model.add_transition(s2, s2, 0.90)
model.add_transition(s2, s3, 0.10)
model.add_transition(s3, s3, 0.70)
model.add_transition(s3, model.end, 0.30)
model.bake()
answer = model.log_probability(list('ACGACTATTCGAT'))
expected = -22.73896159971087